Java并发

重点学习内容

  • 并发编程的基础
  • 原子类
  • 线程池
  • JUC并发工具类
  • 并发容器

一、并发理论基础

1.1 Happends-Before规则

​ Happends-Before规则规定了对共享变量的写对其它线程的读操作可见。

  • 程序次序规则:一个线程内,按照控制流顺序,前面的操作先行发生于后面的操作。

  • 管程锁定规则:unlock操作先行发生于后面(时间上)对这个锁的lock操作。

  • Volatile规则:对一个volatile变量的写操作先行发生于后面(时间上)对这个变量的读操作。

  • 线程:

    线程启动规则:一个线程的start先行发生于这个线程的每一个动作。

    线程终止规则:一个线程的所有操作先行发生于这个线程的终止检测join。

    线程中断规则:对线程interrupt方法的调用先行发生于检测到中断事件的发生。

  • 对象终结规则:一个对象的初始化完成先行发生于其finalize方法的开始。

  • 传递性:若A先行发生于B,B先行发生于C,则A先行发生于C。

总结:

​ 一、同一线程内,按控制流顺序;

​ 二、不同线程:

  1. 依赖对象的:

    volatile对象开始读,其之前的所有写一定可见;

    对象开始执行finalize,其初始化操作一定可见;

    对象/类被lock,其之前的unlock一定可见;

  2. 依赖线程本身的:

    当线程开始执行操作,其start一定可见;

    当线程可被检测为结束,其线程内所有操作一定可见;

    当线程被中断,引发中断的事件一定可见。

1.2 死锁

1.2.1 产生死锁的条件

  1. 互斥:一个资源同一时刻只能由一个进程占用

  2. 请求和保持:一个进程被阻塞时,不释放已有资源

  3. 不可剥夺:只有线程使用资源结束时可以释放资源

  4. 循环等待:多个线程之间形成一个互相阻塞头尾相接的等待资源关系

1.2.2 防止死锁

  1. 破坏请求与保持:一次性申请所有资源

  2. 破坏不可剥夺:申请资源失败时释放已占有的资源

  3. 破坏循环等待:所有线程按相同顺序申请资源,释放反向

1.2.3 避免死锁

银行家算法(借钱给有偿还能力的客户):

在为进程分配资源前预先测试系统状态。若把资源分配给进程会产生死锁,则拒绝分配。

1.2.4 检测死锁

​ 系统定时运行一个死锁检测程序,判断系统内是否出现死锁。若检测到死锁,则设法解除。

​ 设计两张表:等待资源表 + 占用资源表

​ 若有进程Pi等待资源rk,而rk被进程Pj占用,则Pi和Pj具有等待占用关系。根据上述二表,可以扫描出所有进程的等待占用关系。当等待占用关系出现首尾相接时,说明出现了死锁。

1.3 进程与线程

进程是一个拥有一定独立功能的程序关于某个数据集合的一次运行活动

进程是操作系统进行资源分配和调度的一个独立单位

1.3.1 进程、线程区别

进程作为系统资源分配和保护的独立单位,不需要频繁的切换。

线程作为系统调度和分派的基本单位,能轻装运行,能被频繁的调度和切换。

OS感知线程的环境下,处理器调度线程;不感知线程的环境下,处理器调度进程,进程中的用户调度程序调度线程。

1.3.2 内核级线程

  1. 线程的管理工作由OS负责;

  2. 进程中一个线程被阻塞,OS能调用同一进程的其他线程占有处理器运行;

  3. 应用程序线程运行在用户态,调度管理在内核实现,切换线程时需要模式切换,系统开销大。

1.3.3 用户级线程

  1. 线程管理工作由应用程序完成,内核没有意识到线程的存在;
  2. 线程切换不需要切换模式;
  3. 运行进程按应用需要选择调度算法;
  4. 不依赖OS;
  5. 不能利用多处理器优点;
  6. 一个线程阻塞,将引起整个进程阻塞。

