Lock接口
java.util.concurrent.locks.Lock
1 2 3 4 5 6 7 8 9 10 11 12
| void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
|
比较:
synchronized 使用简单,系统自动管理同步,非公平锁;
Lock接口 使用可操作性高,可超时获取锁、中断的获取锁以及自由是实现公平/非公平 等
AQS(同步器)
java.util.concurrent.locks.AbstractQueuedSynchronizer,实现Lock接口和其他并发工具类的底层数据结构
CLH锁
CLH是一种基于单向链表的 公平、高性能 的自旋锁,AQS实现是CLH锁的变体
基本原理:
申请加锁的线程通过前驱结点的状态进行自旋。在前置结点解锁后,当前结点会结束自旋,并进行加锁

AQS源码
AQS 中的主要数据结构:同步队列 + 条件队列,底层数据结构两者都是链表

1)类定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {}
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
|
2)重要属性
AQS 的属性可简单分为 5 类:同步器AQS简单属性、同步队列属性、条件队列属性、公用Node类、条件ConditionObject类
(1)同步器AQS属性
1 2 3 4 5 6 7 8 9
|
private volatile int state;
static final long spinForTimeoutThreshold = 1000L;
|
(2)同步队列属性
同步队列的作用:
阻塞获取不到锁的线程,并在适当时机释放这些线程
当多个线程都来请求锁时
某一时刻有且只有一个线程能够获得锁(排它锁)
其他获取不到锁的线程
都会到同步队列中去排队并阻塞自己
当有线程主动释放锁时
就会从同步队列列头开始释放一个排队的线程
让线程重新去竞争锁
同步队列属性:
1 2 3 4 5 6 7
|
private transient volatile Node head;
private transient volatile Node tail;
|
(3)条件队列属性
条件队列用来实现 wait/notify 等待通知机制的,配合锁使用
1 2 3 4 5 6 7 8
| public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L;
private transient Node firstWaiter;
private transient Node lastWaiter; }
|
(4)公用Node类
Node 非常重要,即是同步队列的结点,又是条件队列的结点
在入队的时用 Node 把线程包装一下,然后把 Node 放入两个队列中
需要重点关注 waitStatus 字段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
volatile Node prev;
volatile Node next;
volatile int waitStatus;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile Thread thread;
Node nextWaiter;
|
(5)条件ConditionObject类
ConditionObject实现了Condition接口
Condition接口:
当 lock 代替 synchronized 来加锁时
可以用 Condition 来代替 Object 中相应的监控方法
比如 Object#wait ()、Object#notify、Object#notifyAll
Condition 实例是绑定在锁上的
通过 Lock#newCondition 方法可以产生该实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| void await() throws InterruptedException;
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
当以下四种情况发生时,条件队列中的线程将被唤醒 1. 有线程使用了 signal 方法,正好唤醒了条件队列中的当前线程; 2. 有线程使用了 signalAll 方法; 3. 其它线程打断了当前线程,并且当前线程支持被打断; 4. 被虚假唤醒
void signal();
void signalAll();
|
3)获取锁
使用 Lock.lock() 方法来获得锁,lock方法一般会调用 AQS 的 acquire 或 tryAcquire方法
(1)acquire 获取排他锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
|
acquireQueued 方法,阻塞当前结点时要先把同步队列中的前置结点状态设置为SIGNAL
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
|
(2)acquireShared 获取共享锁
大致逻辑和获取排他锁一样,只有2个地方不一样
(1)传递的入参不一致,一个是传递Node.EXCLUSIVE(null),一个是Node.SHARED
(2)结点获得排它锁时,仅仅把自己设置为同步队列的头结点即可(setHead 方法),但如果是共享锁的话,还会去唤醒自己的后续结点(并且也是共享模式的),一起来获得该锁(setHeadAndPropagate 方法)


