Android多线程开发 - BlockingQueue

在Java的并发开发中,队列是一种常用的数据结构,很多并发数据都会放入到队列中去。幸运的是,java平台提供了现成的并发队列供我们使用,今天我们就来一起看看。

BlockingQueue初探

BlockingQueue是Java提供的一个接口,用来线程安全的向里面放数据或者从里面取出数据。它的一个典型应用就是生产者/消费者模型,也就是一个线程生产数据,另外一个线程消费数据。如下图:

Producing thread会一直生产数据然后放入队列,知道将队列装满,此时会block住producing thread。而consumer thread会一直从队列中取出数据,当队列为空的时候,线程被阻塞。

BlockingQueue Methods

BlockingQueue有4套不同的方法来对队列进行数据的操作。当数据不能立刻读出或者插入的时候它们的行为略有不同,如图所示:这四组方法对应的响应分别是:Throw Exception, 返回特殊值,block或者timeout。

一个例子

BlockingQueue只是定义的一个接口,Java还提供了它的几个实现类:

  • ArrayBlockingQueue
  • DelayQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue
  • SynchronousQueue

其中比较常用的是ArrayBlockingQueue以及LinkedBlockingQueue。接下来我们用BlockingQueue来实现一个生产者/消费者:

class Producer implements Runnable {
       private final BlockingQueue queue;
       Producer(BlockingQueue q) { queue = q; }
       public void run() {
         try {
           while (true) { queue.put(produce()); }
         } catch (InterruptedException ex) { ... handle ...}
       }
       Object produce() { ... }
 }

 class Consumer implements Runnable {
       private final BlockingQueue queue;
       Consumer(BlockingQueue q) { queue = q; }
       public void run() {
         try {
           while (true) { consume(queue.take()); }
         } catch (InterruptedException ex) { ... handle ...}
       }
       void consume(Object x) { ... }
 }

 class Setup {
       void main() {
         BlockingQueue q = new SomeQueueImplementation();
         Producer p = new Producer(q);
         Consumer c1 = new Consumer(q);
         Consumer c2 = new Consumer(q);
         new Thread(p).start();
         new Thread(c1).start();
         new Thread(c2).start();
       }
 }

其他类

java.util.concurrent 包中还提供了大量的其它辅助类,简化了我们的并发程序的开发:

  • BlockingDeque:类似于上面介绍过的BlockingQueue,不过不同的是,这个可以两头入队/出队。

  • ConcurrentMap:map的并发版本。

  • CountDownLatch

非常好用的一个类。它可以用来让一个或者多个thread来等某个计数器结束。一个CountDownLatch被初始化为某个特定的数值,然后该数值会通过调用CountDownLatch的countDown()递减,而等待的threads可以调用await()来等待Count变为0.

说的有些啰嗦,看个例子:

CountDownLatch latch = new CountDownLatch(3);
Waiter waiter = new Waiter(latch);
Decrementer decrementer = new Decrementer(latch);

new Thread(waiter)     .start();
new Thread(decrementer).start();
Thread.sleep(4000);



public class Waiter implements Runnable{
        CountDownLatch latch = null;
     public Waiter(CountDownLatch latch) {
        this.latch = latch;
     }

     public void run() {
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Waiter Released");
    }
}


public class Decrementer implements Runnable {

        CountDownLatch latch = null;

     public Decrementer(CountDownLatch latch) {
        this.latch = latch;
        }

     public void run() {

        try {
            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        }
}

Reference