混合式策略:

​ 多个用户线程与多个内核线程建立绑定关系。

1.3.4 四种线程创建方式

  1. 继承Thread重写run
class MyThread extends Thread {
@Override
public void run() {
//此处为thread执行的任务内容
System.out.println(Thread.currentThread().getName());
}
}
Thread t = new MyThread();
t.start();
  1. 实现Runnable接口重写run,用其实例初始化Thread(推荐使用)
class MyThread implements Runnable {
@Override
public void run() {
//此处为thread执行的任务内容
System.out.println(Thread.currentThread().getName());
}
}
Thread t = new Thread(new MyThread());
t.start();
  • 为什么推荐使用Runnable来创建线程
    1. Runnable与Thread是组合的关系,Thread的run方法,实际上就是调用的Runnable的run方法
    2. Runnable(实际执行的任务),这种格式更容易与其他API配合,拖了Thread,灵活性强。
  1. 实现Callable接口重写call,用其实例初始化FutureTask,用FutureTask实例初始化Thread

    只是比Runnable多个get返回值,给其他线程

FutureTask<Integer> task = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("running...");
return 100;
}
});

Thread thread = new Thread(task, "t1");
thread.start();

// 获取返回值,给其他线程
Integer result = task.get();
  1. 线程池创建

1.4 线程状态

1.4.1 线程状态概述

当线程被创建并启动以后,它既不是一启动就进入了执行状态,也不是一直处于执行状态。在线程的生命周期中这各个线程状态发生的条件,下面将会对每种状态进行详细解析:
在这里插入图片描述
(1)Runnable(可运行)
线程可以在java虚拟机中运行的状态,可能正在运行自己代码,也可能没有,这取决于操作系统处理器(cpu)

Java将操作系统中线程运行和就绪合并为运行状态。

(2)Blocked(锁阻塞)
当一个线程试图获取一个对象锁,而该对象锁被其他的线程持有,则该线程进入Blocked状态;当该线程持有锁时,该线程将变成Runnable状态。

(3)Waiting(无限等待)
一个线程在等待另一个线程执行一个(唤醒)动作时,该线程进入Waiting状态。进入这个状态后是不能自动唤醒的,必须等待另一个线程调用notify或者notifyAll方法才能够唤醒

我们只需知道在做线程操作中存在这样的状态。那我们怎么去理解这几个状态呢,新建与被终止还是很容易理解的,我们就研究一下线程从Runnable(可运行)状态与非运行状态之间的转换问题

1.4.2 Timed Waiting(计时等待)

Timed Waiting在API中的描述为:一个正在限时等待另一个线程执行一个(唤醒)动作的线程处于这一状态。在我们写卖票的案例中,为了减少线程执行太快,现象不明显等问题,我们在run方法中添加了sleep语句,这样就强制当前正在执行的线程休眠(暂停执行),以“减慢线程”

sleep就是等待。

/**
* 卖票操作
*/
@Override
public void run() {
while(true) {
//开启线程锁
lock.lock();

if(ticket > 0) {
try {
Thread.sleep(100);

} catch (InterruptedException e) {
e.printStackTrace();
}
String name = Thread.currentThread().getName();
System.out.println(name + "正在卖" + ticket--);
}
//关闭线程锁
lock.unlock();
}
}

我们需要记住下面几点:

  1. 进入 TIMED_WAITING 状态的一种常见情形是调用的 sleep 方法,单独的线程也可以调用,不一定非要有协
    作关系。
  2. 为了让其他线程有机会执行,可以将Thread.sleep()的调用放线程run()之内。这样才能保证该线程执行过程
    中会睡眠
  3. sleep与锁无关,线程睡眠到期自动苏醒,并返回到Runnable(可运行)状态
    在这里插入图片描述

1.4.3 BLOCKED(锁阻塞)

