Java应用被k8s认定为oom杀掉

前不久现场反馈说服务运行一段时间就重启了,希望我介入排查一下。

先说结论

jvm堆大小与k8s pod设置的大小一致,均为4g。因jvm还存在其他的内存占用,pod服务总体的内存占用会超过4g,k8s认定为oom,将其杀掉。以及jvm较低版本没有支持容器namespace的资源隔离。

处理过程

  1. 检查日志

    看服务重启前后日志有无抛出一些较为严重的错误,是否存在因为个别异常导致的服务重启;

  2. 修改jvm堆的大小,增加oom时候自动dump等参数

    1. 怀疑是系统内部有一些“不良”业务导致的堆的大量占用,试想是否可以通过增加堆的大小,再对堆进行快照分析,确定具体占用较大堆内存的业务代码,对其进行定向优化。
    2. 增加-XX:+HeapDumpOnOutOfMemoryError以及-XX:HeapDumpPath=/myPath/heapdump.hprof参数,让jvm在下一次oom时自动导出堆的快照,便于分析。

    通过不停的分析堆的镜像快照,确实是没有任何业务代码过多的或者过量的导致了堆的增长,增加的-XX:+HeapDumpOnOutOfMemoryError也没有生效。

  3. 检查k8s pod信息

    在k8s
    pod信息查到,服务是以oom的原因导致被kill的。经过了解,k8s认定pod的内存占用达到了pod所配置的limit值时,就会判定为oom,然后杀掉,从侧面增加-XX:+HeapDumpOnOutOfMemoryError
    无效的原因。

    检查现场配置时发现,jvm堆的大小和pod的大小限制一致。已知java 8除了堆以外还有其他的内存占用,猜测是不是这部分导致了pod实际内存使用大小超过了pod限制导致。

    修改pod限制参数为6g后,问题得以解决。

后续思考

虽然修改为6g以后,服务不再被k8s以oom的原因kill了,但于此同时,还有一个问题一直萦绕着我。

为什么jvm没有回收内存?

在一番了解后,我了解到Java 1.8有一个更新,在Java 1.8
191这个版本中,Java更新了对于namespace的支持,地址是:https://www.oracle.com/java/technologies/javase/8u191-relnotes.html,对应具体的bug描述是https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8146115

容器是通过linux的namespace做资源隔离,通过pid下的cgroup做资源限制,但是在早先版本中,jvm并没有支持这一特性,从而导致了jvm内存不回收的问题。

jvm指定的堆大小到底

类的加载、链接和初始化(基于Java 1.8)

Java的数据类型(Data type)主要是有两种:

  1. 基本类型 primitive types
  2. 引用类型 reference types

其中引用类型又被细分为:

  1. 类 class types
  2. 数组 array types
  3. 接口 interface types

基本类型和数组类型是由Java虚拟机直接生成的,其他(类 class types、接口 interface types)则需要Java虚拟机对其进行链接和初始化。

1. Java虚拟机启动 Java Virtual Machine Startup

  • 通过引导类加载器创建一个初始类来启动,并执行这个public class中的void main(String[])方法。
  • 初始类可以作为命令行参数提供。或者,该实现可以提供一个初始类,该初始类设置一个类加载器,该类加载器进而加载。

2. 创建和加载 Creation and Loading

  • 指查找字节流,并据此创建类的过程。
  • 其中数组(array types)是没有字节流的,由Java虚拟机直接生成,对于其他的类来说,Java虚拟机需要借助于类加载器来完成查找字节流的工程。

2.1 类加载器 ClassLoader

在Java虚拟机规范中,类加载器被分为两种:

  1. Java虚拟机提供的引导类加载器(bootstrap class loader)
  2. 用户定义的类加载器(user-defind class loaders)

2.1.1 Java虚拟机提供的引导类加载器 bootstrap class loader

  • bootstrap class loader 由Java虚拟机提供的
  • 这个类加载器是使用C++实现的,没有对应的Java对象。

2.1.2 用户定义的类加载器 user-defind class loaders

  • user-defind class loaders 是Java虚拟机规范中对于类加载器的分类划分,是一个统称,实际上并没有这个类加载器
  • 用户定义的类加载器都是java.lang.ClassLoader类的子类
  • 在Java虚拟机规范中提到,用户定义的类加载器可以实现通过网络下载类,动态生成类或从加密文件中提取类
  • 用户定义的类加载器需要由bootstrap class loader去加载
  • 在Java1.8的核心类库中,提供了两个类加载器,分别是:
    1. 扩展类加载器 extention class loader
      sun.misc.Launcher.ExtClassLoader
    2. 应用类加载器 application class loader
      sun.misc.Launcher.AppClassLoader

2.1.2.1 扩展类加载器 extention class loader

  • 扩展类加载器的父是启动类加载器(bootstrap class loader)
  • 负责加载相对次要、但又通用的类,如JRE的lib/ext目录下jar包中的类(以及java.ext.dirs指定的类, 这个可以通过查看sun.misc.Launcher.ExtClassLoader.getExtDirs()方法确认)

2.1.2.2 应用类加载器 application class loader

  • 应用类加载器的父则是扩展类加载器
  • 负责加载应用程序路径下的类(应用程序指虚拟机参数-cp/-classpath、系统变量java.class.path或环境变量CLASSPATH所指定的路径。这个可以通过查看sun.misc.Launcher.AppClassLoader确认)
  • 默认情况下,应用程序中包含的类便是通过应用类加载器加载的。

2.1.3 双亲委派模型

  • 指的是一个类加载器接收到加载请求时,会先将请求转发给父类加载器,在父类加载器没有找到所请求的类的情况下,该类加载器才会去尝试加载。

  • 双亲委派模型可以避免类的重复加载,以及java的核心api被篡改的问题。

3 链接 Linking

  • 指将创建的类合并至Java虚拟机中,使之能够执行的过程。可分为验证(Verification)、准备(Preparation)以及解析(Resolution)三个阶段

3.1 验证 Verification

验证是为了确保被加载的类满足Java虚拟机的约束条件。

3.2 准备 Preparation

  • 准备是为被加载的类的静态字段分配内容。
  • 构造其他跟类层次相关的数据结构:如用来实现虚方法的动态绑定的方法表。

3.3 解析 Resolution

在开始解析之前,需要知道:
class文件被加载到Java虚拟机之前,这个类无法知道其他类及其方法、字段所对应的具体地址,甚至不知道自己方法、字段的地址。因此,每当需要引用这些成员时,Java编译器会生成一个符号引用。在运行阶段,这个符号引用一般都能无歧义地定位到具体目标上。

  • 解析的目标是将符号引用解析成为实际应用: 如果符号引用指向一个未被加载的类,或者未被加载类的字段或方法,那么解析就触发这个类的加载。(但未必会出发这个类的链接和初始化)

此外,在Java虚拟机规范中并没有要求在链接过程中完成解析。仅规定了:如果某些字节码使用了符号引用,那么在执行这些字节码之前,需要完成对这些符号引用的解析。

4. 初始化 Initialization

  • 为标记为常量值的字段赋值,以及执行<clinit>方法的过程
  • Java虚拟机会通过加锁来确保类的<clinit>方法仅被执行一次

常量值解释:
Java代码中,如果要初始化一个静态字段,可以在声明时直接赋值,或者在静态代码块中对其进行赋值
如果直接赋值的静态字段被final所修饰,并且它的类型是基本类型或字符串时,该字段便会被Java编译器标记为常量值(ConstantValue)

4.1 初始化的触发条件

