Java并发源码之ReentrantLock(二)

版权声明:本文为博主原创文章,转载请注明出处,谢谢!

版权声明:本文为博主原创文章,转载请注明出处:http://blog.jerkybible.com/2017/10/12/Java并发源码之ReentrantLock-二/

访问原文「Java并发源码之ReentrantLock(二)

前言

上一篇文章《Java并发源码之ReentrantLock(一)》中,对ReentrantLock的结构、作用、主要成员变量、主要方法等做了详细分析说明。本文将对主要的lockunlock方法进行分析,分别包括公平锁和非公平锁。

公平lock

我们已经了解到ReentrantLock将全部的操作都交给了sync变量完成。相应的,公平锁的功能就交给了FairSync
首先我们看lock方法,如下所示,这个方法非常简单,只有一行。

1
2
3
final void lock() {
acquire(1);
}

接下来我们看这个acquire方法,这个方法在AQS中,如下所示。

1
2
3
4
5
6
7
8
9
10
11
/**
* 独占模式获取,忽略中断。通过调用至少一次tryAcquire来实现,成功则返回。
* 否则,线程将排队,可能反复地阻塞和未阻塞,调用tryAcquire直到成功。
*
* @param arg acquire参数.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

acquire首先调用了tryAcquire,这个函数在子类FairSync中进行了实现。我们可以看到,tryAcquire的作用是尝试去获取锁,成功的话,返回true;尝试失败的话,返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* tryAcquire的公平版本。除非递归调用成功或之前没有其他等待者,否则一直等待。
*/
protected final boolean tryAcquire(int acquires) {
// 获取当先线程
final Thread current = Thread.currentThread();
// 获取锁的状态
int c = getState();
// 锁没有被占用
if (c == 0) {
// 判断当前线程是不是CLH队列中的第一个线程线程
// 若是的话,则获取该锁,设置锁的状态,并切设置锁的拥有者为当前线程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果锁的拥有者已经为当前线程
// 则将更新锁的状态
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

如果tryAcquire失败则会继续调用acquireQueued,首先调用addWaiter,该方法负责把当前无法获得锁的线程包装为一个Node添加到队尾,其中参数mode是独占锁还是共享锁,默认为null,独占锁。追加到队尾的动作分两步:

  1. 如果当前队尾已经存在(tail!=null),则使用CAS把当前线程更新为Tail;
  2. 如果当前Tail为null或则线程调用CAS设置队尾失败,则通过enq方法继续设置Tail。
    接着来看enq方法,该方法就是循环调用CAS,即使有高并发的场景,无限循环将会最终成功把当前线程追加到队尾(或设置队头)。
    总而言之,addWaiter的目的就是通过CAS把当前现在追加到队尾,并返回包装后的Node实例。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    /**
    * 根据给定的mode,为当前线程创建node,并将node插入队列
    *
    * @param mode Node.EXCLUSIVE为独占, Node.SHARED为共享
    * @return 新node
    */
    private Node addWaiter(Node mode) {
    // 新建一个Node节点,节点对应的线程是当前线程,当前线程的锁的模型是mode
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 若CLH队列不为空,则将“当前线程”添加到CLH队列末尾
    if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
    pred.next = node;
    return node;
    }
    }
    // 若CLH队列为空,则调用enq新建CLH队列,然后再将“当前线程”添加到CLH队列中
    enq(node);
    return node;
    }
    private Node enq(final Node node) {
    for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
    Node h = new Node(); // Dummy header
    h.next = node;
    node.prev = h;
    if (compareAndSetHead(h)) {
    tail = node;
    return h;
    }
    }
    else {
    node.prev = t;
    if (compareAndSetTail(t, node)) {
    t.next = node;
    return t;
    }
    }
    }
    }

接下来看一下acquireQueued, acquireQueued的目的是从队列中获取锁。其中shouldParkAfterFailedAcquire通过以下规则,判断当前线程是否需要被阻塞。

  1. 如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。
  2. 如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。
  3. 如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。
    如果规则1发生,即“前继节点是SIGNAL”状态,则意味着当前线程需要被阻塞。接下来会调用parkAndCheckInterrupt阻塞当前线程,直到当前先被唤醒才从parkAndCheckInterrupt中返回。

关于waitStatus,是这样的,

  1. CANCELLED[1] – 当前线程已被取消
  2. SIGNAL[-1] – 当前线程的后继线程需要被unpark(唤醒)。一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
  3. CONDITION[-2] – 当前线程(处在Condition休眠状态)在等待Condition唤醒
  4. PROPAGATE[-3] – (共享锁)其它线程获取到“共享锁”