Blocked状态在API中的介绍为:一个正在阻塞等待一个监视器锁(锁对象)的线程处于这一状态。
比如,线程A与线程B代码中使用同一锁,如果线程A获取到锁,线程A进入到Runnable状态,那么线程B就进入到Blocked锁阻塞状态。
在这里插入图片描述

1.4.4 Waiting(无限等待)

一个正在无限期等待另一个线程执行一个特别的(唤醒)动作的线程处于这一状态。
在这里插入图片描述
当多个线程协作时,比如A,B线程,如果A线程在Runnable(可运行)状态中调用了wait()方法那么A线程就进入了Waiting(无限等待)状态,同时失去了同步锁。假如这个时候B线程获取到了同步锁,在运行状态中调用了notify()方法,那么就会将无限等待的A线程唤醒。注意是唤醒,如果A获取到锁对象,那么A线程唤醒后就进入Runnable(可运行)状态;如果没有获取锁对象,那么就进入到Blocked(锁阻塞状态)。

1.4.5 补充

线程状态整体概括:
在这里插入图片描述

1.5 线程调度

1.5.1 三级调度

高级调度:从后备作业队列中选取作业加入主存(新建态->就绪态)

中级调度:进程的挂起与唤醒(挂起:使一个进程暂时失去与其他进程竞争处理器的机会)

低级调度:又称进程调度、短程调度、处理器调度,按照某种原则把处理器分配给就绪进程或内核级进程。

1.5.2 优先级调度算法(时间相关)

  1. 计算时间短优先

  2. 剩余计算时间短优先

  3. 高响应比优先(响应比 = 等待时间 / 进入时间)

  4. 先来先服务

  5. 时间片轮转调度算法(常用作先来先服务的改进)

1.5.3 分级调度算法

​ 基于时间片轮转调度。不同优先级建立不同的队列,优先级越高的进程,时间片越短。

二、锁

2.1 并发机制的底层实现

2.1.1 Volatile的实现原理

​ 在JVM中已经详细阐述Volatile如何实现可见性,但他不保证原子性,因为他没加锁,不能解决指令交错的问题。

2.1.1.1 CAS

​ CompareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子类型。

CAS必须借助volatile来实现读取到共享数据的最新值从而进行比较交换的效果。

public void withdraw(Integer amount) {
while(true) {
// 获取余额的最新值
int prev = money.get();
// 要修改的余额
int next = prev - amount;
// money必须是原子类型,底层value被Volatile修饰
if(money.compareAndSet(prev, next)) {
break; //如果最新的余额与之前的余额不一致继续下一轮循环,一致则退出循环
}
}
}

​ compareAndSet 正是做这个检查,在set前,先比较prev与当前值

  • 不一致了,next 作废,返回 false 表示失败,比如,别的线程已经做了减法,当前值已经被减成了 990 ,那么本线程的这次 990 就作废了,进入 while 下次循环重试
  • 一致,以 next 设置为新值,返回 true 表示成功
2.1.1.2 原子整数

​ 是对整数类包装类的封装,使用CAS实现内部的线程安全性,使用volatile来保证可见性,读取到最新值。

2.1.1.3 原子引用

​ ABA问题:即该线程查觉不到共享变量被修改过,但希望能判断其他线程使用过 (修改过),这时使得自己的CAS失效,返回false。

解决办法:加一个版本号,使得每次修改都使得版本号+1

AtomicStamptdReference就是如此,还可以根据版本号的值来判断修改次数,但有时为了简单不需要关系修改次数,则使用AtomicMarkableReference,他可以返回布尔类型值,只关心判断是否修改。

2.1.1.4 原子数组

不安全的数组

demo(
()->new int[10],
(array)->array.length,
(array, index) -> array[index]++,
array-> System.out.println(Arrays.toString(array))
);
123456

安全的数组

demo(
()-> new AtomicIntegerArray(10),
(array) -> array.length(),
(array, index) -> array.getAndIncrement(index),
array -> System.out.println(array)
);
123456
2.1.1.5 字段更新器

