怕什么真理无穷,进一寸有一寸的欢喜

0%

并发与锁总结

要学习一个知识点,可以采取的思路是了解为什么要用它,怎么用它,有什么缺点,如何去改进。

对于并发 ,也将从这四个方面进行介绍。

并发应用场景

利用多线程去替代单线程,主要有两个应用场景。

  1. 获得更好的性能

    单线程执行速度有限,为了获得更好的性能需要使用多线程

  2. 由于业务模型的需要

    确实需要多个执行实体

需要分清进程与线程的区别

为什么要有进程与线程

进程为了解决并发操作问题,保存了当前线程的状态,由于分配时间短,会有不同进程并行执行的感觉(实际为并发)。但一个进程只能一次执行一个任务,若子任务不存在顺序的区别,可以并发执行。让CPU切给子任务更细的时间片去执行。这样一个进程包括多个线程。

进程让操作系统的并发成为可能,线程让进程的内部并发成为可能

进程与线程区别

  • 进程可以看作是独立应用,线程不可以

  • 进程有独立的地址空间,相互不影响,线程只是进程的不同执行路径,线程没有独立的地址空间

  • 多进程程序比多线程程序健壮

  • 进程的切换比线程的切换开销大

进程间通信方式

管道(pipe)

fork操作可以创建紫禁城,或复制一个进程完全一样的子进程,共享代码空间,但有各自独立的数据空间,紫禁城的数据空间是拷贝父进程的数据空间。管道机制要求两个进程间有血缘关系,如fork出的父子进程。

linux操作系统里,管道缓存要在进程间传输的数据,管道是一个固定大小的缓冲区,4kb。管道中数据一旦被读取出来,就不在管道中。若管道满了,写管道的操作就阻塞,直到别人读取了管道数据;反之若管道是空的,读操作就阻塞。管道一边是一个进程输入,一边为一个进程的输入,一个进程写,一个进程读。两个进程均没了,管道也没了。管道为半双工,数据只能流向一个刚发。

linux中对管道的实现,用了两个文件,指向了一个VFS(虚拟文件系统)的索引节点inode,然后VFS索引节点指向一个物理界面,接着一个进程通过自己关联文件写数据,另一个进程通过自己关联文件读数据。

命名管道(fifo)

命名管道解决了匿名管道要求父子关系进程通信的限制。之前管道没有名字,需要是父子关系的进程才能使用。命名管道,相当于有名字的文件,有路径,没有血缘关系的进程都可以通过此命名管道通信,名字在文件系统删,数据在内存里,其他与匿名管道相同,半双工。

消息队列

linux的消息队列可以认为是个链表结构,linux内核有msgque链表,链表里每个指针指向一个msgid_ds结构,此结构描述了一个消息队列。进程可以通过此消息队列进行通信,一样是写入数据和消费数据。消息队列的好处是每个消息可以指定类型,消费时候消费指定类型的消息即可,功能更多, 用的不多。

共享内存

一块物理内存被映射到两个进程的进程地址空间,进程之间相互可以看到对方在共享内存里做出的修改,需要锁来保证同步。

如何应对秒杀

  1. 利用CDN实现资源的动静分离

    静态资源直接使用CDN处理,其他资源去请求服务器

  2. 使用nginx来实现反向代理与负载均衡

    避免一台服务器压力过大,通过nginx配置文件,来配置不同的url请求的服务器,这样可以实现不同请求访问不同服务器,实现反向代理。设置upstream的group,对于一个请求,会根据其负载策略来分发给不同的服务器来处理,实现负载均衡

  3. 应用程序微服务化

    可能有些程序需要被使用到的频率较高,将不同的程序拆分,形成不同的微服务,让被使用更多的程序分配更多的机器与资源

  4. 增加缓存,Redis等

    一些读的请求,如果频繁请求数据库,会让数据库压力过大,因此可以让热点数据缓存在Redis中,减轻DB压力

  5. 增加消息队列

    使用消息队列来实现削峰,避免请求同时打到服务器上,这样先将消息放入消息队列,然后服务器再从中取出进行处理

多线程怎么用

线程状态

  1. 新建(New):创建后尚未启动的线程的状态(还未调用start方法)

  2. 就绪(Ready):等待CPU为其分配时间

  3. 运行(Running):正在执行

    其中2与3可统称为Runnable

  4. 无限期等待(Waiting):不会被分配CPU执行时间,需要显式被唤醒。wait方法不指定时间,join方法不定之间

  5. 限期等待(Timed Waiting):在一定时间后会由系统自动唤醒(sleep方法)

  6. 阻塞(Blocked):等待获取排它锁(Synchronized)

  7. 结束(Terminated):已终止线程的状态,线程已经结束执行(run或main方法结束)

阻塞和等待区别

阻塞可理解为当前线程仍处于活跃状态,只是在阻塞等待其他线程使用完某个锁资源。在等待是因为自身调用了wait()方法,join()方法或其他进入等待状态,只能等待其他线程执行某个特定动作才能继续唤醒。

Java使用多线程的四种方式

方式一:继承Thread类

1、 定义一个类继承Thread类

2、 覆盖Thread类中的run()方法。Thread类用于描述线程,因此Thread类也有对任务的描述,这个任务就是通过Thread类中的run()方法来体现,因此 run()方法就是封装自定义线程运行任务的函数。run方法中定义的是线程要运行的任务代码。

3、 直接创建Thread的子类对象创建线程

4、 调用start()方法,作用为启动线程,调用run()方法

1
2
3
4
5
6
7
8
class MyThread extends Thread{
@Override
public void run(){
for (int i = 0; i < 50; i++) {
System.out.println(MyThread.currentThread().getName()+" "+i);
}
}
}

在调用的时候

1
2
3
4
MyThread t1 = new MyThread();
MyThread t2 = new MyThread();
t1.start();
t2.start();

原理:Thread本身实现了Runnable接口,通过start()方法启动线程,native方法start()启动新线程并执行run()方法,实现方式较为简单,但继承有局限性。

注意:Thread类的start()方法最多只能调用一次,运行后会将started的状态改变,再次调用会报不合法线程状态异常。

方式二:实现Runnable接口

1、 定义类实现Runnable接口

2、 覆盖接口中的run()方法,将线程的任务代码封装到run()方法中

3、 通过Thread类创建对象,并将Runnable接口的子类对象作为Thread类构造函数的参数进行传递

原因:因为线程的任务都封装在Runnable子类对象run()方法中,所以要在线程对象创建时明确要运行的任务。

4、 调用线程对象的start()方法启动线程

1
2
3
4
5
6
7
8
class RunTest implements Runnable{
@Override
public void run(){
for (int i = 0; i < 50; i++) {
System.out.println(Thread.currentThread().getName()+" "+i);
}
}
}

调用方式

1
2
3
4
5
RunTest r = new RunTest();
Thread t1 = new Thread(r);
Thread t2 = new Thread(r);
t1.start();
t2.start();

原理:当传入一个Runnable类型的任务参数target给Thread后,Thread的run()方法就会调用target.run()

方式三:实现Callable接口

1、创建Callable接口的实现类,并实现call()方法,并创建该实现类的实例

2、使用FutureTask类来包装Callable对象,该FutureTask对象封装了Callable对象的call()方法的返回值

3、使用FutureTask对象作为Thread对象的target创建并启动线程

4、调用FutureTask对象的get()方法来获得子线程执行结束后的返回值

1
2
3
4
5
6
7
8
9
10
class MyCallable implements Callable<String>{
@Override
public String call() throws InterruptedException {
String value = "test";
System.out.println("Ready to work");
Thread.currentThread().sleep(5000);
System.out.println("task done");
return value;
}
}

启动线程

1
2
3
4
5
6
7
8
public static void main(String[] args) throws Exception {
FutureTask task = new FutureTask(new MyCallable());
new Thread(task).start();
if (!task.isDone()){
System.out.println("task has not finished, please wait");
}
System.out.println("task return : " + task.get());
}

get方法会被阻塞直到子线程结束。

方式四:使用Executors工具类来使用

使用Executors中的方法来获取不同种类的线程池,一般不建议直接这样使用。

详情见后面的线程池部分,此处主要讨论前三种方法

三种方法的比较

实现Runnable和实现Callable接口的方式大致相同,基本思路均为将任务进行分装再传递给Thread对象,但后者执行call()方法有返回值且get()方法可以获取异常,且Callable接口可以搭配线程池使用,进行统一管理。二者和Thread的比较为:

1、Thread是一个类,Runnable是一个接口;

2、继承Thread类后不能继承其他类,而实现Runnable接口后仍可继承其他类,更为灵活,避免了单继承的局限性;

3、Thread类中的资源不能共享(多个任务对象),而实现Runnable接口可以实现资源共享(只有一个任务对象);

4、Runnable将任务记性封装,更体现了面向对象的编程思想。

因此相比于继承Thread,更推荐使用Runnable接口来实现多线程。

Thread中run()方法执行顺序

1
2
3
4
5
6
7
8
9
10
11
12
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("R run");
}
}
){
@Override
public void run(){
System.out.println("T run");
}
}.start();

输出为T run,看Thread中run()方法的源码

1
2
3
4
5
6
7
8
9
   /* What will be run. */
private Runnable target;

@Override
public void run() {
if (target != null) {
target.run();
}
}

​ 其会对target进行判断,当target不为空的时候,执行Runnable的run方法,但由于多态,当子类方法覆写了父类方法时,会优先执行子类方法。因此执行顺序为Thread子类>Runnable方法>Thread类

start与run

run()方法只是一个普通方法,而start方法可以让线程从新建状态转为执行状态,并调用run()方法。

start方法只能被调用一次,而run方法可以被多次调用。

线程优先级

可以使用Thread类中的setPriority方法

1
public final void setPriority(int newPriority)

其中优先级为1-10

1
2
3
public final static int MIN_PRIORITY = 1;
public final static int NORM_PRIORITY = 5;
public final static int MAX_PRIORITY = 10;

优先级越高的线程越有可能抢到CPU时间片,但不一定先执行。

常用方法

线程等待

线程间进行协作的时候,一个线程可能会比较依赖于另一个线程的执行结果,为了实现这个效果,可以使用线程的等待join()方法,在Thread类中

1
2
public final synchronized void join(long millis)
public final void join()

提供了无限期等待的无参方法和选择最长等待时间的有参方法,验证如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class TaskJoin implements Runnable{
public volatile static int i = 0;
@Override
public void run() {
for (int j = 0; j < 100000; j++) {
i++;
}
}
}
public class JoinTest {
public static void main(String[] args) throws InterruptedException {
TaskJoin tj = new TaskJoin();
Thread t = new Thread(tj);
t.start();
t.join();
System.out.println(tj.i);
}
}

如果没有join方法,输出的i的值会很小,而使用了join方法后,主线程会wait,直到线程任务执行完毕,join的本质是让调用线程(此处为主线程)wait在当前线程对象实例上,使用了join的线程,实际上进入了Waiting状态。因此每次输出的值都是设定的100000,可以用于线程的顺序执行。

线程礼让

yield为本地方法,表示告知线程调度器当前线程愿意让出CPU使用权,但可能会被线程调度器忽略,不稳定。

1
public static native void yield();

当线程调用了yield方法后,会回到就绪状态,与其他线程一起争抢CPU资源。

