The Classic Producer – Consumer Problem

One of the main questions that my team ask potential candidates during technical interviews, is the Producer – Consumer Problem. We have found over the years that this problem is one of the main issues we have with potential new hires. Many a programmer has been tripped up by this problem, and the related questions that follow.

Producer – Consumer problems are a set of problems with the domain of Multi-Threading.

The concept is simple, we have one of more threads that are “Producers” of something, usually data (Events, Structures or Objects are data) in the case of computers and programming.

“Consumers” are other threads that receive and use the data produced by the “Producers”.

Many different threading concepts are used when creating a Producer-Consumer scenario and it is a great real-world mechanism that we use to solve multi-threaded problems on a day to day basis; and therefore it is an extremely useful to test potential new hires.

For illustration in this blog post, I have created three separate Java classes, which are below. First is the “Test Driver”, this is the main program used the test and potentially debug the Producer and Consumer threads. The second is the Producer. This of course is the Producer thread implementation. And finally the Consumer, which like it’s counterpart the Producer is the Consumer thread implementation.

In my example code below, the Producer and Consumer will run forever (until the user sends the interrupt signal ie. Cltr-C or otherwises kills the process). The Producer will create 10 random Long objects every time is obtains the lock on the “listLock” object monitor, and will than wait while the “sharedList” ArrayList object is NOT empty. The Producer will also Notify ALL other threads waiting on the listLock Object. There is an important difference between the notify() and notifyAll() methods on the java base Object. Both are used to notify waiting threads that are blocked on the same object monitor that it’s time to wake up, but the notify all will wake up ALL threads that are blocked on that object monitor, and based on the OS implementation that threads will compete for object monitor lock, before continuing execution. This is good for situations where you have multiple consumers blocked on the same Object Monitor lock and you want any of them to potentially being processing once that lock becomes available. The rest will block due to the synchronization of the critical section of code which they originally were in the waiting state for. This is of course assuming you wrote your code correctly.

Once the consumer wakes up from the waiting step and obtains the lock on the Object Monitor “listLock”, the example code I have written will consume all data within the ArrayList safely, before notifying the Producer (technically any other threads synchronized and waiting on the listLock) to wake up. In this case the Producer will wake from the waiting state, but it is still blocked because the Consumer has not yet released the lock on the listLock object. Once the Consumer’s loop enters the while sharedList isEmpty check, the Consumer will execute listLock.wait(), entering itself into the wait state, and releasing the lock on the listLock object monitor. This will now allow the Producer to finally being executing, and the process will repeat, with the Producer producing 10 random objects into the shared list. The process will run forever like this, passing data between Producer and Consumer safely, without either thread corrupting the shared memory space, in this case the sharedList ArrayList object. Please note that I did not use any Collection synchronization mechanisms, other than my own. This is perfect for the purpose of testing the Producer-Consumer problem. To use any of Java’s facilities to create thread-safe collections would be defeating the purpose of trying to demonstrate an understanding of multi-thread programming.

Test Driver:

/**
 * File: PCTester.java
 * Creation Date: June 24, 2013
 * Author: Robert C. Ilardi
 * 
 * Here we have a Test Driver for our Producer-Consumer example.
 * 
 */
package com.roguelogic.tests;

import java.util.ArrayList;