​ 保护的是类的字段(成员变量),而且字段必须volatile修饰,否则会出现异常。

Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type

2.1.1.6 原子累加器

​ JDK8之后专门提供了几个类,来做原子的累加,性能要高很多。

性能提升的原因很简单,就是在有竞争时,设置多个累加单元,不是只在一个变量上加。因此减少了 CAS 重试失败,从而提高性能。

2.1.2 synchronized的实现原理

​ synchronize又被称作对象锁(Monitor对象),是一种阻塞性的锁。

synchronized底层实际上是通过管程Monitor来实现,那么如何给对象用synchronized上锁呢?

(1)Java对象的对象头中的Mark Word,指向一个管程Monitor对象的指针。

(2)对象中锁标志位从01->10,然后当前线程指向管程中的owner部分,表示拥有该管程。

(3)下一个线程来了,发现该对象有锁(标志位判断),然后继续看对象指向的Monitor他的owner,发现有线程指向,则进入EntryList阻塞。

(4)线程将临界资源用完后,就不指向owner了,就从EntryList中拿线程去指向Monitor

总结:让作为锁的对象关联一个管程,管程才是真正意义上的锁,使用成本太高,但是具有原子性。

2.1.3 Lock锁

​ java.util.concurrent.locks.Lock 机制提供了比synchronized代码块和synchronized方法更广泛的锁定操作,同步代码块/同步方法具有的功能Lock都有,除此之外更强大,更体现面向对象。

显示的创建锁,锁获取和释放,具有可操作性,可以中断的获取锁,超时获取锁;而synchronized则是隐式的获取锁,锁获取和释放都固化了,就先获取再释放。(不能像Lock实现类ReetrantLook一样可重入,再次获取锁)

Lock锁也称同步锁,加锁与释放锁 方法化了,如下:
public void lock() :加同步锁。
​ public void unlock() :释放同步锁。

/**
* 模拟票
* @author Mango
*/
public class Ticket implements Runnable {

/**
* 多线程共享数据
*/
private int ticket = 100;

//创建一个lock锁对象
Lock lock = new ReentrantLock();


/**
* 卖票操作
*/
@Override
public void run() {
while(true) {
lock.lock();
//同步方法
if(ticket > 0) {
try {
Thread.sleep(100);

} catch (InterruptedException e) {
e.printStackTrace();
}
String name = Thread.currentThread().getName();
System.out.println(name + "正在卖" + ticket--);
}
lock.unlock();
}
}

}

2.2 锁升级

2.2.1 轻量级锁

​ 适用场景:多线程访问在同一周期(一段时间)内没有竞争发生。

​ 它实际上对使用者是透明的,语法依然是synchronized,每次会尝试先加轻量级锁,进行CAS操作。

  1. CAS操作:对象头的Mark Word 尝试复制到 栈帧中的锁记录中,成功,则获取锁,失败则其他对象竞争锁,进行锁膨胀;自己重入获取锁的话,则进行重入计数。
  • 锁膨胀:为对象申请Monitor,让对象指向Monitor锁地址。

​ 结束后(退出synchronized代码块时),再使用CAS操作。

  1. CAS操作:讲栈帧中的锁记录,恢复给对象头的Mark Word值,成功,则解锁成功,失败则说明已经膨胀为重量级锁,实行重量级锁的解锁流程。

2.2.2 偏向锁

​ 轻量级锁如果没竞争,每次都要CAS操作,所以可以使用锁优化:

​ 第一次CAS时,将线程ID设置到对象的Mark Word头,之后发现线程ID无竞争,则不用CAS,只需要判断线程ID。

​ 何时撤销偏向锁?

  • 调用对象的hashCode方法,因为hashCode值会被存入轻量级中的锁记录,重量级锁中的Monitor中。
  • 其他线程使用,自然就会撤销掉偏向锁,升级为轻量级锁。
  • wait和notify,它们是重量级锁的机制。

2.3 悲观锁 -乐观锁