parkAndCheckInterrupt的作用是阻塞当前线程,并且返回“线程被唤醒之后”的中断状态。它会先通过LockSupport.park()阻塞“当前线程”,然后通过Thread.interrupted()返回线程的中断状态。这里介绍一下线程被阻塞之后如何唤醒。一般有2种情况:

  1. unpark()唤醒。“前继节点对应的线程”使用完锁之后,通过unpark()方式唤醒当前线程。
  2. 中断唤醒。其它线程通过interrupt()中断当前线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* 如果队列中已经存在这个线程,则使用独占不中断模式获取
*
* @param node node
* @param arg acquire 参数
* @return 等待过程中出现中断则返回true
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// interrupted表示在CLH队列的调度中
// 当前线程在休眠时,有没有被中断过
boolean interrupted = false;
for (;;) {
// 获取上一个节点。
// node是当前线程对应的节点,这里就意味着获取上一个等待锁的线程
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 返回当前线程是否应该阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前继节点的状态
int ws = pred.waitStatus;
// 如果前继节点是SIGNAL状态,则意味这当前线程需要被unpark唤醒。此时,返回true
if (ws == Node.SIGNAL)
return true;
// 如果前继节点是取消状态,则设置当前节点的前继节点为前继节点的前继节点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前继节点为0或者共享锁状态,则设置前继节点为SIGNAL状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 通过LockSupport的park阻塞当前线程
LockSupport.park(this);
// 返回线程的中断状态
return Thread.interrupted();
}

最后我们来看selfInterrupt,它的代码很简单,就是当前线程自己产生一个中断,为什么要这么做呢!
这必须结合acquireQueued进行分析。如果在acquireQueued中,当前线程被中断过,则执行selfInterrupt;否则不会执行。
acquireQueued中,即使是线程在阻塞状态被中断唤醒而获取到cpu执行权利;但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 该线程再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒;线程才会获取锁,然后“真正执行起来”!
也就是说,在该线程“成功获取锁并真正执行起来”之前,它的中断会被忽略并且中断标记会被清除! 因为在parkAndCheckInterrupt中,我们线程的中断状态时调用了Thread.interrupted。该函数不同于ThreadisInterrupted函数,isInterrupted仅仅返回中断状态,而interrupted在返回当前中断状态之后,还会清除中断状态。 正因为之前的中断状态被清除了,所以这里需要调用selfInterrupt重新产生一个中断!

1
2
3
4
5
6
/**
* 中断当前线程
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

非公平lock

上面我们介绍了公平锁,非公平锁在很多方法上与公平锁是相同的。下面列出了不同的方法。
首先是lock方法,可以看到非公平锁做的第一件事情就是compareAndSetState,也就是不管队列里有没有别的等待者,马上去尝试获取锁,成功的话设置状态,不成功的话和公平锁一样调用acquire方法。
其次在公平锁的acquire方法中调用tryAcquire,而在非公平锁中调用nonfairTryAcquire,可以看到nonfairTryAcquire也是不管队列是否有等待而强制尝试获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 执行lock。尝试立即获取锁,如果失败调用acquire.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* 非公平的tryLock。但是无论是公平还是非公平,trylock始终调用这个方法
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

释放锁

下面的代码为释放锁的主要方法。
先看release,这个方法首先调用tryRelease,若果成功则判断等待队列的头的waitStatus是否为0,不是0的话唤醒等待线程。
然后看tryRelease,如果线程多次锁定,则进行多次释放,直至status==0则真正释放锁,所谓释放锁即设置status为0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 释放锁
*
* 如果当前线程持有锁,则减少锁的持有数
* 如果锁的持有数为0,则释放锁
* 如果当前线程没有持有锁则抛出IllegalMonitorStateException
*
* @throws IllegalMonitorStateException 如果当前线程没有持有锁
*/
public void unlock() {
sync.release(1);
}
/**
* 使用独占模式释放
*
* @param arg release参数
* @return 返回为tryRelease的返回值
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒等待的线程
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 减去持有数
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果持有数量为0,则表示这个锁被释放
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

总结一下

ReentrantLock通过AQS完成了加锁、解锁等操作。而公平锁为如果队列里有线程在等待锁则加入队列等待,非公平锁则不管队列里有无等待线程立即尝试获取锁。感兴趣的小伙伴速速去看源码学习一下吧!