yield方法与sleep方法均为静态的是因为,需要使用在当前运行的线程上,其他等待线程使用没有意义。

守护线程

使用Tread类的setDaemon(true)方法,在start()方法前调用,守护线程在后台执行系统性服务,如垃圾回收线程等可以理解为守护的。当一个Java应用内,只有守护线程时,Java虚拟机就会退出。

1
2
3
4
5
6
7
public final void setDaemon(boolean on) {
checkAccess();
if (isAlive()) {
throw new IllegalThreadStateException();
}
daemon = on;
}

线程停止

  1. 自定义停止条件,在代码中进行判断

  2. 使用interruped方法,在异常捕捉的catch中进行处理

    使用中断,只是设置线程状态,不会让线程停止,需要自己去监视线程状态并做判断

  3. 已经作废的stop方法

线程数量

线程的数量并不是越多越好,因为同一时间CPU只能执行一个线程,这样过多的线程会造成线程上下文的频繁切换从而影响性能,需要根据执行任务的状态来确定。如果是计算密集型,即几乎不在IO上 停顿,这样设置线程数为CPU的核心数即可,过多的线程对CPU提升性能没有帮助。如果一个任务时IO密集型,即在IO上停顿时间较长,这样如果线程数太少,线程进行IO时会被阻塞,这样影响性能,这样多开几个线程,在进行IO等待时进行线程的切换,这样可以提高效率,具体的确认公式为

线程数量 = CPU可用核心数量 /(1-阻塞系数)

若任务被阻塞时间小于50%,认为是计算密集型,线程数将随机减少;若任务被阻塞时间大于50%,认为是IO密集型,线程数将随之增加。

线程安全问题

上文中介绍了为什么要用多线程及如何使用,但当多个线程操作一个资源时,会带来安全问题。如两个线程都在卖票,剩最后一张票的时候,会出现票数量<0的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Ticket implements Runnable{
private int num = 10;
@Override
public void run(){
while (true){
if (num > 0){
try {
Thread.sleep(500);
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" "+num--);
}
}
}
}

当开启多个线程卖票的时候,会出现如下的结果

已经限制了票数要>0,为什么会出现票数=-1的情况呢?

线程安全产生原因

  1. 多个线程在操作共享的数据。

  2. 操作共享数据的线程代码有多条。(操作共享数据代码在2行以上容易出事)

当一个线程在执行操作共享数据的多条代码中,其他线程参与了运算,就会导致线程安全问题的产生。

那么该如何解决呢?

解决思路:将多条操作共享数据的线程代码封装起来;当有线程在执行这些代码的时候,其他线程是不可以参与运算的。必须要当前线程把这些代码都执行完毕后,其他线程才可以参与运算,类似于上锁,当一群人要去上厕所,谁进去了将门锁住,其他人(线程)也就进不来了。

解决办法

乐观锁与悲观锁

为了解决线程安全问题,有的锁比较悲观,认为其他线程一定会修改数据,因此在拿数据的时候会上锁,若其他线程访问会被阻塞,Java中关键字synchronized的实现为悲观锁;而乐观锁,在拿数据时,认为别人不会修改,不会上锁,但在更新时会判断其他线程有没有更新数据。乐观锁适用于多读的场景,可以提高吞吐量。CAS就是比较常见的乐观锁的实现方式。

下面将主要将介绍比较常见的synchronized,RenentrantLock与JMM的内存可见性(volatile)与CAS。

synchronized

synchronized为互斥锁,具有的特性为

  1. 互斥性:即在同一时间只允许一个线程持有某个对象锁,通过这种特性来实现多线程的协调机制,这样在同一时间只有一个线程对需要同步的代码块(复合操作)进行访问。互斥性也称为操作的原子性

  2. 可见性:必须确保在锁被释放之前,对共享变量所做的修改,对于随后获得该锁的另一个线程是可见的(即在获得锁时应获得最新共享变量的值),否则另一个线程可能是在本地缓存的某个副本上继续操作,从而引起不一致。

synchronized锁的不是代码,锁的都是对象。

synchronized使用

可以分为对象锁和类锁,均加在同步代码块上或同步方法上。同步代码块的使用如下

对象锁:

1、 同步代码块(synchronized(this)), synchronized(类实例对象)),锁是小括号()中的实例对象

1
2
3
synchrnozed(this或类实例对象){
//要同步的代码
}

2、 同步非静态方法(synchronized method),锁是当前对象的实例对象

1
2
3
public synchronized 返回值 method(){
//要同步的代码
}

类锁:

1、 同步代码块(synchronized(类.class)),锁时小括号()中的类的对象(Class对象)

1
2
3
synchrnozed(类.class){
//要同步的代码
}

2、 同步静态方法(synchronized static method),锁是当前对象的类对象(Class对象)

1
2
3
public synchronized static 返回值 method(){
//要同步的代码
}

synchronized锁定方法与非锁定方法可以同时运行。synchronized可重入,如果不可重入,那么若父类中一个方法锁定,子类在继承的父类方法中,又调用了父类的锁定方法,如果不可重入,会造成子类调用父类方法的死锁。

synchronized锁,当有异常发生时,会释放当前锁,如果没有正确的异常处理,会造成其他线程的乱入。

可见性

被锁住的代码块,当执行完毕后,加store屏障,将当前处理器更新的变量值flush至高速缓存或主内存中;其他线程对被的处理器更新过的变量,使用load屏障来执行refresh处理器缓存操作,从其他处理的高速缓存或主内存中加载数据至自己的高速缓存中,确保自己看到的是最新的数据。

死锁

当出现锁的嵌套时,容易出现死锁。即线程1先获取锁A,再获取锁B;而线程2先获取锁B,再获取锁A。这时候两个线程均不能请求 到另一个锁,就会出现死锁,进程卡死。

要注意的是,这里的两个锁对象一定要用static修饰,因为要保证这两个对象随类加载保证唯一性,如果不加static,就会每次new DeadLockSyn的时候,产生各自的两个o1,o2,则不能出现死锁现象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class DeadLockSyn implements Runnable{
//要加static是因为随类的创建而产生
//如果不加static,那么产生的是各自的object对象,不是同一个锁了
public static Object o1 = new Object();
public static Object o2 = new Object();
private boolean flag;
public DeadLockSyn(boolean flag){
this.flag = flag;
}
@Override
public void run(){
if (flag){
synchronized (o1){
try {
Thread.sleep(500);
} catch (InterruptedException e) {}
synchronized (o2){
}
}
}else {
synchronized (o2){
try {
Thread.sleep(500);
} catch (InterruptedException e) {}
synchronized (o1){
}
}
}
System.out.println(Thread.currentThread().getName()+"完成任务");
}
}

调用过程

1
2
3
4
5
6
DeadLockSyn d1 = new DeadLockSyn(true);
DeadLockSyn d2 = new DeadLockSyn(false);
Thread t1 = new Thread(d1);
Thread t2 = new Thread(d2);
t1.start();
t2.start();

简单实现死锁程序的关键

  • 两个锁对象保证唯一性:static修饰
  • 两个锁的嵌套调用

死锁的调试

当发现死锁的时候,如何知道是哪里发生了死锁呢?

可以通过jps命令,查看java进程的进程ID,然后使用jstack得到线程的线程堆栈

1
2
3
4
5
->jps
11088
8356 Jps
11576 Launcher
5292 ThreadTest

然后看ThreadTest的线程堆栈情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Java stack information for the threads listed above:
===================================================
"Thread-1":
at test.DeadLockSyn.run(ThreadTest.java:141)
- waiting to lock <0x0499e890> (a java.lang.Object)
- locked <0x0499e898> (a java.lang.Object)
at java.lang.Thread.run(Thread.java:745)
"Thread-0":
at test.DeadLockSyn.run(ThreadTest.java:132)
- waiting to lock <0x0499e898> (a java.lang.Object)
- locked <0x0499e890> (a java.lang.Object)
at java.lang.Thread.run(Thread.java:745)

Found 1 deadlock.

这样便可以看到具体的死锁信息。

形成死锁的条件

  1. 互斥条件:资源具有排他性,只能被一个线程占有
  2. 请求与保持条件:线程因请求资源被阻塞时,对已获取的资源保持不放
  3. 不剥夺条件:线程已获得的资源在未使用完前不能被其他线程剥夺
  4. 循环等待条件:发生死锁时,所等待的线程形成循环等待

线程间通信

当每个线程可以独立安全的完成自己的任务了,但如果A线程的任务涉及到了B线程,如A线程是生产烤鸭的,B线程是吃烤鸭的,只有生产出来的烤鸭被吃掉了A才会继续生产,那要如何实现呢?

这时候需要用到线程间的通信,即A线程刚来做烤鸭,发现B线程还没吃,就跟B线程交流下,让他快点吃,自己先睡个觉,等烤鸭被吃了再起来吃。主要用到的方法是wait、notify()与notifyAll()。

生产者与消费者

基本的实现思路是:封装一个烤鸭资源对象,实现Runnable接口,有一个标识变量,代表烤鸭有没有被吃。生产烤鸭的方法需要加锁,首先判断烤鸭有没有被吃,如果没被吃,生产任务停止,唤醒吃烤鸭任务。如果烤鸭被吃了,进行生产。吃烤鸭的方法与生产类似。

需要持有一个资源,其属性有资源名称name,当前资源数量和标志变量(用来判断消费还是生产)。对同步的生产方法,当还没有消费的时候,让当前线程wait,释放锁;当已经消费了,进行生产,反转标志变量,数量自增,唤醒其他线程。对同步的消费方法,当还没有生产的时候,让当前线程wait,释放锁;当已经生产了,进行消费,反转标志变量,唤醒其他线程。生产者和消费者类持有资源类对象,实现Runnable接口,并分别在run()方法中调用其生产和消费方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class Resource{
private int count = 0;
private boolean flag = false;
private String name;//生产的物品名称
//同步的生产方法
public synchronized void set(String name){
//当还没有消费,生产线程等待
while (flag){
try {
wait();
}catch (InterruptedException e){
e.printStackTrace();
}
}
//开始生产
this.name = name;
count++;
flag = true;
System.out.println(Thread.currentThread().getName()+"生产"+name+count);
notifyAll();
}
//同步的消费方法
public synchronized void out(){
//当还没有生产,消费线程等待
while (!flag){
try {
wait();
}catch (InterruptedException e){
e.printStackTrace();
}
}
//开始生产
System.out.println(Thread.currentThread().getName()+"消费"+this.name+count);
flag = false;
notifyAll();
}
}
//生产者
class Producer implements Runnable{
private Resource r;
public Producer(Resource r){
this.r = r;
}
@Override
public void run(){
while (true){
r.set("烤鸭");
}
}

}
//消费者
class Consumer implements Runnable{
private Resource r;
public Consumer(Resource r){
this.r = r;
}
@Override
public void run(){
while (true){
r.out();
}
}
}
public static void main(String[] args) throws Exception {
Resource r = new Resource();
Producer p = new Producer(r);
Consumer c = new Consumer(r);
Thread t1 = new Thread(p);
Thread t2 = new Thread(p);
Thread t3 = new Thread(c);
Thread t4 = new Thread(c);
t1.start();
t2.start();
t3.start();
t4.start();
}