2.3.1 管程-悲观锁

​ 管程实际上是抽象相关并发进程对共享变量的访问。

  • 条件变量:检测互斥的标志

  • 同步原语wait:进程无法获取资源时,阻塞进程;原语signal:进程释放资源时,唤醒(notify)等待队列

  • 管程局部变量:进程共享变量,过程实际上就是对管程局部变量的操作

2.3.2 无锁并发-乐观锁

​ 乐观锁实际上没有锁,CAS和Volatile结合可以实现无锁并发,不使用synchronized关键字,适用于线程少 ,多核CPU的场景。

class AccountCas implements Account {
private AtomicInteger balance; //使用原子整形AtomicInteger,底层是用的Volatile,实现每次拿到最新值

public AccountCas(int balance) {
this.balance = new AtomicInteger(balance);
}

@Override
public Integer getBalance() {
return balance.get();
}

@Override
public void withdraw(Integer amount) {
while(true) {
// 获取余额的最新值
int prev = balance.get();
// 要修改的余额
int next = prev - amount;
// 真正修改
if(balance.compareAndSet(prev, next)) {
break; //如果最新的余额与之前的余额不一致继续下一轮循环,一致则退出循环
}
}
}
}

2.3.3 悲观锁-乐观锁比较

  • CAS是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃点亏再重试呗。
  • synchronized是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
  • CAS体现的是无锁并发、无阻塞并发:
    • 因为没有使用synchronized,所以线程不会陷入阻塞,这是提升效率的因素之一
    • 但如果竞争激烈,可以想到重试必然频繁发生,反而效率可能会受到影响
  1. 注:为什么无锁就效率高呢?
  • 无锁情况下,线程始终高速运转,没有停歇,而synchronized会让没有获取锁的线程,发生上下文切换,进入阻塞。(重新打火一样,代价太大)
  1. 但无锁情况下,线程需要高速运转,需要cpu的支持,如果没有额外的cpu核数,线程就无法高速运转,虽然不会阻塞,但由于分不到时间片还是回上下文切换。所以要求线程数低于CPU核数,才能发挥优势。

三、线程池

3.1 线程池优势

​ 合理利用线程池能够带来三个好处:

  1. 降低资源消耗。减少了线程创建销毁的过程(节约资源),每个工作线程都可以被重复利用,可执行多个任务。

  2. 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。

  3. 提高线程的可管理性。可以根据系统的承受能力,调整线程池中工作线线程的数目。

    防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。

3.2 ThreadPoolExecutor

​ java中线程池的顶级接口是java.util.concurrent.Executor,但它并不是一个线程池,而只是一个执行线程的工具,真正的线程池接口是java.util.concurrent.ExecutorService

3.2.1 线程池状态

3.2.2 shutdown与stop区别:

  1. shutdown状态,不会接受新任务,把阻塞队列中的继续执行完
  2. stop状态,立即中断,抛弃阻塞队列中的任务

3.2.3 线程池构造方法,参数说明

img

3.2.4 ScheduledThreadPoolExecutor

功能:带有任务调度功能,可以延时执行,定时执行

3.3 拒绝策略

​ 先来说一下JDK线程池的执行过程:

  1. 线程池开始没线程,任务交给后,开始创建执行任务,任务达到corePoolSize(核心线程数)时,这时任务进入阻塞队列。
  2. 如果设置为有界队列,任务超出阻塞队列大小时,则会创建maximumPoolSize 减去 corePoolSize这么多线程来救急。
  3. 线程到达maximumPoolSize仍然有新任务,这时就会执行拒绝策略。
  4. 高峰期过去,超过corePoolSize的线程长时间没有task任务做,则结束释放资源。
  • 拒绝策略:
  1. 默认:抛出RejectedExecutionException异常
  2. 运行任务
  3. 放弃
  4. Netty的实现:创建新线程来执行
  5. ActiveMq的实现:带超时等待(60s)尝试放入队列
  6. 。。。。等等

