本文共 13354 字,大约阅读时间需要 44 分钟。
JUC是指java并发工具包java.util.concurrent包
JUC并发包中的工具类主要有CountDownLatch、CyclicBarrier、Semaphore、FutureTask、Exchanger等。这些工具类在java1.5被引入。
countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。
countDownLatch类中只提供了一个构造器,另外还有三个比较重要的方法:
//参数count为计数值public CountDownLatch(int count) { }; //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行public void await() throws InterruptedException { }; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //将count值减1public void countDown() { };
CountDownLatch的用法非常简单,以下为应用场景之一:
public class CountdownLatchTest{ public static void main(String[] args) { CountDownLatch latch = new CountDownLatch(2); Thread t1 = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(5000); } catch (InterruptedException ignore) { } // 休息 5 秒后(模拟线程工作了 5 秒),调用 countDown() latch.countDown(); } }, "t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(10000); } catch (InterruptedException ignore) { } // 休息 10 秒后(模拟线程工作了 10 秒),调用 countDown() latch.countDown(); } }, "t2"); t1.start(); t2.start(); Thread t3 = new Thread(new Runnable() { @Override public void run() { try { // 阻塞,等待 state 减为 0 latch.await(); System.out.println("线程 t3 从 await 中返回了"); } catch (InterruptedException e) { System.out.println("线程 t3 await 被中断"); Thread.currentThread().interrupt(); } } }, "t3"); Thread t4 = new Thread(new Runnable() { @Override public void run() { try { // 阻塞,等待 state 减为 0 latch.await(); System.out.println("线程 t4 从 await 中返回了"); } catch (InterruptedException e) { System.out.println("线程 t4 await 被中断"); Thread.currentThread().interrupt(); } } }, "t4"); t3.start(); t4.start(); }}
CyclicBarrier是“循环栅栏”,就是一个可循环利用的屏障。它的作用就是会让所有线程都等待完成后才会继续下一步行动。与CountDownLatch的区别在于:
CyclicBarrier提供了两个构造方法以及一些其他重要的方法。
public CyclicBarrier(int parties)public CyclicBarrier(int parties, Runnable barrierAction)public int await() throws InterruptedException, BrokenBarrierExceptionpublic int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
使用场景:一个线程组的线程需要等待所有线程完成任务后再继续执行下一次任务。
使用例子:
public class CyclicBarrierDemo { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CyclicBarrier cb = new CyclicBarrier(3);//需要拦截的线程数为3 for(int i=0;i<3;i++){// 三个人 Runnable runnable = new Runnable(){ public void run(){ try { Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点1,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候")); cb.await();//先到达的人就开始等待其他人 Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点2,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候")); cb.await();//先到达的人就开始等待其他人 Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点3,当前已有" + (cb.getNumberWaiting() + 1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候")); cb.await();//先到达的人就开始等待其他人 } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } service.shutdown(); }}
总结CyclicBarrier和CountDownLatch的区别
//参数permits表示许可数目,即同时可以允许多少线程进行访问 public Semaphore(int var1) { this.sync = new Semaphore.NonfairSync(var1); }//这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可 public Semaphore(int var1, boolean var2) { this.sync = (Semaphore.Sync)(var2 ? new Semaphore.FairSync(var1) : new Semaphore.NonfairSync(var1)); }public boolean tryAcquire() { }; //尝试获取一个许可,若获取成功,则立即返回true,若获 取失败,则立即返回false public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true, 否则则立即返回false public boolean tryAcquire(int permits) { }; //尝试获取permits个许可,若获取成功,则立 即返回true,若获取失败,则立即返回false public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取permits个许可,若在指定的时间内获取成功,则立即返回 true,否则则立即返回false
Semaphore使用场景:假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现:
public class SemaphoreDemo { public static void main(String[] args) { int N = 8; //工人数 Semaphore semaphore = new Semaphore(5); //机器数目 for(int i=0;i
原理解读:Semaphore 类似一个 资源池(读者可以类比线程池),每个线程需要调用 acquire() 方法获取资源,然后才能执行,执行完后,需要 release 资源,让给其他的线程用。创建 Semaphore 实例的时候,需要一个参数 permits,这个基本上可以确定是设置给 AQS的 state 的,然后每个线程调用 acquire 的时候,执行 state = state - 1,release 的时候执行 state = state + 1,当然,acquire 的时候,如果 state = 0,说明没有资源了,需要等待其他线程 release。
public Semaphore(int permits) { sync = new NonfairSync(permits); }public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }// 公平策略: protected int tryAcquireShared(int acquires) { for (;;) { // 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作 // 这个就不分析了,第一篇AQS中已经讲过 if (hasQueuedPredecessors()) //进入到这里说明阻塞队列中已经有线程在等着获取资源 return -1; int available = getState(); int remaining = available - acquires; //当remaining最小为0时,会CAS设置state为0,成功返回remaining //当remaining小于0时,这里会直接返回remaining,这里不会执行compareAndSetState if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }// 非公平策略: protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
FutureTask是创建线程的方式之一。创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。 这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。 如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结 果。FutureTask通常和Callabel接口一起使用。
public interface Runnable { public abstract void run(); }
public interface Callable{ /*** Computes a result, or throws an exception if unable to do so. ** @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
Future submit(Callable task); Future submit(Runnable task, T result); Future submit(Runnable task);
第一个submit方法里面的参数类型就是Callable。一般情况下我们使用第一个submit方法和第三个submit方法,第二个submit方法很少使用。
submit方法的返回值是一个Future对象。Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。 Future类位于java.util.concurrent包下,它是一个接口:
public interface Future{ boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
FutureTask类实现了RunnableFuture接口:
public class FutureTask implements RunnableFuture { public FutureTask(Callablecallable) { } public FutureTask(Runnable runnable, V result) { }}public interface RunnableFuture extends Runnable, Future { void run(); }
public class FutureTaskDemo { public static void main(String[] args) { FutureTaskfutureTask = new FutureTask (new Callable () { @Override public Integer call() throws Exception { // 具体的异步线程的业务执行 int num = new Random().nextInt(10); TimeUnit.SECONDS.sleep(num); return num; } }); Thread thread = new Thread(futureTask); thread.start(); // 还可以做其他的业务操作 与futureTask并行执行 get try{ // 在结果返回之前,所有线程都被堵塞,存放在等待队列中 Integer num = futureTask.get(); System.out.println(num); }catch (Exception e){ e.printStackTrace(); } }}
Exchanger:一个同步点,在该点上线程可以成对地交换元素。每个线程在进入exchange方法时显示一些对象,与伙伴线程匹配,并在返回时接收伙伴的对象。交换器可以看作是同步队列的双向形式。交换器可用于遗传算法和管道设计等应用。它提供了个构造器:
Exchanger 是 JDK 1.5 开始提供的一个用于两个工作线程之间交换数据的封装工具类,简单说就是一个线程在完成一定的事务后想与另一个线程交换数据,则第一个先拿出数据的线程会一直等待第二个线程,直到第二个线程拿着数据到来时才能彼此交换对应数据。其定义为 Exchanger<V>
泛型类型,其中 V 表示可交换的数据类型,对外提供的接口很简单,具体如下:
Exchanger():
无参构造方法。
V exchange(V v):
等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。
V exchange(V v, long timeout, TimeUnit unit):
等待另一个线程到达此交换点(除非当前线程被中断或超出了指定的等待时间),然后将给定的对象传送给该线程,并接收该线程的对象。
可以看出,当一个线程到达 exchange 调用点时,如果其他线程此前已经调用了此方法,则其他线程会被调度唤醒并与之进行对象交换,然后各自返回;如果其他线程还没到达交换点,则当前线程会被挂起,直至其他线程到达才会完成交换并正常返回,或者当前线程被中断或超时返回。
Exchange的使用例子
import java.util.concurrent.Exchanger; public class ExchangeTest01 { // 创建交换器 private static Exchanger< String > exchanger = new Exchanger < String >(); /*** 将当前线程id塞入交换器,同时获取交换器中的线程id */ public void update() { new Thread() { public void run() { try { String i = exchanger.exchange(Thread.currentThread().getName()); System.out.println(Thread.currentThread().getName() + "进行交换的线程:" + i); }catch (InterruptedException e) { e.printStackTrace(); } } }.start(); } public static void main(String[] args) { ExchangeTest01 exchangerTest = new ExchangeTest01(); for (int i = 0; i < 5; i++) { exchangerTest.update(); } } }
转载地址:http://bfuqb.baihongyu.com/