值得注意的是,在同步方法中判断标记要使用while而不能使用if,因为if只进行一次判断,不安全。唤醒线程的时候要使用notifyAll()而不能使用notify(),不然容易造成死锁。notify是唤醒线程池中的一个线程,而notifyAll是唤醒线程池中的所有线程。

可见在synchronized中,线程间通信主要依靠wait()、notify()和notifyAll()方法,这三个方法均在Object类中,那么既然wait()和sleep()都能让线程休眠,二者有什么区别呢?

wait和sleep差别

基本差别

  1. sleep是Thread类的方法,wait是Object类中定义的方法

  2. sleep()方法可以在任何地方使用, wait()方法只能在synchronized方法或synchronized块中使用

  3. sleep在设置时间结束后自动唤醒,而wait需要使用notify或者notifyAll方法唤醒

本质差别

​ Thread.sleep只会让出CPU,不会导致锁行为的改变;Object.wait不仅会让出CPU,还是释放已经占有的同步资源锁

二者相比,sleep类似于线程在厕所中睡着了,wait类似于从厕所中让出来睡着了。

介绍了synchronized后,需要了解为什么每个对象都可以作为锁,即需要了解synchronized的底层原理

底层实现

首先必须要明确的是synchronized锁的是对象,而不是代码。然后宏观上来看有以下级别的实现:

  1. 源码级别

    同步代码块或同步方法

  2. 字节码级别

    moniterenter moniterexist 或 SYN_SYNCHRONIZED标识

  3. JVM级别(Hotspot)

    锁升级的过程,依靠对象头中的Markword

实现基础

实现synchronized的基础是以下两点

  • Java对象头
  • Monitor

对象头结构

​ 在JVM中,对象在内存中的布局有对象头实例数据(存放类的属性数据信息)和对齐填充(虚拟机要求对象起始地址必须是8字节的整数倍)。

一般synchronized使用的锁对象是存储在Java对象头里的,头结构由Mark WordCLass Pointer指针组成,如果为数组,还有数组长度。其中CLass Pointer用于确定该对象是哪个类的实例;而Mark Word用于存储对象自身的运行时数据,是实现轻量级锁和偏向锁的关键,默认存储着对象的identity hashCode(即用内存地址计算出的哈希值),分代年龄(4字节最大15),锁标志位GC等信息。

对于Object o = new Object();Object对象的对象头,在64位的机器下,Markword是 8字节,一般默认压缩了ClassPointer指针跟栈中的对象引用指针,在不压缩下是8字节,压缩后变为4字节。因此Mardword+Class Pointer长度为8+4=12,而Object类中没有实例变量,为了对齐,需要有4字节的对其数据,因此Object对象在堆中占据空间为16字节。

Markword

MarkWord被设计为一个非固定的数据结构,以便存储更多有效数据,会根据对象本身的状态,复用存储空间,轻量级锁和偏向锁是JDK6后新增。

用后两位来区分不同的锁,01为偏向锁,00为轻量级锁,10为重量级锁,用倒数第三位来区分无锁和偏向锁。

identity hashcode

有关identity hashcode的问题,可以看到,只有在无锁状态下才有hashcode,如果一个对象计算了identity hashcode,则对象无法进入偏向状态。

若是轻量级锁,在线程的栈帧中保存了无锁状态下的Markword,即Lock Record记录。

若是重量级锁,在ObjectMonitor中有相应的成员变量。

hashcode为32位,但identity hashcode只用31位存储,无符号位,因此默认对象的hashcode均大于0