3.4 Fork/Join线程池

​ Fork/Join :拆分 + 合并,采用分治的思想,适用于能够进行任务拆分的cpu密集型运算。(线程数 = cpu)

​ 在分治的基础上,加入了多线程,可以把每个任务分解合并交给不同的线程来完成,提高效率,默认创建与cpu核心数相同的线程池。

四、并发工具类

  1. 永远只在更新对象的成员变量时加锁
  2. 永远只在访问可变的成员变量时加锁
  3. 永远不在调用其他对象的方法时加锁(其他方法内部可能有锁,或者执行时间过长)

4.1 AQS

​ 抽象队列同步器AbstractQueuedSynchronizer,是除了java自带的synchronized关键字之外的锁机制,可以用来实现锁。

4.1.1 核心思想

AQS的核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态state置为1,如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到等待队列(类似于 Monitor 的 EntryList)中。

总结:AQS就是基于CLH队列,用volatile修饰共享变量state,线程的tryAcquire()方法通过CAS去改变状态符,成功则获取锁成功,失败则进入等待队列,等待被唤醒。

例:继承AQS实现独占不可重入锁,Lock的锁方法,都是调用的独占锁(继承同步器类)的方法

// 自定义锁(不可重入锁)
class MyLock implements Lock {

// 独占锁 同步器类
class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0, 1)) {
// 加上了锁,并设置 owner 为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

@Override // 是否持有独占锁
protected boolean isHeldExclusively() {
return getState() == 1;
}

public Condition newCondition() {
return new ConditionObject();
}
}

private MySync sync = new MySync();

@Override // 加锁(不成功会进入等待队列)
public void lock() {
sync.acquire(1);
}

@Override // 加锁,可打断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

@Override // 尝试加锁(一次)
public boolean tryLock() {
return sync.tryAcquire(1);
}

@Override // 尝试加锁,带超时
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}

@Override // 解锁
public void unlock() {
sync.release(1);
}

@Override // 创建条件变量
public Condition newCondition() {
return sync.newCondition();
}
}

4.1.2 AQS三要素:

  1. 原子维护state状态:volatile配合CAS保证修改时的可见性

  2. 维护等待队列:使用FIFO先入先出队列

  3. 阻塞、唤醒线程的方法:使用park & unpark来,标志为-1,则需要恢复后序的线程

4.1.3 与synchronized的区别

相比于synchronized,能够:

  1. 响应中断
  2. 支持超时
  3. 非阻塞的获取锁
@Override // 加锁,可打断的API
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

@Override // 尝试加锁,非阻塞的获取锁,底层是用CAS实现无阻塞
public boolean tryLock() {
return sync.tryAcquire(1);
}

@Override // 尝试加锁,带超时
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}

4.2 ReentrantLock

​ 可重入锁,即同一线程可以重复获取同一把锁。

4.2.1 非公平锁实现原理

​ ReentrantLock默认构造函数是创建非公平锁NonfairSync

  • 非公平锁即在有线程释放锁时,随机唤醒阻塞队列中的一个线程
  • 公平锁即在有线程释放锁时,唤醒等待时间最长(阻塞队列队头)的线程。
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}

NonfairSync非公平锁是继承自AQS的一个同步器类,作为ReetranLock的实现(是其一个内部类,与我们自定义锁类似)

  1. 刚开始第一个线程获取锁时,这时没有竞争,直接state置为1,让Owner指向线程。

final void lock() {
if (compareAndSetState(0, 1)) //state置为1
setExclusiveOwnerThread(Thread.currentThread());//Owner指向当前线程
else
acquire(1);
}
  1. 当出现竞争时

在这里插入图片描述

新线程继续上面代码,CAS尝试失败,然后进入tryAcquire再次尝试获得锁

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
  1. 如果开始线程依然没有释放锁又失败,则进入addWaiter,构造一个Node队列。

