锁、AQS

1. 前言

在实际开发中在多线程中保证其安全性,其实我们用的最多的是互斥锁,也就是synchronized关键字,我们更多的是考虑怎么降低锁的颗粒性,以及锁持有的时间。其次就是ReentrantLock,该类和synchronized功能几乎一样,唯一的特点是它比synchronized更加灵活。

2. ReentrantLock

2.1 初始化

直接来看源码

1
2
3
4
5
6
/**
*构造函数中可以看出,可以选择创建公平锁和非公平锁
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

2.2 非公平锁的加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public void lock() { // 加锁
sync.lock();
}

static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

final void lock() {
// CAS判断如果是0则更新成1,这里变量state是AbstractQueuedSynchronizer抽象类中的变量
// 这里我们需要知道state表示该锁一共有加锁的次数,0表示锁处于空闲状态
if (compareAndSetState(0, 1))
// 更新成功,保存持有锁的线程
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

abstract void lock();

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread(); //获取当前线程
int c = getState(); // 获取state
if (c == 0) { // state=0,表示锁处理空闲状态时,直接持有锁即可
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 锁已被人持有,判断锁持有人是否为当前线程
// 是则state+1,否表示加锁失败
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

abstract class AbstractQueuedSynchronizer {
public final void acquire(int arg) {
// 尝试获取锁(子类实现)获取锁失败则将线程加入等待队列中
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) { // 加入等待队列
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 等待队列为空,创建一个头部队列
if (compareAndSetHead(new Node()))
tail = head;
} else { // 等待队列不为空,将当前node加入到队列的尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
}

非公平的加锁的已经完成,其中有一系列的CAS操作,我们要知道CAS操作是无锁的,处于线程安全的。基本所有更新的操作都是安全的! 其中还有一个点,比如在nonfairTryAcquireaddWaiter中,明明已经有了整个处理的逻辑为什么还要在外层加一层判断,这里主要是为了性能考虑。当某个状态可直接快速的决定逻辑的走向,所有可以优先判断快速处理。不必走整个完整的逻辑。当然这里牺牲了一些代码的可读性!

这一步结合简单的图来在整体了解一下加锁过程

image

剖析出来整体是比较清晰的。

2.3 AQS中的等待队列

这里的加入等待队列,比较重点。
在AQS里面。维护了一个等待队列,来保存等待的线程。从上面代码可出实际头部节点在初始化的时候是没有保存线程的。实际的线程是在第二个队列中。接下来看一下acquireQueued方法做了什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 跳出等待队列
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { // 死循环
final Node p = node.predecessor(); // 获取前驱节点
// 如果前驱节点=头部节点,
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断是否要阻塞当前线程
/**
在AQS的等待队列中,只有前置是SIGNAL状态才能开始等待。
*/
// 阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

