Java并发源码之LongAdder

版权声明:本文为博主原创文章,转载请注明出处,谢谢!

版权声明:本文为博主原创文章,转载请注明出处:http://blog.jerkybible.com/2018/01/11/Java并发源码之LongAdder/

访问原文「Java并发源码之LongAdder

前言

AtomicLong我已经在之前的文章Java并发源码之AtomicLong中做了详细的介绍。但是除了AtomicLong,还存在另外一个高效的并发计数类,那就是LongAdderLongAdder官方文档的说明如下。我就不对这段文字做详细的说明了,简单来说,这个类用于在多线程情况下的求和。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* One or more variables that together maintain an initially zero
* {@code long} sum. When updates (method {@link #add}) are contended
* across threads, the set of variables may grow dynamically to reduce
* contention. Method {@link #sum} (or, equivalently, {@link
* #longValue}) returns the current total combined across the
* variables maintaining the sum.
*
* <p>This class is usually preferable to {@link AtomicLong} when
* multiple threads update a common sum that is used for purposes such
* as collecting statistics, not for fine-grained synchronization
* control. Under low update contention, the two classes have similar
* characteristics. But under high contention, expected throughput of
* this class is significantly higher, at the expense of higher space
* consumption.
*
* <p>LongAdders can be used with a {@link
* java.util.concurrent.ConcurrentHashMap} to maintain a scalable
* frequency map (a form of histogram or multiset). For example, to
* add a count to a {@code ConcurrentHashMap<String,LongAdder> freqs},
* initializing if not already present, you can use {@code
* freqs.computeIfAbsent(k -> new LongAdder()).increment();}
*
* <p>This class extends {@link Number}, but does <em>not</em> define
* methods such as {@code equals}, {@code hashCode} and {@code
* compareTo} because instances are expected to be mutated, and so are
* not useful as collection keys.
*
* @since 1.8
* @author Doug Lea
*/

从关键方法分析

下面将从LongAdder的关键方法add来解析这个类。首先我把这个方法列出来,如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 增加固定值
*
* @param x 固定值
*/
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

add方法包含了一个Cell数组,Cell是Striped64的一个内部类。它的定义为:Padded variant of AtomicLong supporting only raw accesses plus CAS,翻译一下就是AtomicLong的填充变体并且只支持原始访问和CAS。下面列出了它的实现,可以看到Cell有一个value变量,并且提供了一个cas方法更新value值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}

接下来看第一个if语句,这句首先判断cells是否还没被初始化,并且尝试对value值进行cas操作。如果cells已经初始化并且cas操作失败,则运行if内部的语句。在进入第一个if语句之后紧接着是另外一个if,这个if有4个判断:cell[]数组是否初始化;cell[]数组虽然初始化了但是数组长度是否为0;该线程所对应的cell是否为null;尝试对该线程对应的cell单元进行cas更新是否失败,如果这些条件有一条为true,则运行最为核心的方法longAccumulate,下面列出这个方法,为了便于理解,直接将对其的分析写为注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/**
* 处理涉及初始化,调整大小,创建新Cell,和/或争用的更新案例
*
* @param x 值
* @param fn 更新方法
* @param wasUncontended 调用
*/
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
// 获取线程probe的值
if ((h = getProbe()) == 0) {
// 如果线程的probe的值为0,初始化
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
// 无限循环
for (;;) {
Cell[] as; Cell a; int n; long v;
// 这个if分支处理上述四个条件中的前两个相似,此时cells数组已经初始化了并且长度大于0
if ((as = cells) != null && (n = as.length) > 0) {
// 线程对应的cell为null
if ((a = as[(n - 1) & h]) == null) {
// 如果busy锁没被占有
if (cellsBusy == 0) { // Try to attach new Cell
// 新建一个cell
Cell r = new Cell(x); // Optimistically create
// 检测busy是否为0,并且尝试锁busy
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
//再次确认线程probe所对应的cell为null,将新建的cell赋值
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
// 解锁
cellsBusy = 0;
}
if (created)
break;
//如果失败,再次尝试
continue; // Slot is now non-empty
}
}
collide = false;
}
//置为true后交给循环重试
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//尝试给线程对应的cell update
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
//在以上条件都无法解决的情况下尝试扩展cell
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
//此时cells还未进行第一次初始化,进行初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//busy锁不成功或者忙,则再重试一次casBase对value直接累加
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
* 通过cas实现的自旋锁,用于扩大或者初始化cells
*/
transient volatile int cellsBusy;

从以上分析来看,longAccumulate就是为了尽量减少多个线程更新同一个值value,最后实在不行则采用扩大cell的方式。

可以看到,LongAdder减少冲突的方法以及在求和场景下比AtomicLong更高效。原因是LongAdder在更新数值的时候并不是对一个数进行更新,而是分散到多个cell,这样在多线程的情况下可以有效的嫌少冲突和压力,使得更加高效。

LongAdder的使用场景

LongAdder适用于统计求和计数的场景,因为它提供了addsum方法。

LongAdder是否能够替换AtomicLong

从上面的分析来看是不行的,因为AtomicLong提供了很多cas方法,例如getAndIncrementgetAndDecrement等,使用起来非常的灵活,而LongAdder只有addsum,使用起来比较受限。

Jerky Lu wechat
欢迎加入微信公众号