在Java虚拟机规范中明确枚举了以下情况:

  • The execution of any one of the Java Virtual Machine instructions new, getstatic, putstatic, or invokestatic that references C (§new, §getstatic, §putstatic, §invokestatic). These instructions reference a class or interface directly or indirectly through either a field reference or a method reference.

  • Upon execution of a new instruction, the referenced class is initialized if it has not been initialized already.

  • Upon execution of a getstatic, putstatic, or invokestatic instruction, the class or interface that declared the resolved field or method is initialized if it has not been initialized already.

  • The first invocation of a java.lang.invoke.MethodHandle instance which was the result of method handle resolution (§5.4.3.5) for a method handle of kind 2 (REF_getStatic), 4 (REF_putStatic), 6 (REF_invokeStatic), or 8 (REF_newInvokeSpecial).

  • This implies that the class of a bootstrap method is initialized when the bootstrap method is invoked for an invokedynamic instruction (§invokedynamic), as part of the continuing resolution of the call site specifier.

  • Invocation of certain reflective methods in the class library (§2.12), for example, in class Class or in package java.lang.reflect.

  • If C is a class, the initialization of one of its subclasses.

  • If C is an interface that declares a non-abstract, non-static method, the initialization of a class that implements C directly or indirectly.

  • If C is a class, its designation as the initial class at Java Virtual Machine startup (§5.2).

Java虚拟机规范中有部分是依赖于Java虚拟机指令了,我对此了解并不多,以下摘抄于《极客时间-深入拆解Java虚拟机-郑雨迪》的分享,相较而言更通俗易懂些。

  1. 当虚拟机启动时,初始化用户指定的主类;
  2. 当遇到用以新建目标类实例的new指令时,初始化new指令的目标类;
  3. 当遇到调用静态方法的指令时,初始化该静态方法所在的类;
  4. 当遇到访问静态字段的指令时,初始化该静态字段所在的类;
  5. 子类的初始化会触发父类的初始化;
  6. 如果一个接口定义了default方法,那么直接实现或者间接实现该接口的类的初始化,会触发该接口的初始化;
  7. 使用反射API对某个类进行反射调用时,初始化这个类;
  8. 当初次调用MethodHandle实例时,初始化该MethodHandle指向的方法所在的类。

5. 绑定本机方法实现

指的是将Java编程语言以为的其他语言编写的功能和实现native方法的功能集成到Java虚拟机中以便可以执行的过程。

传统上来说,此过程可称为链接,但Java虚拟机规范中指出,使用绑定是为了避免于Java虚拟机对类和接口的链接产生混淆。

6. Java虚拟机退出

当调用Runtime.exit()Runtime.halt()System.exit,并且SecurityManager安全管理其允许exit或halt时候,Java虚拟机就会被关闭。

同时,JNI(Java Native Interface)规范描述了对于JNI调用相关的java虚拟机的终止信息。

参考文档

https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-5.html
https://time.geekbang.org/column/article/11523

HashMap源码实现解析

1
2
3
java version "1.8.0_251"
Java(TM) SE Runtime Environment (build 1.8.0_251-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.251-b08, mixed mode)

基于数组+链表实现,通过&与运算,计算数组下标。在JDK8中,加入红黑树实现,使其时间复杂度保持在O(1)到O(logn)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* The table, initialized on first use, and resized as
* necessary. When allocated, length is always a power of two.
* (We also tolerate length zero in some operations to allow
* bootstrapping mechanics that are currently not needed.)
*/
transient Node<K,V>[] table;

static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
V value;
Node<K,V> next;

Node(int hash, K key, V value, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}

public final K getKey() { return key; }
public final V getValue() { return value; }
public final String toString() { return key + "=" + value; }

public final int hashCode() {
return Objects.hashCode(key) ^ Objects.hashCode(value);
}

public final V setValue(V newValue) {
V oldValue = value;
value = newValue;
return oldValue;
}

public final boolean equals(Object o) {
if (o == this)
return true;
if (o instanceof Map.Entry) {
Map.Entry<?,?> e = (Map.Entry<?,?>)o;
if (Objects.equals(key, e.getKey()) &&
Objects.equals(value, e.getValue()))
return true;
}
return false;
}
}

HashMap 静态内部类Node,实现链表,通过Node[]这个数组属性存放所有的节点。

put(K,V)

应该直接看final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict)这个方法更为实际

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}

如果当前想要存放的这个节点的hash值暂时没有存在的节点,则直接在数组中添加。

1
2
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);

通过&与运算,

如果当前节点的hash值存在了,则在这个节点下增加链表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
else {
Node<K,V> e; K k;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}

从JDK8 开始,当链表中的子节点超过八个时,将转为红黑树。关于红黑树的数据结构特点,我现在也不是特别的理解,先给自己挖个坑,改天填。

红黑树

HashMap扩容

HashMap中有一个属性:threshold ,这个主要是根据阀值和当前HashMap的大小计算而来,可通过查看

1
2
3
4
5
6
7
8
9
10
11
12
public HashMap(int initialCapacity, float loadFactor) {
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal initial capacity: " +
initialCapacity);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
if (loadFactor <= 0 || Float.isNaN(loadFactor))
throw new IllegalArgumentException("Illegal load factor: " +
loadFactor);
this.loadFactor = loadFactor;
this.threshold = tableSizeFor(initialCapacity);
}

在初始化HashMap的最后,会根据当前的阀值和实际的大小进行计算threshold的值,同时在每一次操作元素的时候,都会去比较当前HashMap的实际大小与threshold的值,如果当前实际大小已经大于了这个限定的阀值,此时将会对HashMap进行扩容。

resize()方法主要是两个步骤:

  1. 计算大小;
  2. 将原HashMap中的元素进行移动