在这里插入图片描述

  • 黄色三角为标志位,0位正常
  • Node创建时懒创建,第一个Node是哨兵,用来占位使用
  1. 竞争的线程进入到acquireQueued
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}

在这里插入图片描述

(1)会在死循环中,一直不断尝试获得锁,失败后进入park阻塞

(2)如果是第二位(即哨兵后一位),那么再次尝试,如果state依然为-1,则失败

(3)进入到shouldParkAfterFailedAcquire,将前驱结点标志位置为-1

  1. 线程再次循环到shouldParkAfterFailedAcquire时,这时前驱为-1,则返回true,进入parkAndCheckInterrupt,线程结点用灰色表示

在这里插入图片描述

​ 多个线程依然经历上述过程,到开始线程没有释放锁之前,都竞争失败

在这里插入图片描述

  1. 终于,开始线程释放了锁,进入到了tryRelease流程,成功则进行对称操作
  • owner置为null
  • state置为0

在这里插入图片描述

​ 当前队列不为null,并且head标志位为-1,进入unpark恢复流程,恢复最近的一个线程即 Thread-1,然后Thread-1进入acquireQueued

在这里插入图片描述

​ 如果加锁成功(没有外部线程竞争),会设置:

  • owner,state,head指向Thread-1
  • head指向Thread-1的Node,清空置为null即可
  • 原来的head从链表中断开 ,被GC

​ 如果加锁不成功,即外面有线程竞争,不巧由于非公平锁被Thread-4占了先

在这里插入图片描述

​ 则Thread-4进行原先线程的操作owner指向Thread-4,state置为1,Thread-1再次进入acquireQueued,获取锁失败,则重新进入park阻塞

4.2.2 可重入原理

// 非公平锁为例
static final class NonfairSync extends Sync {
// ...

// Sync 继承过来的方法, 方便阅读, 放在此处
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state++ state变成2,变成两次
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

// Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
  • 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入,state++ ,例如state变成2,表示两次加锁
  • 支持锁重入, 只有 state 减为 0, 才setExclusiveOwnerThread释放成功

4.2.3 可打断原理

​ 默认为不可打断,不能被其他线程打断,终止我的等待。即使被打断,也仍然会在AQS队列中,等获得锁时打个标记继续运行。

if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
// 在 park 过程中如果被 interrupt 会进入此
// 这时候抛出异常, 而不会再次进入 for (;;),从而终止等待
throw new InterruptedException();
}

总结:在 park 过程中如果被 interrupt 会抛出异常, 而不会再次进入 for (;;),从而终止等待。

4.2.4 公平锁实现原理

// 与非公平锁主要区别在于 tryAcquire 方法的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// h != t 时表示队列中有 Node
return h != t &&
(
// (s = h.next) == null 表示队列中还有没有老二(哨兵后一位)
(s = h.next) == null ||
// 或者队列中老二线程不是此线程
s.thread != Thread.currentThread()
);
}

4.3 ReadWriteLock

4.3.1 ReentrantReadWriteLock

​ ReentrantReadWriteLock:读写锁,支持共享锁和排它锁。

​ 支持降级重入,即可以先获得写锁,再获得读锁,反之则不行。

总结:当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。

4.3.2 StampedLock

​ StampedLock:为了进一步优化读性能,支持乐观读、悲观读、也有写锁(避免了写线程饥饿)

​ 特点:使用乐观锁、读锁、写锁必须配合【戳】来使用,判断返回值。

​ 乐观读:StampedLock支持tryOptimisticRead()方法来进行乐观读,读取完毕需要加一次戳验证,看是否被其他写线程修改,验证不通过则进行锁升级为读锁

StampedLock lock = new StampedLock();

long stamp = lock.tryOptimisticRead();
// 验证戳
if (!lock.validate(stamp)) {
// 进行锁升级,获取读锁
lock.readLock();
}

​ 注意:

  • StampedLock不支持条件变量(条件不满足进入waitSet等待)相比较于ReentrantLock支持多条件变量就逊色了

  • StampedLock不支持可重入