hashCode方法返回值不是地址,因为对象地址在32位jvm或64位jvm开启指针压缩下,地址为4字节,因为没必要用31位存储(使用的是当前线程有关的随机数+三个确定值,通过随机数算法计算得到的随机数

具体计算方法:https://www.jianshu.com/p/be943b4958f4

Monitor

对于重量级锁,锁标志位是10,指针指向的是monitor对象的起始地址,每个对象都有Monitor与之关联,monitor被线程持有后就处于锁定状态。在Java虚拟机中,monitor是由ObjectMonitor实现。monitor底层由C++实现,里面有两个队列,WaitSet(等待池)和EntryList(锁池),而owner指向持有ObjectMonitor对象的线程

具体过程为:当有多个线程同步访问同步代码时,进入锁池集合,当线程获取到锁对象的monitor以后,进入_Owner 区域并将owner变量设置为当前线程,Monitor中的计数器count+1,如果当前线程调用了wait()方法,会释放当前的Monitor,将owner变量设置为null,count清零,该线程进入等待池来等待被唤醒。若当前变量执行完毕,也会释放Monitor锁,并将对应的变量值复位。

monitor对象存在于每个Java对象的对象头中,synchronized锁通过这种方式获取锁,这也是为什么所有Java对象均可以作为锁的原因,也是wait,notify,notifyAll等方法存在于对象Object中的原因。而wait,notify,notifyAll方法需要在synchronized中使用是因为调用这几个方法前必须拿到当前对象的监视器monitor对象,而synchronized关键字可以获取 monitor,因此若不在synchronized中使用会报IllegalMonitorStateException异常。

接下来反编译synchronized应用于同步代码块和同步方法中的情况

1
2
3
4
5
6
7
8
9
10
public class Sync{
public void synBlock(){
synchronized (this){
System.out.println("synchronized in code block");
}
}
public synchronized void synMethod(){
System.out.println("synchronized in method");
}
}

具体操作为先javac生成对应class文件,然后执行javap -v Sync,首先看应用于同步代码块中的情况

1
2
3
4
5
6
7
8
9
10
3: monitorenter
4: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream;
7: ldc #3 // String synchronized in code block
9: invokevirtual #4 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
12: aload_1
13: monitorexit
14: goto 22
17: astore_2
18: aload_1
19: monitorexit

截取了最重要的输出,可看到同步代码块实现的方式为monitorenter和monitorexit,分别是同步 开始和结束的位置。当执行monitorenter,当前线程会尝试获取对象锁,当计数器为0时,可以成功获取monitor并将计数器置为1,表示持有锁,当已经拥有此锁的时候,可以重入(再次遇到有相同对象锁时,可以获取);若其他线程持有了锁,便会在monitorenter处阻塞,直到其他线程释放锁并将计数器置为0。为了保证monitorenter和monitorexit可以正常配对执行,编译期产生处理器声明可以处理任何异常,目的是为了处理monitorexit指令,正常释放锁,因此多了一个monitorexist。

而对于同步方法,并没有显式的monitorenter等,而是用ACC_SYNCHRONIZED访问标志来区分此方法是否是同步方法,当方法调用时,检查方法此标识是否被设置,如果设置则持有monitor,无论方法是否正常完成,释放monitor。

1
2
3
4
5
6
7
8
9
public synchronized void synMethod();
descriptor: ()V
flags: ACC_PUBLIC, ACC_SYNCHRONIZED
Code:
stack=2, locals=1, args_size=1
0: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream;
3: ldc #5 // String synchronized in method
5: invokevirtual #4 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
8: return

在早期,synchronized属于重量级锁,效率低下,因为监视器锁(monitor)是依赖于底层的操作系统的Mutex Lock来实现的,而操作系统实现线程之间的切换时需要从用户态转换到内核态(kernel),这个状态之间的转换需要相对比较长的时间(时间成本相对较高),是相对于重量级的。而从JDK6以后,其性能有了较大提升。

synchronized优化

锁的膨胀

锁并不会一开始就进入到重量级锁,在JDK6对锁优化后,锁的状态有四种,并且会根据实际情况膨胀升级,膨胀方向为无锁->偏向锁->轻量级锁->重量级锁。

锁只有在GC时才会降级。

偏向锁

在很多情况下,锁不存在多线程竞争,总是由一个线程多次获得。其核心的思想是:当一个线程获得了锁,锁就进入偏向模式,Markword的结构调整为偏向锁模式,当该线程再次请求锁的时候,获取锁的过程只要检查Markword的锁标记位是偏向锁且当前线程id与Markword中ThreadID相等即可,这样便可以省去很多锁申请的操作。

​ 适用条件:无锁竞争的场合,但不适用于比较激烈的多线程场合。偏向锁失败后,在safe point撤销,升级为轻量级锁。

偏向锁默认4s后会开启,没有直接打开是因为程序刚启动的时候,竞争会比较激烈,直接开启偏向锁,影响性能。可以设置偏向锁没有时延,-XX:BiasedLockingStartupDelay=0,设置0延迟。立刻上偏向锁。此时对象是偏向锁,但没有被任何线程占有,称为匿名偏向锁

轻量级锁

​ 轻量级锁适用于线程交替执行同步代码,其依据是“对绝大部分的锁,在整个同步周期内都不存在竞争”,只允许顺序拿锁,但如果有竞争操作,轻量级锁会先尝试自旋,尝试失败后会膨胀为重量级锁。

轻量级锁的加锁过程

  1. 在代码进入到同步代码块的时候,如果同步对象锁是无锁状态(锁标志位是‘01’),虚拟机首先在当前线程的栈帧中建立叫锁空间(Lock Record)的空间用于存储锁对象目前的Mark Word拷贝,官方称为Displaced Mark Word,此时线程堆栈和对象头状态如下
  1. 拷贝对象头中的Mark Word复制到锁记录中

    需要保存轻量级锁之前的Markword原因是,当上轻量级锁后,对象的很多信息在Markword中会被丢失,因此需要被记录下来。

  2. 拷贝成功后,虚拟机将使用CAS操作尝试将对象的Mark Word更新为指向Lock Record的指针,并将Lock Record中owner指针指向object mark word。如果更新成功,执行步骤4,否则执行步骤5

  3. 如果更新成功,当前线程就拥有了该对象的锁,并且对象mark word锁标志设置成‘00’,表示此对象处于轻量级锁状态,此时线程堆栈与对象头的状态如下

  1. 如果更新失败了,虚拟机首先将检查对象的mark word是否指向当前线程的栈帧,是说明当前线程已经拥有了锁,可以直接进入同步块继续执行。否则说明有多个线程竞争锁,先进行一段时间的自旋,如果自旋期间成功获得锁,则执行同步代码。若自旋结束后也没有获得锁,则将膨胀为重量级锁,将锁标志位变成‘10’,此时mark word中存储的就是指向重量级锁的指针,后面等待锁的线程进入阻塞状态。

执行完毕解锁

  1. 通过CAS操作尝试把线程栈帧中复制的Displaced Mark Word对象替换当前的Mark Word
  2. 若替换成功,整个同步过程就完成了
  3. 若替换失败,说明有其他线程尝试过获取该锁(此时锁已膨胀),则要在释放锁的同时,唤醒被挂起的线程

自旋锁

轻量级锁失败后,虚拟机为了避免线程真实地在操作系统层面挂起,还会进行一项称为自旋锁的优化手段。依赖的思想是也许几个CPU周期后就可以获得锁,虚拟机会让当前等待锁的线程做几个空循环(称为自旋的原因),经过固定的若干次循环后,如果得到锁,就进入临界区。反之,就会将线程真实地在操作系统层面挂起。

自旋锁的缺点是:若锁被其他线程长时间占用,会带来许多性能上的开销,因为要一直消耗CPU资源

因此自旋需要有一定的限度,默认是自旋等待10次(-XX:PreBlockSpin)或自旋的线程数超过CPU核数的一半,如果超出限度还没有获取到锁,就要用传统的方法去挂起线程。

对自旋锁有优化,为自适应自旋锁,自旋的次数不再固定,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定,这样JVM依据之前自旋成功率在增加或减少自旋次数,更加精确

重量级锁

如果轻量级锁升级为重量级锁,那么Markword锁标志位从00改为10,Markword中指针指向重量级锁(互斥锁)。依靠的monitor,而monitor在操作系统层面依靠的是mutex,涉及到从用户态到内核态的切换,比较耗时。

比较三种锁如下

优点 缺点 适用场景
偏向锁 加锁和解锁不需要CAS操作,没有额外的性能消耗,和执行非同步方法相比仅存在纳秒级的差距 若线程存在锁竞争,会带来额外的锁撤销的消耗 只有一个线程访问同步块或同步方法
轻量级锁 竞争的线程不会阻塞,提高了程序的响应速度 若线程长时间抢不到锁,自旋会消耗CPU性能 线程交替执行同步方法块或同步方法的场景
重量级锁 线程竞争不使用自旋,不会消耗CPU 线程阻塞,响应时间缓慢,在多线程下,频繁的获取释放锁,会带来巨大的性能消耗 追求吞吐量,同步块或者同步方法执行时间较长的场景

锁消除

上面介绍了第一种优化方式是使用锁膨胀来代替单纯的重量级锁,而第二种方式为使用锁消除。

在JIT(Just-In-Time Compiler)编译时,对运行上下文进行扫描,去除不可能存在竞争的锁

1
2
3
4
5
6
7
8
9
10
public void method1(){
Object o = new Object();
synchronized (o){
System.out.println("method1");
}
}
public void method2(){
Object o = new Object();
System.out.println("method2");
}

在上面代码中,method1中方法虽然被同步修饰,但锁变量为方法内的私有变量,不会被其他线程获取到,因此在编译阶段会被优化掉,反编译得到结果如下

1
2
3
4
public void method1();
descriptor: ()V
flags: ACC_PUBLIC
Code:

可以看到并没有ACC_SYNCHRONIZED标志。

在动态编译同步块的时候,JIT编译器借助逃逸分析(Escape Analysis)技术来判断同步块所使用的锁对象是否只能够被一个线程访问而没有被发布到其他线程。

锁粗化

锁粗化是虚拟机对另一种极端情况的优化处理,通过扩大锁的范围,避免反复加锁和释放锁。

一般设置同步时尽量限制在小的范围,只在共享数据的实际作用域中,减少等待时间。但如果有一连串操作对一个对象反复加锁解锁,甚至加锁操作出现在循环体中,即使没有线程竞争,频繁进行互斥同步锁操作,会降低性能。当JIT此时会将加锁同步的范围扩散(粗化)到整个操作序列的外部

1
2
3
4
5
6
7
8
9
10
11
12
public void test1(){
for (int i = 0; i < 1000000; i++) {
synchronized (this){
}
}
}
public void test2(){
synchronized (this){
for (int i = 0; i < 1000000; i++) {
}
}
}

上面操作中,在循环体中进行了加锁,如果没有锁粗化,那么方法一应该会比方法二慢很多,但测试了两端代码运行时间后,结果为下图

说明方法一种锁范围被扩大至循环体外,验证了锁粗化的效果。

上文中介绍了Java中使用synchrnonized上锁的方式,那么按照面向对象的思想,能否将锁及线程间的通信业对象化 呢?下面将介绍将介绍其增强版-重入锁。

ReentrantLock

ReentranlLock位于java.util.concurrent.locks包,基于AQS(AbstractQueuedSynchronizer队列同步器)实现,AQS是Java并发构建锁或其他同步组件的基础,是JUC包的核心,一般使用AQS的方式为继承,而利用AQS实现同步结构,必须要实现同步结构,至少要实现acquire(实现资源的独占权)和release(释放对资源的独占)。

ReentranlLock特点

ReentrantLock的特点有:

  • 重入锁有着显式的操作过程,必须手动指定何时加锁、释放锁,在逻辑控制的灵活性上好于synchronized。

  • 在退出临界区时必须释放锁。

  • 可以实现比synchronized更细粒度的控制,如控制锁的公平性

    在公平时,倾向于将锁赋予等待时间最长的线程

  • 性能不一定比synchronized高,也是可重入(已持有锁的线程可以再次访问需要同一个锁的代码)的

    在低竞争场合,synchronized性能可能会优于ReentrantLock

ReentrantLock使用

使用可重入锁的流程分为三步:

  1. 上锁
  2. try中写同步执行代码
  3. finally中解锁

处理死锁

之前演示过了synchronized的死锁情况,使用ReentrantLock也可以达到相同的效果,那么有没有方法可以处理死锁呢?

中断响应

对于synchronized,线程等待锁,结果是要么获取锁执行,要么继续等待,而RenentranlLock提供了线程可以被中断的情况,在等待锁的过程中,程序可以根据需要取消对锁的请求,对处理死锁有一定的帮助。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class IntLock implements Runnable{
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
public int lockStatus;
public IntLock(int lockStatus){
this.lockStatus = lockStatus;
}
public void run(){
try {
if (lockStatus == 1){
lock1.lockInterruptibly();
try {
Thread.sleep(500);
}catch (InterruptedException e){}
lock2.lockInterruptibly();
} else {
lock2.lockInterruptibly();
try {
Thread.sleep(500);
}catch (InterruptedException e){}
lock1.lockInterruptibly();
}
System.out.println(Thread.currentThread().getName()+"线程完成任务");
}catch (InterruptedException e){
e.printStackTrace();
}finally {
if (lock1.isHeldByCurrentThread())
lock1.unlock();
if (lock2.isHeldByCurrentThread())
lock2.unlock();
System.out.println(Thread.currentThread().getId()+"线程退出");
}
}
}
//main函数中
IntLock lock1 = new IntLock(1);
IntLock lock2 = new IntLock(2);
Thread t1 = new Thread(lock1);
Thread t2 = new Thread(lock2);
t1.start();
t2.start();
Thread.sleep(1000);
t2.interrupt();

lock1与lock2容易形成死锁,lockInterruptibly方法可以对中断进行相应的锁申请动作,在等待锁的过程中可以响应中断。当t2被中断的时候,t2会放弃对锁lock1的申请,同时释放已经获得的lock2,这样t1线程就可以顺序获取lock2而执行下去。

锁申请等待限时

除了等待外部通知将线程中断外,避免死锁还可以使用限时等待。给定等待时间,让线程自动放弃, 使用tryLock()方法进行限时的等待,tryLock()方法接收两个参数,等待时长和计时单位。在请求等待时间之内,如果请求成功,返回true,否则返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class TryLock implements Runnable{
public static ReentrantLock lock = new ReentrantLock();
@Override
public void run(){
try {
if (lock.tryLock(5, TimeUnit.SECONDS)){
Thread.sleep(6000);
System.out.println(Thread.currentThread().getName()+"完成任务");
}else {
System.out.println(Thread.currentThread().getName()+"任务失败");
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if (lock.isHeldByCurrentThread())
lock.unlock();
}
}
}

当创建两个线程去执行此任务时,会发现线程2执行失败,线程1执行成功。因为先持有锁的线程会持有锁6秒,而另一个线程等待锁只有5秒钟(5,TimeUnit.SECONDS),因此后请求锁的线程请求失败。

此方法也可以空参数运行,表示当前线程尝试获取锁,如果锁未被其他线程占用,申请锁成功,并立刻返回true;若锁被其他线程占用,当前线程不会等待,立刻返回false。这种模式不会引起线程等待,也不会引起死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class TryLockDeadLock implements Runnable{
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;
public TryLockDeadLock(int lock){
this.lock = lock;
}
@Override
public void run(){
if (lock == 1){
while(true){
//上锁1
if (lock1.tryLock()){
try {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
//上锁2
if (lock2.tryLock()){
try {
System.out.println(Thread.currentThread().getName()+"job done");
return;
}finally {
lock2.unlock();
}
}
}finally {
lock1.unlock();
}
}
}
}else{
while(true){
//上锁2
if (lock2.tryLock()){
try {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
//上锁1
if (lock1.tryLock()){
try {
System.out.println(Thread.currentThread().getName()+"job done");
return;
}finally {
lock1.unlock();
}
}
}finally {
lock2.unlock();
}
}
}
}
}
}

死锁代码:利用条件判断,第一个线程先用锁1,再用锁2;第二个线程先用锁2,再用锁1。使用tryLock()后,线程不会一直处于阻塞状态,会不停尝试,只要执行足够长的时间,线程会得到需要的资源(同时获得两个所),从而正常执行。但结果不总是会正常完成,有时还是会出现死锁。

公平锁

在大多数情况下,锁的申请都是非公平的,即不会根据等待时间来挑选执行线程。而公平锁的特点是:不会产生饥饿现象,不会出现一线程长时间等待获取不到CPU分配时间的情况。

synchronized是非公平锁,而ReentrantLock可以设置为公平锁,将fair设置为true即可。

1
public ReentrantLock(Boolean fair)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class FairLock implements Runnable{
public static ReentrantLock fair = new ReentrantLock(true);
public void run(){
while (true){
//1、上锁
fair.lock();
//2、try中运行代码
try {
System.out.println(Thread.currentThread().getName()+"工作");
}
//3、finally中释放锁
finally {
fair.unlock();
}
}
}
}

这样,当开启两个线程时,两个线程轮流输出,几乎不会发生同一个线程多次获得锁的可能,而如果使用非公平锁会出现一个线程多次获得锁进行输出。实现公平锁要维护一个有序队列,实现成本比较高,性能比较低下,默认可重入锁是非公平的,若无特殊需求不使用公平锁。

Condition

在synchrnozied中,有wait、notify和notifyAll搭配使用进行线程间的通信,而ReentrantLock中也有,那就是Condition,Condition与重入锁相关联,通过newCondition()方法可获取Condition对象,一个锁可以获得多个Condition,类似的,Condition中提供了await()、signal()、signalAll()方法。

下面将基于重入锁和Condition来实现多生产多消费者模式。核心是一个锁,两个监视器,在要同步的方法中,先上锁,然后在try中写同步代码,最后在finally中解锁。其中利用标志变量来判断当前线程是否休眠。生产者和消费者类实现Runnable接口,在run()方法中调用资源类的方法即可。要注意的是,生产完后,要唤醒消费线程;消费完后,要唤醒生产线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class LockResource{
//一个锁,两个监视器
public static ReentrantLock lock = new ReentrantLock();
Condition proCondition = lock.newCondition();
Condition conCondition = lock.newCondition();
public boolean flag = false;
public int count = 0;
public String name;
public LockResource(String name){
this.name = name;
}
//同步set方法
public void set(){
//1、上锁
lock.lock();
//2、在try中写同步代码
try {
while (flag){
try {
proCondition.await();
} catch (InterruptedException e) {}
}
//进行生产
count++;
flag = true;
System.out.println(Thread.currentThread().getName()+"生产"+name+count);
//唤醒消费线程
conCondition.signal();
}
//3、在finally中解锁
finally {
lock.unlock();
}

}
//同步消费方法
public void out(){
//1、上锁
lock.lock();
//2、在try中写同步代码
try {
while (!flag){
try {
conCondition.await();
} catch (InterruptedException e) { }
}
//进行消费
flag = false;
System.out.println(Thread.currentThread().getName()+"消费"+name+count);
//唤醒生产线程
proCondition.signal();
}
//3、在finally中解锁
finally {
lock.unlock();
}
}
}
class ProducerRe implements Runnable{
private LockResource r;
public ProducerRe(LockResource r){
this.r = r;
}
@Override
public void run(){
while (true){
r.set();
}
}
}
class ConsumerRe implements Runnable{
private LockResource r;
public ConsumerRe(LockResource r){
this.r = r;
}
@Override
public void run(){
while (true){
r.out();
}
}
}

读写锁(TO DO)

写锁饥饿,如何避免

synchronized和ReentrantLock区别

  1. synchronized是关键字,ReentrantLock是类
  2. synchronized的加锁解锁是隐式的,而重入锁的显式的
  3. ReentrantLock可以设置为公平锁,synchronized不行
  4. ReentrantLock可以设置锁的等待时间,避免死锁
  5. 底层机制:sync操作对象头中Mark Word,lock调用Unsafe类中方法

AQS

AbstractQueuedSynchronizer,队列同步器,是JUC下许多同步类的核心。为了理解其工作原理,需要先大致了解下LockSupport与CLH队列锁。

LockSupport工具类

提供park方法来阻塞一个线程,提供unpark方法来唤醒一个线程

CLH队列锁

有2个关键属性,一个是myPred,指向前一个节点,一个是locked,如果是true表示当前线程要获取锁,如果为false表示释放了锁。当一个线程要获取锁的时候,加入到队列尾部,当前线程的myPred指向前一个线程的locked,当前线程循环检测前一个线程的状态,如果为true表示锁没有被释放,如果为false,表示锁被释放了,这时候去获取锁,当用完后,释放锁,将locked改为false。

AQS原理

AQS面向锁的开发者,在自定义锁的实现类的时候,一般使用一个静态内部类来继承AQS,对外只暴露一些必要的使用方法。AQS本质使用的是模板方法模式,AQS中的acquire与release方法,其内部用到的一些方法需要子类去实现,这样子类只需要实现必要的方法,其他由AQS内的方法来调用。AQS中凡是抛出了特定异常的方法,都需要自己去实现。

AQS中获取锁的核心方法acquire如下

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

AQS核心是volatile int state(表示锁的状态)与同步队列(思想与CLH队列类似)。

AQS提供CAS方法来获取与更改锁的状态。

而等待同步线程队列是双向链表,包含头尾结点,对于每个结点,属性包括有

  • 当前等待线程
  • 当前线程获取锁模式(独占or共享)
  • 当前线程等待状态
    • CANCELLED:获取锁状态取消
    • SIGNAL:等待锁释放
    • CONDITION:等待某条件被满足
    • PROPAGATE:传播,当处于共享模式,将当前状态传播给其他线程
  • 前驱与后序结点
  • 等待Condition的Node结点

当一个线程要加入等待队列的尾节点时,使用CAS操作添加到队列尾部,如果没有成功,自旋重新读取尾节点进行加入。头结点释放锁后,将head下一个变为头结点下一个。

核心流程

独占非公平锁

一个线程尝试使用CAS操作改变state的数值,如果成功从0改到1,持有锁。如果没有成功,采用CAS操作将当前线程封装成结点加入等待队列,若CAS操作没有成功,则进行自旋直到加到队列尾部。

若队列中上一个结点是头结点,再次CAS尝试获取锁,若获取成功,将当前结点设置为头结点。若失败后需要被阻塞,则使用LockSupport将当前线程阻塞住。当拿到锁的线程释放锁后,将后面结点的线程唤醒。即在release方法中,当释放锁后,唤醒头结点后面的线程。被唤醒的线程会继续自旋使用CAS操作来获取锁。

共享非公平锁

在获取锁的时候,在一个线程获取锁后,只要返回的数字大于0,就一直向后传播,唤醒后面的线程。

可重入

如果获取锁的线程再次请求锁,将state的值加一即可;释放一次锁,将 state值减一。

公平与非公平

非公平锁:如果此时锁状态为0,非等待队列中的线程到来后,可以直接获取锁并执行,可能会造成线程饥饿问题。公平锁:若可以获取锁,将当前线程切换到等待队列头的线程,这样保证按照请求锁的顺序来执行。但公平锁涉及线程的切换,开销较大,一般使用非公平锁。

JMM内存可见性

Java内存模型(Java Memory Model,简称JMM)是抽象概念,描述一组规范,围绕原子性有序性可见性展开。通过此规范定义了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式。

原子性:java语言规范中,各种变量的简单赋值操作,规定都是原子性的。32位虚拟机里的long/double类型的变量的简单赋值写操作,不是原子性,因为long与double是64位。可能多线程下两个线程同时赋值高32位与低32位。volatile对32位虚拟机里,对long/double变量赋值写可以保障是原子性。

为了解决硬件内存的有限性与读取速度的需求,内存的设计分为CPU三级缓存与主内存。当要进行数据的操作时,先将数据从主内存中加载到工作内存,然后进行操作,在工作内存中赋值,再加载并写回到主内存中。

read load use assign store write

原子性

原子性指一个操作不可中断,即使是多个线程一起执行的时候,一个操作一旦开始就不会被其他线程干扰。

可见性

可见性指当一个线程修改了某个共享变量的值,其他线程能够立刻知道这个修改。以下情况会导致可见性问题

  • 缓存优化

    CPU改动后的值被缓存在cache或寄存器中,另一个CPU读到值没有更新

  • 硬件优化

    一些内存读写不会立即触发,而会先进入一个硬件队列等待

  • 指令重排及编辑器优化

有序性

在并发时,程序的执行可能会出现乱序,原因是程序在执行时可能会进行指令重排,重排后的指令与原指令未必一致。

线程将变量从主内存中拷贝至线程自己的本地内存进行操作,操作完成后再放回主内存。主内存和工作内存的关系是,工作内存是主内存的备份,工作内存是线程的私有区域。因此线程间的传值必须依靠主内存来完成。

JMM中主内存和工作内存

JMM中的主内存

  • 存储Java实例对象
  • 包括成员变量、类信息、常量、静态变量等
  • 属于数据共享的区域,多线程并发操作会引发线程安全问题

JMM中的工作内存

  • 存储当前方法的所有本地变量信息,本地变量对其他线程不可见
  • 字节码行号指示器,Native方法信息
  • 属于线程私有区域,不存在线程安全问题

主内存与工作内存的数据类型及操作方式

  • 方法里的基本数据类型本地变量将直接存储在工作内存的栈帧结构中
  • 引用类型的本地变量:引用存储在工作内存中,实例存储在主内存中
  • 成员变量、static信息、类信息均存储在主内存中
  • 主内存共享的方式是线程各拷贝一份数据到工作内存,操作完成后刷新回主内存

前面提到指令重排会影响可见性和有序性,但指令重排对提高CPU性能非常重要,那什么时候不能够指令重排呢?答案是无法通过happens-before原则推导出来的,才能进行指令重排。在JVM内部的实现,通常依赖于内存屏障,通过禁止某些重排序的方式,提供内存可见性保证,即实现了各种happens-before的规则。下面将简单介绍下happens-before原则。

happens-before

  1. 程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作;(先来后到)
  2. 锁定规则:一个unlock操作先行于后面对同一个锁的lock操作;(解锁后才能上锁)
  3. volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作;(写好才给看)
  4. 传递规则:A先于B,B先于C,则A先于C;(排队)
  5. 线程启动规则:线程的start()方法先于它的每一个动作;(不开始不许动)
  6. 线程中断规则:线程的中断(interrupt())先于被中断的代码检测到中断的发生;
  7. 线程终结规则:线程的操作先于线程的终结(Thread.join());
  8. 对象终结规则:对象的初始化先于finalized。

值得注意的是,第1条的次序规则只在单线程下有效,多线程中不一定保证。第3条volatitle保证了线程的可见性,写一定先于读。第6条线程A若对线程B设置中断,线程B马上可以知道。第8个要保证对象的字段要在初始化后可见。

volatile

为了在适当的场合保证线程间的原子性、有序性和可见性,volatile关键字是JVM提供的轻量级同步机制,对于保证操作的原子性有很大的帮助,但其不能替代锁,也无法保证一些复合操作的原子性。

  • 保证被volatile修饰的共享变量对所有线程总是可见
  • 禁止指令重排序优化

如果操作volatile修饰的变量是复合的(多条操作),如value++这种(先读,后自增),即使value被volatile修饰也会引发线程安全问题。synchronized会创建内存屏障,保证所有CPU结果刷到主内存中,可以保证操作的内存可见性,因此在synchronized代码块或方法中操作的变量可以省略volatile修饰。而如果对一个变量的操作是原子性的,如操作布尔类型的变量,加上volatile就可以实现可见性,实现线程安全的目的,此时可以省略synchronized。

如何保证可见性(lock+MESI)

当写一个volatile变量时,JMM把该线程对应的工作内存中的共享变量刷新到主内存中;当读取一个volatile变量时,JMM会把该线程对应的工作内存置为无效,只能从主线程中重新读取。

JVM发送lock前缀指令给CPU,CPU在计算完后会立即将这个值写回主内存,同时因为有MESI一致性协议,各个CPU都会对总线进行嗅探,看自己本地缓存中数据是否被修改。若被修改,CPU会将本地缓存数据过期掉,这个CPU上执行线程要读取变量时,从 主内存中重新加载最新的数据。

lock前缀指令 + MESI

涉及到CPU的结构,CPU一般有3级缓存,每个CPU内有2级缓存,多核CPU共享3级缓存,而3级缓存共享主内存。CPU在读取数据是,从1级缓存,2级,3级,主内存这样的顺序读取,写入顺序是主内存,3级,2级,1级。在读取数据的时候,根据局部性原理会多读取一些数据,按照一块64字节的cache line即缓存行来读取。CPU级别的内存可见性是以缓存行为单位的。在常用的Intel CPU内,是依靠MESI缓存一致性协议来保证内存可见性的。MESI是CPU缓存的4种状态:

  • Modified
  • Exclusive
  • Shared
  • Invalid

而使用了缓存行一致性后,会存在一个问题,如果线程1要修改x变量,线程2要修改y变量,若x与y变量在一个缓存行中,线程1修改了x变量后要通知线程2,线程2修改了y变量后要通知线程1,这时候就比较浪费效率。解决办法是缓存行对齐,即故意添加一些无用数据,使得x与y不在一个缓存行中,如Disruptor,使用了缓存行对齐来提高运行效率。下图为其源码中使用到了缓存行对齐的部分:

总的来说,对于实现可见性方面,系统底层的实现为:

  • 使用MESI缓存一致性
  • 锁总线(数据量超过缓存行)

CPU在写数据时,可以写到寄存器、写缓冲器,高速缓存,不同硬件缓存行一致性协议不同,有flush处理器缓存与refresh处理器缓存。

flush处理器缓存,将写缓冲器中的值写到高速缓存或主内存,这样才能让其他处理器获取到更新值。发送消息到总线(bus),通知其他处理器,某个变量的值被修改了。

refresh处理器缓存,处理器线程在读取一个变量值时,如果发现其他处理器的线程更新了变量的值,必须从其他处理器的高速缓存或主内存里,读取最新值更新到自己的高速缓冲中。

为了保证可见性,底层通过MESI协议,flush处理器缓存和refresh处理器缓存一整套机制来保障的。

在JIT动态编译时,会进行指令的重排序。在处理器执行指令的时候,将编译好的指令一条条读取到处理器中,那个指令就绪可以执行就先执行,不按照代顺序来。每个指令的结果放到一个重排序处理器中,重排序处理器把各个指令的结果按照代码顺序应用到主内存或写缓冲器中 。

如何禁止重排序(内存屏障)

javac编译与JIT编译指令,可能会调整顺序。cpu执行指令时,有四种乱序

1、storestore

2、loadload

3、loadstore

4、storeload

内存屏障(Memory Barrier)

  1. 保证特性操作的执行顺序
  2. 保证某些变量的内存可见性

通过插入内存屏障禁止指令在内存屏障前后的指令执行重排序优化;强制刷出各种CPU的缓存数据,因此任何CPU上的线程能够读取到这些数据的最新版本。

  1. 源码层面 volatile

  2. 字节码层面 ACC_VOLATILE标记

  3. jvm层面,jvm规范要求加内存屏障

    sfence(写屏障) mfence(全屏障) lfence(读屏障)等系统原语

    • 写操作前加屏障,写完才能读
    • 读操作前加屏障,读完才能写
  4. hot spot层面

    没有使用fence,使用lock addl指令

在volatile遍历写操作前加入Release屏障,在之后加入一个Store屏障,保证volatile写跟Release屏障之前的任何读写操作不会被指令重排,Store屏障保证了写完数据后,立马会执行flush处理器缓存的操作。

在volatile变量读操作前加入一个Load屏障,保证对变量读取时,如果被别的处理器修改过了,需要从其他处理器的高速缓存(或主内存)中加载到自己的本地高速缓存中,保证读取的是最新数据,在之后会加入一个Acquire屏障,禁止volatile读操作之后的任何读写操作会跟volatile读指令重排序。

Acquire屏障其实是LoadLoad屏障+LoadStore屏障,Release屏障其实是StoreLoad屏障+StoreStore屏障。

核心为volatile读写前后会加内存屏障,避免指令重排。

高速缓存结构

高速缓存底层数据结构为拉链散列表,有很多bucket,一个bucket下挂载很多cache entry,每个cache entry由三个部分组成:tag、cache line与flag。cache line是缓存的数据。tag指向缓存数据在主内存中数据地址,flag表示缓存行的状态,cache line中可以包含多个变量的值。

处理器在读写高速缓存时,会根据变量名执行内存地址解码的操作,解析出来3个,index、tag和offset。index用于定位高拉链散列表的某个bucket,tag用于定位cache entry,offset定位一个变量在cache line 中的位置。

若可以成功定位到一个高速缓存中的数据,且flag还标志有效,则缓存命中;否则未命中。若缓存未命中,则重新从主内存加载数据到高速缓存中。现在处理器一般由三级高速缓存,约靠近前面的缓存读写数据越快。

单例双重检测

广为人知的单例模式懒汉式在多线程下是不安全的,因此有如下的双重检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Singleton{
private static Singleton instance;
private Singleton(){}
public static Singleton getInstance(){
if(instance != null){
synchronized(Singleton.class){
if(instance != null){
instance = new Singleton();
}
}
}
return instance;
}
}

其中要点是,此类中持有的instance是私有静态的,因为不能被其他类直接获取而且不能通过对象获得。构造函数要私有。获取实例的对象要公共静态,为了直接通过类名调用。锁对象为单例类的class对象而双重检测,同步是为了线程安全,外层判断是为了提高效率。但是这样依然会存在风险。

原因 :instance = new Singleton();可以分为三步

1
2
3
memory = allocate();//1、分配对象内存空间
instance(memory); //2、初始化对象
instance = memory //3、设置instance指向刚分配的内存地址,此时instance=null

而因为步骤2和步骤3之间没有happens-before关系,因此是可以被重排序的,这样排序后的顺序为

1
2
3
memory = allocate();//1、分配对象内存空间
instance = memory //3、设置instance指向刚分配的内存地址,此时instance!=null
instance(memory); //2、初始化对象

这时候,会出现instance指向了分配的空间已经不为null,执行了默认初始化,但还没有显式初始化对象,如果此时线程被切走,再次进来的线程判断instance不为空,直接将instance返回,出现问题。

因此解决办法是不让这部分指令重排,即让instance被volatile修饰。因此完整的线程安全的单例双重检测如下,其中volatile与static顺序没有要求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public Singleton{
private volatile static Singleton instance;
private Singleton(){}
public static Singleton getInstance(){
if(instance != null){
synchronized(Singleton.class){
if(instance != null){
instance = new Singleton();
}
}
}
return instance;
}
}
中间件应用

在许多中间件里面,会大量使用到volatile关键字

1
2
3
4
5
6
7
8
9
10
11
12
13
class Kafka{
private static volatile boolean running = true;
public static void main(String[] args){
Kafka kafka = new Kafka();
// 执行代码
while(running){
Thread.sleep(1000);
}
}
public void shutdown(){
running = false;
}
}

为了避免中间件程序自己关闭,设置volatile的变量来让其阻塞,为了停止,可以将此变量状态进行更改即可,如果没有volatile修饰,则没有可见性,更改无法生效。

volatile与synchronized

  1. 二者本质不同。volatile为告诉JVM当前变量在寄存器(工作内存)中的值是不确定的,需要从主存中读取;synchronized则是锁定当前变量,只有当前线程可以访问该变量,其他线程被阻塞住直到该线程完成变量操作为止。
  2. 使用级别不同。volatile仅能使用在变量级别,而synchronized可以使用在变量、方法和类级别
    • synchronized(A.class) //修饰类
    • synchronized void get() //修饰方法
    • synchronized(o) //修饰变量
  3. volatile仅能实现变量的修改可见性,不确保原子性;而synchronized可以保证变量修改的可见性和原子性
  4. volatile不会造成线程阻塞,而synchronized会
  5. volatile标记的变量不会被编译器优化,synchronized标记的变量可以被编译器优化

无法禁止指令重排的synchronized如何保证有序性

synchronized内部代码块会有指令重排,但synchronized内部与外部代码不会发生指令重排,原因是monitorenter后有acquire内存屏障,monitorexit前有release内存屏障。

  1. 有序性是什么

对于有序性的理解为,本线程内观察,所有操作都是天然有序的;若在另一个线程中观察此线程,所有操作都是无序的。在本线程中有序是因为根据as-if-serial语义,不管怎么重排序,单线程程序执行的结构都不能被改变,因此可以认为重排序在单线程内部可忽略。

  1. synchronized如何保障有序

而synchronized虽然不禁止指令重排,但是其为排他的可重入锁,一个线程持有了锁后,其他线程是不能访问到临界区资源的,这样被synchronized修饰的方法相当于单线程执行,因此符合as-if-serial语义下的有序性。

  1. 指令重排的原因与影响

更具体来讲,synchronized虽然保证了单线程内的有序性,但因为指令的重排序,一个线程操作的结果对另一个线程会影响。那么为什么要指令重排序呢?

  • 指令重排存在的原因是计算机为了提升一些语句的处理效率,会将不满足happens-before的语句进行重排序,提高了处理效率(在一些语句停顿的时候先加载其他语句)。heppens-before保证的是可见性。

在多线程环境中,由于有多个线程的并发操作,可能会导致线程安全问题,这样在多线程下,语句的重排序可能会造成线程安全问题,如懒汉式instance不被volatile修饰的问题。

  1. 如何解决

为了不让某些语句进行重排序,使用volatile关键字,利用内存屏障禁止相关语句重排序,这样就保证了语句的有序性。

因此从宏观上,synchronized通过排它锁保证了单线程下的有序性,但因为此线程的结果会对其他线程造成影响,有时候synchronized需要配合volatile一起使用。这样才能更好的达到有序性。

而CAS操作,其为轻量级的锁,一般需要让临界区资源变量被volatile修饰,保证其可见性,然后通过compare and set的方式对其进行操作。

CAS(Compare and Swap)

​ CAS叫做自旋锁或无锁(无重量级锁),是乐观锁最常见的实现方式,默认没有数据冲突,如果有就回滚数据并重新提交。

CAS是一种高效实现数据安全性的方法

  • 支持原子更新操作,适用于计数器,序列发生器(给变量自增的工具)等场景
  • 属于乐观锁机制,号称lock-free(底层仍有加锁行为)
  • CAS操作失败时由开发者决定是否继续尝试,还是执行其他操作

CAS思想及原理

包含有三个操作数——内存位置(V),预期原值(A)和新值(B)

将内存位置(主内存)的值与预期原值比较,如果匹配则自动将该位置值更新为新值;否则进行自旋,重新获取值再进行操作。

对于volatile修饰的变量,虽然保证了内存可见性,但不能保证操作原子性,反编译如下代码

1
2
3
4
5
6
public class CASTest {
public volatile int value;
public void add(){
value++;
}
}

反编译结果部分展示如下

1
2
3
4
2: getfield      #2                  // Field value:I
5: iconst_1
6: iadd
7: putfield #2 // Field value:I

可以看到自增这个操作是分为先取值,再自增,再将值放回去的,因此这种非原子性的操作容易引发线程安全问题。而之前介绍的将add方法加上synchronized关键字实现悲观锁可以解决,但是效率相对较低,此时也可以使用CAS这种乐观锁来实现。Java中的AtomicInteger类便可以在不使用悲观锁的情况下保证线程安全,引用其实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class AtomicInteger extends Number implements java.io.Serializable {  
private volatile int value;

public final int get() {
return value;
}

public final int getAndIncrement() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return current;
}
}

public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
//存在自旋
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}
}

