ConcurrentHashMap源码笔记
spread
假设table的长度为n=2^k,取模操作hash%n等价于hash&(n-1),n-1为mask(二进制的k-1个1)
即,hash的低k位决定了桶的位置,k位以上的高位不起作用,如果不同hash的低k位相同,就会产生碰撞
1 | static final int spread(int h) { |
put
循环做4件事情:
- 如果表为空,初始化表,继续
- 如果对应下标节点为空,cas插入,跳出
- 如果发现表正在迁移,帮助迁移,继续
- 执行put操作。如果链表长度超过阈值,转为树。如果为更新,直接返回,否则跳出
循环退出后,计数+11
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94public V put(K key, V value) {
return putVal(key, value, false);
}
/**
* Implementation for put and putIfAbsent
* onlyIfAbsent: true 只替换为null值的node, false: 都替换
*/
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key value 不能为null
if (key == null || value == null) throw new NullPointerException();
// 计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
// 循环执行
for (Node<K, V>[] tab = table; ; ) {
Node<K, V> f;
int n, i, fh;
// 如果table为空,初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 如果对应下标节点为空,直接插入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// cas替换
if (casTabAt(tab, i, null,
new Node<K, V>(hash, key, value, null)))
break; // no lock when adding to empty bin
// 如果节点不为空,且节点的hash为-1,-1表示在迁移,则帮助迁移
} else if ((fh = f.hash) == MOVED)
// 此处,helpTransfer返回的tab为f的nextTable,或者为已完整迁移的新table
tab = helpTransfer(tab, f);
// 执行put操作
else {
V oldVal = null;
// 请求同步锁,避免并发写操作
synchronized (f) {
// double check
if (tabAt(tab, i) == f) {
// 链表结构
if (fh >= 0) {
// 统计节点个数(非精确)-- 统计的是原来链表的长度
// for中break之后,不会再执行++binCount
// f的binCount为1,第二个节点binCount为2
// 假设第k节点key相同,替换新值后break,binCount为k,链表长度未知
// 或者第k节点的next为空,链接上新节点后break,binCount为k,链表长度为k+1
binCount = 1;
for (Node<K, V> e = f; ; ++binCount) {
K ek;
// 找到相同的key,更新value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 否则添加到链表尾部
Node<K, V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K, V>(hash, key,
value, null);
break;
}
}
} else if (f instanceof TreeBin) {
// 红黑树结构,TreeBin的hash为-2
Node<K, V> p;
binCount = 2;
if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果链表节点个数大于阈值,将链表转化为红黑树
// 如果原结构为红黑树,binCount=2
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// oldVal不为null,说明是更新操作,节点个数无变化,直接返回
if (oldVal != null)
return oldVal;
break;
}
}
}
// 到这里说明是添加操作,计数+1
addCount(1L, binCount);
return null;
}
initTable
只有一个线程能进行初始化,其他线程让出CPU1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K, V>[] initTable() {
Node<K, V>[] tab;
int sc;
while ((tab = table) == null || tab.length == 0) {
// sc小于0,说明正在扩容或者迁移,让出cpu
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 当前线程获得初始化机会,cas sizeCtl为-1
try {
if ((tab = table) == null || tab.length == 0) {
// 如果sizeCtl有正值,用正值,否则采用默认容量
// sizeCtl什么时候有正值?有什么样的正值?
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
"unchecked") (
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
// 此时tab=table!=null,其他线程从本方法的循环中跳出
table = tab = nt;
// 相当于n - n*1/4,即sc = 0.75*n
// >>> 无符号右移
sc = n - (n >>> 2);
}
} finally {
// 初始化完成后,sizeCtl=0.75*n
sizeCtl = sc;
}
// 初始化完成跳出循环
break;
}
}
return tab;
}
addCount
对于check,从putVal过来的几种情形(这里check=binCount)
- table中槽为空,直接放入新节点,check=0
- table中槽里的节点(称为first节点)key和新值一样,被替换,check=1 – 这种情况在putVal直接return了,不会进到addCount
- 非first节点的key和新值一样,或者加入了新节点,check>1 – 替换的也return了,只有新节点才进入addCount,这是check=binCount=原链表长度
- 槽中是红黑树,check=2
总结下,这里check的值有0、k(>1)、2
1 | /** |
sizeCtl的注释说明,当sizeCtl为负数时,-1标识表初始化,-(sizeCtl-1)标识活动的扩容线程数
为什么在具体实现里,是sizeCtl的低16位,且需减去1的值,标识扩容线程数呢?
((rs << RESIZE_STAMP_SHIFT) + 2)这里有个疑问,为什么是+2?不能是+1么?
– 无责任猜测是版本变更了,最初也许没有RESIZE_STAMP_SHIFT、没有高低16位这么复杂
如果是+1,假如表长为0,第一个扩容线程加入,sizeCtl=[1.....0](16位)+[0....1](16位)
=很负的一个负数,除非sizeCtl溢出了,不然也没发现其他情况下有重复的情况
+2的话,sizeCtl=[1.....0](16位)+[0....10](16位)
=还是很负的一个负数,和+1只有最低位有区别。
1 | /** |
resizeStamp
resizeStamp的结果作为扩容&迁移时sizeCtl的高16位信息
- sizeCtl为负数
- 标识着此次扩容&迁移对应的表长n
1
2
3
4
5
6static final int resizeStamp(int n) {
// n的二进制前导0个数。因为表长n为2的次幂,每次扩容*2,意味着每次扩容前导0个数少1,用于判断是否为同一次扩容
// 将第16位或为1,是为了左移RESIZE_STAMP_SHIFT后为负数
// 1 << (RESIZE_STAMP_BITS - 1) = (0b)1000 0000 0000 0000
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
helpTransfer
1 | /** |
transfer
1 | private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) { |
get
1 | public V get(Object key) { |
作者说
1 | ....We do not want to waste |
引用一段并发分析
Java 8 中 ConcurrentHashMap工作原理的要点分析
6.1初化的同步问题
表长度的分配并不是在构造函数中进行的,而是在put方法中进行的,也就是说这实际上是个懒汉模式。但是如果多个线程同时进行表长度的空间分配,显然是非线程安全的。所以只能有一个线程来进行创建表,其它线程会等待创建完成。ConcurrentHashMap类中设定一个volatile变量sizeCtl
private transient volatile int sizeCtl;
然后通过CAS方法去修改它,如果有其它线程发现sieCtl为-1
U.compareAndSwapInt(this, SIZECTL, sc, -1)
就表示已经有线程正在创建表了,那么当前线程就会放弃CPU使用权(调用Thread.yield()方法),等待分初始化完成后继续进行put操作。否则当前线程尝试将siezeCtl修改为-1,若成功,就由当前线程来创建表。
6.2 put方法和remove方法之间的同步问题
在表的同一个槽上,一个线程调用put方法和另一个线程调用put方法是互斥的;在表的同一个槽上,一个线程调用remove方法和另一个线程调用remove方法也是互斥的;在表的同一个槽上,一个线程调用remove方法和另一个线程调用put方法也是互斥的。这些互斥操作在代码中都是通过锁来保证的。
6.3 put(或remove)方法和get方法的同步问题
实际上是不需要同步,先到先得。这主要由于Node定义中value和next都定义成了volatile类型。一个线程能否get到另一个线程刚刚put(或remove)的值,这主要由两个线程当前访问的结点所处的位置决定的。
6.4 get方法和扩容操作的同步问题
可以分成两种情况讨论
1)该位置的头结点是Node类型对象,直接get,即使这个桶正在进行迁移,在get方法未完成前,迁移已完成(槽被设置成了ForwordingNode对象),也没关系,并不影响get的结果,因为get线程仍然持有旧链表的引用,可以从当前结点位置访问到所有的后续结点,原因是新表中的节点是通过复制旧表中的结点得到的,所以新表的结点的next不会影响旧表中对应结点的next值。当get方法结束后,旧链表就不可达了,会被垃圾回收线程回收。
2)该位置的头结点是ForwordingNode类型对象(头结点的hash值 == -1),头结点是ForwordingNode类型的对象,调用该对象的find方法,在新表中查找。
所以无论哪种情况,都能get到正确的值。
6.5 put(或remove)方法和扩容操作的同步问题
同样可以分为两种情况讨论:
1)该位置的头结点是Node类型对象,put操作就走正常路线,先将Node对象放入到旧表中,然后调用addCount方法,判断是否需要帮助扩容。
2)该位置的头结点是ForwordingNode类型对象,那就会先帮助扩容,然后在新表中进行put操作。