锁、AQS 1. 前言 在实际开发中在多线程中保证其安全性,其实我们用的最多的是互斥锁,也就是synchronized关键字,我们更多的是考虑怎么降低锁的颗粒性,以及锁持有的时间。其次就是ReentrantLock,该类和synchronized功能几乎一样,唯一的特点是它比synchronized更加灵活。
2. ReentrantLock 2.1 初始化 直接来看源码
1 2 3 4 5 6 public ReentrantLock (boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
2.2 非公平锁的加锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 public void lock () { sync.lock(); } static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L ; final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } } abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L ; abstract void lock () ; final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } } abstract class AbstractQueuedSynchronizer { public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } }
非公平的加锁的已经完成,其中有一系列的CAS操作,我们要知道CAS操作是无锁的,处于线程安全的。基本所有更新的操作都是安全的! 其中还有一个点,比如在nonfairTryAcquire 和addWaiter 中,明明已经有了整个处理的逻辑为什么还要在外层加一层判断,这里主要是为了性能考虑。当某个状态可直接快速的决定逻辑的走向,所有可以优先判断快速处理。不必走整个完整的逻辑。当然这里牺牲了一些代码的可读性!
这一步结合简单的图来在整体了解一下加锁过程
剖析出来整体是比较清晰的。
2.3 AQS中的等待队列 这里的加入等待队列,比较重点。 在AQS里面。维护了一个等待队列,来保存等待的线程。从上面代码可出实际头部节点在初始化的时候是没有保存线程的。实际的线程是在第二个队列中。接下来看一下acquireQueued方法做了什么?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
简单的看出,首先是一个死循环,一是如果前驱节点是头部节点,再次尝试获取锁。成功则自己设为头部节点。(特点注意这里在已经在加入等待队列时,已经获取到锁了 )。如果不是,则尝试加入队列同时阻塞自己。 看下shouldParkAfterFailedAcquire 方法,检查是否允许进入阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }
所以这里队列其实是可以通过状态来标识自己是否有需要被唤醒。我们这个AQS的实现。暂时没有用到这个,一直是SIGNAL状态。 注意这里的死循环,简单说就是一个线程在尝试加入队列的同时,也在不断的尝试获取锁。比如发现前置节点是SIGNAL则开始阻塞线程。等待执行线程的唤醒。然后又继续重复尝试获取线程,尝试加入队列。
2.4 非公平锁的解锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public void unlock () { sync.release(1 ); } abstract static class Sync extends AbstractQueuedSynchronizer { protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } } abstract class AbstractQueuedSynchronizer { public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); } }
解锁过程比较简单。主要是从队列的尾部像前找,找到对接近队列头部的节点切状态正确的节点,将其唤醒。
2.5 整体流程 看下来其实可能比较模糊,我们最后看一下流程图!
最后文字在讲述一下基本的逻辑
线程A lock,state=0,将state=1,获取到锁,此时队列为null
线程B lock,尝试获取锁失败,加入队列,由于队列为null,先初始化一个head,把自己放在head后面。
线程B 准备休眠,开始死循环 ,检查自己的前置节点是不是head,是则尝试获取一下锁。失败的话。开始判断自己是否可以休眠。
检测自己的前置节点,是否为SIGNAL 标识,是则直接执行休眠
前置节点为CANCELLED 标识(代码是>0),将自己的node往前推,删除前置节点,直到不是CANCELLED 标识
其他标识,将标识直接改成SIGNAL 标识, 这里线程B会先进入第三条规则,因为其前置节点是空节点,没有设标识。在进去第一条规则。可以挂靠。
线程C lock,尝试获取锁。
线程A 此时放开锁,唤醒了线程B,线程B开始尝试获取锁,注意此时线程C也在尝试获取锁,结果被线程C获取到,同时将自己设为head。而线程B又在死循环中进入休眠。等待下次唤醒。
最后根据流程图看会比较简单了。 终于为什么叫非公平锁,其实可以看出来了!就是在获取锁的时候,首先会尝试获取一下锁。不行再加入到队列。这里就存在一些竞争关系,实际就是我上面文字描述的4、5两步骤。但是并不是完全非公平的,只在新成员加入队列前会有尝试获取锁的机会。进入队列中是按公平的策略来的。
2.6 公平锁 公平锁和非公平锁区别不大。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L ; final void lock () { acquire(1 ); } protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } }
首先lock的时候,没有了提前去判断state的状态,毕竟判断了也不可能给你随便加入。 然后就是尝试获取锁,这里有个方法 hasQueuedPredecessors
这个方法主要是判断队列中是否有在等待中的线程。 所以这里解决了根本原因的,就是公平锁在第一次锁的时候会判断一下的队列的情况。这样就解决了上面说到的非公平锁的不公平性。
2.7 Condition 用法 3. Semaphore semaphore信号量,可以控制并发访问的线程个数。 可以在需要访问限制中使用,比如一些连接池、限流的场景。当然很多高并发的限流工具,我们会优先采用现成的,自己写的不多。所以用的频率不高。但是自己写写测试代码倒是用的挺多。 来看下基本用法
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) throws InterruptedException { Semaphore semaphore = new Semaphore(100 ); try { semaphore.acquire(); semaphore.tryAcquire(); }finally { semaphore.release(); } }
简单看下源码,也是通过继承AQS模板来实现,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public class Semaphore { public Semaphore (int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } abstract static class Sync extends AbstractQueuedSynchronizer { final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } } public abstract class AbstractQueuedSynchronizer { public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } }
简单看一下和ReentrantLock其实比较类似。利用好AQS的模板效果。重点在于state的维护和队列的使用。这些都是AQS内部已经建好的。需要是掌握时机即可。
4. CountDownLatch CyclicBarrier 两者都可以保证线程的优先级,也就是在多线程中能够保证线程的执行顺序,比如在拆分计算的时候,可以保证某些计算先做,某个后做。
4.1 CountDownLatch 主要体现在一个线程等待多个线程的效果 比如下面的例子,导游等待多个同学,只有当同学全部到了以后就可以触发了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static void main (String[] args) { ExecutorService service = Executors.newFixedThreadPool(3 ); CountDownLatch countDownLatch = new CountDownLatch(2 ); service.execute(() -> { log.info("A同学在路上" ); countDownLatch.countDown(); }); service.execute(() -> { log.info("B同学在路上" ); countDownLatch.countDown(); }); service.execute(() -> { countDownLatch.await(); log.info("2个同学到了,开车旅游" ); }); }
4.2 CyclicBarrier 表达的是多个线程互相等待,然后一起进行的操作! 比如下面的旅游,但是没有老师了,自发约定地点。我这里的例子可能不是很合理。但是区别还是很明显的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(2 ); ExecutorService service = Executors.newFixedThreadPool(3 ); service.execute(() -> { log.info("A同学到了约定地点" ); cyclicBarrier.await(); log.info("可以走了" ); }); service.execute(() -> { log.info("B同学到了约定地点" ); cyclicBarrier.await(); log.info("可以一起走了" ); }); }
4.3 区别 CyclicBarrier的计数器是可以重置的,CountDownLatch 不可以。 CyclicBarrier是表示多个线程互相等待,然后互相一起走。CountDownLatch 表达的是一个线程等待多个线程,然后这一个线程就可以走了。 CyclicBarrier的API方法比较CountDownLatch 更加丰富一些,可以自己查看一下!
5. FutureTask futureTask也可以实现线程的等待。这里简单的看一下代码的使用!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService service = Executors.newFixedThreadPool(3 ); Future<String> submit1 = service.submit(() -> { log.info("助手1:切肉!" ); return "切好的肉" ; }); Future<String> submit2 = service.submit(() -> { log.info("助手2切青椒" ); return "切好的青椒" ; }); log.info("主厨准备其他东西" ); String s = submit1.get(); String s1 = submit2.get(); log.info("青椒肉丝出锅" ); }
比如青椒和肉丝其实也可以有两个人一起切。最后主厨就在准备好调料后,就在等待主材料。这三种都是可以同步执行的,如果主厨比较快。那么就会通过get方法,获取需要东西,然后开启等待。
原文作者: duteliang
原文链接: http://yoursite.com/2019/04/18/javabase/锁、AQS/
版权声明: 转载请注明出处(必须保留原文作者署名原文链接)