4.4 信号量

​ Semaphore:限制能同时访问共享资源的线程上限,线程运行需要获取许可。

// 每次最多运行三个线程
Semaphore semaphore = new Semaphore(3);

try {
// 获取许可,则permits就减1
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放许可
semaphore.release();
}

​ 信号量模型:

  • 一个计数器:底层实际上还是使用AQS,通过对permits(state)实现加减来判断能否获取共享资源

  • 一个等待队列:同样有个CLH队列,有park阻塞

  • 三个方法

    在这里插入图片描述

4.5 线程安全集合类

4.5.1 遗留的安全集合

​ 不推荐使用,主要有Hashtable和Vector,一个是线程安全的Map类型,一个是线程安全(加synchronized)的List类型。

4.5.2 修饰的安全集合

​ SynchronizedMap 和 SynchronizedList,都是通过Collections修饰变成线程安全的集合。

​ 内部实现实现就是调用Map和List的方法,在方法体中,加入了Synchronized(mutex)的方法块。

4.5.3 J.U.C下的并发容器

主要分为三大类:Blocking、CopyOnWrite、Concurrent

在这里插入图片描述

  1. Blocking 大部分实现基于锁,并提供了阻塞的方法实现,即不满足条件即阻塞。

  2. CopyOnWrite:容器修改时会进行拷贝,来避免多线程并发的线程安全,适合于读多写少的场景

  3. Concurrent:(强烈推荐,性能好)

    1. 内部很多操作使用CAS进行优化,也有用Synchronized来实现的,一般可以提供高吞吐量,避免频繁阻塞。
    2. 弱一致性:遍历时,求大小时,读取时会存在弱一致性,发生修改可能拿取到的是旧值(注意是可能),但是比其他性能高。

    注:遍历时,对应非安全类(即没有加Synchronized)容器,使用fail-fast机制让遍历尽快失败,抛出ConcurrentModificationException,不再继续的遍历。

注:弱一致性不一定不好,并发高和弱一致性无法两全,需要权衡。

4.6 ConcurrentHashMap

​ 使用:一般使用 computeIfAbsent()方法 + 原子累加器(原子类例如LongAdder)来实现。

synchronized加在computeIfAbsent()方法中的bin链表头部的,性能高。

4.6.1 原理

​ JDK1.8之前:维护一个Segment数组,每个Segment对应一把锁,多个线程访问不同的segment,实际是无冲突与8类似。

​ JDK1.8:数组+ 链表(红黑树)来实现,以下数组=table,链表为bin,红黑树为TreeNode

  • 初始化,使用cas保证并发安全,懒惰初始化table
  • 树化,当table长度<64时,尝试扩容;当bin的长度>8,会将链表转化成红黑树,树化过程用synchronized锁住链表头。
  • put,没链表时为cas创建链表,有就尾插法
  • get,无锁仅需保证可见性
  • 扩容,需要对每个链表进行synchronized
  • size,元素个数保存在baseCount,并发时将个数变动保存到CounterCell[]中,最后统计数量时再进行累加即可

4.6.2 ConcurrentHashMap比HashTable的优点

  • ConcurrentHashMap里使用了 Segment分段锁 + HashEntry(将整个map分为N段,操作时只锁一段),而HashTable用的是Syncronized锁全部,所有线程竞争一把锁。

  • Segment分段锁继承ReentrantLock,在并发数高的时候,ReentrantLock比Syncronized总体开销要小一些。

    在这里插入图片描述

4.7 CopyOnWriteArrayList

​ CopyOnWriteSet就是调用CopyOnWriteArrayList的方法,每次执行前加一次判断Set中是否值。

底层实现采用写入时拷贝,即增删改这种写操作,会将底层数组拷贝一份,更改在新数组中执行,这时并不影响其他线程并发读,读写分离。Java8采用可重入锁实现,到了Java11就采用的是synchronized来实现。