挖坑,以后填

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) {
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;
else { // zero initial threshold signifies using defaults
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;

get(Object key)

get方法就更好理解了,首先还是通过hash值找到数组下标,然后通过数组下标获取的实际的元素。然后判断一下当前节点key的hash值是否与第一个节点相同,相同则直接返回结果。

如果不同,这个时候,就得看第一个节点后的下一个节点是采用的红黑树还是使用的链表。然后再根据key的hash去取值即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final Node<K,V> getNode(int hash, Object key) {
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;
if ((e = first.next) != null) {
if (first instanceof TreeNode)
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
do {
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}

ArrayList源码解析

1
2
3
java version "1.8.0_251"
Java(TM) SE Runtime Environment (build 1.8.0_251-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.251-b08, mixed mode)

基于数组实现

1
transient Object[] elementData;

add(E e)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return <tt>true</tt> (as specified by {@link Collection#add})
*/
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}

private void ensureCapacityInternal(int minCapacity) {
ensureExplicitCapacity(calculateCapacity(elementData, minCapacity));
}

private void ensureExplicitCapacity(int minCapacity) {
modCount++;

// overflow-conscious code
if (minCapacity - elementData.length > 0)
grow(minCapacity);
}

size是当前数组实际使用的大小,如果当前所需要的数组地址已经大于了当前数组的容量,则对数组进行扩容操作,即调用grow方法。

1
2
3
4
5
6
7
8
9
10
11
private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = elementData.length;
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
// minCapacity is usually close to size, so this is a win:
elementData = Arrays.copyOf(elementData, newCapacity);
}

得到所需要扩容的大小以后,调用navite方法对集合进行扩容。
扩容结束以后,将size(代表着实际占用的变量)进行自增,同时将这个数组下标进行赋值。

remove(Object o)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean remove(Object o) {
if (o == null) {
for (int index = 0; index < size; index++)
if (elementData[index] == null) {
fastRemove(index);
return true;
}
} else {
for (int index = 0; index < size; index++)
if (o.equals(elementData[index])) {
fastRemove(index);
return true;
}
}
return false;
}

remove方法实际上就是遍历数组所有的元素,然后找到数组下标,再根据数组下标进行删除

1
2
3
4
5
6
7
8
9
10
11
12
/*
* Private remove method that skips bounds checking and does not
* return the value removed.
*/
private void fastRemove(int index) {
modCount++;
int numMoved = size - index - 1;
if (numMoved > 0)
System.arraycopy(elementData, index+1, elementData, index,
numMoved);
elementData[--size] = null; // clear to let GC do its work
}

定位到具体需要删除的数组下标以后,将下标后的数据往前移动,并将最后一个元素设为null,便于GC回收内存。

迭代器

私有内部类,通过“游标”去操作数组

为什么不能在循环里面增加或删除元素

foreach

foreach本质上就是迭代器的实现,但在删除的时候,使用的是集合的remove方法,而不是迭代器提供的remove方法,这就导致在迭代器遍历中,有一个校验是否并发修改的方法无法通过验证,抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

@SuppressWarnings("unchecked")
public E next() {
checkForComodification();
int i = cursor;
if (i >= size)
throw new NoSuchElementException();
Object[] elementData = ArrayList.this.elementData;
if (i >= elementData.length)
throw new ConcurrentModificationException();
cursor = i + 1;
return (E) elementData[lastRet = i];
}

// modCount是ArrayList中的属性值,是集合添加元素、删除元素的次数,expectedModCount是迭代器中的属性值,是预期的修改次数。实际修改值与期望值不同
final void checkForComodification() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}

for循环

for循环本质上是使用数组下标遍历数组,通过前文中提到的remove(Object o)方法的实现,可以了解到,实际上是将需要删除的元素后的数组向前移动,并将最后一个元素设为null,便于GC回收。

那么当我们使用for循环操作数组,并对其进行删除操作的时候

1
2
3
4
5
for (int i = 0; i < list.size(); i++){
if(list[i] == 'xxx'){
list.remove(i);
}
}

假设数组中,第X个元素满足条件,并将其进行删除。 此时X后的数据元素全部向前移动,那么第X个元素,已经是移动前X+1,如果此时i++自增,那么你取到的是未移动前的X+2个元素。

所以我们只需要修正一下遍历的数组下标即可解决

1
2
3
4
5
6
for (int i = 0; i < list.size(); i++){
if(list[i] == 'xxx'){
list.remove(i);
i--;
}
}

部分参考: https://blog.csdn.net/wangjun5159/article/details/61415358

解决私有证书导致Maven无法更新的问题

最近公司更换了maven的私有化仓库,一般来说,其实也没啥大问题,就是修改一下setting文件就好了。但麻烦的是,他们搞了一个自签的证书,强制使用了https。

遇到的问题

全局代理

这个属于自己把自己坑了,怪自己科学上网管理不当。

  1. 终端检查是否有全局代理

    1
    2
    3
    echo $http_proxy
    echo $https_proxy
    echo $all_proxy
  2. 检查IDEA代理设置

    Preferences->Appearance & Behaivor->System Settings->HTTP Proxy

  3. You have JVM property https.proxyHost set

在IDEA 的 HTTP Proxy页面看到了这个警告,通过一番搜索,发现可以在IDEA的配置文件vmoptions里面加上-Djava.net.useSystemProxies=true解决掉

证书问题

由于公司用的是自签的证书,所以还得配置自签的证书

其实最开始我是没有确认原因的,我是通过IDEA help->Show log in finder找到了IDEA的详细日志,才确认到因为证书问题导致无法更新的。

由于maven是依赖于java的,即便是我给macOS安装了根证书也无效,所以还得给java运行环境安装根证书才可以。

需要注意的是,idea默认启动maven的jdk环境可能并不是系统安装的jdk,需要到idea配置面板中确认一下即可。

记因缓存返回引用对象导致的线程安全问题

背景

前几天处理生产环境问题的时候,遇到一个因为缓存写的不太好,直接返回了引用对象,导致的线程安全问题。

日志信息

日志如下,部分与公司有关的信息已经删掉。

1
2
3
4
5
6
7
8
2020-06-01 14:37:49 [ERROR] [task-3] xxx -失败!
java.util.ConcurrentModificationException: null
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:859) ~[na:1.7.0_80]
at java.util.ArrayList$Itr.next(ArrayList.java:831) ~[na:1.7.0_80]
2020-06-01 14:37:49 [ERROR] [qtp1073067421-33398] xxx -失败!
java.util.ConcurrentModificationException: null
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:859) ~[na:1.7.0_80]
at java.util.ArrayList$Itr.next(ArrayList.java:831) ~[na:1.7.0_80]

排查过程

两个线程同时抛出一个异常点,在被我删掉的日志中显示这两个线程正在执行同一行代码。

两个线程执行的同一行代码其实是一个foreach遍历,且经我检查,并没有对集合中的对象进行remove的操作。

继续排查发现,这个foreach操作的对象是从一个缓存中获取到的,于是顺着这个缓存开始继续排查。跟着代码调用逻辑发现,有直接往这个缓存返回的list中做add的操作。

检查后发现这个我们系统自己使用ConcurrentHashMap封装的缓存直接返回了引用对象ArrayList,且可读可写。

结论

最后其实结论就是缓存使用不当,导致多线程操作ArrayList对象,一边遍历一边插入新的元素,导致迭代器在做check的时候抛出了一场,出现了线程安全的问题。

解决方案

  1. 返回结果的时候拷贝,不要直接返回引用对象
  2. 使用线程安全的List实现
  3. 还有啥?记一个TODO

线程安全的概念

我的理解是由于程序使用多线程的方式运行,导致程序无法正确的得出我们期望的结果。

什么会导致线程安全问题,主要是可见性、原子性、有序性问题。详细可见:Java并发编程-可见性、原子性、有序性问题引入

线程安全的实现方式

可见性和有序性问题

主要是通过volatile、Happens-Before规则以及final关键字解决。详细可见Java并发编程-如何解决可见性和有序性问题

原子性问题

互斥锁

sychronized

死锁问题

产生死锁需要同时满足四个条件,即:

  1. 互斥:共享资源X和Y只能被一个线程占用
  2. 占有且等待:线程1已经取得共享资源x,在等待共享资源y时,不释放共享资源x
  3. 不可抢占:其他线程不可抢占线程1占有的资源
  4. 循环等待:线程1等待线程2占有的资源,线程2等待线程1占有的资源

解决死锁问题,就是解决上面四个条件的任一一个。

  1. 对于“占用且等待”条件,我们可一次性申请所有资源。
  2. 对于“不可抢占”条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可主动释放它占有的资源。
  3. 对于“循环等待”条件,按序申请,即资源是有线性顺序的,申请的时候可以先申请资源序号小的,再申请序号大的。

针对于“不可抢占”条件,sychronized申请不到资源时会进入阻塞状态,无法释放已占有的资源,我们应使用JUC提供的Lock解决

sychronized关键字的使用场景、作用范围

实现方法

利用Monitor,在使用sychronized关键字修饰的代码块,编译后自动生成相关加锁和解锁的代码,但仅支持一个条件变量,通过monitorenter和monitorexit实现。

修饰非静态方法

默认对当前实例对象this加锁

修饰静态方法

默认对当前类的Class对象加锁

管程Monitor

极客时间-Java并发编程实战-javaMESA管程模型

为什么局部变量是线程安全的?

调用栈和栈帧

CPU支持栈结构,这个栈与方法调用相关,被称为调用栈。
每个方法在调用栈中都有自己的独立空间,被称为栈帧,每个栈帧都有对应方法需要的参数和返回地址。

当调用方法时,会创建新的栈帧,并压入调用栈;当方法返回时,对应的栈帧会自动弹出,即栈帧和方法是同生共死的。
极客时间-Java并发编程实现-调用栈结构

局部变量存储位置

局部变量放到了调用栈里,如下图所示:
极客时间-Java并发编程实战-保护局部变量的调用栈结构

调用栈与线程

每个线程都有自己独立的调用栈
极客时间-Java并发编程实战-线程和调用栈的关系图

JUC中锁的分类和用途

公平锁、非公平锁

公平锁,多个线程按照申请锁的顺序来获取锁。会判断当前线程是否处于等待队列的头部,即链表的头部,如果是的话就直接获取锁。
非公平锁,没有顺序。不会判断当前线程处于等待队列的具体位置。CAS操作成功则认定为获取到锁。
synchronized是非公平锁,ReentrantLock可通过构造函数决定是公平锁还是非公平锁。

可重入锁

线程可重复获取同一把锁。
ReentrantLock在获取锁时,判断当前线程是否是之前已获取锁的线程,如果是,则直接返回true表示锁获取成功。

互斥锁(独享锁)、读写锁(共享锁)

互斥锁(独享锁)指锁一次只能被一个线程持有,读写锁(共享锁)指该锁可被多个线程持有。
synchronized和ReentrantLock都是互斥锁(独享锁),ReadWriteLock的读锁是共享锁,写锁是独占锁。

乐观锁、悲观锁

乐观锁在更新数据时会不断尝试更新,认为不加锁的并发操作是没问题的。基于CAS实现。
悲观锁认为对一个共享变量的并发操作,这个共享变量是一定会发生修改的,采取加锁方式。

乐观锁适合读操作远远大于写操作的情景,悲观锁适合写操作非常多的场景。

分段锁

对于ConcurrentHashMap来说,在put操作时,通过hashcode判断将要put的元素需要放到哪个分段,然后对分段进行加锁。当put操作不同的分段时,就可以实现并发操作。

无锁、偏向锁、轻量级锁、重量级锁

自1.6以后,java对synchronized进行了优化,当第一个线程获得了锁,锁状态变更新为偏向锁状态。

偏向锁

获取锁:当之前的线程再次获取锁时,无需再执行获取锁的过程。
锁撤销:原持有偏向锁的线程状态是非活动状态时,偏向锁撤销,锁状态更新为无锁状态。

轻量级锁

获取锁:如果每次申请锁的线程都是不相同的,则锁会升级为轻量级锁,指向栈中锁记录的指针。轻量级锁适用于线程交替执行同步块的场景。
释放锁:通过CAS操作,尝试把线程中复制的Displaced Mark Word对象替换当前的Mark Word,如果成功则完成解锁操作。如果失败则表明有其他线程获取该锁,此时锁膨胀为重量级锁。释放锁的同时,唤醒被挂起的线程。

重量级

当多个线程同时竞争锁,则轻量级锁会膨胀为重量级锁。指向互斥量的指针。未获取到锁的线程会阻塞。

自旋锁

循环检测锁标志位

可中断锁

Lock

1
2
3
4
5
6
7
8
// 支持中断的 API
void lockInterruptibly()
throws InterruptedException;
// 支持超时的 API
boolean tryLock(long time, TimeUnit unit)
throws InterruptedException;
// 支持非阻塞获取锁的 API
boolean tryLock();

线程安全的集合

List

CopyOnWriteArrayList

在写操作的时候,会将共享变量复制一份出来,当写操作完成以后,再修改共享变量的内存引用地址。
不能使用迭代器删除数据,因为操作的是一个副本,不会修改到实际的共享变量。

Map

ConcurrentHashMap

  • KEY和VALUE不允许为空
  • KEY是无序的

ConcurrentSkipListMap

  • KEY和VALUE不允许为空
  • KEY是有序的
  • 实现方案是使用SkipList(跳表)数据结构

Set

CopyOnWriteArraySet

类似于CopyOnWriteArrayList

ConcurrentSkipListSet

类似于ConcurrentSkipListMap

Queue

单端阻塞队列

ArrayBlockingQueue

使用数组实现

LinkedBlockingQueue

使用链表实现

SynchronousQueue

不持有队列,入队操作必须要等到消费者线程的出队操作

LinkedTransferQueue

链表实现,入队操作必须要等到消费者线程的出队操作

PriorityBlockingQueue

支持按照优先级出队

DelayQueue

支持延时出队

双端阻塞队列

LinkedBlockingDeque

单端非阻塞队列

ConcurrentLinkedQueue

双端非阻塞队列

ConcurrentLinkedDeque

原子类

原子化的基本数据类型

AtomicBoolean

AtomicInteger

AtomicLong

原子化的对象引用类型

AtomicReference

AtomicStampedReference

AtomicMarkableReference

用锁的最佳实践

  1. 永远只在更新对象的成员变量时加锁
  2. 永远只在访问可变的成员变量时加锁
  3. 永远不在调用其他对象的方法时加锁

Gitlab Runner的安装与注册

1.访问项目的gitlab页面获取token

Settings -> CI/CD -> Runners

获取到当前项目的key,如下图所示:

此时我们就可以拿到对应的token了

2. 安装gitlab runner

官网安装教程:https://docs.gitlab.com/runner/install/

建议是安装到CentOS服务器上

  1. 添加源
    1
    curl -L https://packages.gitlab.com/install/repositories/runner/gitlab-runner/script.rpm.sh | sudo bash
  2. 通过包管理工具安装
    1
    sudo yum install gitlab-runner
    默认会安装最新的版本,如果需要安装特定版本则可以通过以下命令实现
    1
    2
    yum list gitlab-runner --showduplicates | sort -r
    sudo yum install gitlab-runner-10.0.0-1

3. 注册gitlab runner

1
sudo gitlab-runner register

需要输入项目所在的gitlab地址,类似于https://gitlab.xxx.com
回车过后需要输入项目的token,也就是我们在第一步中获取到的
后面会要求输入一些描述,紧接着是tag,这个可以用于.gitlab-ci.yml文件中指定具体的runner,可以认为tag就是这个当前注册runner的身份ID。最后是选择执行器,初级选手,只会使用shell,其他相关的请看EXECUTORS执行器简单介绍

另,一台服务器可以注册多个gitlab runner

EXECUTORS执行器简单介绍

执行者 描述
shell 在本地运行构建,默认
docker 使用Docker容器运行构建 - 这需要在Runner运行的系统上安装[runners.docker]和Docker Engine
docker-ssh 运行生成使用泊坞容器,但用SSH连接到它-这需要的存在[runners.docker],[runners.ssh]以及码头工人引擎的亚军运行的系统上安装。注意:这将在本地计算机上运行docker容器,它只是更改命令在该容器内运行的方式。如果要在外部计算机上运行docker命令,则应更改host该runners.docker部分中的参数。
ssh 使用SSH远程运行构建 - 这需要存在 [runners.ssh]
parallels 使用Parallels VM运行构建,但使用SSH连接到它 - 这需要存在[runners.parallels]和[runners.ssh]
virtualbox 使用VirtualBox VM运行构建,但使用SSH连接到它 - 这需要存在[runners.virtualbox]和[runners.ssh]
docker+machine 喜欢docker,但使用自动缩放的Docker机器 - 这需要存在[runners.docker]和[runners.machine]
docker-ssh+machine 喜欢docker-ssh,但使用自动缩放的Docker机器 - 这需要存在[runners.docker]和[runners.machine]
kubernetes 使用Kubernetes Pods运行构建 - 这需要存在 [runners.kubernetes]

shell执行器补充

shell执行器是根据当前gitlab runner所安装的操作系统来决定的

shell 描述
bash 生成Bash(Bourne-shell)脚本。在Bash上下文中执行的所有命令(所有Unix系统的默认值)
sh 生成Sh(Bourne-shell)脚本。Sh上下文中执行的所有命令(适用bash于所有Unix系统的后备)
cmd 生成Windows批处理脚本。所有命令都在批处理上下文中执行(Windows的默认值)
powershell 生成Windows PowerShell脚本。所有命令都在PowerShell上下文中执行

.gitlab-ci.yml基本语法

以下内容暂时只针对shell执行器

config.toml基本配置

官方文档介绍:https://docs.gitlab.com/runner/configuration/advanced-configuration.html

全局配置

关键字 描述
concurrent 限制全局可以同时运行的作业数。使用所有已定义的运行者的作业的最大上限。0并不是指无限制,如果服务器性能还不错,可以尝试给个5
log_level 日志级别(选项:调试,信息,警告,错误,致命,恐慌)。请注意,此设置的优先级低于命令行参数设置的级别--debug-l--log-level
log_format 日志格式(选项:runner,text,json)。请注意,此设置的优先级低于命令行参数设置的格式--log-format
check_interval 定义新作业检查之间的间隔长度(以秒为单位)。默认值为3; 如果设置为0或更低,将使用默认值。
sentry_dsn 启用跟踪哨兵的所有系统级错误
listen_address 地址(<host>:<port>),Prometheus指标HTTP服务器应该在其上监听

[session_server]介绍

此项配置应在[[runners]]外部指定,主要包含以下参数

设置 描述
listen_address 用于会话服务器的内部URL。
advertise_address Runner将向GitLab公开的URL,用于访问会话服务器。listen_address如果没有定义,则回退到。
session_timeout 作业完成后会话可以保持活动状态的多长时间(这将阻止作业完成),默认为1800(30分钟)。

其中listen_addressadvertise_address需要以host:port形式提供,其中host可以是IP地址,也可以是域名。

示例:

1
2
3
4
[session_server]
listen_address = "0.0.0.0:8093" # listen on all available interfaces on port 8093
advertise_address = "runner-host-name.tld:8093"
session_timeout = 1800

[[runners]]介绍

一个config.toml文件允许存在多个[[runners]],也就对应了上文中提到的一个服务器允许注册多个gitlab runner
相关配置参数介绍如下表所示:

设置 描述
name Runner的描述,只是提供信息
url GitLab URL
token Runner的特殊令牌(不要与注册令牌混淆)
tls-ca-file 包含证书的文件,用于在使用HTTPS时验证对等方
tls-cert-file 包含证书的文件,以便在使用HTTPS时与对等方进行身份验证
tls-key-file 包含私钥的文件,用于在使用HTTPS时与对等方进行身份验证
limit 限制此令牌可同时处理的作业数。0(默认)仅表示不限制
executor 选择应如何构建项目
shell 用于生成脚本的shell的名称。默认值取决于平台。
builds_dir 构建将存储在所选执行程序的上下文中的目录(本地,Docker,SSH)
cache_dir 构建缓存的目录将存储在所选执行程序(本地,Docker,SSH)的上下文中。如果使用docker执行程序,则此目录需要包含在其volumes参数中。
environment 附加或覆盖环境变量
request_concurrency 限制GitLab新作业的并发请求数(默认值为1)
output_limit 设置最大构建日志大小(以KB为单位),默认设置为4096(4MB)
pre_clone_script 在克隆Git存储库之前要在Runner上执行的命令。例如,这可以用于首先调整Git客户端配置。要插入多个命令,请使用(三引号)多行字符串或“\ n”字符。
pre_build_script 在克隆Git存储库之后但在执行构建之前要在Runner上执行的命令。要插入多个命令,请使用(三引号)多行字符串或“\ n”字符。
post_build_script 在执行构建之后但在执行之前要在Runner上执行的命令after_script。要插入多个命令,请使用(三引号)多行字符串或“\ n”字符。
clone_url 覆盖GitLab实例的URL。如果Runner无法连接到GitLab上的GitLab暴露自己,则使用。
debug_trace_disabled 禁用该CI_DEBUG_TRACE功能。设置为true时,即使用户CI_DEBUG_TRACE将设置为调试跟踪,也将保持禁用状态true。

示例:

1
2
3
4
5
6
7
8
9
10
[[runners]]
name = "ruby-2.1-docker"
url = "https://CI/"
token = "TOKEN"
limit = 0
executor = "docker"
builds_dir = ""
shell = ""
environment = ["ENV=value", "LC_ALL=en_US.UTF-8"]
clone_url = "http://gitlab.example.local"

Java线程基础

线程状态

关于java的线程状态,实际上你只要看Thread.java源码就可以了,网上很多资料都不全,而且往往你看了资料以后根本记不住,反而自己去花时间看看源码,思考每次位运算的结果,才能让你真的理解和掌握线程的状态转换~当然,我也不建议你看我的这篇文章,毕竟我是写给我自己看的。

NEW

Thread state for a thread which has not yet started.

还没有开始的状态,就是new

RUNNABLE

Thread state for a runnable thread. A thread in the runnable state is executing in the Java virtual achine but it may be waiting for other resources from the operating system such as processor.

正在jvm虚拟机中运行的状态,但是可能还需要等待系统资源,类似于cpu这种资源。

BLOCKED

Thread state for a thread blocked waiting for a monitor lock. A thread in the blocked state is waiting for a monitor lock to enter a synchronized block/method or reenter a synchronized block/method after calling

在monitor管程阻塞等待获取资源(锁)的状态,主要表现在synchronized block/method

WAITING

Thread state for a waiting thread. A thread is in the waiting state due to calling one of the following methods:

Object.wait with no timeout

Thread.join with no timeout

LockSupport.park

A thread in the waiting state is waiting for another thread to perform a particular action.

当调用Object.wait()Thread.join()LockSupport.park()方法的时候,调用线程就会进入waiting状态,只有当线程调用Object.notify()或者Object.notifyAll()方法的时候才会终止。另外,一个线程调用另外一个线程的join()方法,则该线程将等待另外一个线程执行结束。

TIMED_WAITING

Thread state for a waiting thread with a specified waiting time. A thread is in the timed waiting state due to calling one of the following methods with a specified positive waiting time.

TERMINATED

Thread state for a terminated thread.The thread has completed execution.

从源码看ThreadPoolExecutor和BlockQueue

JDK版本

此文章基于jdk 1.8.0_191

需要用到的知识点

位运算

操作符 描述 例子
如果相对应位都是1,则结果为1,否则为0 (A&B),得到12,即0000 1100
| 如果相对应位都是0,则结果为0,否则为1 (A | B)得到61,即 0011 1101
^ 如果相对应位值相同,则结果为0,否则为1 (A ^ B)得到49,即 0011 0001
按位取反运算符翻转操作数的每一位,即0变成1,1变成0。 (〜A)得到-61,即1100 0011
<< 按位左移运算符。左操作数按位左移右操作数指定的位数。 A << 2得到240,即 1111 0000
>> 按位右移运算符。左操作数按位右移右操作数指定的位数。 A >> 2得到15即 1111
>>> 按位右移补零操作符。左操作数的值按右操作数指定的位数右移,移动得到的空位以零填充。 A>>>2得到15即0000 1111

线程池原理

摘自 汪文君. Java高并发编程详解:多线程与架构设计 (Java核心技术系列) (Kindle 位置 2508-2512). 北京华章图文信息有限公司. Kindle 版本.

所谓线程池通俗的理解就是有一个池子,里面存放着已经创建好的线程,当有任务提交给线程池执行时,池子中的 某个线程会主动执行该任务。如果池子中的线程数量不够应付数量众多的任务时,则需要自动扩充新的线程到池子 中,但是该数量是有限的,就好比池塘的水界线一样。当任务比较少的时候,池子中的线程能够自动回收,释放 资源。为了能够异步地提交任务和缓存未被处理的任务,需要有一个任务队列。

线程池原理图

一个完整的线程池应该具备如下要素:

  1. 任务队列:用于缓存提交的任务
  2. 线程数量管理功能:一个线程池必须能够很好地管理和控制线程数量,可通过如下三个参数来实现,比如创建 线程池时初始的线程数量 init;线程池自动扩充时最大的线程数量max;在线程池空闲时需要释放线程但是也要维护一定数量的活跃数量或者核心数量core。有了这三个参数,就能够很好地控制线程池中的线程数量,将其维护在一个合理的范围之内,三者之间的关系是 init<= core<= max
  3. 任务拒绝策略:如果线程数量已达到上限且任务队列已满,则需要有相应的拒绝策略来通知任务提交者
  4. 线程工厂:主要用于个性化定制线程,比如线程设置为守护线程以及设置线程名称等
  5. QueueSize:任务队列主要存放提交的Runnable,但是为了防止内存溢出,需要有limit数量对其进行控制
  6. Keepedalive 时间:该时间主要决定线程各个重要参数自动维护的时间间隔

线程池的五种状态

线程池状态示意图以及五种状态的说明摘自CSDN一只逗比的程序猿

一共有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED

线程池状态切换示意图

线程池状态切换示意图

RUNNING

状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理

状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0

SHUTDOWN

状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务

状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN

注:虽然状态已经不是RUNNING了,但是如果任务队列中还有任务的时候,线程池仍然会继续执行,具体分析请见ThreadPoolExecutor.execute()方法解析

STOP

状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务

状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP

TIDYING

状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现

状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。
当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING

TERMINATED

状态说明:线程池彻底终止,就变成TERMINATED状态

状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED

线程池五种状态的二进制表示

线程池状态 二进制
RUNNING 111
SHUTDOWN 000
STOP 001
TIDYING 010
TERMINATED 011
1
2
3
4
5
6
7
8
9
10
11
COUNT_BITS :29
RUNNING :11100000 00000000 00000000 00000000
SHUTDOWN :00000000 00000000 00000000 00000000
STOP :00100000 00000000 00000000 00000000
TIDYING :01000000 00000000 00000000 00000000
TERMINATED :01100000 00000000 00000000 00000000
RUNNING :-536870912
SHUTDOWN :0
STOP :536870912
TIDYING :1073741824
TERMINATED :1610612736

ThreadPoolExecutor解读

构造函数解读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

先看参数

1
2
3
4
5
6
7
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler

对应含义关系如下

参数名 类型 备注
corePoolSize int 核心线程数(如果allowCoreThreadTimeOut为true,核心线程将一直存活)
maximumPoolSize int 允许创建的最大线程数
(如果使用了无界队列LinkedBlockingQueue,这个值会失效,原因在讲解execute方法中提及)
keepAliveTime long 非核心线程闲置时的超时时长(如果allowCoreThreadTimeOut为true,这个时长也会用于核心线程)
unit TimeUnit 参数keepAliveTime的单位
workQueue BlockingQueue<Runnable> 任务队列,可选的子类
ArrayBlockingQueue
DelayQueue
LinkedBlockingDeque
LinkedBlockingQueue
LinkedTransferQueue
PriorityBlockingQueue
SynchronousQueue
threadFactory ThreadFactory 线程工厂,为线程池提供创建新线程的功能(其他构造函数中默认传Executors.defaultThreadFactory()
handler RejectedExecutionHandler 拒绝策略,当队列和线程池都满了就才会根据这个策略进行处理。(默认为AbortPolicy,直接抛出异常),可选子类ThreadPoolExecutor.AbortPolicy
ThreadPoolExecutor.CallerRunsPolicy
ThreadPoolExecutor.DiscardOldestPolicy
ThreadPoolExecutor.DiscardPolicy

执行方法解读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

这里涉及到runStateworkerCount两个概念,线程池中是利用32位的int变量来表示。

因为线程池的状态总共有五种,2^2 = 4, 2^3 = 8,所以需要占用三位,实际采用的就是高三位表示,具体可见线程池五种状态的二进制表示

剩下的部分全部都用于记录有效线程数,所以代码中也就规定有效线程数不可大于29位,也就是最大为2^29-1,详见execute()方法解析。

然后我们再来一起看看execute(Runnable command)方法是如何运行的。

execute()整体逻辑概览

  1. 最开始是一个基本的判空逻辑,如果传入的任务是空的则抛出异常
  2. 第一个判断:检查当前核心线程数,如果当前线程数小于核心线程则调用addWorker()方法创建线程,创建成功返回true
  3. 第二个判断:当核心线程池中的所有线程都在运行,此时将线程放到任务中
  4. 第三个判断:如果核心线程数已经满了,队列也添加失败了,那么这里就会调用上面提到的拒绝策略,如果我们没有在创建线程池时给出特定的拒绝策略,那么默认的实现就是抛出异常。
1
2
3
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());

细节实现

第一个判断:检查当前核心线程数

1
2
3
4
5
6
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
获取workerCount
1
2
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }

这里的ctl变量是一个初始值为RUNNINGAtomicInteger对象,拿到的变量c中既存储了当前线程池的状态,又保存了当前线程池中的有效线程数量。

画外音:这里的AtomicInteger对象为什么是线程安全的,是因为使用了CAS,具体不谈

判断当前有效线程数量是否大于核心线程数量

然后我们再看workerCountOf(c)方法的实现:

1
2
3
private static int workerCountOf(int c)  { 
return c & CAPACITY;
}

CAPACITY的值为:

1
00011111 11111111 11111111 11111111

结合位运算法则,这里c & CAPACITY的结果集就是实际的有效线程数量。

创建核心线程数量

当有效线程数量小于核心线程数量的时候,我们需要调用addWorker(command, true)的方法去创建线程,这里的参数ture就用于标识当前想要创建的线程是核心线程。

然后我们再来看看addWorker(command, true)方法的具体实现逻辑。

addWork实现逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
rs为什么可以表示runstate?
1
private static int runStateOf(int c)     { return c & ~CAPACITY; }

~CAPACITY的值为:

1
11100000 00000000 00000000 00000000

所以后面的29位不论怎么样都会变成0,也就是最后的结果集中只会有高三位用于表示runstate的参数

什么时候才会创建线程?
1
2
3
4
5
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