简单的看出,首先是一个死循环,一是如果前驱节点是头部节点,再次尝试获取锁。成功则自己设为头部节点。(特点注意这里在已经在加入等待队列时,已经获取到锁了)。如果不是,则尝试加入队列同时阻塞自己。
看下shouldParkAfterFailedAcquire方法,检查是否允许进入阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 这个状态标识,可以正常挂靠在这里
*/
return true;
if (ws > 0) {
/*
* 这个状态,会一直往前找,找到一个不属于CANCELLED的node
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 其他节点会默认为 SIGNAL
* 我们这里会先走这个流程,在走第一条分支。标识可以正常挂靠
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

所以这里队列其实是可以通过状态来标识自己是否有需要被唤醒。我们这个AQS的实现。暂时没有用到这个,一直是SIGNAL状态。
注意这里的死循环,简单说就是一个线程在尝试加入队列的同时,也在不断的尝试获取锁。比如发现前置节点是SIGNAL则开始阻塞线程。等待执行线程的唤醒。然后又继续重复尝试获取线程,尝试加入队列。

2.4 非公平锁的解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public void unlock() {
sync.release(1);
}
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 获取state-1
// 当前线程不是持有锁的线程,直接抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // state-1 = 0 表示解锁成功
free = true;
setExclusiveOwnerThread(null);
}
setState(c); // state-1 != 0 表示是可重入锁,减了一层,
return free;
}
}
abstract class AbstractQueuedSynchronizer {
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 判断队列是否为null
if (h != null && h.waitStatus != 0)
//
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
}

解锁过程比较简单。主要是从队列的尾部像前找,找到对接近队列头部的节点切状态正确的节点,将其唤醒。

2.5 整体流程

看下来其实可能比较模糊,我们最后看一下流程图!

image

最后文字在讲述一下基本的逻辑

  1. 线程A lock,state=0,将state=1,获取到锁,此时队列为null
  2. 线程B lock,尝试获取锁失败,加入队列,由于队列为null,先初始化一个head,把自己放在head后面。
  3. 线程B 准备休眠,开始死循环,检查自己的前置节点是不是head,是则尝试获取一下锁。失败的话。开始判断自己是否可以休眠。
    • 检测自己的前置节点,是否为SIGNAL标识,是则直接执行休眠
    • 前置节点为CANCELLED标识(代码是>0),将自己的node往前推,删除前置节点,直到不是CANCELLED标识
    • 其他标识,将标识直接改成SIGNAL标识,
      这里线程B会先进入第三条规则,因为其前置节点是空节点,没有设标识。在进去第一条规则。可以挂靠。
  4. 线程C lock,尝试获取锁。
  5. 线程A 此时放开锁,唤醒了线程B,线程B开始尝试获取锁,注意此时线程C也在尝试获取锁,结果被线程C获取到,同时将自己设为head。而线程B又在死循环中进入休眠。等待下次唤醒。

最后根据流程图看会比较简单了。
终于为什么叫非公平锁,其实可以看出来了!就是在获取锁的时候,首先会尝试获取一下锁。不行再加入到队列。这里就存在一些竞争关系,实际就是我上面文字描述的4、5两步骤。但是并不是完全非公平的,只在新成员加入队列前会有尝试获取锁的机会。进入队列中是按公平的策略来的。

2.6 公平锁

公平锁和非公平锁区别不大。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

首先lock的时候,没有了提前去判断state的状态,毕竟判断了也不可能给你随便加入。
然后就是尝试获取锁,这里有个方法 hasQueuedPredecessors 这个方法主要是判断队列中是否有在等待中的线程。
所以这里解决了根本原因的,就是公平锁在第一次锁的时候会判断一下的队列的情况。这样就解决了上面说到的非公平锁的不公平性。

2.7 Condition 用法

3. Semaphore

semaphore信号量,可以控制并发访问的线程个数。
可以在需要访问限制中使用,比如一些连接池、限流的场景。当然很多高并发的限流工具,我们会优先采用现成的,自己写的不多。所以用的频率不高。但是自己写写测试代码倒是用的挺多。
来看下基本用法

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws InterruptedException {
// 初始化,同时允许的并发个数
Semaphore semaphore = new Semaphore(100);
try {
// 获取一个许可
semaphore.acquire();
semaphore.tryAcquire(); //尝试获取一个许可
}finally {
// 释放一个许可
semaphore.release();
}
}

简单看下源码,也是通过继承AQS模板来实现,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class Semaphore{
// 初始化,维护AQS的state变量,
// 也分公平和非公平模式和ReentrantLock类似的实现,这里只贴非公平锁的代码
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// 获取许可
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
abstract static class Sync extends AbstractQueuedSynchronizer {
// 以非公平方式,尝试获取凭证
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState(); // 获取state
int remaining = available - acquires;
// 如果 remaining >= 0 表示凭证够,后面的CAS是保证数据的正确性
// < 0 凭证不够,直接返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
public abstract class AbstractQueuedSynchronizer{
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException { // 获取许可
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取,调用实现类 nonfairTryAcquireShared
// 这里的不公平地方也在这里。来了什么都不管先尝试一下。
// 所以这个实现在公平锁肯定会加入判断队列的情况而非公平锁就不会去判断队列的情况
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 开始阻塞线程 也和ReentrantLock类似
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 将自己加入队列
boolean failed = true;
try {
for (;;) { // 死循环
final Node p = node.predecessor(); // 获取前置节点
if (p == head) {
int r = tryAcquireShared(arg); // 尝试获取凭证
if (r >= 0) { // 获取成功,将自己设为头部节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 开始检查自己是否可以加入队列,
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}

简单看一下和ReentrantLock其实比较类似。利用好AQS的模板效果。重点在于state的维护和队列的使用。这些都是AQS内部已经建好的。需要是掌握时机即可。

4. CountDownLatch CyclicBarrier

两者都可以保证线程的优先级,也就是在多线程中能够保证线程的执行顺序,比如在拆分计算的时候,可以保证某些计算先做,某个后做。

4.1 CountDownLatch

主要体现在一个线程等待多个线程的效果
比如下面的例子,导游等待多个同学,只有当同学全部到了以后就可以触发了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) {
// 初始化,同时允许的并发个数
ExecutorService service = Executors.newFixedThreadPool(3);
CountDownLatch countDownLatch = new CountDownLatch(2);
service.execute(() -> {
log.info("A同学在路上");
countDownLatch.countDown();
});
service.execute(() -> {
log.info("B同学在路上");
countDownLatch.countDown();
});
service.execute(() -> {
countDownLatch.await();
log.info("2个同学到了,开车旅游");
});
}

4.2 CyclicBarrier

表达的是多个线程互相等待,然后一起进行的操作!
比如下面的旅游,但是没有老师了,自发约定地点。我这里的例子可能不是很合理。但是区别还是很明显的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
ExecutorService service = Executors.newFixedThreadPool(3);
service.execute(() -> {
log.info("A同学到了约定地点");
cyclicBarrier.await(); // 等待
log.info("可以走了");
});
service.execute(() -> {
log.info("B同学到了约定地点");
cyclicBarrier.await(); // 等待
log.info("可以一起走了");
});
}

4.3 区别

CyclicBarrier的计数器是可以重置的,CountDownLatch 不可以。
CyclicBarrier是表示多个线程互相等待,然后互相一起走。CountDownLatch 表达的是一个线程等待多个线程,然后这一个线程就可以走了。
CyclicBarrier的API方法比较CountDownLatch 更加丰富一些,可以自己查看一下!

5. FutureTask

futureTask也可以实现线程的等待。这里简单的看一下代码的使用!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(3);
Future<String> submit1 = service.submit(() -> {
log.info("助手1:切肉!");
return "切好的肉";
});
Future<String> submit2 = service.submit(() -> {
log.info("助手2切青椒");
return "切好的青椒";
});
log.info("主厨准备其他东西");
String s = submit1.get();
String s1 = submit2.get();
log.info("青椒肉丝出锅");
}

比如青椒和肉丝其实也可以有两个人一起切。最后主厨就在准备好调料后,就在等待主材料。这三种都是可以同步执行的,如果主厨比较快。那么就会通过get方法,获取需要东西,然后开启等待。