4)释放锁
使用 Lock.unLock() 方法来释放锁,lock方法一般会调用 AQS 的 release 或 tryRelease方法
(1)release 释放排他锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
|
(2)releaseShared 释放共享锁
大致逻辑和释放排他锁一样
1 2 3 4 5 6 7
| public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
|
5)条件队列
为什么还需要条件队列?
遇到 锁 + 队列 的场景时
如果队列满或者空的时候
可以使用 Condition 实现等待/通知机制来管理这些线程
(1)入条件队列等待 await
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
|
(2)单个唤醒 signal
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
|
(3)全部唤醒 signalAll
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); }
private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
|
ReentrantLock(重入锁)
可以直接替代 synchronized 关键字,同时也支持 可重入(重复加锁)、公平锁、非公平锁
1)使用方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| private int num;
private ReentrantLock reentrantLock = new ReentrantLock();
@Test public void mainTest() throws Exception { int threadCount = 1000; CountDownLatch countDownLatch = new CountDownLatch(threadCount);
Runnable runnable = () -> { LockSupport.parkNanos(1000000);
reentrantLock.lock(); try { num++; } finally { reentrantLock.unlock(); }
countDownLatch.countDown(); };
for (int i = 0; i < threadCount; i++) { new Thread(runnable).start(); }
countDownLatch.await(); System.out.println("结束:" + num); }
|
2)源码
java.util.concurrent.locks.ReentrantLock
(1)类结构
实现了Lock接口,底层实现交给静态内部类 ReentrantLock.Sync 以及其子类 ReentrantLock.NonfairSync(非公平) 和 ReentrantLock.FairSync(公平)
1 2 3 4 5 6 7 8 9 10 11
| public class ReentrantLock implements Lock, java.io.Serializable { abstract static class Sync extends AbstractQueuedSynchronizer {} static final class NonfairSync extends Sync {} static final class FairSync extends Sync {} private final Sync sync; }
|
(2)构造方法
1 2 3 4 5 6 7 8
| public ReentrantLock() { sync = new NonfairSync(); }
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
|
(3)Sync同步器
nonfairTryAcquire()方法、tryRelease()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
|
(4)FairSync 公平锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| final void lock() { acquire(1); }
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
|
(5)NonfairSync 非公平锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
|
(6)Lock 接口的实现
(6.1)lock()获得锁
公平lock:
FairSync.lock() -> AQS.acquire() -> FairSync.tryAcquire()
非公平lock:
NoFairSync.lock() -> AQS.acquire() -> NoFairSync.tryAcquire() -> Sync.noFairTryAcquire()
(6.2)tryLock()获得锁
tryLock()
只有非公平的:Sync.noFairTryAcquire()
tryLock(时间入参):只是加了一个自旋计时的功能
公平lock: FairSync.tryAcquire()
非公平lock:NoFairSync.tryAcquire() -> Sync.noFairTryAcquire()
(6.3)unlock()释放锁
1 2 3 4
| public void unlock() { sync.release(1); }
|
(6.4)newCondition()获得Condition
1 2 3 4
| public Condition newCondition() { return sync.newCondition(); }
|
ReentrantReadWriteLock(读写锁)
1)使用方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| private int threadCount = 1000; private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private Lock r = rwl.readLock(); private Lock w = rwl.writeLock(); @Test public void mainTest() throws Exception { int threadCount = 1000; CountDownLatch countDownLatch = new CountDownLatch(threadCount);
Runnable runnable = () -> { LockSupport.parkNanos(1000000);
r.lock(); try { System.out.println(num); } finally { r.unlock(); }
w.lock(); try { num++; } finally { w.unlock(); }
countDownLatch.countDown(); };
for (int i = 0; i < threadCount; i++) { new Thread(runnable).start(); }
countDownLatch.await(); System.out.println("结束:" + num); }
|
2)源码
java.util.concurrent.locks.ReentrantReadWriteLock
大致实现和 ReentrantLock 差不多,只是多了2个内部类去分别实现读/写锁,底层调用的是AQS的共享锁和排他锁