哇~这个判断真的是够绕的,我看了老半天,一起来缕一缕

1
rs >= SHUTDOW

代表当前runstate是SHUTDOWN STOP TIDYING TERMINATED 任意一个,关于五种状态的值可见线程池五种状态的二进制表示

1
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())

A:在rs >= SHUTDOW成立的前提下,如果是rs != SHUTDOWN,则整个判断成立。

B:在rs >= SHUTDOW成立的前提下,如果是firstTask != null,则整个判断成立。

C:在rs >= SHUTDOW成立的前提下,如果是 workQueue.isEmpty(),则整个判断成立。

并且A B C三个是有先后顺序的

再总结一下就是在第一个判断成立的前提下,第二个判断中,只要有一个不成立就会返回false,线程创建失败。

转成白话文就是

A 当线程池状态为STOP TIDYING TERMINATED时,不会创建线程

B 当线程池为SHUTDOWN时,不允许新建任务

C 当线程池为SHUTDOWN时,且没有新的任务,此时如果任务队列也经没有任务,同样不会创建线程

此时再回头看看线程池的五种状态


接着往下看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
for (;;) {
// 第一步,拿到当前的有效线程数
int wc = workerCountOf(c);
// 如果当前有效线程数已经大于等于允许的最大线程数,不允许创建
// 如果正准备创建的线程是core的线程且大于线程池初始化时设定的线程数,不允许创建
// 如果正准备创建的线程不是core但大于线程池初始化时设定的最大线程数,不允许创建
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 利用CAS做自增操作,如果成功了,就跳出循环体开始下一步操作,如果失败则重试
if (compareAndIncrementWorkerCount(c))
break retry;
// 重新读取当前的runstate,如果当前的runstate和循环体中的runstate有变化,则重新去判断是否需要创建线程
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
线程到底是怎么创建的呢?

经历了层层校验逻辑,我们总算是要准备创建线程了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
   // 标记线程是否启动
boolean workerStarted = false;
// 标记线程是否被添加
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 获取锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 再检查一下当前线程池的状态
// rs < SHUTDOWN 表示线程池的状态为RUNNING就直接创建线程
// rs == SHUTDOWN && firstTask == null 当前线程已经处于SHUTDOWN且当前没有新建的任务(也就是为了把任务队列执行完)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果正准备创建的线程已经处于alive状态,则抛出异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 否则放到workers中(包含线程池中所有的工作线程,将新构造的工作线程加入到工作线程集合中)
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
总结
创建线程的条件:

当前线程池状态为RUNNING,当前线程数小于核心线程数或当前线程线程数小于最大线程数。其中最大线程数又分为创建线程池时给定的数量以及程序允许的最大值。

不创建线程的条件:

当线程池状态不为RUNNING时,不会接受新的任务,此时如果任务队列还有任务,会把这部分处理完。

第二个判断:当核心线程池中的所有线程都在运行,此时将线程放到任务中

1
2
3
4
5
6
7
8
9
10
   // 检查当前线程池状态,如果为RUNNING状态,则尝试将任务放到任务队列中  
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 放置成功以后,再次检查当前的线程池状态,如果当前线程池状态非RUNNING,则尝试将刚刚放入的任务从任务队列中移除
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果当前线程数状态为RUNNING,但是workerCount的值又等于0则传入空任务结束此次创建
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}