其中getAndIncrement相当于自增操作,那么来看CAS操作是如何实现操作的安全性的。首先value被volatile修饰,这样保证了内存的可见性,因此当前数current为从主内存中刷出来的数据(CAS中的A),next为其操作后变成的数据(CAS中的B),然后调用了unsafe类的compareAndSwapInt方法,其中unsafe类为底层实现,其中为native方法,其实现逻辑类似于如下

1
2
3
4
5
6
if (this == expect) {
this = update
return true;
} else {
return false;
}

即如果期望值A与主内存处为同一地址,那么就将主内存中的地址修改为更新后的值位置;否则不做修改。因此CAS的关键在于将期望值与主内存中值进行比较,确保值没有变动才进行修改。

而unsafe类中,看其C++源码,发现更底层使用的汇编指令cmpxchg1,是非原子的,存在问题是如果有线程比较完后来写入数据的时候,又有线程进行了数据写入,则仍然不安全,因此会有指令LOCK_IF_MP,multi processor如果是多核处理器,会在这条指令前加上lock指令。因此CAS最底层的指令为lock cmpxchg,锁定的是北桥信号,没有锁总线。

在J.U.C包中的atomic包中提供了常用的原子性数据以及引用、数组等相关原子类型和更新操作工具,是很多线程安全服务的首选。那么CAS这么好用,有没有什么缺点呢?

