Table of Contents

Producer Consumer

public final class Message {
	
	private final String msg;
	
	public Message(String msg) {
		this.msg = msg;
	}
	
	public Message(int msg) {
		this.msg = String.valueOf(msg);
	}
	
	public final String getMessage() {
		return msg;
	}
}

public class Producer implements Runnable {

	private final BlockingQueue queue;
	private final AtomicBoolean producing;
	
	public Producer(BlockingQueue queue, AtomicBoolean producing) {
		this.queue = queue;
		this.producing = producing;
	}
	
	@Override
	public void run() {
				
		for(int i = 0; i <= 225; i++) {

			Message msg = new Message(i);

			try {
				queue.put(msg);
				System.out.println("producing msgID: " + msg.getMessage() +
					", queue.size(): " + queue.size());
			}
			catch(InterruptedException ex) {	ex.printStackTrace();	}
		}
		
		producing.set(false);
		
		System.out.println("PRODUCER EXITS");
	}
}

public class Consumer implements Runnable {
	
	private final BlockingQueue queue;
	private final AtomicBoolean producing;
	
	public Consumer(BlockingQueue queue, AtomicBoolean producing) {
		this.queue = queue;
		this.producing = producing;
	}
	
	@Override
	public void run() {
		
		while(producing.get() || !queue.isEmpty()) {

			try {

				Message msg = queue.poll(1, TimeUnit.MILLISECONDS);
				
				if(msg != null) {
					System.out.println(
						Thread.currentThread().getName() + ": " +
						"consumer msgID: " + msg.getMessage() +
							", queue.size(): " + queue.size());
				}
			}
			catch(InterruptedException ie) {	ie.printStackTrace();		}
		}
		
		System.out.println(Thread.currentThread().getName() + ": CONSUMER EXITS");
	}
}

public class ProducerConsumer {

	/** @param args the command line arguments		*/
	public static void main(String[] args) {

		final BlockingQueue queue = new LinkedBlockingQueue<>(4);
		final AtomicBoolean producing = new AtomicBoolean(true);

		final Thread prodTh = new Thread(new Producer(queue, producing), "ProdTh");
		final Thread consTh_1 = new Thread(new Consumer(queue, producing), "ConsTh_1");
		final Thread consTh_2 = new Thread(new Consumer(queue, producing), "ConsTh_2");
		final Thread consTh_3 = new Thread(new Consumer(queue, producing), "ConsTh_3");

			prodTh.start();
			consTh_1.start();
			consTh_2.start();
			consTh_3.start();

		try {
			prodTh.join();
			consTh_1.join();
			consTh_2.join();
			consTh_3.join();
		} catch (InterruptedException ex) {
			ex.printStackTrace();
		}
	}
}