第三个判断:什么时候执行拒绝策略?

1
2
else if (!addWorker(command, false))
reject(command);

当第二个判断不成立,也就是当前线程池状态非RUNNING状态或尝试将任务放到任务队列中失败时,尝试再次创建一个非核心线程,此时线程数需要小于创建线程池时给定的最大值。

总结

结合第二个判断和第三个判断,就可以明白为什么当任务队列是无界的时候,最大线程数不会产生作用了。

工作线程的执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); // 得到当前线程
Runnable task = w.firstTask; // 得到Worker中的任务task,也就是用户传入的task
w.firstTask = null; // 将Worker中的任务置空
w.unlock(); // allow interrupts。
boolean completedAbruptly = true; // 标识当前Worker异常结束,默认是异常结束
try {
// 如果worker中的任务不为空,执行执行任务
// 否则使用getTask获得任务。一直循环,除非得到的任务为空才退出
while (task != null || (task = getTask()) != null) {
// 如果拿到了任务,给自己上锁,表示当前Worker已经要开始执行任务了,
// 已经不是处于闲置Worker(闲置Worker的解释请看下面的线程池关闭)
w.lock();
// 在执行任务之前先做一些处理。
// 1. 如果线程池已经处于STOP状态并且当前线程没有被中断,中断线程
// 2. 如果线程池还处于RUNNING或SHUTDOWN状态,并且当前线程已经被中断了,
// 重新检查一下线程池状态,如果处于STOP状态并且没有被中断,那么中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 任务执行前需要做什么,ThreadPoolExecutor是个空实现,子类可以自行扩展
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 真正的开始执行任务,这里run的时候可能会被中断,比如线程池调用了shutdownNow方法
task.run();
} catch (RuntimeException x) { // 任务执行发生的异常全部抛出,不在runWorker中处理
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 任务执行结束需要做什么,ThreadPoolExecutor是个空实现,子类可以自行扩展
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++; // 记录执行任务的个数
w.unlock(); // 执行完任务之后,解锁,Worker变成闲置Worker,等待执行下一个任务
}
}
completedAbruptly = false; // 正常结束
} finally {
processWorkerExit(w, completedAbruptly); // Worker退出时执行
}
}

