阶段一:多线基础知识
入门思考
为什么要学习并发
- 发挥CPU多处理器的强大能力
- 更符合现实生活中的多事件同时处理
- 简化异步事件的处理
- 更加灵敏的响应用户界面
并发的缺点
- 安全性
- 活跃性(饥饿)问题
- 性能问题(并发不一定快)
- 线程切换有消耗ccc
学习并发的四个阶段
- 熟练掌握API,能够完成并发编程
- 熟读API源码,掌握其原理
- 理解Java虚拟机的内存模型 【待看】
- 操作系统对并发的支持 【待看】
进程与线程基础知识
- 进程是资源分配的基本单位
- 进程包含多个线程,线程共享进程
- 线程是CPU调度的基本单位
线程状态
线程中断及中断处理
多线程-interrupt(),isInterrupted(),interrupted()
setDaemon(true); //主进程结束,子线程while(true)也结束
优雅的中断线程
- 处理不中断线程,在中断中触发其异常
@Override
public void interrupt() {
try {
socket.close();
} catch (IOException e) {
} finally {
super.interrupt();
}
}
- 用守护进程,
Daemon Thread: 其优先级特别低(低到甚至可以被JVM自动终止),通常这类线程用于在空闲时做一些资源清理类的工作,比如GC线程
public void execute(Runnable task) {
executeThread = new Thread("ThreadService") {
@Override
public void run() {
Thread runner = new Thread(task, "Daemon-thread");
runner.setDaemon(true);
runner.start();
try {
runner.join();
finished = true;
} catch (InterruptedException e) {
//e.printStackTrace();
}
}
};
executeThread.start();
}
线程池中的应用
public static ThreadFactory threadFactory(final String name, final boolean daemon) {
return new ThreadFactory() {
@Override public Thread newThread(Runnable runnable) {
Thread result = new Thread(runnable, name);
result.setDaemon(daemon);
return result;
}
};
}
堆和栈
- 栈java.util.Stak是线程的私有Last In First Out顺序的数据结构,堆所有线程都能访问到
- 栈内存用来存储
局部变量和方法调用
,堆内存用来存储Java中的对象。 无论是成员变量,局部变量,还是类变量,它们指向的对象都存储在堆内存中。
查看堆和栈的默认值
//堆 其中InitialHeapSize为最开始的堆的大小,MaxHeapSize为堆的最大值。
java -XX:+PrintFlagsFinal -version | grep HeapSize
//栈
java -XX:+PrintFlagsFinal -version | grep ThreadStackSize
线程之间的通信
1. 类synchronized,wait,notify 实现相互通知
共享变量
public class MySignal{
protected boolean hasDataToProcess = false;
public synchronized boolean hasDataToProcess(){
return this.hasDataToProcess;
}
public synchronized void setHasDataToProcess(boolean hasData){
this.hasDataToProcess = hasData;
}
}
- Wait解锁
public synchronized int get () {
wait(); // wait() synchronized 解锁,多线程能直接访问到wait
return 0;
}
- notify. , 会加锁拿到锁,但是会等到synchronize的锁释放成功后才能加锁
- notify: 随机叫醒一个处于wait状态的线程
- notifyAll: 叫醒所有的处于wait线程,争夺到时间片的线程只有一个
public synchronized void set () {
signal = 1;
notifyAll(); // notify方法会随机叫醒一个处于wait状态的线程
// notifyAll叫醒所有的处于wait线程,争夺到时间片的线程只有一个
}
3. 常用的Lock机制
LockSupport, ReentrantLock
//初始化
Lock lock = new ReentrantLock();
Condition a = lock.newCondition();
Condition b = lock.newCondition();
//使用
public void a() {
lock.lock();
while(signal != 0 ) {
a.await();
}
System.out.println("a");
signal ++;
b.signal();
lock.unlock();
}
4. 并发工具类及并发容器
CountDownLatch
5. 管道
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class testPipeConnection {
public static void main(String[] args) {
/**
* 创建管道输出流
*/
PipedOutputStream pos = new PipedOutputStream();
/**
* 创建管道输入流
*/
PipedInputStream pis = new PipedInputStream();
try {
/**
* 将管道输入流与输出流连接 此过程也可通过重载的构造函数来实现
*/
pos.connect(pis);
} catch (IOException e) {
e.printStackTrace();
}
/**
* 创建生产者线程
*/
Producer p = new Producer(pos);
/**
* 创建消费者线程
*/
Consumer1 c1 = new Consumer1(pis);
/**
* 启动线程
*/
p.start();
c1.start();
}
}
/**
* 生产者线程(与一个管道输入流相关联)
*
*/
class Producer extends Thread {
private PipedOutputStream pos;
public Producer(PipedOutputStream pos) {
this.pos = pos;
}
public void run() {
int i = 0;
try {
while(true)
{
this.sleep(3000);
pos.write(i);
i++;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 消费者线程(与一个管道输入流相关联)
*
*/
class Consumer1 extends Thread {
private PipedInputStream pis;
public Consumer1(PipedInputStream pis) {
this.pis = pis;
}
public void run() {
try {
while(true)
{
System.out.println("consumer1:"+pis.read());
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
缺点
- 只能两个线程之间专递.
- 只能单向流动;
4. volatile 关键字
ThreadLocal原理和使用
案例:javaBase/ThreadLocalDemo
原理:把初始值放到线程中去,并且每个线程单独一份互不影响
使用场景
- 实现单个线程单例以及单个线程上下文信息存储,比如交易id等
- 实现线程安全,非线程安全的对象使用ThreadLocal之后就会变得线程安全,因为每个线程都会有一个对应的实例
- 承载一些线程相关的数据,避免在方法中来回传递参数
间通信之join加赛
JavaBase/JoinDemo.java
CyclicBarrier 开会场景类似,如果出现一个人员没有await或中途异常,导致其他线程一直在等待
Exchanger
tb7.Demo
Callable、Future和FutureTask
tb8.Demo
Future设计模式的实现
tb9 com.sample.design_pattern.chapter04_future_design
Callable 和Runnable的区别
- Runnable的run方法是线程调用的,在run方法是异步执行的
- Callable的call方法,不是异步执行的,是由Future的run方法调用的
Future源码解析
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
多种创建线程的方式案
- 继承Thread类
- 实现Runnable接口
- 匿名内部类
- 带返回值的线程,实现
implements Callable<T>
有返回值的 - 定时器,java常用的几种定时器
@Scheduled注解、quartz、new Timer().schedule、使用线程控制
【待深入】 - 线程池的实现 【待深入】
- Lambda表达式实现
- Spring是需要多线程
- RxJava
线程设置
-Xss10M 设置 tackssize new thread(stacksize)
Thread.currentThread().setPriority(priority);
优先级设置不要把线程的优先级与
运行结果的顺序
作为衡量的标准,优先级较高的线程并不一定每一次都先执行完run()方法中的任务,也就是说,线程的优先级与打印顺序无关,不要将这两者的关系相关联,它们的关系具有不确定性和随机性。
关键字
- join 兄弟你走,我垫后
ThreadGroup 线程组
- 给线程指定ThreadGroup
ThreadGroup threadGroup = new ThreadGroup("TG1");
Thread t1 = new Thread(threadGroup, "t1")
- main函数默认创建在
main
线程组中
中断的处理逻辑体会
不支持线程interrupt的
可以在interrupt中触发停止条件,如读取文件时在interrupt中关闭读取文件的流触发线程内的异常
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
public class Test32 extends Thread {
public static final int BUF_SIZE = 512;
Socket socket;
InputStream in;
public Test32(Socket socket) throws IOException {
this.socket = socket;
this.in = socket.getInputStream();
}
@Override
public void interrupt() {
try {
//中断是关闭socket是线程产生错误
socket.close();
} catch (IOException e) {
} finally {
super.interrupt();
}
}
@Override
public void run() {
try {
byte[] buf = new byte[BUF_SIZE];
while (true) {
int count = in.read(buf);
if (count < 0) {
break;
} else if (count > 0) {
}
}
} catch (IOException e) {
}
}
}
向上抛异常,自己不处理
自己处理完,在抛异常上面在处理
线程异常问题及处理
知识点: 银行服务会预留应急服务器,定期重启服务器,重置资源保持服务器的稳定性
1. 钩子函数,捕获异常
public class ExitCapture {
public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new Thread(()->{
System.out.println("The application will be exit.");
notifyAndRelease();
}));
int i = 0 ;
while (true) {
try {
Thread.sleep(1_000L);
System.out.println("I am working ...");
} catch (InterruptedException e) {
//ignore
}
i++;
// simulate a exception
if (i>20) throw new RuntimeException("error");
}
}
// 1. try to capture the simulate exception and handler that
// 2. even this Thread kill by some reason can capture
// 3. but kill -9 pid (force kill ) can't not capture
private static void notifyAndRelease() {
System.out.println("notify to the admin(the monitor.) by email or mq or some other way");
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Will release resource(socket,file,connection.)");
try{
Thread.sleep(1_000);
}catch (Exception e){
//ignore
}
System.out.println("Relase and notify Done. ");
}
}
2. 获取线程运行期间的异常
threadRun.setUncaughtExceptionHandler((threadError, e) -> {
System.out.println(e);
System.out.println(threadError);
});
- ThreadFactory设置
ExecutorService exec = Executors.newCachedThreadPool(new ThreadFactory(){
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setUncaughtExceptionHandler(new MyUnchecckedExceptionhandler());
return thread;
}
});
exec.execute(new ExceptionThread());
3. 追踪程序中调用的逻辑顺序
Arrays.asList(Thread.currentThread().getStackTrace()).stream()
.filter(e -> !e.isNativeMethod())
.forEach(e -> Optional.of(
e.getClassName() + ":" + e.getMethodName() + ":" + e.getLineNumber())
.ifPresent(System.out::println)
);
4. 线程组ThreadGroup
//1.创建线程组
ThreadGroup threadGroup =
// 这是匿名类写法
new ThreadGroup("group") {
// 继承ThreadGroup并重新定义以下方法
// 在线程成员抛出unchecked exception 会执行此方法
@Override
public void uncaughtException(Thread t, Throwable e) {
//4.处理捕获的线程异常
}
};
//2.创建Thread
Thread thread = new Thread(threadGroup, new Runnable() {
@Override
public void run() {
System.out.println(1 / 0);
}
}, "my_thread");
//3.启动线程
thread.start();
5. 默认的线程异常捕获器
// 设置默认的线程异常捕获处理器
Thread.setDefaultUncaughtExceptionHandler(new MyUnchecckedExceptionhandler());
当线程出现异常时,
如果我们没有指定线程的异常处理器,而且线程组也没有设置,那么就会使用默认的线程异常处理器
6. FetureTask来捕获异常
6.1 基本用法
//1.创建FeatureTask
FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1/0;
}
});
//2.创建Thread
Thread thread = new Thread(futureTask);
//3.启动线程
thread.start();
try {
Integer result = futureTask.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
//4.处理捕获的线程异常
}
6.2 线程池使用
//1.创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
//2.创建Callable,有返回值的,你也可以创建一个线程实现Callable接口。
// 如果你不需要返回值,这里也可以创建一个Thread即可,在第3步时submit这个thread。
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1/0;
}
};
//3.提交待执行的线程
Future<Integer> future = executorService.submit(callable);
try {
Integer result = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
//4.处理捕获的线程异常
}
7. 重写ThreadPoolExecutor的afterExecute
//1.创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (r instanceof Thread) {
if (t != null) {
//处理捕获的异常
}
} else if (r instanceof FutureTask) {
FutureTask futureTask = (FutureTask) r;
try {
futureTask.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
//处理捕获的异常
}
}
}
};
Thread t1 = new Thread(() -> {
int c = 1 / 0;
});
threadPoolExecutor.execute(t1);
Callable<Integer> callable = () -> 2 / 0;
threadPoolExecutor.submit(callable);
多线程带来的风险
活跃性问题
1. 死锁
2. 饥饿
- 高优先级吞噬所有低优先级的CPU时间片
- 线程被永久堵塞(block)在一个等待进入同步块的状态
- wait() 等待的线程永远不被唤醒
如何处饥饿与公平
- 设置合理的优先级;
- 设置synchronized
3. 活锁
由于某些条件没有满足,导致一直重复尝试,失败,尝试,失败。活锁和死锁的区别在于,处于活锁的实体是在不断的改变状态
活锁指的是线程不断重复执行相同的操作,但每次操作的结果都是失败的。尽管这个问题不会阻塞线程,但是程序也无法继续执行。活锁通常发生在处理事务消息的应用程序中,如果不能成功处理这个事务那么事务将回滚整个操作。解决活锁的办法是在每次重复执行的时候引入随机机制,这样由于出现的可能性不同使得程序可以继续执行其他的任务
性能问题
线程安全性问题
出现线程安全性问题的条件:
- •多线程环境下
- •多个线程共享一个资源
- 对资源进行非原子性操作(同时读OK,同时 读写有问题)
synchronized 的原理与使用
内置锁
/** * 普通方法上,内置锁就是当前类的实例 */ public synchronized int getNext() { return value ++; }
互斥锁
三种修饰方式
- 普通方法,内置锁
当前类的实例
- 静态方法,内置锁
是当前的Class字节码对象
- 代码块,内置锁
指定的类
线程锁
锁的存放
任何对象都可以作为锁,锁信息存放在对象的头中 Mark World
锁的分类
偏向锁
存在原因:
- 每次获取锁和释放锁会浪费资源
- 很多情况下,竞争锁不是有多个线程,而是一个线程在使用
处理方式:
- 如果是同一个线程,判断是否是偏向锁,是否是上一个线程id,直接访问;更多条件如下:
- •线程id
- •Epoch
- •对象的分代年龄信息
- •是否是偏向锁
- •锁标志位
- 其他线程访问会需要重新获取锁;
轻量锁
重量锁(synchronize)
理解自旋锁,死锁与重入锁
- 锁的重入
synchronize 方法访问 内部访问synchronize方法
- 自旋锁,没有拿到锁的线程等待或者在等待线程的状态达到某个条件的时候
while(Thread.activeCount() != 1) {
// 自旋
}
System.out.println("所有的线程执行完毕了...");
死锁
// a b 并行执行,a 方法获取obj1锁, b方法获取obj2锁,第二步相互制约形成死锁 public void a () { synchronized (obj1) { synchronized (obj2) { System.out.println("a"); } } } public void b () { synchronized (obj2) { synchronized (obj1) { System.out.println("b"); } } }
指令重排序(happen-before)
指令重排序,可以分为编译器重排序和处理器重排序
单例
饿汉式
//不存在线程安全性问题 public class Singleton { // 私有化构造方法 private Singleton () {} private static Singleton instance = new Singleton(); public static Singleton getInstance() { return instance; } }
懒汉式,用的时候才加载
public class Singleton2 { private Singleton2() {} // volatile 防止虚拟机为了提高性能在不改变执行结果的前提下指令重排序(原来执行在后面的可能放到前面运行) private static volatile Singleton2 instance; /** * 双重检查加锁 * @return */ public static Singleton2 getInstance () { // 自旋 while(true) if(instance == null) { synchronized (Singleton2.class) { if(instance == null) { instance = new Singleton2(); // 指令重排序 // new Singleton2() 汇编中不是一个指令完成,大致过程如下 // 申请一块内存空间 // 1 在1的时候instance就不为空了 // 在这块空间里实例化对象 // 2 // instance的引用指向这块空间地址 // 3 } } } return instance; } }
Volatile原理与使用
- Volatile称之为轻量级锁,被volatile修饰的变量,在线程之间是可见的。
- 可见:一个线程修改了这个变量的值,在另外一个线程中能够读到这个修改后的值。
- Synchronized除了线程之间互斥意外,还有一个非常大的作用,就是保证可见性
- volatile 只保证了线程的之间的可见性,不能保证线程的原子性
volatile内存语义
- 线程对变量进行修改之后,要立刻回写到主内存。
- 线程对变量读取的时候,要从主内存中读,而不是缓存。
参考:
- Java多线程(一)之volatile深入分析
- 正确使用 Volatile 变量 【重点】 EverNote有笔记
JDK5提供的原子类的操作以及实现原理
jar: rt.jar java.util.concurrent.atomic
核心代码逻辑
public final int getAndUpdate(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get(); // 获取当前值
next = updateFunction.applyAsInt(prev); //获取共享资源值
} while (!compareAndSet(prev, next)); //如果共享资源没有被改变更新,继续get()
return prev;
}
参考 1.[非阻塞算法简介](https://www.ibm.com/developerworks/cn/java/j-jtp04186/)
Lock接口认识与使用
synchronized 和Lock各自优势
Lock
- 显示地获取和释放锁,繁琐能让代码更灵活
- 使用Lock可以方便的实现公平性
- 非阻塞的获取锁
- 能被中断的获取锁
- 超时获取锁
AbstractQueuedSynchronizer(AQS)详解
ReentrantLock.java
//简单案例
MyLock2.java
公平锁
公平是针对获取而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序。
实质是先进先出
FirLock
- FairSync 顺序执行,高并发性能损耗
- NonFairSync 每个线程获取锁的几率都是相同,减少JVM调度时间
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
读写锁
- ReentrantReadWriteLock 多个读锁不互斥,读锁与写锁互斥,多个写锁互斥
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
锁降级
•锁降级是指写锁降级为读锁。
•在写锁没有释放的时候,获取到读锁,再释放写锁
private volatile boolean isUpdate;
public void readWrite() {
r.lock(); // 为了保证isUpdate能够拿到最新的值
if (isUpdate) {
r.unlock();
w.lock();
map.put("xxx", "xxx");
r.lock(); //锁降级 利用读写锁互斥,在写锁竞争写之前先读锁
w.unlock();
}
Object obj = map.get("xxx");
System.out.println(obj);
r.unlock();
}
锁升级
•把读锁升级为写锁
•在读锁没有释放的时候,获取到写锁,再释放读锁
线程安全性问题简单总结
出现线程安全性问题的条件
•在多线程的环境下
•必须有共享资源
•对共享资源进行非原子性操作
解决线程安全性问题的途径
•synchronized (偏向锁,轻量级锁,重量级锁)
•volatile
•JDK提供的原子类
使用Lock(共享锁,排它锁)
认识的“*锁”
•偏向锁
•轻量级锁
•重量级锁
•重入锁
•自旋锁
•共享锁
•独占锁
•排他锁
•读写锁
•公平锁
•非公平锁
•死锁
•活锁
同步容器与并发容器
- 同步容器,性能比较差
- Vector(线程安全)
- ArrayList(线程不安全)==> 转化成线程安全
Collections.synchronizedCollection(list)
- Hashtable ==> HashMap
- 并发容器
- CopyOnWriteArrayList
- ConcurrentHashMap 分区加锁 来实现
CopyOnWriteArrayList 原理和使用
原理:读不加锁, 写new一个List用于写,写完成List头指针指向新的List
使用: 每次写都copy一份,故写数据量大的时候对内存的消耗是非常大的,读操作比较多使用CopyOnWriteArrayList ,如果读很少写操作很多使用同步容器处理这件事性能会更好一点
非阻塞队列-并发容器ConcurrentLinkedQueu
原理与使用
并发安全的FIFO链表的队列
提示: size() 不是线程安全,导致调用的时候数量不准确容易变
阻塞队列BlockingQueue
- ArrayBlockingQueue
1)ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的.
2)LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的 有届链表队列
3)PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序. 无届队列
4)SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的.
并发容器ConcurrentHashMap
线程池
线程池概述
什么是线程池
为什么使用线程池
线程池的优势
- 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。
Executors提供四种线程池
- newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
- newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
- newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
创建一个线程池并提交线程任务
线程池源码解析
参数认识
corePoolSize : 线程池的基本大小,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
runnableTaskQueue:任务对列,用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
PriorityBlockingQueue:一个具有优先级得无限阻塞队列。
maximumPoolSize:线程池最大大小,线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常又帮助。
RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。
CallerRunsPolicy:只用调用者所在线程来运行任务。
DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
DiscardPolicy:不处理,丢弃掉。
当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
keepAliveTime :线程活动保持时间,线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
TimeUnit:线程活动保持时间的单位,可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
类中其他属性
// 线程池的控制状态:用来表示线程池的运行状态(整型的高3位)和运行的worker数量(低29位)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 29位的偏移量
private static final int COUNT_BITS = Integer.SIZE - 3;
// 最大容量(2^29 - 1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 线程运行状态,总共有5个状态,需要3位来表示(所以偏移量的29 = 32 - 3)
/**
* RUNNING : 接受新任务并且处理已经进入阻塞队列的任务
* SHUTDOWN : 不接受新任务,但是处理已经进入阻塞队列的任务
* STOP : 不接受新任务,不处理已经进入阻塞队列的任务并且中断正在运行的任务
* TIDYING : 所有的任务都已经终止,workerCount为0, 线程转化为TIDYING状态并且调用terminated钩子函数
* TERMINATED: terminated钩子函数已经运行完成
**/
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 可重入锁
private final ReentrantLock mainLock = new ReentrantLock();
// 存放工作线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 终止条件
private final Condition termination = mainLock.newCondition();
// 最大线程池容量
private int largestPoolSize;
// 已完成任务数量
private long completedTaskCount;
// 线程工厂
private volatile ThreadFactory threadFactory;
// 拒绝执行处理器
private volatile RejectedExecutionHandler handler;
// 线程等待运行时间
private volatile long keepAliveTime;
// 是否运行核心线程超时
private volatile boolean allowCoreThreadTimeOut;
// 核心池的大小
private volatile int corePoolSize;
// 最大线程池大小
private volatile int maximumPoolSize;
// 默认拒绝执行处理器
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 || // 核心大小不能小于0
maximumPoolSize <= 0 || // 线程池的初始最大容量不能小于0
maximumPoolSize < corePoolSize || // 初始最大容量不能小于核心大小
keepAliveTime < 0) // keepAliveTime不能小于0
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// 初始化相应的域
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
提交任务
/*
* 进行下面三步
*
* 1. 如果运行的线程小于corePoolSize,则尝试使用用户定义的Runnalbe对象创建一个新的线程
* 调用addWorker函数会原子性的检查runState和workCount,通过返回false来防止在不应
* 该添加线程时添加了线程
* 2. 如果一个任务能够成功入队列,在添加一个线城时仍需要进行双重检查(因为在前一次检查后
* 该线程死亡了),或者当进入到此方法时,线程池已经shutdown了,所以需要再次检查状态,
* 若有必要,当停止时还需要回滚入队列操作,或者当线程池没有线程时需要创建一个新线程
* 3. 如果无法入队列,那么需要增加一个新线程,如果此操作失败,那么就意味着线程池已经shut
* down或者已经饱和了,所以拒绝任务
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取线程池控制状态
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // worker数量小于corePoolSize
if (addWorker(command, true)) // 添加worker
// 成功则返回
return;
// 不成功则再次获取线程池控制状态
c = ctl.get();
}
// 线程池处于RUNNING状态,将用户自定义的Runnable对象添加进workQueue队列
if (isRunning(c) && workQueue.offer(command)) {
// 再次检查,获取线程池控制状态
int recheck = ctl.get();
// 线程池不处于RUNNING状态,将自定义任务从workQueue队列中移除
if (! isRunning(recheck) && remove(command))
// 拒绝执行命令
reject(command);
else if (workerCountOf(recheck) == 0) // worker数量等于0
// 添加worker
addWorker(null, false);
}
else if (!addWorker(command, false)) // 添加worker失败
// 拒绝执行命令
reject(command);
}
addWorker
原子性的增加workerCount。
将用户给定的任务封装成为一个worker,并将此worker添加进workers集合中。
启动worker对应的线程,并启动该线程,运行worker的run方法。
回滚worker的创建动作,即将worker从workers集合中删除,并原子性的减少workerCount。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) { // 外层无限循环
// 获取线程池控制状态
int c = ctl.get();
// 获取状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && // 状态大于等于SHUTDOWN,初始的ctl为RUNNING,小于SHUTDOWN
! (rs == SHUTDOWN && // 状态为SHUTDOWN
firstTask == null && // 第一个任务为null
! workQueue.isEmpty())) // worker队列不为空
// 返回
return false;
for (;;) {
// worker数量
int wc = workerCountOf(c);
if (wc >= CAPACITY || // worker数量大于等于最大容量
wc >= (core ? corePoolSize : maximumPoolSize)) // worker数量大于等于核心线程池大小或者最大线程池大小
return false;
if (compareAndIncrementWorkerCount(c)) // 比较并增加worker的数量
// 跳出外层循环
break retry;
// 获取线程池控制状态
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) // 此次的状态与上次获取的状态不相同
// 跳过剩余部分,继续循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// worker开始标识
boolean workerStarted = false;
// worker被添加标识
boolean workerAdded = false;
//
Worker w = null;
try {
// 初始化worker
w = new Worker(firstTask);
// 获取worker对应的线程
final Thread t = w.thread;
if (t != null) { // 线程不为null
// 线程池锁
final ReentrantLock mainLock = this.mainLock;
// 获取锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 线程池的运行状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || // 小于SHUTDOWN
(rs == SHUTDOWN && firstTask == null)) { // 等于SHUTDOWN并且firstTask为null
if (t.isAlive()) // precheck that t is startable // 线程刚添加进来,还未启动就存活
// 抛出线程状态异常
throw new IllegalThreadStateException();
// 将worker添加到worker集合
workers.add(w);
// 获取worker集合的大小
int s = workers.size();
if (s > largestPoolSize) // 队列大小大于largestPoolSize
// 重新设置largestPoolSize
largestPoolSize = s;
// 设置worker已被添加标识
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
if (workerAdded) { // worker被添加
// 开始执行worker的run方法
t.start();
// 设置worker已开始标识
workerStarted = true;
}
}
} finally {
if (! workerStarted) // worker没有开始
// 添加worker失败
addWorkerFailed(w);
}
return workerStarted;
}
执行任务
runWorker函数中会实际执行给定任务(即调用用户重写的run方法),并且当给定任务完成后,会继续从阻塞队列中取任务,直到阻塞队列为空(即任务全部完成)。在执行给定任务时,会调用钩子函数,利用钩子函数可以完成用户自定义的一些逻辑。在runWorker中会调用到getTask函数和processWorkerExit钩子函数
final void runWorker(Worker w) {
// 获取当前线程
Thread wt = Thread.currentThread();
// 获取w的firstTask
Runnable task = w.firstTask;
// 设置w的firstTask为null
w.firstTask = null;
// 释放锁(设置state为0,允许中断)
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // 任务不为null或者阻塞队列还存在任务
// 获取锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) || // 线程池的运行状态至少应该高于STOP
(Thread.interrupted() && // 线程被中断
runStateAtLeast(ctl.get(), STOP))) && // 再次检查,线程池的运行状态至少应该高于STOP
!wt.isInterrupted()) // wt线程(当前线程)没有被中断
wt.interrupt(); // 中断wt线程(当前线程)
try {
// 在执行之前调用钩子函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 运行给定的任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行完后调用钩子函数
afterExecute(task, thrown);
}
} finally {
task = null;
// 增加给worker完成的任务数量
w.completedTasks++;
// 释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 处理完成后,调用钩子函数
processWorkerExit(w, completedAbruptly);
}
}
此函数用于从workerQueue阻塞队列中获取Runnable对象,由于是阻塞队列,所以支持有限时间等待(poll)和无限时间等待(take)。在该函数中还会响应shutDown和、shutDownNow函数的操作,若检测到线程池处于SHUTDOWN或STOP状态,则会返回null,而不再返回阻塞队列中的Runnalbe对象。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) { // 无限循环,确保操作成功
// 获取线程池控制状态
int c = ctl.get();
// 运行的状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 大于等于SHUTDOWN(表示调用了shutDown)并且(大于等于STOP(调用了shutDownNow)或者worker阻塞队列为空)
// 减少worker的数量
decrementWorkerCount();
// 返回null,不执行任务
return null;
}
// 获取worker数量
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 是否允许coreThread超时或者workerCount大于核心大小
if ((wc > maximumPoolSize || (timed && timedOut)) // worker数量大于maximumPoolSize
&& (wc > 1 || workQueue.isEmpty())) { // workerCount大于1或者worker阻塞队列为空(在阻塞队列不为空时,需要保证至少有一个wc)
if (compareAndDecrementWorkerCount(c)) // 比较并减少workerCount
// 返回null,不执行任务,该worker会退出
return null;
// 跳过剩余部分,继续循环
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 等待指定时间
workQueue.take(); // 一直等待,直到有元素
if (r != null)
return r;
// 等待指定时间后,没有获取元素,则超时
timedOut = true;
} catch (InterruptedException retry) {
// 抛出了被中断异常,重试,没有超时
timedOut = false;
}
}
}
processWorkerExit函数是在worker退出时调用到的钩子函数,而引起worker退出的主要因素如下
阻塞队列已经为空,即没有任务可以运行了。
调用了shutDown或shutDownNow函数
此函数会根据是否中断了空闲线程来确定是否减少workerCount的值,并且将worker从workers集合中移除并且会尝试终止线程池。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果被中断,则需要减少workCount // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// 获取可重入锁
final ReentrantLock mainLock = this.mainLock;
// 获取锁
mainLock.lock();
try {
// 将worker完成的任务添加到总的完成任务中
completedTaskCount += w.completedTasks;
// 从workers集合中移除该worker
workers.remove(w);
} finally {
// 释放锁
mainLock.unlock();
}
// 尝试终止
tryTerminate();
// 获取线程池控制状态
int c = ctl.get();
if (runStateLessThan(c, STOP)) { // 小于STOP的运行状态
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty()) // 允许核心超时并且workQueue阻塞队列不为空
min = 1;
if (workerCountOf(c) >= min) // workerCount大于等于min
// 直接返回
return; // replacement not needed
}
// 添加worker
addWorker(null, false);
}
}
关闭线程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查shutdown权限
checkShutdownAccess();
// 设置线程池控制状态为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断空闲worker
interruptIdleWorkers();
// 调用shutdown钩子函数
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终止
tryTerminate();
}
final void tryTerminate() {
for (;;) { // 无限循环,确保操作成功
// 获取线程池控制状态
int c = ctl.get();
if (isRunning(c) || // 线程池的运行状态为RUNNING
runStateAtLeast(c, TIDYING) || // 线程池的运行状态最小要大于TIDYING
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // 线程池的运行状态为SHUTDOWN并且workQueue队列不为null
// 不能终止,直接返回
return;
if (workerCountOf(c) != 0) { // 线程池正在运行的worker数量不为0 // Eligible to terminate
// 仅仅中断一个空闲的worker
interruptIdleWorkers(ONLY_ONE);
return;
}
// 获取线程池的锁
final ReentrantLock mainLock = this.mainLock;
// 获取锁
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 比较并设置线程池控制状态为TIDYING
try {
// 终止,钩子函数
terminated();
} finally {
// 设置线程池控制状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 释放在termination条件上等待的所有线程
termination.signalAll();
}
return;
}
} finally {
// 释放锁
mainLock.unlock();
}
// else retry on failed CAS
}
}
private void interruptIdleWorkers(boolean onlyOne) {
// 线程池的锁
final ReentrantLock mainLock = this.mainLock;
// 获取锁
mainLock.lock();
try {
for (Worker w : workers) { // 遍历workers队列
// worker对应的线程
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) { // 线程未被中断并且成功获得锁
try {
// 中断线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
// 释放锁
w.unlock();
}
}
if (onlyOne) // 若只中断一个,则跳出循环
break;
}
} finally {
// 释放锁
mainLock.unlock();
}
}
参考
StampedLock
- 乐观锁(tryOptimisticRead),读写不互斥
- 悲观锁,读写互斥
怎么保证读写同步
读后发现有写重新读取
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
//下面看看乐观读锁案例
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
double currentX = x, currentY = y; //将两个字段读入本地局部变量
if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
stamp = sl.readLock(); //如果没有,我们再次获得一个读悲观锁
try {
currentX = x; // 将两个字段读入本地局部变量
currentY = y; // 将两个字段读入本地局部变量
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
//下面是悲观读锁案例
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
if (ws != 0L) { //这是确认转为写锁是否成功
stamp = ws; //如果成功 替换票据
x = newX; //进行状态改变
y = newY; //进行状态改变
break;
}
else { //如果不能成功转换为写锁
sl.unlockRead(stamp); //我们显式释放读锁
stamp = sl.writeLock(); //显式直接进行写锁 然后再通过循环再试
}
}
} finally {
sl.unlock(stamp); //释放读锁或写锁
}
}
}
阶段二:多线程设计模式详细介绍
七钟单例模式
饿汉式;
懒汉式;
synchronize method
synchronize block
volatile 解决重排序
Holder方式 【推荐】
public class SingletonObject6 { private SingletonObject6() {} private static class InstanceHolder { // static 只执行一次,并严格保证按照顺序执行 private final static SingletonObject6 instance = new SingletonObject6(); } public static SingletonObject6 getInstance() { return InstanceHolder.instance; } }
枚举
public class SingletonObject7 { private SingletonObject7() {} private enum Singleton { INSTANCE; private final SingletonObject7 instance; Singleton() { instance = new SingletonObject7(); } public SingletonObject7 getInstance() { return instance; } } public static SingletonObject7 getInstance() { return Singleton.INSTANCE.getInstance(); } public static void main(String[] args) { IntStream.rangeClosed(1, 100) .forEach(i -> new Thread(String.valueOf(i)) { @Override public void run() { System.out.println(SingletonObject7.getInstance()); } }.start()); } }
WaitSet
- 所有的对象都会有一个wait set,用来存放调用了该对象wait方法之后进入block状态线程
- 线程被notify之后,不一定立即得到执行
- 线程从wait set中被唤醒顺序不一定是FIFO.
- 线程被唤醒后,必须重新获取锁
volatile 最好的例子
缓存不一致问题
内存模型,由于cpu和内存频率不同存在cpu 高速缓存
由于Java优化,检测到线程没有写操作,就不需要去主线程中拿更新变量
解决缓存不一致问题
i = 1;
//两个线程同步执行
i= i+1;
cpu1 -> main memory -> i -> cache i+1 -> cache(2) -> main memory(2)
cpu2 -> main memory -> i -> cache i+1 -> cache(2) -> main memory(2)
两种方案
- 给数据总线(数据总线,地址总线,控制总线)加锁
- CPU高速缓存一致性协议(Intel MESI)
核心思想:
- 当CPU写入数据的时候,如果发现该变量被共享(其他CPU存在该变量的副本),会发出一个信号,通知其他CPU该变量缓存无效;
- 其他的CPU访问该变量的时候,重新到主内存中获取
指令重排&happens-before规则
三个重要概念
原子性,可见性,有序性
原子性: 对基本数据类型的变量读取和赋值是保证原子性的,要么成功,要么失败,这些操作不可中断
a = 10; 原子性 b = a; 不满足, 1. read a ; 2. assign b; c++; 不满足 1. read c; 2. add; 3. assign to c; c = c+1; 不满足 1. read c; 2. add; 3. assign to c;
可见性
有序性
volatile 关键字作用
- 保证重排序循序;
- 强制对缓存的修改立刻更新到主存;
- 如果有写操作,其他CPU中的缓存失效;
volatile实践
- volatile读操作和非volatile一样,写操作开销比读多得多,但是volatile 的总开销仍然要比锁获取低
==> 如果读操作的次数要远远超过写操作,与锁相比,volatile 变量通常能够减少同步的性能开销。
参考
- Java多线程(一)之volatile深入分析
- 正确使用 Volatile 变量 【重点】 EverNote有笔记
观察者模式
design_pattern/ObserverClient.java
一个人通过
共享资源
design_pattern/gate.java
读写锁分离
design_pattern/chapter03_readwrite_lock/ReaderWorker.java
不可变类 Immutable
chapter 07
作用: 多线不用加锁,提高程效率
简单的不可变操作
- 返回List用
Collections.unmodifiableList(list)
- 返回对象用clone
Future Design Pattern
设计逻辑: 在不堵塞主线程的情况下,主线程做其他的事情,然后在去取结果
chapter08
Future ->代表的是未来的一个凭据
FutureTask ->将你的调用逻辑进行了隔离
FutureService ->桥接 Future和 FutureTask
Guarded Suspension design pattern
chapter09
实际生活中的例子
等我一下,我一会就来 ==> 任务太多,把任务缓存到队列中,后面在处理
我正在厨房做饭
快递员敲门,说你的快递来了,要求开门
The Thread-Specific Storage
线程保险箱, chapter11
- ThreadLocal 早期实现原理
Map<Thread,T> storage = new HassMap<>();
应用在同一线程中做上下文设计模式
@Override public void run() { queryAction.execute(); System.out.println("The name query successful"); httpAction.execute(); System.out.println("The card id query successful"); //Context 为ThreadLocal 实现的单例模式用于run内部数据储存 Context context = ActionContext.getActionContext().getContext(); System.out.println("The Name is " + context.getName() + " and CardId " + context.getCardId()); }
Balking pattern
执行到某个状态,终端执行
public synchronized void save() throws IOException {
if (!changed) { // 设计模式核心重点,当发现事情已经被其他线程干完,不在往下执行
return;
}
doSave();
this.changed = false;
}
Producer and Consumer pattern
BlockingQueue 实现
private final BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);
//生产
blockingQueue.put(1);
//消费
blockingQueue.take();
Semaphore 实现
final Semaphore notFull = new Semaphore(10); //计数
final Semaphore notEmpty = new Semaphore(0); //计数
final Semaphore mutex = new Semaphore(1); // 锁的效果
// Producer.java
try {
notFull.acquire();
mutex.acquire();
count++;
System.out.println(Thread.currentThread().getName()
+ "生产者生产,目前总共有" + count);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mutex.release();
notEmpty.release();
}
//Cusomer.java
try {
notEmpty.acquire();
mutex.acquire();
count--;
System.out.println(Thread.currentThread().getName()
+ "消费者消费,目前总共有" + count);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mutex.release();
notFull.release();
}
synchronized
public synchronized void push () {
count ++;
notifyAll();
}
public synchronized void take () {
count --;
notifyAll();
}
ReentrantLock实现
//创建一个锁对象
private Lock lock = new ReentrantLock();
//创建两个条件变量,一个为缓冲区非满,一个为缓冲区非空
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
//Producer.java
lock.lock();
try{
while (count == FULL) {
notFull.await();
}
count ++;
//唤醒消费者
notEmpty.signal();
}finally {
lock.unlock();
}
//Consumer.java
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
notFull.signal();
} finally {
lock.unlock();
}
public void put(final Message message) throws InterruptedException {
synchronized (queue) { // multi-thread lock
while (queue.size() > limit) { // out over limit wait
queue.wait();
}
queue.addLast(message); // first in last out
queue.notifyAll();
}
}
Count Donw pattern
JDK CountDownLatch
CountDownLatch latch = new CountDownLatch(1); latch.countDown(); latch.await();
模仿
public CountDown(int total) { this.total = total; } public void down() { synchronized (this) { this.counter++; this.notifyAll(); } } public void await() throws InterruptedException { synchronized (this) { while (counter != total) { this.wait(); } } }
Two-Phase Termination Design Pattern
线程执行完毕,在线程中关闭一次,还需要手动清理一下资源
Work Thread Design Pattern
流水线工人,流水线一直开着,有固定工人组装,运输工人负责运输添加流水线的货物
Active Object
(Chapter18)接受异步消息的主动方法
阶段三:并发包详情介绍
1. 原子类型详细讲解
1.1 Atomic*
- volatile 修饰的变量
- CAS算法, 也就是CPU级别的同步命令进程间通信
1.2 AtomicReference ABA问题
缺点:
CAS轻量级锁,带来的一个严重问题,ABA问题, 问题描述,当T2从A回到A状态可以有各种条件的变化,而T1的判断条件可能导致无法感知到T2的多种状态变化而产生问题
T1 T2
A->C A->B->A
解决方案是加版本标签,官方提供的解决类AtomicStampedReference
AtomicLong 如果是64位的情况下可能会有高位低位分别传输的问题导致非原子性;
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
1.2 Atomic*FieldUpdater使用场景
想要类的属性操作具备原子性
- volatile
- 非private,protected(保证可访问)
- 类型一致
不想使用锁(包括显式锁或者重量级锁synchronized)
- 大量需要原子类型修饰的对象,相对比较耗费内存,案例ConcurrentSkipListMap
Node{ pre:Node; next:Node; value:Objecct; }
1.3 Unsafe
Unsafe是Java中一个底层类,包含了很多基础的操作,比如数组操作、对象操作、内存操作、CAS操作、线程(park)操作、栅栏(Fence)操作,JUC包、一些三方框架都使用Unsafe类来保证并发安全。
Java is a safe programming language and prevents programmer from doing a lot of stupid mistakes, most of which based on memory management. But, there is a way to do such mistakes intentionally, using Unsafe class
- park/unpark LockSupport 依赖park/unpark 实现
Java 调用C/C++ 代码
Hello.java
public class Hello {
static {
System.loadLibrary("hello");
}
private native void hi();
public static void main(String[] args) {
new Hello().hi();
}
}
编译Hello.java javac Hello.java
生成Jni头文件 javah -jni Hello
编写C代码
Hello.c
#include <jni.h>
#include <stdio.h>
/* Header for class Hello */
#ifndef _Included_Hello
#define _Included_Hello
#ifdef __cplusplus
extern "C" {
#endif
/*
* Class: Hello
* Method: hi
* Signature: ()V
*/
JNIEXPORT void JNICALL Java_Hello_hi
(JNIEnv * env, jobject o){
print("Hello Jni\n");
}
#ifdef __cplusplus
}
#endif
#endif
编译Cgcc -fPIC -D_REENETRANT -I "$JAVA_HOME/include" -I "$JAVA_HOME/include/linux" -c Hello.c
生成连接库 gcc -shared Hello.o -o libhello.so
运行java Hello 如下异常,需配置LD_LIBRARY_PATH
为当前路径
export LD_LIBRARY_PATH=./lib:$LD_LIBRARY_PATH
Exception in thread "main" java.lang.UnsatisfiedLinkError: no hello in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at Hello.<clinit>(Hello.java:3)
unsafe常用方法
/**
* 绕过初始化
*/
Simple simple2 = (Simple) unsafe.allocateInstance(Simple.class);
System.out.println(simple2.getValue());
Guard guard = new Guard();
guard.work();
/**
* 直接修改guard内存
*/
Field field = guard.getClass().getDeclaredField("ACCESS_ALLOWED");
unsafe.putInt(guard, unsafe.objectFieldOffset(field), 42);
guard.work();
/**
* 定义类
*/
byte[] bytes = loadClassContent();
Class<?> aClass = unsafe.defineClass(null, bytes, 0, bytes.length,null,null);
int v = (int) aClass.getMethod("get").invoke(aClass.newInstance(), null);
System.out.println("defineClass:" + v);
/**
* 获取类的size
*/
System.out.println(sizeOf(new Simple()));
2. Java并发包工具
2.1 CountDownLatch
- 并行转窜行,等到最后结果,离散平行任务增加逻辑层次关系
- 有一个任务执行的时候发现可以交给其他线程执行;
2.2 CyclicBarrier
API
CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
@Override public void run() {
//当所有线程线程写入操作完毕之后,所有线程就继续进行后续的操作了
}
});
barrier.getNumberWaiting(); //获取当前等待数量
barrier.getParties();
barrier.reset(); // 当getNumberWaiting !=0 的时候, reset会导致正在等待的线程报BrokenBarrierException 异常
即reset == initial == finish;
小结
CountDownLatch VS CyclicBarrier
- CountDownLatch 不能reset,而CyclicBarrier是可以循环使用的;
- latch 工作线程互不关系, barrier工作线程必须等到
2.3 Semaphore
官方demo
class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
// Not a particularly efficient data structure; just for demo
protected Object[] items = ... whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
acquireUninterruptibly 不处理中断
acquire(int permits) 同时获取多个中断
release(int permits) 同时释放多个中断
drainPermits() Acquires and returns all permits that are immediately available.
2.4 ReentrantLock
ReentrantLock API
- getOwner() 尝试拿到这个锁,那不到的时候尝试中断
ConditionObject 具体实现类
维护一个单向列表,await向等待队列插入Node,叫醒向等待队列头部移除一个节点,放人同步队列中竞争CPU资源
- 同步队列
- 等待队列
2.5 Exchanger
注意:
- exchanger的值指向的是同一个堆内存,如果修改两边都会变动,如操作某些List => 代码ExchangerExample。
官网Demo
class FillAndEmpty {
Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
DataBuffer initialEmptyBuffer = ... a made-up type
DataBuffer initialFullBuffer = ...
class FillingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialEmptyBuffer;
try {
while (currentBuffer != null) {
addToBuffer(currentBuffer);
if (currentBuffer.isFull())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ... }
}
}
class EmptyingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialFullBuffer;
try {
while (currentBuffer != null) {
takeFromBuffer(currentBuffer);
if (currentBuffer.isEmpty())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ...}
}
}
void start() {
new Thread(new FillingLoop()).start();
new Thread(new EmptyingLoop()).start();
}
}
博文:【死磕Java并发】—–J.U.C之并发工具类:Exchanger
2.6 ReadWriteLock
2.7 Condition
2.8 StampedLock
读的时候也能写
最简单案例
private static void read() {
long stamped = -1;
try {
stamped = lock.readLock();
} finally {
lock.unlockRead(stamped);
}
}
private static void write() {
long stamped = -1;
try {
stamped = lock.writeLock();
} finally {
lock.unlockWrite(stamped);
}
}
官网demo
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
if (!sl.validate(stamp)) {
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) {
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
}
else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}
}
2.9 Forkjoin
任务分解框架
- RecursiveTask 有返回值
- RecursiveAction 无返回值
3.0 Phaser
通过 phaser.register() 可以动态添加;
可以循环使用
arriveAndAwaitAdvance Arrives at this phaser and awaits others.
arriveAndDeregister 动态的–;我退出你 们不用等我了
/** * Arrives at this phaser and deregisters from it without waiting * for others to arrive. Deregistration reduces the number of * parties required to advance in future phases. If this phaser * has a parent, and deregistration causes this phaser to have * zero parties, this phaser is also deregistered from its parent. * * <p>It is a usage error for an unregistered party to invoke this * method. However, this error may result in an {@code * IllegalStateException} only upon some subsequent operation on * this phaser, if ever. * * @return the arrival phase number, or a negative value if terminated * @throws IllegalStateException if not terminated and the number * of registered or unarrived parties would become negative */ public int arriveAndDeregister() { return doArrive(ONE_DEREGISTER); }
arrive
/** * Arrives at this phaser, without waiting for others to arrive. * * <p>It is a usage error for an unregistered party to invoke this * method. However, this error may result in an {@code * IllegalStateException} only upon some subsequent operation on * this phaser, if ever. * * @return the arrival phase number, or a negative value if terminated * @throws IllegalStateException if not terminated and the number * of unarrived parties would become negative */ public int arrive() { return doArrive(ONE_ARRIVAL); }
- bulkRegister 注册多个
状态判断
- getRegisteredParties
- getArrivedParties
- getUnarrivedParties
控制
//根据条件控制是否终止Phaser
final Phaser phaser = new Phaser(2){
@Override protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("phase:" + phase + "\tregisteredParties :" + registeredParties);
//return super.onAdvance(phase, registeredParties);
return true/*终止*/, false /*不终止*/;
}
};
强制终止
phaser.forceTermination();
awaitAdvance(phase phase) 案例
终止等待的两个条件
- current phase is not equal to the given phase value
- this phaser is terminated.
demo: F:\Source\javaBase\src\main\java\com\sample\current\phaser\PhaserExample6.java
带有等待的中断
- awaitAdvanceInterruptibly
- phaser.awaitAdvanceInterruptibly(0,3,TimeUnit.SECONDS); 带有超时时间
附录
1. CountDownLatch、CyclicBarrier和Semaphore
3. Executors框架
ThreadPoolExecutor
3.1 shutdown分析
shutdown
/**
* shutdown
*
* -- condition ---
* 20 threads
* 10 threads work
* 10 idle
*
* shutdown invoked
* -- result --
* 1. 10 waiting to finished the work.
* 2. 10 interrupted the idle works.
* 3. 20 idle threads will exist.
*/
shutdownNow
/**
* shutdownNow
* -- condition ---
* 10 threads queue elements 10
* 10 running
* 10 stored in the blocking queue.
*-- result --
* 1. return list<Runnable> remain 10 un handle runnable in the queue.
* 2. interrupt all of threads in the pool.
*/
调用shutdown后一些状态变化
executorService.isShutdown(); // ture
executorService.isTerminated(); // false 还有任务在执行
((ThreadPoolExecutor) executorService).isTerminating(); // true
3.2 Executors.newWorkStealingPool
- using all available processors
- 线程工作模式为“Work-Stealing Algorithm” ,当任务队列完成后不是进入等待状态,而是主动窃取别的线程任务来做;
- 任务处理完自动停止线程池;
3.3 Schedule 的实现
Timer 存在问题是当任务执行超时时,影响下一个任务执行时间
crontab (linux), 每隔一分钟执行run.sh 脚本
run.sh
#!/bin/sh echo `date "+%Y-%m-%d %H:%M:%S"`
crontab -e
edit user’s crontab#每一分钟执行一次 * * * * * sh //root/document/scripts/run.sh >> /root/document/scripts/run.log
crontab 简单说明 vi
/etc/crontab
SHELL=/bin/bash <==使用哪种 shell 接口 path="/sbin:/bin:/usr/sbin:/usr/bin" <="=执行文件搜寻路径" mailto="root" stdout,以 email 将数据送给谁 # example of job definition: .---------------- minute (0 - 59) | .------------- hour 23) .---------- day month (1 31) .------- 12) or jan,feb,mar,apr ... .---- week 6) (sunday="0" 7) sun,mon,tue,wed,thu,fri,sat * user-name command to be executed
quartz(推荐使用)
石英钟
依赖
<!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz --> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.2.1</version> </dependency>
最简单demo
JobDetail jobDetail = JobBuilder.newJob(SimpleJob.class) .withIdentity("job1", "group1") .build(); Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("trigger", "group1") .withSchedule(CronScheduleBuilder.cronSchedule("0/5 * * * * ?")) .build(); Scheduler scheduler = new StdSchedulerFactory().getScheduler(); scheduler.start(); scheduler.scheduleJob(jobDetail, trigger);
SimpleJob.java
public class SimpleJob implements Job { @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { SimpleDateFormat dt = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss"); System.out.println("======" + dt.format(new Date()) + "======"); } }
ScheduledExecutorService
如果执行任务时长超过周期,那么周期按照任务时长算
3.4 异常处理
方案一:增加守护线程并设置异常捕获类setUncaughtExceptionHandler
,缺点当获取不到详细的线程状态
方案二:通过自定义Runnable捕捉异常,模板如下:
public abstract class MyTask implements Runnable {
protected final String name ;
public MyTask(String name) {
this.name = name;
}
@Override public void run() {
try {
this.doInit();
this.doExecute();
this.done();
} catch (Throwable e) {
this.error(e);
}
}
protected abstract void error(Throwable e);
protected abstract void done();
protected abstract void doExecute();
protected abstract void doInit();
}
3.5 拒绝策略(RejectedExecutionHandler)
3.6 API详解
F:\Source\javaBase\src\main\java\com\sample\current\executor\ExecutorServiceExampleApi.java
Core Thread Timeout 设置核心线程超时时间,超时关闭核心线程池
executorServices.setKeepAliveTime(10, TimeUnit.SECONDS);
executorServices.allowCoreThreadTimeOut(true);
带返回值的方法
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
//代超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* return {@code false}
* if all core threads have already been started.
*/
boolean prestartCoreThread();
prestartAllCoreThreads
预启动所有的coreThread
before/after
在每个任务启动开始和结束时调用
@Override protected void beforeExecute(Thread t, Runnable r) {
System.out.println("--before--");
System.out.println("init the " + ((MyRunnable) r).getData());
}
@Override protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
System.out.println("--after--");
if (null == t) {
System.out.println("successful " + ((MyRunnable) r).getData());
}else {
t.printStackTrace();
}
}
3.7 Future&Callable
future.get() interrupt 是谁?
中断的是调用的线程,但是中断后runnable还能继续工作
future timeout 是否会继续执行?
future.get(5, TimeUnit.SECONDS);
future中断Runnable后会继续执行,如果需要终止任务需要额外处理 如kill -9 applicationId
3.8 CompletionService
解决一个问题,如下,多线程futureList.get
的时候不知道随先完成谁后完成(谁先谁后
),导致执行快的等执行慢的。
List<Future<Integer>> futureList = executorService.invokeAll(callableList);
System.out.println(futureList.get(0).get());
System.out.println(futureList.get(1).get());
completionService.take()// 阻塞,知道有任务完成可以获取结果
completionService.poll()//poll直接返回,不阻塞。但是没有完成的任务则返回null
completionService.poll(5,TimeUnit.SECONDS) //阻塞等待指定时间,如果有完成结果返回,没有的直接返回null
获取最先完成的任务
while ((future = completionService.take()) != null) {
System.out.println(future.get());
}
3.9 CompletableFuture
入门体验
ExecutorService + future 当任务执行完成的时候通知调动者
解决future的缺点:
- 多个future不知道谁先执行完
- future需要主动去拿而且future.get() 会阻塞
- future获取的结果在放入线程池需要new callback,达不到极连
解决案例:
F:\Source\javaBase\src\main\java\com\sample\current\executor\CompletableFutureExample.java
知识点:
- CompletableFuture 默认的线程是守护线程当调用者退出,默认直接退出;
多个CompletableFuture的组合
实战经验
1. 任务超时无法shutdown
当有runnable访问某个资源,网络请求,某个db特别慢任务过重的时候,shutdown/shutdownNow 无法使用
//新建线程池时,使用守护线程的方式
new ThreadFactory() {
@Override public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
}
}
2. ExecutorService中的陷阱
线程池中执行完一段时间(20s),shutdown线程,然后记录下未完成的任务
答案: F:\Source\javaBase\src\main\java\com\sample\current\executor\ComplexExample.java
使用CompletionService + shutdownNow 返回未完成值的方式
, 容易犯的问题:
- ExecutorCompletionService 把runable重新封装,shutdownNow 返回私有类无法使用
- shutdownNow 没有包括中断的任务
小结
4. 并发集合
常用的并发集合类
ConcurrentHashMap
ConcurrentSkipListMap
ConcurrentLinkedQueue
ConcurrentLinkedDeque
CopyOnWriteArraySet
CopyOnWriteArrayList
ArrayBlockingQueue
PriorityBlockingQueue
LinkedBlockingQueue
SynchronousQueue
DelayQueue
LinkedTransferQueue
1. LinkedList 实现
LinkedList Binary search treee
1. 单向LinkedList Stack B+ tree
2. 单项有序 ==> Queue 由树的平衡性 Red Black tree
3. 双向 Binary Tree ==> AVL
4. 双向有序 2-3-4 Tree
Spary's Tree
- 无序 F:\Source\javaBase\src\main\java\com\sample\current\collections\LinkedList.java
- 有序 F:\Source\javaBase\src\main\java\com\sample\current\collections\PriorityLinkedList.java
2. SkipList跳表数据结构实现
F:\Source\javaBase\src\main\java\com\sample\current\collections\SimpleSkipList.java
跳表的实现
跳表的技术特点
- 一种随机的数据结构
- 最底层包含整个跳表的所有元素
- 典型的空间换时间的算法
- 由于对比效率高,查找删除快
3. ArrayBlockingQueue
插入队列方法(主要区分是队列满时应该怎么反应,异常,等待,返回值)
方法名称 | 参数描述 | 返回值 | 异常信息 |
---|---|---|---|
add | 插入对象 | ture代表插入成功,如果队列已满,抛出异常 | IllegalStateException(“Queue full”)异常——AbstractQueue |
offer | 插入对象 | true代表插入成功,队列已满直接返回false | 无 |
offer | 插入对象,等待时间 | true代表插入成功,队列已满等待 一段时间后仍没有空间则返回false |
无 |
put | 插入对象 | true代表插入成功,如果队列已满则阻塞 线程等待队列为空的时候插入 |
获取队列内容
方法名称 | 参数描述 | 返回值 | 异常信息 |
---|---|---|---|
remove | 无 | 返回队首数据并移除,队列已空则抛出异常信息 | NoSuchElementException()异常——AbstractQueue |
poll | 无 | 列不为空时返回队首值 并移除;队列为空时返回null。非阻塞 立即返回。 |
|
poll | 等待时间 | 设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值 | |
take | 无 | 队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。 |
参考:
JDK 新特性
LongAddr
Atomctic 的不同之处
- 竞争多个资源(拆分为Cell数组)
- 动态扩容
缺点:
LongAdder在统计的时候如果有并发更新,可能导致统计的数据有误差java
提供的方法
- add():增加指定的数值;
- increament():增加1;
- decrement():减少1;
- intValue()/floatValue()/doubleValue():得到最终计数后的结果
- sum():求和,得到最终计数结果
- sumThenReset():求和得到最终计数结果,并重置value。
阶段四: 并发深入探讨
第1节 锁
•轻量级锁
•重量级锁
•重入锁
•自旋锁
•共享锁
•独占锁
•排他锁
•读写锁
•公平锁
•非公平锁
•死锁
•活锁
1.1 死锁
Jconsole -> 线程-> 有
检测死锁
的工具
1.1.1 死锁的分类
1 顺序死锁,A调用
leftRight()
,B调用rightLeft()
(厕所内有坑,厕所外有纸,相互缺少)public class LeftRightDeadlock { private final Object left = new Object(); private final Object right = new Object(); public void leftRight() { // 得到left锁 synchronized (left) { // 得到right锁 synchronized (right) { doSomething(); } } } public void rightLeft() { // 得到right锁 synchronized (right) { // 得到left锁 synchronized (left) { doSomethingElse(); } } } }
2 动态死锁,进来锁定自己的账户,相互等待。
// 转账 public static void transferMoney(Account fromAccount, Account toAccount, DollarAmount amount) throws InsufficientFundsException { // 锁定汇账账户 synchronized (fromAccount) { // 锁定来账账户 synchronized (toAccount) { // 判余额是否大于0 if (fromAccount.getBalance().compareTo(amount) < 0) { throw new InsufficientFundsException(); } else { // 汇账账户减钱 fromAccount.debit(amount); // 来账账户增钱 toAccount.credit(amount); } } } }
- 如果两个线程同时调用
transferMoney()
- 线程A从X账户向Y账户转账
- 线程B从账户Y向账户X转账,产生死锁。
- 如果两个线程同时调用
3 协作对象之间
public class CooperatingDeadlock { // Warning: deadlock-prone! class Taxi { @GuardedBy("this") private Point location, destination; private final Dispatcher dispatcher; public Taxi(Dispatcher dispatcher) { this.dispatcher = dispatcher; } public synchronized Point getLocation() { return location; } // setLocation 需要Taxi内置锁 public synchronized void setLocation(Point location) { this.location = location; if (location.equals(destination)) // 调用notifyAvailable()需要Dispatcher内置锁 dispatcher.notifyAvailable(this); } public synchronized Point getDestination() { return destination; } public synchronized void setDestination(Point destination) { this.destination = destination; } } class Dispatcher { @GuardedBy("this") private final Set<Taxi> taxis; @GuardedBy("this") private final Set<Taxi> availableTaxis; public Dispatcher() { taxis = new HashSet<Taxi>(); availableTaxis = new HashSet<Taxi>(); } public synchronized void notifyAvailable(Taxi taxi) { availableTaxis.add(taxi); } // 调用getImage()需要Dispatcher内置锁 public synchronized Image getImage() { Image image = new Image(); for (Taxi t : taxis) // 调用getLocation()需要Taxi内置锁 image.drawMarker(t.getLocation()); return image; } } class Image { public void drawMarker(Point p) { } } }
Dispatcher
的getImage() 及Taxi
的setLocation有相互锁的可能性
1.1.2 避免死锁
- 固定加锁的顺序(针对锁顺序死锁)
- 开放调用(针对对象之间协作造成的死锁)
- 使用定时锁–>
tryLock()
2.1 固定锁顺序避免死锁
用HashCode 固定顺序
public class InduceLockOrder {
// 额外的锁、避免两个对象hash值相等的情况(即使很少)
private static final Object tieLock = new Object();
public void transferMoney(final Account fromAcct,
final Account toAcct,
final DollarAmount amount)
throws InsufficientFundsException {
class Helper {
public void transfer() throws InsufficientFundsException {
if (fromAcct.getBalance().compareTo(amount) < 0)
throw new InsufficientFundsException();
else {
fromAcct.debit(amount);
toAcct.credit(amount);
}
}
}
// 得到锁的hash值
int fromHash = System.identityHashCode(fromAcct);
int toHash = System.identityHashCode(toAcct);
// 根据hash值来上锁
if (fromHash < toHash) {
synchronized (fromAcct) {
synchronized (toAcct) {
new Helper().transfer();
}
}
} else if (fromHash > toHash) {// 根据hash值来上锁
synchronized (toAcct) {
synchronized (fromAcct) {
new Helper().transfer();
}
}
} else {// 额外的锁、避免两个对象hash值相等的情况(即使很少)
synchronized (tieLock) {
synchronized (fromAcct) {
synchronized (toAcct) {
new Helper().transfer();
}
}
}
}
}
}
2.2 开放调用避免死锁
如果在调用某个方法时不需要持有锁,那么这种调用被称为开放调用!
用局部代码块锁代替整个方法的锁
class CooperatingNoDeadlock {
@ThreadSafe
class Taxi {
@GuardedBy("this") private Point location, destination;
private final Dispatcher dispatcher;
public Taxi(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
}
public synchronized Point getLocation() {
return location;
}
public synchronized void setLocation(Point location) {
boolean reachedDestination;
// 加Taxi内置锁
synchronized (this) {
this.location = location;
reachedDestination = location.equals(destination);
}
// 执行同步代码块后完毕,释放锁
if (reachedDestination)
// 加Dispatcher内置锁
dispatcher.notifyAvailable(this);
}
public synchronized Point getDestination() {
return destination;
}
public synchronized void setDestination(Point destination) {
this.destination = destination;
}
}
@ThreadSafe
class Dispatcher {
@GuardedBy("this") private final Set<Taxi> taxis;
@GuardedBy("this") private final Set<Taxi> availableTaxis;
public Dispatcher() {
taxis = new HashSet<Taxi>();
availableTaxis = new HashSet<Taxi>();
}
public synchronized void notifyAvailable(Taxi taxi) {
availableTaxis.add(taxi);
}
public Image getImage() {
Set<Taxi> copy;
// Dispatcher内置锁
synchronized (this) {
copy = new HashSet<Taxi>(taxis);
}
// 执行同步代码块后完毕,释放锁
Image image = new Image();
for (Taxi t : copy)
// 加Taix内置锁
image.drawMarker(t.getLocation());
return image;
}
}
class Image {
public void drawMarker(Point p) {
}
}
}
2.3 使用定时锁
1.2 偏向锁/轻量级锁/重量级锁
1.3 独享锁 & 共享锁
独享锁(互斥锁):同时只能有一个线程获得锁比如,ReentrantLock 是互斥锁,ReadWriteLock 中的写锁是互斥锁。 共享锁:可以有多个线程同时获得锁。比如,Semaphore、CountDownLatch 是共享锁,ReadWriteLock 中的读锁是共享锁。
深度问题
Java的多路复用
实战
问题定位
数据+ 工具+ 经验
操作系统–> JVM 虚拟机(java提供的工具)–> 数据
jstack 使用
建议线程名字给的有意义,在排查问题时很有必要。
jps -v
列出所有Java进程,拿到pid=1523jstack 1523 > 1523.log
将dump文件输出到日志- 上传到http://heaphero.io/index.jsp 或使用MAT分析
Best practice
尽量不要在线程中做大量耗时的网络操作,如查询数据库(可以的话在一开始就将数据从从 DB 中查出准备好)【能在初始化做的耗时完成】
尽可能的减少多线程竞争锁。可以将数据分段,各个线程分别读取。
多利用
CAS+自旋
的方式更新数据,减少锁的使用。【待实践】private AtomicReference<Thread> atomicReference = new AtomicReference<>(); private void lock() { while (atomicReference.compareAndSet(null, currentThread())) ; } private void unlock() { atomicReference.compareAndSet(currentThread(), null); }
应用中加上
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp
参数,在内存溢出时至少可以拿到内存日志。线程池监控。如线程池大小、队列大小、最大线程数等数据,可提前做好预估。
ThreadPoolExecutor tpe = ((ThreadPoolExecutor) es); while (true) { System.out.println(); int queueSize = tpe.getQueue().size(); System.out.println("当前排队线程数:" + queueSize); int activeCount = tpe.getActiveCount(); System.out.println("当前活动线程数:" + activeCount); long completedTaskCount = tpe.getCompletedTaskCount(); System.out.println("执行完成线程数:" + completedTaskCount); long taskCount = tpe.getTaskCount(); System.out.println("总线程数:" + taskCount); Thread.sleep(3000); }
JVM 监控,可以看到堆内存的涨幅趋势,GC 曲线等数据,也可以提前做好准备。
- Linux 系统监控 查看(Linux菜鸟私房菜笔记)之 性能监控 【待实践总结】
jstat
JVM监控命令(在Java基础笔记看到使用说明之监控)- jvisualvm 同上
简易web服务器
附录一
推荐书籍
- 《Java 并发编程实战》
- 《Java并发编程的艺术》方腾飞
- 《深入理解Java虚拟机》 周志明
网站
博客
专业名词
- 指令重排序,在不影响程序运行结果的前提下重新排序代码