JUC 常用并发组件
这里列举几个常用的并发组件:
- Semaphore
- CountDownLatch
- CyclicBarrier
- SynchronousQueue
Semaphore
信号量
用法
1. 初始化:
Semaphore semaphore = new Semaphore(3);
2. 获取信号
semaphore.acquire();
3. 还回信号
semaphore.release();
4. 带超时时间的获取
semaphore.tryAcquire(1, TimeUnit.SECONDS);
示例
典型的应用场景:限制并发。
几十个线程同时来调用某个接口服务
public class SemphoreTest {
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 50; i++) {
new Thread(() -> {
try {
callService();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "thread-"+ i).start();
}
}
static void callService() throws InterruptedException {
System.out.println(Thread.currentThread().getName());
TimeUnit.MILLISECONDS.sleep(1000);
}
}
使用semaphore限速
最多同时有3个线程去调用服务,做到限速
public class SemphoreTest {
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 50; i++) {
new Thread(() -> {
try {
semaphore.acquire();
callService();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}, "thread-"+ i).start();
}
}
static void callService() throws InterruptedException {
System.out.println(Thread.currentThread().getName());
TimeUnit.MILLISECONDS.sleep(1000);
}
}
原理
利用AQS同步器。
- new Semaphore(信号量个数) 初始化信号量
- acquire() 获取信号,当减到0时候,再有线程acquire() 时开始AQS排队
- 当有release()时,队列里排队的线程会取出
CountDownLatch
用法
1. 初始化
CountDownLatch countDownLatch = new CountDownLatch(2);
2. 减
Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
countDownLatch.countDown();
3. 等待
countDownLatch.await();
4. 带超时时间的等待 超时获取的返回值false
countDownLatch.await(1, TimeUnit.SECONDS);
示例
典型的应用场景,一个线程C等待A、B线程结束后的数据。一线程等多线程,一等多
public class CountDownLatchTest {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(() -> {
try {
// 业务操作
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "作业完成");
// 操作结束,countDown
countDownLatch.countDown();
}, "thread-1").start();
new Thread(() -> {
try {
// 业务操作
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "作业完成");
// 操作结束,countDown
countDownLatch.countDown();
}, "thread-2").start();
new Thread(() -> {
// 等待A、B两个线程作业结束了,我开始拿他们的数据干活 可能数据在并发安全的容器里等
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "开始作业");
}, "thread-3").start();
}
}
原理
AQS同步器,利用 state 进行加减操作。 当减到0时,await() 的阻塞线程被唤醒
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
CyclicBarrier
用法
1. 初始化: 带回调任务的
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
System.out.println("互相等待结束,callback执行");
});
2. 互相等待
cyclicBarrier.await();
3. 带等待超时的
cyclicBarrier.await(1, TimeUnit.SECONDS);
示例
典型的应用场景,多个线程互相等待,都等待完去做一件事。然后继续循环。CyclicBarrier循环屏障,能自动重设。
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
System.out.println("互相等待结束,callback执行");
});
int total = 5;
new Thread(() -> {
try {
int i = 0;
while (i < total) {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "作业中,开始互相等待 "+ i);
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "继续作业 "+ i);
i++;
}
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, "thread-1").start();
new Thread(() -> {
try {
int i = 0;
while (i < total) {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "作业中,开始互相等待 "+ i);
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "继续作业 "+ i);
i++;
}
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, "thread-2").start();
}
}
原理
SynchronousQueue
SynchronousQueue 是个无容量的线程安全队列,用法和Go里的无容量channel类似。一个线程读会阻塞,直到有个线程写;同样一个线程写会阻塞,直到有一个线程读。
用法
1. SynchronousQueue<Integer> queue = new SynchronousQueue<>();
2. queue.put() <-> queue.take()
示例
两个线程交替打印递增数字。利用SynchronousQueue做线程间同步。
public class SynchronousQueueTest {
public static void main(String[] args) {
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
new Thread(() -> {
Integer take = 0;
while (true) {
try {
// A线程put,对应B线程take
queue.put(take++);
// A线程take,对应B线程put
take = queue.take();
System.out.println(Thread.currentThread().getName() + " " + take++);
//TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "threadName-A").start();
new Thread(() -> {
while (true) {
try {
Integer take = queue.take();
System.out.println(Thread.currentThread().getName() + " " + take++);
queue.put(take++);
//TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "threadName-B").start();
}
}
原理
SynchronousQueue 有两种实现:一个公平的内部是TransferQueue 先进先出;另一个非公平的TransferStack,后进先出。
队列或栈内部是一个包含线程的元素,线程安全用的CAS保证而非锁。唤醒和睡眠使用的 LockSupport.park 和 LockSupport.unpark。