BlockQueue

三个添加元素的方法:

  • add:把e加到队列里,添加成功返回true,容量如果满了添加失败会抛出IllegalStateException异常
  • offer:表示如果可能的话,将e加到队列里,成功返回true,否则返回false
  • put:把e加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

三个删除元素的方法:

  • poll:取走队列头部的对象,若不能立即取出,则可以等待timeout参数规定的时间,取不到时返回null
  • remove:基于对象找到对应的元素并删除,删除成功返回true,否则返回false
  • take:取走队列中排在首位的对象,若队列为空,一直阻塞到队列有元素并删除

其中:

队列不接受null 元素。试图add、put 或offer 一个null 元素时,某些实现会抛出NullPointerException。

null 被用作指示poll 操作失败的警戒值。

抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove(Object o) poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用
类型 含义
抛出异常 如果试图的操作无法立即执行,抛一个异常
特殊值 如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)
阻塞 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行
超时 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,
但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)

常见的四个实现:

ArrayBlockingQueue

一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。

ArrayBlockingQueue的原理就是使用一个可重入锁和这个锁生成的两个条件对象进行并发控制(classic two-condition algorithm)

ArrayBlockingQueue是一个带有长度的阻塞队列,初始化的时候必须要指定队列长度,且指定长度之后不允许进行修改

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 存储队列元素的数组,是个循环数组
final Object[] items;

// 拿数据的索引,用于take,poll,peek,remove方法
int takeIndex;

// 放数据的索引,用于put,offer,add方法
int putIndex;

// 元素个数
int count;

// 可重入锁
final ReentrantLock lock;
// notEmpty条件对象,由lock创建
private final Condition notEmpty;
// notFull条件对象,由lock创建
private final Condition notFull;

创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 构造函数要求指定队列大小capacity
/**
* capacity和fair,capacity同第一个构造方法,代表队列大小。fair代表该队列的访问策略是否公平。如果为
* true,则按照 FIFO 顺序访问插入或移除时受阻塞线程的队列;如果为 false,则访问顺序是不确定的。这里fair参
* 数被设置为ReentrantLock的入参,就可以通过ReentrantLock来保证线程访问是否公平。而此构造方法创建了两个
* Condition,也就是条件,分别是notEmpty和notFull,Condition可以调用wait()和signal()来控制当前现
* 成等待或者唤醒
*/
// 默认fair为false
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}

内部调用的this(capacity, fair)方法

1
2
3
4
5
6
7
8
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

数据的添加

add

ArrayBlockingQueue自己并没有实现add方法,而直接调用父类AbstractQueue的add方法

1
2
3
public boolean add(E e) {
return super.add(e);
}

内部调用的super.add

1
2
3
4
5
6
public boolean add(E e) {
if (offer(e))
return true;
else
throw IllegalStateException("Queue full");
}

实际上最后调用的就是ArrayBlockingQueue自己的offer方法,但是如果offer方法返回结果为false,则抛出IllegalStateException

offer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public boolean offer(E e) {
// 不允许元素为空,为空会抛出NullPointerException异常
checkNotNull(e);
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 元素个数和当前存储队列元素的数组大小相等,就不会再加了,所以会返回false
if (count == items.length)
return false;
else {
// 入队操作
enqueue(e);
return true;
}
} finally {
// 释放锁
lock.unlock();
}
}

内部调用的enqueue方法

1
2
3
4
5
6
7
8
9
10
11
12
13
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
// 放数据索引+1,如果数据索引已经与存储队列的元素数量相同则变为0
if (++putIndex == items.length)
putIndex = 0;
// 元素个数+1
count++;
// 使用条件对象notEmpty通知,比如使用take方法的时,队列中没有数据被阻塞。这个时候队列中新增了一条数据,需要调用signal通知
notEmpty.signal();
}
put
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void put(E e) throws InterruptedException {
// 元素为空抛出NPE异常
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 加锁,保证调用put方法的时候只有一个线程
lock.lockInterruptibly();
try {
// 如果队列满了,阻塞当前线程并加入到条件对象notFull的等待队列里
while (count == items.length)
// 线程阻塞并被挂起,同时释放锁
notFull.await();
// 入队
enqueue(e);
} finally {
lock.unlock();
}
}

疑问点,之前的入队操作的写法是

1
2
3
4
5
6
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}

为什么改了?改之前和改之后有什么区别?

总结

ArrayBlockingQueue总共有三个添加数据的方法,分别是add、put、offer

  • add方法:内部实际调用的是offer方法,如果队列已满则抛出IllegalStateException一场,否则返回true

  • offer方法:如果队列满了返回false,成功返回true

  • put方法:如果队列满了会阻塞线程,直到有线程消费了队列中的元素

三个方法内部都使用可重入锁保证原子性

数据的删除

