Producer Consumer
public final class Message {
private final String msg;
public Message(String msg) {
this.msg = msg;
}
public Message(int msg) {
this.msg = String.valueOf(msg);
}
public final String getMessage() {
return msg;
}
}
public class Producer implements Runnable {
private final BlockingQueue queue;
private final AtomicBoolean producing;
public Producer(BlockingQueue queue, AtomicBoolean producing) {
this.queue = queue;
this.producing = producing;
}
@Override
public void run() {
for(int i = 0; i <= 225; i++) {
Message msg = new Message(i);
try {
queue.put(msg);
System.out.println("producing msgID: " + msg.getMessage() +
", queue.size(): " + queue.size());
}
catch(InterruptedException ex) { ex.printStackTrace(); }
}
producing.set(false);
System.out.println("PRODUCER EXITS");
}
}
public class Consumer implements Runnable {
private final BlockingQueue queue;
private final AtomicBoolean producing;
public Consumer(BlockingQueue queue, AtomicBoolean producing) {
this.queue = queue;
this.producing = producing;
}
@Override
public void run() {
while(producing.get() || !queue.isEmpty()) {
try {
Message msg = queue.poll(1, TimeUnit.MILLISECONDS);
if(msg != null) {
System.out.println(
Thread.currentThread().getName() + ": " +
"consumer msgID: " + msg.getMessage() +
", queue.size(): " + queue.size());
}
}
catch(InterruptedException ie) { ie.printStackTrace(); }
}
System.out.println(Thread.currentThread().getName() + ": CONSUMER EXITS");
}
}
public class ProducerConsumer {
/** @param args the command line arguments */
public static void main(String[] args) {
final BlockingQueue queue = new LinkedBlockingQueue<>(4);
final AtomicBoolean producing = new AtomicBoolean(true);
final Thread prodTh = new Thread(new Producer(queue, producing), "ProdTh");
final Thread consTh_1 = new Thread(new Consumer(queue, producing), "ConsTh_1");
final Thread consTh_2 = new Thread(new Consumer(queue, producing), "ConsTh_2");
final Thread consTh_3 = new Thread(new Consumer(queue, producing), "ConsTh_3");
prodTh.start();
consTh_1.start();
consTh_2.start();
consTh_3.start();
try {
prodTh.join();
consTh_1.join();
consTh_2.join();
consTh_3.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}