CAS缺点及解决方式

  • CAS中存在自旋等待,若循环时间长,开销很大,浪费CPU资源
  • 由于CAS比较的为this对象,因此只能保证一个共享变量的原子操作
  • ABA问题

下面将详细介绍ABA问题。ABA问题描述为如果内存地址V初次读取到为A,在赋值检查时仍为A,不能保证此值没有被其他线程修改过,如果曾经被改为B再改回来,CAS操作会误认为此值没有被改变过,此漏洞就是CAS的ABA问题。

那么该如何解决呢?

1、ABA问题

对于解决ABA问题,JDK1.5后后atomic包中提供了类AtomicStampedReference类,增加了版本号的比较,在比较值的同时,再进行版本号的比较。若存在ABA问题的应用场景,使用传统的互斥同步会更高效点。

1
2
3
4
public boolean compareAndSet(V   expectedReference,
V newReference,
int expectedStamp,
int newStamp)

版本号的使用如下(使用lambda表达式,()->{方法代码})

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public static void main(String[] args) {
//新建类,初始化值
AtomicStampedReference at = new AtomicStampedReference<>(2,0);
//新建两个任务,执行方法
//任务1,执行ABA操作
new Thread(()->{
int stamp = at.getStamp();
System.out.println("线程1第一次拿到的版本号"+stamp);
//暂停一段时间,让线程2拿到相同的版本号
try {
Thread.sleep(1000);
} catch (InterruptedException e) { }
//进行A->B操作
at.compareAndSet(2,5,at.getStamp(),at.getStamp()+1);
System.out.println("线程1第一次操作后的版本号"+at.getStamp());
//进行B->A操作
at.compareAndSet(5,2,at.getStamp(),at.getStamp()+1);
System.out.println("线程1第二次操作后的版本号"+at.getStamp());
}).start();

new Thread(()->{
int stamp = at.getStamp();
System.out.println("线程2第一次拿到的版本号"+stamp);
//暂停,让线程一完成ABA操作
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
//进行A->C操作,看能否操作成功
boolean b = at.compareAndSet(2,10,stamp,stamp+1);
System.out.println("当前内存中最新值为"+at.getReference());
System.out.println("线程2处理完ABA问题后的结果为:"+b);
}).start();
}