poll
1
2
3
4
5
6
7
8
9
10
11
public E poll() {
// 加锁,保证调用poll方法的时候只有一个线程
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果count=0,也就是队列中没有元素了,这时候会返回null,否则调用dequeue取元素
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

内部调用的dequeue方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// takeIndex是用于拿数据的索引
E x = (E) items[takeIndex];
// 取出数据以后,这个元素位置为null
items[takeIndex] = null;
// 如果当前拿数据的索引大小已经和元素数组大小相等则置为0,这里的takeIndex之所以自增是因为FIFO原则
if (++takeIndex == items.length)
takeIndex = 0;
// 一个数据被取出,此时元素个数-1
count--;
// TODO
if (itrs != null)
itrs.elementDequeued();
// 使用对象notFull通知,如使用put方法放置数据的时候队列满了,被阻塞,这个时候dequeue取出一条数据,队列没满,则可以继续放入
notFull.signal();
return x;
}
take
1
2
3
4
5
6
7
8
9
10
11
12
13
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列为空,阻塞当前线程,并加入到条件对象notEmpty的等待队列里
while (count == 0)
// 线程阻塞并被挂起,同时释放锁
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
remove
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
// 放数据索引
final int putIndex = this.putIndex;
// 取数据索引
int i = takeIndex;
// 通过元素类型遍历,找到类型相同的索引位置,i<=元素总大小
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
// 因为putIndex就是最后一个放数据的位置,所以拿数据的索引不能等于它
// 但是有没有想过为什么不可以写<=呢?
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}

内部调用的removeAt方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
// 如果要删除数据的索引位置就是拿数据索引位置,直接将takeIndex索引位置上的数据,然后takeIndex+1
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove

// slide over all others up through putIndex.
// 如果要删除数据的索引位置不是takeIndex,则需要移动元素位置,更新putIndex
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
// 删除以后通知阻塞线程,比如put方法
notFull.signal();
}

总结

三个删除方法,分别是poll,take,remove

  • poll方法对于队列为空的情况,返回null,否则返回队列头部元素
  • remove方法取的元素是基于对象的下标值,删除成功返回true,否则返回false
  • poll方法和remove方法不会阻塞线程
  • take方法对于队列为空的情况,会阻塞并挂起当前线程,直到有数据加入到队列中
  • 三个删除方法内部都会调用notFull.signal方法通知正在等待队列满情况下的阻塞线程

LinkedBlockingQueue

内部以一个链式结构(链接节点)对其元素进行存储,链表是单向链表,满足FIFO(先进先出)原则。

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
  /** The capacity bound, or Integer.MAX_VALUE if none */
// 容量大小
private final int capacity;

/** Current number of elements */
// 元素个数
private final AtomicInteger count = new AtomicInteger();

/**
* Head of linked list.
* Invariant: head.item == null
*/
// 头节点
transient Node<E> head;

/**
* Tail of linked list.
* Invariant: last.next == null
*/
// 尾节点
private transient Node<E> last;

/** Lock held by take, poll, etc */
// 读锁
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
// 读锁的条件对象
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
// 写锁
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
// 写锁的条件对象
private final Condition notFull = putLock.newCondition();

ArrayBlockingQueue只有1个锁,添加数据和删除数据的时候只有1个可以被执行,不允许并行操作

但LinkedBlockingQueue有2个锁,读和写各有一把,添加数据和删除数据两个操作可以并行。

创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}

内部调用的this和enqueue方法

1
2
3
4
5
6
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// last和head节点都是null
last = head = new Node<E>(null);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
static class Node<E> {
E item;

/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;

Node(E x) { item = x; }
}

enqueue的class方法

1
2
3
private void enqueue(Node<E> paramNode){
this.last = (this.last.next = paramNode);
}

另外还画了一个图

数据的添加

三个方法,分别是add offer put

add

LinkedBlockingQueue和ArrayBlockingQueue一样,同样没有实现add方法,所以会直接调用父类AbstractQueue的add方法

1
2
3
4
5
6
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

实际上也就是调用LinkedBlockingQueue的offer方法,失败则抛出异常

offer

关于这里使用到的锁实际上涉及到java的monitor MESA模型,这里就不展开讲了,有兴趣的小伙伴戳极客时间-管程:并发编程的万能钥匙,有二十个小伙伴可以免费读哦~打不开的话复制地址用微信打开哦😊

可重入锁指的是线程可以重复获取同一把锁,ReentrantLock有一个带布尔值fair的构造函数,true表示公平锁,反之则是非公平锁。指的是条件变量的等待队列唤醒策略,公平锁是唤醒等待时间最长的,非公平锁则不会保证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public boolean offer(E e) {
// 不允许空元素
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 如果满了就返回false,but这个大小可是2^31-1=2147483647
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
// 拿到写锁
final ReentrantLock putLock = this.putLock;
// 对写操作加锁
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
// 元素个数+1
c = count.getAndIncrement();
if (c + 1 < capacity)
// 如果容量还没满,在对象notFull唤醒正在等待的线程,表示可以再往队列里加数据了
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
// todo
return c >= 0;
}
put
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
// 拿到写锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 对写操作加锁
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
// 如果当前容量已经满了,则阻塞并挂起当前线程
while (count.get() == capacity) {
notFull.await();
}
// 入队操作
enqueue(node);
// 元素+1
c = count.getAndIncrement();
if (c + 1 < capacity)
// 如果容量还没满,在放锁的条件对象notFull唤醒正在等待的线程
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}

总结

add offer put方法与ArrayBlockingQueue特性一致,只是底层实现不同

  • add方法:内部实际调用的是offer方法,如果队列已满则抛出IllegalStateException一场,否则返回true

  • offer方法:如果队列已满则返回false,成功返回true

  • put方法:如果队列满了会阻塞线程,直到有线程消费了队列中的元素

ArrayBlockingQueue中放入数据阻塞的时候,需要消费数据才能唤醒,而LinkedBlockingQueue中放入数据阻塞的时候,因为它内部有2个锁,可以并行执行放入数据和消费数据,不仅在消费数据的时候进行唤醒插入阻塞的线程,同时在插入的时候如果容量还没满,也会唤醒插入阻塞的线程

数据的删除

poll
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
// 成功就会返回实际的元素,否则返回null
return x;
}

内部调用的dequeue方法

1
2
3
4
5
6
7
8
9
10
11
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

不是特别好理解的话,就看图吧

take
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
// 阻塞等待
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
remove
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public boolean remove(Object o) {
if (o == null) return false;
// remove操作的位置不固定,所以需要对两个锁都进行加锁
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
// 判断是否找到对象
if (o.equals(p.item)) {
// 修改节点的链接信息,同时调用notFull的signal方法
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}

内部调用的fullyLock方法 unlink方法以及fullyUnlock方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
// 判断当前元素总数是否等于最大值
if (count.getAndDecrement() == capacity)
notFull.signal();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}

总结

take poll方法的特性与ArrayBlockingQueue一致,唯一不同的是remove方法中会同时对取 拿两个锁进行加锁

ArrayBlockingQueue因为内部实现是通过数组实现,所以其在初始化的时候必须要指定大小,且不可变,在删除操作的时候会移动元素。

LinkedBlockingQueue内部实现是通过单向链表实现,本身没有边界,但默认最大值为2^31-1。可同时操作读和写,在删除操作时需要给读写同时加锁。

DelayQueue

Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队 列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待

SynchronousQueue

SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。

拒绝策略

主要有四种

ThreadPoolExecutor.AbortPolicy

默认的ThreadPoolExecutor.AbortPolicy 处理程序遭到拒绝将抛出运行时RejectedExecutionException

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }

/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

ThreadPoolExecutor.CallerRunsPolicy

线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }

/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

ThreadPoolExecutor.DiscardPolicy

不能执行的任务将被删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }

/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

ThreadPoolExecutor.DiscardOldestPolicy

如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }

/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

参考资料

极客时间-Java并发编程实战

《Java高并发编程详解:多线程与架构设计 (Java核心技术系列) 》

Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析

LinkedBlockingQueue源码解析

0%