public class PCTester {
  public static void main(String[] args) {
    try {
      // This creates a Object Monitor or "lock"
      // that can be shared between threads.
      // Technically in Java, any Object
      // instance of any class
      // can be used as an Object Monitor.
      // As a matter of convenience, most
      // programmers,
      // simply use an instance of the
      // class Object to represent the
      // lock.
      Object listLock = new Object();

      // This ArrayList will become our
      // Shared resource that both the
      // "consumer" threads and the
      // "producer" threads will
      // use to pass data back and
      // forth. On specifically in
      // this case, it's one
      // directionally from the
      // Producer to the Consumer.
      ArrayList<Object> sharedList = new ArrayList<Object>();

      // Here we are creating the Producer
      // It is an object which I have extended
      // the Thread class from. This makes the
      // Producer itself a thread.
      Producer p = new Producer();

      // As you can see I'm passing the listLock object monitor.
      p.setListLock(listLock);

      // I'm also passing the Shared List ArrayList to this Producer.
      p.setSharedList(sharedList);

      // We are now going to create a Consumer Thread.
      // Like the Producer, it also extends from the Thread class
      // and I have also set it up to receive references of
      // both the listLock Object Monitor and the sharedList ArrayList.
      Consumer c = new Consumer();
      c.setListLock(listLock);
      c.setSharedList(sharedList);

      p.start(); // We are going to start the Producer Thread.
      c.start(); // We are going to start the Consumer Thread.

      // The next two lines of code
      // Simply ensures that the main method
      // waits on the producer and consumer thread
      // to exit before continuing.
      p.join();
      c.join();
    }
    catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

syntax highlighted by Code2HTML, v. 0.9.1

Producer:

/**
 * File: Producer.java
 * Creation Date: June 24, 2013
 * Author: Robert C. Ilardi
 * 
 * This is the Producer example thread class.
 * 
 */
package com.roguelogic.tests;

import java.security.SecureRandom;
import java.util.ArrayList;

public class Producer extends Thread {

  private ArrayList<Object> sharedList;
  private Object listLock;

  public Producer() {
    super();
  }

  public void setListLock(Object listLock) {
    this.listLock = listLock;
  }

  public void setSharedList(ArrayList<Object> sharedList) {
    this.sharedList = sharedList;
  }

  public void run() {
    // We are using Secure Random, because the regular Random is boring.
    SecureRandom srnd = new SecureRandom();

    // Here we have the synchronized block
    // which is on the listLock Object monitor.
    // This creates the critical section of code
    // that marks the block of code within it
    // that the operations are atomic.
    // However listLock.wait will cause the thread
    // to stop processing and allow other threads
    // to obtain the lock on the Object Monitor.
    synchronized (listLock) {
      try {
        // Run forever
        while (true) {

          // Add 10 random numbers into the shared queue.
          for (int i = 1; i <= 10; i++) {
            Long nxtLng = srnd.nextLong();
            sharedList.add(nxtLng);
          }

          listLock.notifyAll(); // Notify ALL other threads to wake up

          // Wait while the Shared List had data in it
          while (!sharedList.isEmpty()) {
            listLock.wait();
          }
        }
      }
      catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}

Consumer:

/**
 * File: Consumer.java
 * Creation Date: June 24, 2013
 * Author: Robert C. Ilardi
 * 
 * This is the Consumer example thread class.
 * 
 */
package com.roguelogic.tests;

import java.util.ArrayList;

public class Consumer extends Thread {

  private ArrayList<Object> sharedList;
  private Object listLock;

  public Consumer() {
    super();
  }

  public void setListLock(Object listLock) {
    this.listLock = listLock;
  }

  public void setSharedList(ArrayList<Object> sharedList) {
    this.sharedList = sharedList;
  }

  public void run() {
    int slSz;
    Object obj;

    // Just like the Producer class
    // the synchronized block creates
    // a block of atomic code also
    // know as the critical section
    // as it potentially interacts with
    // other threads.
    synchronized (listLock) {
      try {
        // Loop forever
        while (true) {

          // Wait while the list of empty.
          // Technically this means we can
          // wake up as soon as a single
          // object is populated in the shared list
          // however since our producer implementation
          // does not notify until 10 items are added
          // to the list, we won't really wake up
          // until all 10 are added.
          while (sharedList.isEmpty()) {
            listLock.wait();
          }

          // Note: even if the listLock.wait()
          // receives the notification, it doesn't
          // mean the code will start running,
          // because we are synchronized on the listLock
          // therefore until the lock is released by
          // the thread that has the lock, we will still wait.

          // Loop until the sharedList is empty.
          while (!sharedList.isEmpty()) {
            slSz = sharedList.size();
            obj = sharedList.remove(0);

            System.out.println("Consumer(" + super.getId() + ")>  Shared List Size = " + slSz + " ; Object = " + obj);
          }

          // When we are done make sure to notify other threads
          // This is extremely important if the Producer is in the waiting
          // state. However this thread still holds the lock
          // until the listLock.wait() is invoked in the next iteration
          // of the while loop. If we forget to notify, both threads
          // simply wait forever. Give it a try. Comment it out.
          listLock.notifyAll();
        }
      }
      catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

}

Although, technically I am giving away a partial question and definitely a topic that I have my teams’ test potential new hires on their technical interviews, I think it is important for students of computer science, and programming in general to have a solid understanding of the Producer-Consumer problem. I have found that in all complex systems, and definitely systems that are multi-tier, 3-tier or more distributed systems, a lack of understanding of the Producer-Consumer problem, severely curtails the ability of programmers to produce reliable systems. Again, I hope this post has been helpful!

Just Another Stream of Random Bits…
– Robert C. Ilardi
 
This entry was posted in Development and tagged , , . Bookmark the permalink.

One Response to The Classic Producer – Consumer Problem

  1. Mark Gokhale says:

    Nice article Robert! As we all know, in Computing, as in life, there are always a few different ways to solve a problem. ( When I work in NYC, my commute is either drive-in, take the bus, take the train, or take the ferry. If I ran a hedge fund, I guess a chopper would have been a real possibility too! Back to reality.) Choosing the appropriate solution, helps immensely to cut down the time spent to develop, test and maintain code.

    I have presented two solutions to a typical Producer Consumer problem. The first (let’s call it the Classic solution), is almost identical to the one presented in your article. It uses Java’s low level concurrency primitives wait(), notifyAll() and synchronized to solve the problem. What I have is a Stack which is shared by two threads – the Producer Thread and the Consumer Thread. But this Stack can only hold up to 5 strings at any time. The Producer pumps out strings P1 through P9 in rapid order and tries to push them on top of the Stack. The Consumer pops the top element in the Stack. The Producer has to block if the Stack is full and similarly the Consumer has to block if the Stack is empty. The differences between our examples are minor. The Producer and Consumer Threads implement the Runnable interface as opposed to extending the Thread class. The lock Flag on the SyncStack is used to gain exclusive access to the critical section of the Shared object. I also use synchronized methods as opposed to synchronized blocks. But here it doesn’t really matter since the methods are very small.

    I have created four separate Java classes, which are below. First is the “Driver”, this is the main program. The second is the “Producer”. The third is the “Consumer”. The fourth is the “SyncStack”. This is the object which needs to be shared by the Producer and Consumer threads. I have implemented a simple stack using ArrayList.
    It is a lot of fun to see how changing the time the Producer and Consumer Threads sleep, by changing the argument to the Thread.currentThread().sleep(X) method, affects the interaction between the Producer and the Consumer.
    For example, you could use values between 1 and 500, thus increasing or decreasing the rate at which things are put on the stack or removed from it.

    The second (let’s call it the Concurrent solution), uses classes from the Java Concurrent Collections package, first introduced in Java 1.5.
    Again, I have 4 classes. The Producer and Consumer classes are as before – no change at all. The SyncStack offers the same interface, but its implementation is all new. It uses the LinkedBlockingDeque which implements the BlockingDeuque interface, to implement the Stack. Notice the absence of any synchronized primitives! The word Deque comes from the term “Double Ended Queue”. A Deque is thus a queue where you can insert and remove elements from both ends of the queue. I use it to implement the Stack, which is a LIFO. The LinkedBlockingDeque construct provides the functionality and guarantee, which we hand-crafted using the Java synchronization primitives. Here we get it for free! (Sometimes a free lunch is indeed possible – Just like when Robert sends out an email to the group saying that there are goodies in his office!).
    While I need not have not changed the Driver code at all, I decided to use the Executor and ExcecutorServices, to simplify the coding.

    Which solution should one use? Here is how I would approach it. If the project code used synchronization primitives, then to be consistent from a support perspective, stick to wait(), notify() and synchronized. If you are starting a new project, you may want to check out the Concurrent Collections Utilities. No matter which solution you choose, take the time to appreciate, how threading works at the lower level, by understanding the code as presented in the Classic Producer Consumer solutions.

    ==========================================================================================================
    CLASSIC SOLUTION
    =========================================================================================================
    ——————————————-
    package classic;

    public class Driver {

    public static void main(String[] args) {
    SyncStack ss = new SyncStack();
    Producer producer = new Producer(ss);
    Consumer consumer = new Consumer(ss);

    Thread p_worker = new Thread(producer);
    p_worker.setName(“ProducerThread”);

    Thread c_worker = new Thread(consumer);
    c_worker.setName(“ConsumerThread”);

    p_worker.start();
    c_worker.start();

    try {
    p_worker.join();
    c_worker.join();
    } catch (InterruptedException e1) {
    e1.printStackTrace();
    }
    }
    }
    ——————————————-
    package classic;

    public class Producer implements Runnable {

    SyncStack ss = null;

    public Producer(SyncStack ss){
    this.ss = ss;
    }

    @Override
    public void run() {

    for (int i=0; i ” + Thread.currentThread().getName() + ” Exiting” );
    }
    }
    ——————————————-
    package classic;

    public class Consumer implements Runnable {

    SyncStack ss = null;

    public Consumer(SyncStack ss) {
    this.ss = ss;
    }

    @Override
    public void run() {

    for (int i=0; i ” + Thread.currentThread().getName() + ” Exiting” );
    }
    }
    ——————————————-
    package classic;

    import java.util.ArrayList;

    public class SyncStack {

    private ArrayList list = null;
    private int index = 0;

    private final int max_size = 5;

    public SyncStack() {
    list = new ArrayList (max_size);
    }

    synchronized public void push (String s) {
    while ( index >= max_size ) {
    try {
    wait();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    list.add(s);
    System.out.println( Thread.currentThread().getName() + ” Pushed: ” + s);
    index++;
    notifyAll();
    }
    synchronized public String pop() {
    while ( index ProducerThread Exiting
    ConsumerThread Popped: P9
    ConsumerThread Popped: P4
    ConsumerThread Popped: P3
    ConsumerThread Popped: P2
    ConsumerThread Popped: P1
    ===========> ConsumerThread Exiting

    ==========================================================================================================
    CONCURRENT SOLUTION
    ==========================================================================================================
    package concurrent;

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    public class Driver {

    public static void main(String[] args) {
    SyncStack ss = new SyncStack();
    Runnable producer = new Producer(ss);
    Runnable consumer = new Consumer(ss);

    ExecutorService executor = Executors.newFixedThreadPool(2);

    executor.execute(producer);
    executor.execute(consumer);

    executor.shutdown();
    }
    }
    ——————————————-
    package concurrent;

    public class Producer implements Runnable {

    SyncStack ss = null;

    public Producer(SyncStack ss){
    this.ss = ss;
    }

    @Override
    public void run() {

    for (int i=0; i ” + Thread.currentThread().getName() + ” Exiting” );
    }
    }
    ——————————————-
    package concurrent;

    public class Consumer implements Runnable {

    SyncStack ss = null;

    public Consumer(SyncStack ss) {
    this.ss = ss;
    }

    @Override
    public void run() {

    for (int i=0; i ” + Thread.currentThread().getName() + ” Exiting” );
    }
    }
    ——————————————-
    package concurrent;

    import java.util.concurrent.BlockingDeque;
    import java.util.concurrent.LinkedBlockingDeque;

    public class SyncStack {

    private BlockingDeque dq = null;
    private final int max_size = 5;

    public SyncStack() {
    dq = new LinkedBlockingDeque (max_size);
    }
    public void push (String s) {
    try {
    dq.putLast(s);
    System.out.println( Thread.currentThread().getName() + ” Pushed: ” + s);

    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    public String pop() {
    String s = null;
    try {
    System.out.println( ” ” + Thread.currentThread().getName() + ” Popped: ” + (s = dq.takeLast() ));
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return s;
    }
    }
    ——————————————-
    PROGRAM OUTPUT:
    pool-1-thread-1 Pushed: P0
    pool-1-thread-2 Popped: P0
    pool-1-thread-1 Pushed: P1
    pool-1-thread-1 Pushed: P2
    pool-1-thread-1 Pushed: P3
    pool-1-thread-1 Pushed: P4
    pool-1-thread-1 Pushed: P5
    pool-1-thread-2 Popped: P5
    pool-1-thread-1 Pushed: P6
    pool-1-thread-2 Popped: P6
    pool-1-thread-1 Pushed: P7
    pool-1-thread-2 Popped: P7
    pool-1-thread-1 Pushed: P8
    pool-1-thread-2 Popped: P8
    pool-1-thread-1 Pushed: P9
    ———–> pool-1-thread-1 Exiting
    pool-1-thread-2 Popped: P9
    pool-1-thread-2 Popped: P4
    pool-1-thread-2 Popped: P3
    pool-1-thread-2 Popped: P2
    pool-1-thread-2 Popped: P1
    ===========> pool-1-thread-2 Exiting

    ===============================================================================================================

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.