这样当线程1执行完ABA操作后,虽然期待值与主内存中值相等,但是版本号变化了,因此当线程2用旧的版本号进行操作的时候,无法成功执行CAS操作,执行结果为

2、循环时间长

以下解释引用自博客

若JVM可以支持处理器提供的pause指令效率会有一定提升。pause指令的作用是:1、延迟流水线执行指令,使CPU不消耗过多的执行资源;2、避免在退出循环时因内存顺序冲突而引起CPU流水线被清空,提高CPU执行效率。

3、只能保证一个共享变量的原子操作

可以将多个变量合并为一个共享变量进行操作,JDK1.5后提供了AtomicReference类,可以保证引用对象之间的原子性,因此可以将多个变量放在一个对象中进行CAS操作。

1
2
3
public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}

CAS与synchronized

在synchronized未被改进前,synchronized在资源竞争较少时,线程被阻塞在操作系统层面被挂起开销较大,因此CAS更适用于竞争较少的场合;而竞争较多的场合,CAS资源几率大,效率低于synchronized。

而synchronized在JDK1.6后被改进,有了前面介绍过的锁的膨胀机制,在竞争资源较少时也能获得与CAS类似的性能。

concurrent包的实现

concurrent包有通用化的实现模式

  1. 声明共享变量为volatile(保证内存可见性)
  2. 使用CAS的原子条件更新实现线程间的同步
  3. 配合以volatile的读/写和CAS所具有的读和写的内存语义来实现线程间的通信

关于锁的实现先说到这里,接下来说下多线程非常常用的线程池。

线程池

在之前使用多线程时,依赖的是新建多个Thread对象,然后传递任务对象,再开启线程。

但是线程的创建与销毁开销是比较大的,为了重复利用线程,提高效率,便有了线程池。为了了解线程池,从为什么要用线程池(优势),怎么用线程池(方式与种类),重要参数来说明。

线程池优势

  • 降低资源消耗

    重复利用已创建的线程来降低线程创建和销毁造成的消耗

  • 提高线程的可管理性

    线程是稀缺资源,无限制的创建会消耗系统资源,降低稳定性,使用线程池可以统一的分配、调优和监控

使用线程池后,创建线程变成从线程池中获得空闲线程,关闭线程变成向线程池归还线程。

常见线程池种类

JDK提供了Executor框架来更好的控制线程池,下面先介绍5种常用的线程池工作方法。

1、指定工作数量的线程池

newFixedThreadPool(int nThreads)

每当一个任务去创建一个线程,如果工作线程数量达到线程池的初始最大数,将提交的任务存储池队列中;如果工作线程退出,将会有新的工作线程被创建,补足nThread的数目。

适用于可以预测线程数量的任务中,或者服务器负载较重,对当前线程数量进行限制

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}

2、处理大量短时间工作任务的线程池

newCachedThreadPool()

(1)试图缓存线程并重用,当无缓存线程可用时,就创建新的线程

(2)如果线程闲置的时间超过阈值(一般60s),则会被终止并移出缓存

(3)系统长时间闲置的时候,不会占用什么资源

适用于服务器负载较轻,执行很多短期异步任务

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}

3、唯一的工作线程池

newSingleThreadExecutor()

创建唯一的工作线程来执行任务,如果线程异常结束,会有另一个线程取代它。可保证顺序的去执行各个任务,且在给定的时间内不会有多个线程是活动的。

适用于需要保证顺序执行各个任务,并且在任意时间点,不会有多个线程是活动的场景。

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue()));
}

4、定时或者周期性的工作调度

newSingleThreadScheduledExecutor()与newScheduledThreadPool(int corePoolSize)

二者区别在于单一工作线程还是多个线程,前者与newSingleThreadExecutor()一样,若线程异常结束,会有另一个线程来取代它确保顺序执行。

适用于需要后台线程执行周期任务的场景。

1
2
3
4
5
6
7
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

5、拥有多个任务队列的线程池

newWorkStealingPool()

内部创建ForkJoinPool,利用work-stealing算法,并行地执行任务,不保证处理顺序,JDK8引入。

可以减少连接数,创建当前可用CPU数量的线程来并行执行,适用于大耗时的操作,可以并行来执行。

1
2
3
4
5
6
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

ForkJoin框架

将大任务切割成若干个小任务并发执行,最终汇总每个小任务的结果后得到大任务结果的框架。

是Executor接口的一种具体实现,更好利用多处理带来的好处,使用分治法解决问题。使用的是work-stealing算法,具体是从其他任务队列中窃取任务来执行。将任务分别放在不同的队列中,为每个队列来创建一个线程执行任务。会出现某些队列任务已经完成而其他队列任务还没有完成的情况,这时候为了提高效率,完成任务的处于空闲状态的线程会从其他处于busy状态的线程处窃取等待执行的任务。为了减少窃取和被窃取任务的竞争,使用双端队列,被窃取任务线程从双端队列头部拿任务执行,而窃取任务线程从双端队列尾部拿任务。

下面将介绍JDK中的Executor框架

Executor框架

Executor框架将任务的提交与运行进行分离,在Java标准库中提供了Executor、ExecutorService和ScheduledExecutorService三个接口和基础实现。如Executors类扮演线程池工厂的角色,通过Executors可以获得拥有特定功能的线程池。ThreadPoolExecutor类实现Executor接口,表示一个线程池,通过此接口,任何Runnable的对象均可以被ThreadPoolExecutor线程池调度。以上成员均在J.U.C包中。

J.U.C的三个Executor接口

Executor

运行新任务的简单接口,将任务提交与任务执行细节解耦。下面只有一个execute(Runnable command)方法,线程执行有多种可能

  • 可能为创建一个新线程并立即启动
  • 可能是用已有的工作线程来运行传入的任务
  • 可能是根据设置线程池的容量或阻塞队列的容量来决定将线程放入阻塞队列或接收传入的线程
1
void execute(Runnable command);
ExecutorService

具备管理执行器和任务声明周期的方法,提交任务机制更完善。扩展了Executor接口,如可以返回Future的submit方法,Callable弥补了Runnale没有返回值的短板

1
2
Future submit(Runnable task);
Future submit(Callable task);
ScheduledExecutorService

扩展了ExecutorService,支持定期执行任务。

线程池使用

展示固定大小的线程池的使用为例子,基本思路仍为创建任务类实现Runnable接口,实现run()方法,然后利用Executors线程工厂来获取线程池,利用线程池的submit(task)方法来执行任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class MyTask implements Runnable{
@Override
public void run() {
System.out.println(System.currentTimeMillis()+": Thread ID"+Thread.currentThread().getId());
//休息一秒钟
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ExecutorTest {
public static void main(String[] args) {
//1、新建线程池
ExecutorService es = Executors.newFixedThreadPool(5);
//将任务放入线程池,并执行10次
MyTask task = new MyTask();
for (int i = 0; i < 10; i++) {
es.submit(task);
}
}
}

其中这里让一个线程执行后休眠1秒钟,然后线程池大小设置为5,让线程池执行10次任务,可以看到任务5个5个分批次执行,而且每一批都为相同的几个线程,结果如下所示。

但是如果使用的是newCachedThreadPool(),则可以10个线程一批执行完。

但是有些时候,Executors工厂类中提供的线程池不能满足需求,这时候可以通过ThreadPoolExecutor构造函数去创建线程池。

ThreadPoolExecutor

线程池整个的应用逻辑为:线程池由工作队列WorkQueue来存储各个队列提交的工作任务。工作队列可以是容量为0的SynchronizedQueue,主要用于newCachedThreadPool;也可以是newFixedThreadPool中的LinkedBlockingQueue。队列接到任务后,排队提交给线程池,即工作队列的集合,该集合负责在运行的过程中管理线程的创建和销毁。线程池的工作线程被抽象为静态内部类WorkerThread,线程池维护的其实是一组Worker对象

Worker类是ThreadPoolExecutor类中的内部类,继承自AQS,实现了Runnable,由firstTast保存传入的任务,thread保存创建出来的线程。

1
2
3
4
5
6
7
8
9
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
}

当Worker启动的时候,调用run()方法来启动里面的逻辑。

ThreadPoolExecutor构造函数

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

5个参数比较关键

  • corePoolSize:核心线程数量(长期驻留的线程数,不同实现差别很大)

    太小,频繁创建销毁线程;太大,浪费系统资源

  • maximumPoolSize:线程不够用时能够创建的最大线程数(有的固定于核心线程一样,有的可以创建更多)

    必须大于等于corePoolSize

  • keepAliveTime:线程存活时间(线程池维护线程允许的空闲时间,当线程池中线程数量大于corePoolSize的时候,若没有新任务提交,核心线程外的线程不会立即被销毁,而是等待时间超过keepAliveTime)

  • workQueue:任务等待队列(当任务提交时,若线程池中线程数量大于等于PoolSize,将任务封装为Worker对象放入等待队列中)

    • 直接提交的队列:SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,一个插入操作要等待一个删除操作。提交的任务不会被真实保存,总是将新任务给线程执行,若没有空闲的线程,尝试创建新的线程。一般要求maximumPoolSizes设置很大,避免线程拒绝执行操作。
    • 有界的任务队列:ArrayBlockingQueue,构造函数必须传入其最大容量,当有新任务执行,线程数小于核心线程数,优先创建线程,若大于,将新任务加进等待队列。若队列满了,无法加入,在总线程数不大于最大线程数时,创建新线程执行任务;否则执行拒绝策略。因此有界队列仅在任务队列装满时才可能将线程数扩充到核心线程数之上。
    • 有界的任务队列:LinkedBlockingQueue,不传参数默认为Integer.MAX_VALUE,当有新任务执行,线程数小于设定核心线程数,生成新线程执行任务,当线程数达到核心线程数后,线程不会增加,若有任务没有被消费,放进队列等待,队列可以一直增长直到耗尽系统资源。每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。
    • 优先任务队列:PriorityBlockingQueue,可控制任务先后执行顺序,特殊的无界队列,ArrayBlockingQueue与LinkedBlockingQueue按照先进先出顺序执行,而此队列确保高优先级的任务先执行。
  • threadFactory:创建新线程,Executors.defaultThreadFactory()(使用这个创建,新线程有一样的优先级,且是非守护线程,同时设置了线程的名称)

而其中的handle,是线程池的饱和策略

若阻塞队列满且没有空闲线程,若继续提交任务,需要策略去处理

  • AbortPolicy:直接抛出异常,默认策略
  • CallerRunsPolicy:由调用者的线程来执行任务
  • DiscardOldestPolicy:丢弃队列中最靠前的任务,并执行当前任务
  • DiscardPolicy:直接丢弃任务
  • 或自己定义,实现RejectExecutionHandler接口

自定义类,实现RejectExecutionHandler接口,并实现如下方法

1
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

新任务提交执行后线程池判断

  • 若运行中线程数少于核心线程数,创建新线程处理任务
  • 若线程池数量大于等于核心线程数且小于最大线程数,将其放入等待队列,当等待队列满了再新建线程去执行任务
  • 若运行线程数量大于最大线程数,若等待队列已满,通过handler所指定策略执行任务

线程池状态

  • RUNNING:可以接受新提交的任务,也能处理阻塞队列中的任务

  • SHUTDOWN:不再接受新任务,可以处理存量任务

    在处于RUNNING状态时调用shutdown()方法

  • STOP:不再接受新的任务,也不处理存量任务(线程被中断)

    处于RUNNING或SHUNDOWN状态调用shutdownnow()方法

  • TIDYING:所有任务终止

    进行最后清理工作,有效线程数为0,进入此状态后调用terminated方法进入下一状态

  • TERMINATED:默认什么也不做,只作为一个标识

处于SHUTDOWN和STOP状态的线程,当线程池中工作线程数量为0时,进入TIDYING状态。

线程池大小

  • CPU密集型:线程数=核数或核数+1设定

    大幅增加线程导致许多不必要的开销

  • I/O密集型:线程数=CPU核数*(1+平均等待时间/平均工作时间)

线程池创建规则

在阿里巴巴《Java开发手册》中规定,线程池不允许使用Executors创建,而是通过ThreadPoolExecutor的方式,目的是更加明确线程池的使用规则,避免资源耗尽的风险。

使用Executors创建线程池弊端有

  • FixedThreadPool和SingleThreadPool,使用的LinkdedBlockingQueue,任务队列最大长度为Integer.MAX_VALUE,可能堆积大量请求,导致OOM
  • CachedThreadPool和ScheduledThreadPool,允许创建的最大此案城数量为Integer.MAX_VALUE,可能创建大量的线程,导致OOM

线程池执行方法

有submit()和execute()

  • execute()属于Executor接口,submit()属于ExecutorService接口
  • execute()不能接收返回值,submit()可以配合Future来接收返回值

ThreadPoolExecutor扩展

让自定义类继承ThreadPoolExecutor,然后重写其beforeExecute和afterExecute方法,然后添加自定义方法。

1
2
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }

远程服务远程,阻塞队列为无界

如果消费者的远程服务异常,消费任务时延时比较长,这时候消费速度赶不上生产速度,就会导致阻塞队列中任务积压,如果阻塞队列为无界的,则可能会造成OOM异常。

线上机器宕机,阻塞队列中请求怎么办

线程池中阻塞队列积压的任务会丢失,解决办法为:在提交任务到线程池前,先在数据库中插入此任务的信息,如基本数据,还有状态信息,如未提交、已提交、已完成。在提交完成后,完成状态更新为已提交。在机器宕机后,重启系统,使用后台线程扫描数据库中未提交和已提交的任务,将任务信息提取出来并重新提交给线程池去执行。

并发工具包

Semaphore

具体方法

Semaphore信号量是对锁的拓展,synchronized或ReentrantLock都只允许一个线程访问一个资源,而信号量可以指定多个线程,其构造方法有

1
2
3
4
5
6
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

即在新建其对象时,必须指定信号的准入数,即同时可以申请多少个许可。其主要方法有

1
2
3
4
5
public void acquire()
public void acquireUninterruptibly()
public Boolean tryAcquire()
public Boolean tryAcquire(long timeout, TimeUnit unit)
public void release()

acquire方法尝试去获得准入许可,若无法获得会等待,直到有线程释放许可或被中断。acquireUninterruptibly不响应中断。这两个方法类似于locks.interruptedLock和lock.lock。tryAcquire()尝试获得许可,成功返回true,失败返回false,不会等待,而tryAcquire()会尝试等待,类似于locks.tryLock()方法。release()用于线程在访问资源结束后,释放一个许可。

如何使用

下面演示Semaphore的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class TaskSem implements Runnable{
final Semaphore sem = new Semaphore(5);
@Override
public void run() {
try {
//1、获取semaphore
sem.acquire();
//执行任务
Thread.sleep(1000);
System.out.println(Thread.currentThread().getId()+"完成任务");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
sem.release();
}
}
}
public class SemaphoreTest {
public static void main(String[] args) {
//信号量的学习
ExecutorService es = Executors.newFixedThreadPool(20);
TaskSem ts = new TaskSem();
//提交20次任务
for (int i = 0; i < 20; i++) {
es.submit(ts);
}
es.shutdown();
}
}

新建了一个允许5个的Semaphore,任务被执行前先获取许可,执行完后需要释放资源,因此将其放在finallt中。在20个线程的线程池中,将20个任务提交,但结果为5个一组来执行,

因此可以看到Semaphore可以用在一些需要限流的场合,如数据库连接数是有限的,这样可以用信号量来控制最多同时访问的线程数。

CountDownLatch

倒计时器,常用来控制线程等待,让一个线程等待直到倒计时结束,再执行。

具体方法

构造函数,需要传入等待计数

1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

主要方法

1
2
public void countDown()
public void await()

当计数没有到达设定次数时,调用await()方法的线程会处于等待,其他线程调用一次countDown()方法,计数-1,当计数为0的时候,处于等待的方法继续进行

如何使用

在主线程上阻塞,任务线程执行一次后执行countDown()方法,当执行10次后,主线程继续运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class TaskCoutDown  implements Runnable{
static final CountDownLatch cd = new CountDownLatch(10);
@Override
public void run() {
//执行一次,检查一次
try {
Thread.sleep(1000);
System.out.println("check");
//进行计数
cd.countDown();
}catch (InterruptedException e){}
}
}
public class CountDownTest{
public static void main(String[] args) throws InterruptedException {
TaskCoutDown tcd = new TaskCoutDown();
ExecutorService es = Executors.newFixedThreadPool(10);
//执行 10次点火
for (int i = 0; i < 10; i++) {
es.submit(tcd);
}
TaskCoutDown.cd.await();
System.out.println("发射");
es.shutdown();
}
}

ThreadLocal

ThreadLocal,直观的语义理解为线程私有,可以先直观理解为每个线程内有自己的ThreadLocal对象,线程存在则ThreadLocal存在。线程A在ThreadLocal中设置的对象是自己私有的,线程B无法获取到。

应用
  • 保存线程上下文信息,在任意需要的地方进行获取

将信息set后,后续有切换的情况下,直接用get可以获取到之前的信息。

还有spring的事务管理,spring中的@transaction注解,事务注解,其中的方法要么全部完成,要么全部回滚,若有方法m()中有两个方法m1与m2,均需要读取数据库中的数据,需要保证二者用的同一个Connection才能构成事务,使用的是ThreadLocal,因为每个线程有自己私有的ThreadLocal,从当前线程中拿对象,拿到的是同一个。

  • 线程安全的,避免某些情况需要考虑线程安全必须同步带来的性能损失

使用一些不是线程安全的对象,在不涉及共享对象的更新问题时,为了避免加锁带来的性能损失,将共享变量加入ThreadLocal,这样每个线程对ThreadLocal中读写数据是线程隔离的,互相之间不会影响,也就不存在竞争问题。

底层实现

下图为ThreadLocal的示意图。

图片来源于微信公众号链接https://mp.weixin.qq.com/s/SysYihctu03RlUtI0pcG7w

总的来说,每个Thread线程有一个ThreadLocalMap类型的threadLocals成员变量,在map中有键值对,用Entry来存储,Entry相当于map的一行,也是键值对。其中Entry中的key对ThreadLocal对象为弱引用,而value是对传进ThreadLocal中泛型对象的强引用。而作用后面会细说,先看ThreadLocal的set方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ThreadLocal<T> {
//set方法
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
//key为this即当前ThreadLocal对象
map.set(this, value);
else
createMap(t, value);
}
//getMap方法,获取线程对应的map
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
}

可以看到,通过当前线程获取到map,然后将ThreadLocal作为key,传入ThreadLocal的对象作为value,存入当前线程的map中。

而map.set()方法源码如下

1
2
3
4
5
6
7
8
9
10
static class Entry extends WeakReference<ThreadLocal> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal k, Object v) {
//相当于new了WeakReference,弱引用指向key
super(k);
value = v;
}
}

可以看到,map中的Entry继承了弱引用,因此调用super(key)代表着key是对ThreadLocal的弱引用。

key为弱引用原因

总的来说是为了避免内存泄漏。

对于ThreadLocal tl = new ThreadLocal<>();语句,tl对ThreadLocal为强引用,key对ThreadLocal为弱引用,使用弱引用原因是防止内存泄漏。如果key为强引用,那么即使tl不指向ThreadLocal了,因为有key的指向,ThreadLocal对象也无法被回收。当使用弱引用后,当tl强引用为空,ThreadLocal对象可在GC时被回收。这样存在的问题是,当ThreadLocal被回收,key为null,但这行记录还存在,需要手动去remove()。当ThreadLocal对象tl不继续使用,必须调用tl.remove()方法。

内存泄漏(TO DO)

引用到的资料来源:

https://blog.csdn.net/cy973071263/article/details/104546954

https://blog.csdn.net/javazejian/article/details/72828483#%E7%90%86%E8%A7%A3java%E5%AF%B9%E8%B1%A1%E5%A4%B4%E4%B8%8Emonitor

https://blog.csdn.net/qq_37113604/article/details/81582784?depth_1-utm_source=distribute.pc_relevant.none-task

https://mp.weixin.qq.com/s/SysYihctu03RlUtI0pcG7w