Java多线程并发学习笔记

阶段一:多线基础知识

入门思考

为什么要学习并发

  1. 发挥CPU多处理器的强大能力
  2. 更符合现实生活中的多事件同时处理
  3. 简化异步事件的处理
  4. 更加灵敏的响应用户界面

并发的缺点

  1. 安全性
  2. 活跃性(饥饿)问题
  3. 性能问题(并发不一定快)
    1. 线程切换有消耗ccc

学习并发的四个阶段

  1. 熟练掌握API,能够完成并发编程
  2. 熟读API源码,掌握其原理
  3. 理解Java虚拟机的内存模型 【待看】
  4. 操作系统对并发的支持 【待看】

进程与线程基础知识

  • 进程是资源分配的基本单位
  • 进程包含多个线程,线程共享进程
  • 线程是CPU调度的基本单位

线程状态

线程中断及中断处理

多线程-interrupt(),isInterrupted(),interrupted()

  setDaemon(true); //主进程结束,子线程while(true)也结束

优雅的中断线程

  1. 处理不中断线程,在中断中触发其异常
@Override
public void interrupt() {
    try {
        socket.close();
    } catch (IOException e) {
    } finally {
        super.interrupt();
    }
}
  1. 用守护进程,

Daemon Thread: 其优先级特别低(低到甚至可以被JVM自动终止),通常这类线程用于在空闲时做一些资源清理类的工作,比如GC线程

   public void execute(Runnable task) {
       executeThread = new Thread("ThreadService") {
         @Override
         public void run() {
           Thread runner = new Thread(task, "Daemon-thread");
           runner.setDaemon(true);
           runner.start();
           try {
             runner.join();
             finished = true;
           } catch (InterruptedException e) {
             //e.printStackTrace();
           }
         }
       };
       executeThread.start();
    }

线程池中的应用

     public static ThreadFactory threadFactory(final String name, final boolean daemon) {
       return new ThreadFactory() {
         @Override public Thread newThread(Runnable runnable) {
           Thread result = new Thread(runnable, name);
           result.setDaemon(daemon);
           return result;
         }
       };
     }

堆和栈

  1. 栈java.util.Stak是线程的私有Last In First Out顺序的数据结构,堆所有线程都能访问到
  2. 栈内存用来存储局部变量和方法调用 ,堆内存用来存储Java中的对象。 无论是成员变量,局部变量,还是类变量,它们指向的对象都存储在堆内存中。

查看堆和栈的默认值

//堆 其中InitialHeapSize为最开始的堆的大小,MaxHeapSize为堆的最大值。
java -XX:+PrintFlagsFinal -version | grep HeapSize
//栈
java -XX:+PrintFlagsFinal -version | grep ThreadStackSize

线程之间的通信

1. 类synchronized,wait,notify 实现相互通知

共享变量

public class MySignal{

  protected boolean hasDataToProcess = false;

  public synchronized boolean hasDataToProcess(){
    return this.hasDataToProcess;
  }

  public synchronized void setHasDataToProcess(boolean hasData){
    this.hasDataToProcess = hasData;
  }

}
  • Wait解锁
public synchronized int get () {
    wait(); // wait() synchronized 解锁,多线程能直接访问到wait
    return 0;
  }
  • notify. , 会加锁拿到锁,但是会等到synchronize的锁释放成功后才能加锁
    • notify: 随机叫醒一个处于wait状态的线程
    • notifyAll: 叫醒所有的处于wait线程,争夺到时间片的线程只有一个
  public synchronized void set () {
    signal = 1;
    notifyAll(); // notify方法会随机叫醒一个处于wait状态的线程
    // notifyAll叫醒所有的处于wait线程,争夺到时间片的线程只有一个
  }

3. 常用的Lock机制

LockSupport, ReentrantLock

//初始化    
Lock lock = new ReentrantLock();
Condition a = lock.newCondition();
Condition b = lock.newCondition();
//使用
    public void a() {
        lock.lock();
        while(signal != 0 ) {
            a.await();
        }
        System.out.println("a");
        signal ++;
        b.signal();
        lock.unlock();
    }

4. 并发工具类及并发容器

CountDownLatch

5. 管道

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class testPipeConnection {

    public static void main(String[] args) {
        /**
         * 创建管道输出流
         */
        PipedOutputStream pos = new PipedOutputStream();
        /**
         * 创建管道输入流
         */
        PipedInputStream pis = new PipedInputStream();
        try {
            /**
             * 将管道输入流与输出流连接 此过程也可通过重载的构造函数来实现
             */
            pos.connect(pis);
        } catch (IOException e) {
            e.printStackTrace();
        }
        /**
         * 创建生产者线程
         */
        Producer p = new Producer(pos);
        /**
         * 创建消费者线程
         */
        Consumer1 c1 = new Consumer1(pis);
        /**
         * 启动线程
         */
        p.start();
        c1.start();
    }
}

/**
 * 生产者线程(与一个管道输入流相关联)
 * 
 */
class Producer extends Thread {
    private PipedOutputStream pos;

    public Producer(PipedOutputStream pos) {
        this.pos = pos;
    }

    public void run() {
        int i = 0;
        try {
            while(true)
            {
            this.sleep(3000);
            pos.write(i);
            i++;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

/**
 * 消费者线程(与一个管道输入流相关联)
 * 
 */
class Consumer1 extends Thread {
    private PipedInputStream pis;

    public Consumer1(PipedInputStream pis) {
        this.pis = pis;
    }

    public void run() {
        try {
            while(true)
            {
            System.out.println("consumer1:"+pis.read());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

缺点

  1. 只能两个线程之间专递.
  2. 只能单向流动;

4. volatile 关键字

ThreadLocal原理和使用

案例:javaBase/ThreadLocalDemo
原理:把初始值放到线程中去,并且每个线程单独一份互不影响

使用场景

  • 实现单个线程单例以及单个线程上下文信息存储,比如交易id等
  • 实现线程安全,非线程安全的对象使用ThreadLocal之后就会变得线程安全,因为每个线程都会有一个对应的实例
  • 承载一些线程相关的数据,避免在方法中来回传递参数

间通信之join加赛

JavaBase/JoinDemo.java

CyclicBarrier 开会场景类似,如果出现一个人员没有await或中途异常,导致其他线程一直在等待

Exchanger

tb7.Demo

Callable、Future和FutureTask

tb8.Demo

Future设计模式的实现

tb9 com.sample.design_pattern.chapter04_future_design

Callable 和Runnable的区别

  1. Runnable的run方法是线程调用的,在run方法是异步执行的
  2. Callable的call方法,不是异步执行的,是由Future的run方法调用的

Future源码解析

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

多种创建线程的方式案

  1. 继承Thread类
  2. 实现Runnable接口
  3. 匿名内部类
  4. 带返回值的线程,实现 implements Callable<T> 有返回值的
  5. 定时器,java常用的几种定时器@Scheduled注解、quartz、new Timer().schedule、使用线程控制 【待深入】
  6. 线程池的实现 【待深入】
  7. Lambda表达式实现
  8. Spring是需要多线程
  9. RxJava

线程设置

-Xss10M 设置 tackssize new thread(stacksize)

  • VM调优总结 -Xms -Xmx -Xmn -Xss

  • Thread.currentThread().setPriority(priority);优先级设置

    不要把线程的优先级与运行结果的顺序作为衡量的标准,优先级较高的线程并不一定每一次都先执行完run()方法中的任务,也就是说,线程的优先级与打印顺序无关,不要将这两者的关系相关联,它们的关系具有不确定性和随机性。

关键字

  • join 兄弟你走,我垫后

ThreadGroup 线程组

  • 给线程指定ThreadGroup
  ThreadGroup threadGroup = new ThreadGroup("TG1");
  Thread t1 = new Thread(threadGroup, "t1")
  • main函数默认创建在main线程组中

中断的处理逻辑体会

  1. 不支持线程interrupt的

    可以在interrupt中触发停止条件,如读取文件时在interrupt中关闭读取文件的流触发线程内的异常

   import java.io.IOException;
   import java.io.InputStream;
   import java.net.Socket;

   public class Test32 extends Thread {
       public static final int BUF_SIZE = 512;

       Socket socket;
       InputStream in;

       public Test32(Socket socket) throws IOException {
           this.socket = socket;
           this.in = socket.getInputStream();
       }

       @Override
       public void interrupt() {
           try {
               //中断是关闭socket是线程产生错误
               socket.close();
           } catch (IOException e) {

           } finally {
               super.interrupt();
           }
       }

       @Override
       public void run() {
           try {
               byte[] buf = new byte[BUF_SIZE];
               while (true) {
                   int count = in.read(buf);
                   if (count < 0) {
                       break;
                   } else if (count > 0) {

                   }
               }
           } catch (IOException e) {
           }
       }
   }
  1. 向上抛异常,自己不处理

  2. 自己处理完,在抛异常上面在处理

线程异常问题及处理

知识点: 银行服务会预留应急服务器,定期重启服务器,重置资源保持服务器的稳定性

1. 钩子函数,捕获异常

public class ExitCapture {
  public static void main(String[] args) {
    Runtime.getRuntime().addShutdownHook(new Thread(()->{
      System.out.println("The application will be exit.");
      notifyAndRelease();
    }));
    int i = 0 ;
    while (true) {
      try {
        Thread.sleep(1_000L);
        System.out.println("I am working ...");
      } catch (InterruptedException e) {
          //ignore
      }
      i++;
      // simulate a exception
      if (i>20) throw new RuntimeException("error");
    }
  }
  // 1. try to capture the simulate exception and handler that
  // 2. even this Thread kill by some reason can capture
  // 3. but kill -9 pid (force kill ) can't not capture 
  private static void notifyAndRelease() {
    System.out.println("notify to the admin(the monitor.) by email or mq or some other way");
    try {
      Thread.sleep(1_000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("Will release resource(socket,file,connection.)");

    try{
      Thread.sleep(1_000);
    }catch (Exception e){
      //ignore
    }
    System.out.println("Relase and notify Done. ");
  }
}

2. 获取线程运行期间的异常

  threadRun.setUncaughtExceptionHandler((threadError, e) -> {
            System.out.println(e);
            System.out.println(threadError);
        }); 
  1. ThreadFactory设置
   ExecutorService exec = Executors.newCachedThreadPool(new ThreadFactory(){
               @Override
               public Thread newThread(Runnable r) {
                   Thread thread = new Thread(r);
                   thread.setUncaughtExceptionHandler(new MyUnchecckedExceptionhandler());
                   return thread;
               }
   });
   exec.execute(new ExceptionThread());

3. 追踪程序中调用的逻辑顺序

    Arrays.asList(Thread.currentThread().getStackTrace()).stream()
                .filter(e -> !e.isNativeMethod())
                .forEach(e -> Optional.of(
                    e.getClassName() + ":" + e.getMethodName() + ":" + e.getLineNumber())
                        .ifPresent(System.out::println)
                );

4. 线程组ThreadGroup

//1.创建线程组
ThreadGroup threadGroup =
        // 这是匿名类写法
        new ThreadGroup("group") {
            // 继承ThreadGroup并重新定义以下方法
            // 在线程成员抛出unchecked exception 会执行此方法
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                //4.处理捕获的线程异常
            }
        };
//2.创建Thread        
Thread thread = new Thread(threadGroup, new Runnable() {
    @Override
    public void run() {
        System.out.println(1 / 0);

    }
}, "my_thread");  
//3.启动线程
thread.start();   

5. 默认的线程异常捕获器

// 设置默认的线程异常捕获处理器
Thread.setDefaultUncaughtExceptionHandler(new MyUnchecckedExceptionhandler());

当线程出现异常时,
如果我们没有指定线程的异常处理器,而且线程组也没有设置,那么就会使用默认的线程异常处理器

6. FetureTask来捕获异常

6.1 基本用法

//1.创建FeatureTask
FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        return 1/0;
    }
});
//2.创建Thread
Thread thread = new Thread(futureTask);
//3.启动线程
thread.start();
try {
    Integer result = futureTask.get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    //4.处理捕获的线程异常
}

6.2 线程池使用

//1.创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
//2.创建Callable,有返回值的,你也可以创建一个线程实现Callable接口。
//  如果你不需要返回值,这里也可以创建一个Thread即可,在第3步时submit这个thread。
Callable<Integer> callable = new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        return 1/0;
    }
};
//3.提交待执行的线程
Future<Integer> future = executorService.submit(callable);
try {
     Integer result = future.get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    //4.处理捕获的线程异常
}

7. 重写ThreadPoolExecutor的afterExecute

//1.创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<>()) {
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        if (r instanceof Thread) {
            if (t != null) {
                //处理捕获的异常
            }
        } else if (r instanceof FutureTask) {
            FutureTask futureTask = (FutureTask) r;
            try {
                futureTask.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                //处理捕获的异常
            }
        }
    }
};

Thread t1 = new Thread(() -> {
    int c = 1 / 0;
});
threadPoolExecutor.execute(t1);
Callable<Integer> callable = () -> 2 / 0;
threadPoolExecutor.submit(callable);

多线程带来的风险

活跃性问题

1. 死锁

什么是死锁,产生死锁的原因及必要条件

2. 饥饿

  1. 高优先级吞噬所有低优先级的CPU时间片
  2. 线程被永久堵塞(block)在一个等待进入同步块的状态
  3. wait() 等待的线程永远不被唤醒

如何处饥饿与公平

  1. 设置合理的优先级;
  2. 设置synchronized

3. 活锁

由于某些条件没有满足,导致一直重复尝试,失败,尝试,失败。活锁和死锁的区别在于,处于活锁的实体是在不断的改变状态

活锁指的是线程不断重复执行相同的操作,但每次操作的结果都是失败的。尽管这个问题不会阻塞线程,但是程序也无法继续执行。活锁通常发生在处理事务消息的应用程序中,如果不能成功处理这个事务那么事务将回滚整个操作。解决活锁的办法是在每次重复执行的时候引入随机机制,这样由于出现的可能性不同使得程序可以继续执行其他的任务

性能问题

线程安全性问题

出现线程安全性问题的条件:

  1. •多线程环境下
  2. •多个线程共享一个资源
  3. 对资源进行非原子性操作(同时读OK,同时 读写有问题)

synchronized 的原理与使用

  • 内置锁

    /**
    * 普通方法上,内置锁就是当前类的实例
    */
    public synchronized int getNext() {
        return value ++;
    }
  • 互斥锁

三种修饰方式

  1. 普通方法,内置锁当前类的实例
  2. 静态方法,内置锁是当前的Class字节码对象
  3. 代码块,内置锁指定的类

线程锁

锁的存放

任何对象都可以作为锁,锁信息存放在对象的头中 Mark World

锁的分类

  1. 偏向锁

    存在原因:

    1. 每次获取锁和释放锁会浪费资源
    2. 很多情况下,竞争锁不是有多个线程,而是一个线程在使用

    处理方式:

    1. 如果是同一个线程,判断是否是偏向锁,是否是上一个线程id,直接访问;更多条件如下:
      1. •线程id
      2. •Epoch
      3. •对象的分代年龄信息
      4. •是否是偏向锁
      5. •锁标志位
    2. 其他线程访问会需要重新获取锁;
  2. 轻量锁

  3. 重量锁(synchronize)

理解自旋锁,死锁与重入锁

  • 锁的重入
  synchronize 方法访问 内部访问synchronize方法
  • 自旋锁,没有拿到锁的线程等待或者在等待线程的状态达到某个条件的时候
          while(Thread.activeCount() != 1) {
              // 自旋
          }
          System.out.println("所有的线程执行完毕了...");
  • 死锁

    // a b 并行执行,a 方法获取obj1锁, b方法获取obj2锁,第二步相互制约形成死锁
    public void a () {
            synchronized (obj1) {
                synchronized (obj2) {
                    System.out.println("a");
                }
            }
        }
        public void b () {
            synchronized (obj2) {
                synchronized (obj1) {
                    System.out.println("b");
                }
            }
        }

指令重排序(happen-before)

指令重排序,可以分为编译器重排序和处理器重排序

单例

  • 饿汉式

    //不存在线程安全性问题
    public class Singleton {
        // 私有化构造方法
        private Singleton () {}
    
        private static Singleton instance = new Singleton();
    
        public static Singleton getInstance() {
            return instance;
        }
    }
  • 懒汉式,用的时候才加载

    public class Singleton2 {
    
        private Singleton2() {}
        // volatile 防止虚拟机为了提高性能在不改变执行结果的前提下指令重排序(原来执行在后面的可能放到前面运行)
        private static volatile Singleton2 instance;
    
        /**
         * 双重检查加锁
         * @return
         */
        public static Singleton2 getInstance () {
            // 自旋   while(true)
            if(instance == null) {
                synchronized (Singleton2.class) {
                    if(instance == null) {
    
                        instance = new Singleton2();  // 指令重排序
                        // new Singleton2() 汇编中不是一个指令完成,大致过程如下
                        // 申请一块内存空间   // 1 在1的时候instance就不为空了
                        // 在这块空间里实例化对象  // 2
                        // instance的引用指向这块空间地址   // 3
                    }
                }
            }
            return instance;
        }
    }

Volatile原理与使用

  1. Volatile称之为轻量级锁,被volatile修饰的变量,在线程之间是可见的。
    1. 可见:一个线程修改了这个变量的值,在另外一个线程中能够读到这个修改后的值。
  2. Synchronized除了线程之间互斥意外,还有一个非常大的作用,就是保证可见性
  3. volatile 只保证了线程的之间的可见性,不能保证线程的原子性

volatile内存语义

  1. 线程对变量进行修改之后,要立刻回写到主内存。
  2. 线程对变量读取的时候,要从主内存中读,而不是缓存。

参考:

  1. Java多线程(一)之volatile深入分析
  2. 正确使用 Volatile 变量 【重点】 EverNote有笔记

JDK5提供的原子类的操作以及实现原理

jar: rt.jar java.util.concurrent.atomic 核心代码逻辑

    public final int getAndUpdate(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get(); // 获取当前值
            next = updateFunction.applyAsInt(prev); //获取共享资源值
        } while (!compareAndSet(prev, next)); //如果共享资源没有被改变更新,继续get()
        return prev;
    }

参考 1.[非阻塞算法简介](https://www.ibm.com/developerworks/cn/java/j-jtp04186/)

Lock接口认识与使用

synchronized 和Lock各自优势

Lock

  1. 显示地获取和释放锁,繁琐能让代码更灵活
  2. 使用Lock可以方便的实现公平性
  3. 非阻塞的获取锁
  4. 能被中断的获取锁
  5. 超时获取锁

AbstractQueuedSynchronizer(AQS)详解

ReentrantLock.java
//简单案例
MyLock2.java

公平锁

公平是针对获取而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序。

实质是先进先出

FirLock

  • FairSync 顺序执行,高并发性能损耗
  • NonFairSync 每个线程获取锁的几率都是相同,减少JVM调度时间
static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }
        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

读写锁

  • ReentrantReadWriteLock 多个读锁不互斥,读锁与写锁互斥,多个写锁互斥
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

锁降级

•锁降级是指写锁降级为读锁。

•在写锁没有释放的时候,获取到读锁,再释放写锁

    private volatile boolean isUpdate;
    public void readWrite() {
        r.lock(); // 为了保证isUpdate能够拿到最新的值
        if (isUpdate) {
            r.unlock();
            w.lock();
            map.put("xxx", "xxx");
            r.lock();  //锁降级 利用读写锁互斥,在写锁竞争写之前先读锁
            w.unlock();
        }

        Object obj = map.get("xxx");

        System.out.println(obj);
        r.unlock();

    }

锁升级

•把读锁升级为写锁

•在读锁没有释放的时候,获取到写锁,再释放读锁

线程安全性问题简单总结

  1. 出现线程安全性问题的条件

    •在多线程的环境下

    •必须有共享资源

    •对共享资源进行非原子性操作

  2. 解决线程安全性问题的途径

    •synchronized (偏向锁,轻量级锁,重量级锁)

    •volatile

    •JDK提供的原子类

  3. 使用Lock(共享锁,排它锁)

  4. 认识的“*锁”

    •偏向锁

    •轻量级锁

    •重量级锁

    •重入锁

    •自旋锁

    •共享锁

    •独占锁

    •排他锁

    •读写锁

    •公平锁

    •非公平锁

    •死锁

    •活锁

同步容器与并发容器

  • 同步容器,性能比较差
    • Vector(线程安全)
    • ArrayList(线程不安全)==> 转化成线程安全Collections.synchronizedCollection(list)
    • Hashtable ==> HashMap
  • 并发容器
    • CopyOnWriteArrayList
    • ConcurrentHashMap 分区加锁 来实现

CopyOnWriteArrayList 原理和使用

原理:读不加锁, 写new一个List用于写,写完成List头指针指向新的List

使用: 每次写都copy一份,故写数据量大的时候对内存的消耗是非常大的,读操作比较多使用CopyOnWriteArrayList ,如果读很少写操作很多使用同步容器处理这件事性能会更好一点

非阻塞队列-并发容器ConcurrentLinkedQueu

原理与使用

并发安全的FIFO链表的队列
提示: size() 不是线程安全,导致调用的时候数量不准确容易变

阻塞队列BlockingQueue

  • ArrayBlockingQueue

1)ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的.

2)LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的 有届链表队列

3)PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序. 无届队列

4)SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的.

并发容器ConcurrentHashMap

线程池

线程池概述

  • 什么是线程池

  • 为什么使用线程池

  • 线程池的优势

    • 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    • 第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
    • 第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。

Executors提供四种线程池

  • newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
  • newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
  • newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

创建一个线程池并提交线程任务

线程池源码解析

参数认识

  1. corePoolSize : 线程池的基本大小,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。

  2. runnableTaskQueue:任务对列,用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。

  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

  • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。

  • PriorityBlockingQueue:一个具有优先级得无限阻塞队列。

  1. maximumPoolSize:线程池最大大小,线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。

  2. ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常又帮助。

  3. RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。

  • CallerRunsPolicy:只用调用者所在线程来运行任务。

  • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

  • DiscardPolicy:不处理,丢弃掉。

  • 当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。

  1. keepAliveTime :线程活动保持时间,线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

  2. TimeUnit:线程活动保持时间的单位,可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

  3. 类中其他属性


    // 线程池的控制状态:用来表示线程池的运行状态(整型的高3位)和运行的worker数量(低29位)
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 29位的偏移量
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 最大容量(2^29 - 1)
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    // 线程运行状态,总共有5个状态,需要3位来表示(所以偏移量的29 = 32 - 3)
   /**
    * RUNNING    :    接受新任务并且处理已经进入阻塞队列的任务
    * SHUTDOWN    :    不接受新任务,但是处理已经进入阻塞队列的任务
    * STOP        :    不接受新任务,不处理已经进入阻塞队列的任务并且中断正在运行的任务
    * TIDYING    :    所有的任务都已经终止,workerCount为0, 线程转化为TIDYING状态并且调用terminated钩子函数
    * TERMINATED:    terminated钩子函数已经运行完成
    **/
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    // 阻塞队列
    private final BlockingQueue<Runnable> workQueue;
    // 可重入锁
    private final ReentrantLock mainLock = new ReentrantLock();
    // 存放工作线程集合
    private final HashSet<Worker> workers = new HashSet<Worker>();
    // 终止条件
    private final Condition termination = mainLock.newCondition();
    // 最大线程池容量
    private int largestPoolSize;
    // 已完成任务数量
    private long completedTaskCount;
    // 线程工厂
    private volatile ThreadFactory threadFactory;
    // 拒绝执行处理器
    private volatile RejectedExecutionHandler handler;
    // 线程等待运行时间
    private volatile long keepAliveTime;
    // 是否运行核心线程超时
    private volatile boolean allowCoreThreadTimeOut;
    // 核心池的大小
    private volatile int corePoolSize;
    // 最大线程池大小
    private volatile int maximumPoolSize;
    // 默认拒绝执行处理器
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

构造方法

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||                                                // 核心大小不能小于0
            maximumPoolSize <= 0 ||                                            // 线程池的初始最大容量不能小于0
            maximumPoolSize < corePoolSize ||                                // 初始最大容量不能小于核心大小
            keepAliveTime < 0)                                                // keepAliveTime不能小于0
            throw new IllegalArgumentException();                                
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        // 初始化相应的域
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
提交任务
/*
* 进行下面三步
*
* 1. 如果运行的线程小于corePoolSize,则尝试使用用户定义的Runnalbe对象创建一个新的线程
*     调用addWorker函数会原子性的检查runState和workCount,通过返回false来防止在不应
*     该添加线程时添加了线程
* 2. 如果一个任务能够成功入队列,在添加一个线城时仍需要进行双重检查(因为在前一次检查后
*     该线程死亡了),或者当进入到此方法时,线程池已经shutdown了,所以需要再次检查状态,
*    若有必要,当停止时还需要回滚入队列操作,或者当线程池没有线程时需要创建一个新线程
* 3. 如果无法入队列,那么需要增加一个新线程,如果此操作失败,那么就意味着线程池已经shut
*     down或者已经饱和了,所以拒绝任务
*/
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 获取线程池控制状态
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) { // worker数量小于corePoolSize
        if (addWorker(command, true)) // 添加worker
            // 成功则返回
            return;
        // 不成功则再次获取线程池控制状态
        c = ctl.get();
    }
    // 线程池处于RUNNING状态,将用户自定义的Runnable对象添加进workQueue队列
    if (isRunning(c) && workQueue.offer(command)) { 
        // 再次检查,获取线程池控制状态
        int recheck = ctl.get();
        // 线程池不处于RUNNING状态,将自定义任务从workQueue队列中移除
        if (! isRunning(recheck) && remove(command)) 
            // 拒绝执行命令
            reject(command);
        else if (workerCountOf(recheck) == 0) // worker数量等于0
            // 添加worker
            addWorker(null, false);
    }
    else if (!addWorker(command, false)) // 添加worker失败
        // 拒绝执行命令
        reject(command);
}
addWorker
  1. 原子性的增加workerCount。

  2. 将用户给定的任务封装成为一个worker,并将此worker添加进workers集合中。

  3. 启动worker对应的线程,并启动该线程,运行worker的run方法。

  4. 回滚worker的创建动作,即将worker从workers集合中删除,并原子性的减少workerCount。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) { // 外层无限循环
        // 获取线程池控制状态
        int c = ctl.get();
        // 获取状态
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&            // 状态大于等于SHUTDOWN,初始的ctl为RUNNING,小于SHUTDOWN
            ! (rs == SHUTDOWN &&        // 状态为SHUTDOWN
               firstTask == null &&        // 第一个任务为null
               ! workQueue.isEmpty()))     // worker队列不为空
            // 返回
            return false;

        for (;;) {
            // worker数量
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||                                // worker数量大于等于最大容量
                wc >= (core ? corePoolSize : maximumPoolSize))    // worker数量大于等于核心线程池大小或者最大线程池大小
                return false;
            if (compareAndIncrementWorkerCount(c))                 // 比较并增加worker的数量
                // 跳出外层循环
                break retry;
            // 获取线程池控制状态
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs) // 此次的状态与上次获取的状态不相同
                // 跳过剩余部分,继续循环
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // worker开始标识
    boolean workerStarted = false;
    // worker被添加标识
    boolean workerAdded = false;
    // 
    Worker w = null;
    try {
        // 初始化worker
        w = new Worker(firstTask);
        // 获取worker对应的线程
        final Thread t = w.thread;
        if (t != null) { // 线程不为null
            // 线程池锁
            final ReentrantLock mainLock = this.mainLock;
            // 获取锁
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                // 线程池的运行状态
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||                                    // 小于SHUTDOWN
                    (rs == SHUTDOWN && firstTask == null)) {            // 等于SHUTDOWN并且firstTask为null
                    if (t.isAlive()) // precheck that t is startable    // 线程刚添加进来,还未启动就存活
                        // 抛出线程状态异常
                        throw new IllegalThreadStateException();
                    // 将worker添加到worker集合
                    workers.add(w);
                    // 获取worker集合的大小
                    int s = workers.size();
                    if (s > largestPoolSize) // 队列大小大于largestPoolSize
                        // 重新设置largestPoolSize
                        largestPoolSize = s;
                    // 设置worker已被添加标识
                    workerAdded = true;
                }
            } finally {
                // 释放锁
                mainLock.unlock();
            }
            if (workerAdded) { // worker被添加
                // 开始执行worker的run方法
                t.start();
                // 设置worker已开始标识
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted) // worker没有开始
            // 添加worker失败
            addWorkerFailed(w);
    }
    return workerStarted;
}

执行任务

runWorker函数中会实际执行给定任务(即调用用户重写的run方法),并且当给定任务完成后,会继续从阻塞队列中取任务,直到阻塞队列为空(即任务全部完成)。在执行给定任务时,会调用钩子函数,利用钩子函数可以完成用户自定义的一些逻辑。在runWorker中会调用到getTask函数和processWorkerExit钩子函数

final void runWorker(Worker w) {
    // 获取当前线程
    Thread wt = Thread.currentThread();
    // 获取w的firstTask
    Runnable task = w.firstTask;
    // 设置w的firstTask为null
    w.firstTask = null;
    // 释放锁(设置state为0,允许中断)
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) { // 任务不为null或者阻塞队列还存在任务
            // 获取锁
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||    // 线程池的运行状态至少应该高于STOP
                 (Thread.interrupted() &&                // 线程被中断
                  runStateAtLeast(ctl.get(), STOP))) &&    // 再次检查,线程池的运行状态至少应该高于STOP
                !wt.isInterrupted())                    // wt线程(当前线程)没有被中断
                wt.interrupt();                            // 中断wt线程(当前线程)
            try {
                // 在执行之前调用钩子函数
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 运行给定的任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 执行完后调用钩子函数
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 增加给worker完成的任务数量
                w.completedTasks++;
                // 释放锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 处理完成后,调用钩子函数
        processWorkerExit(w, completedAbruptly);
    }
}

此函数用于从workerQueue阻塞队列中获取Runnable对象,由于是阻塞队列,所以支持有限时间等待(poll)和无限时间等待(take)。在该函数中还会响应shutDown和、shutDownNow函数的操作,若检测到线程池处于SHUTDOWN或STOP状态,则会返回null,而不再返回阻塞队列中的Runnalbe对象。

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) { // 无限循环,确保操作成功
            // 获取线程池控制状态
            int c = ctl.get();
            // 运行的状态
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 大于等于SHUTDOWN(表示调用了shutDown)并且(大于等于STOP(调用了shutDownNow)或者worker阻塞队列为空)
                // 减少worker的数量
                decrementWorkerCount();
                // 返回null,不执行任务
                return null;
            }
            // 获取worker数量
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 是否允许coreThread超时或者workerCount大于核心大小

            if ((wc > maximumPoolSize || (timed && timedOut))     // worker数量大于maximumPoolSize
                && (wc > 1 || workQueue.isEmpty())) {            // workerCount大于1或者worker阻塞队列为空(在阻塞队列不为空时,需要保证至少有一个wc)
                if (compareAndDecrementWorkerCount(c))            // 比较并减少workerCount
                    // 返回null,不执行任务,该worker会退出
                    return null;
                // 跳过剩余部分,继续循环
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :    // 等待指定时间
                    workQueue.take();                                        // 一直等待,直到有元素
                if (r != null)
                    return r;
                // 等待指定时间后,没有获取元素,则超时
                timedOut = true;
            } catch (InterruptedException retry) {
                // 抛出了被中断异常,重试,没有超时
                timedOut = false;
            }
        }
    }

processWorkerExit函数是在worker退出时调用到的钩子函数,而引起worker退出的主要因素如下

  1. 阻塞队列已经为空,即没有任务可以运行了。

  2. 调用了shutDown或shutDownNow函数

此函数会根据是否中断了空闲线程来确定是否减少workerCount的值,并且将worker从workers集合中移除并且会尝试终止线程池。

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // 如果被中断,则需要减少workCount    // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
        // 获取可重入锁
        final ReentrantLock mainLock = this.mainLock;
        // 获取锁
        mainLock.lock();
        try {
            // 将worker完成的任务添加到总的完成任务中
            completedTaskCount += w.completedTasks;
            // 从workers集合中移除该worker
            workers.remove(w);
        } finally {
            // 释放锁
            mainLock.unlock();
        }
        // 尝试终止
        tryTerminate();
        // 获取线程池控制状态
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) { // 小于STOP的运行状态
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty()) // 允许核心超时并且workQueue阻塞队列不为空
                    min = 1;
                if (workerCountOf(c) >= min) // workerCount大于等于min
                    // 直接返回
                    return; // replacement not needed
            }
            // 添加worker
            addWorker(null, false);
        }
    }

关闭线程池

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 检查shutdown权限
            checkShutdownAccess();
            // 设置线程池控制状态为SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 中断空闲worker
            interruptIdleWorkers();
            // 调用shutdown钩子函数
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 尝试终止
        tryTerminate();
    }
    final void tryTerminate() {
        for (;;) { // 无限循环,确保操作成功
            // 获取线程池控制状态
            int c = ctl.get();
            if (isRunning(c) ||                                            // 线程池的运行状态为RUNNING
                runStateAtLeast(c, TIDYING) ||                            // 线程池的运行状态最小要大于TIDYING
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))    // 线程池的运行状态为SHUTDOWN并且workQueue队列不为null
                // 不能终止,直接返回
                return;
            if (workerCountOf(c) != 0) { // 线程池正在运行的worker数量不为0    // Eligible to terminate
                // 仅仅中断一个空闲的worker
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            // 获取线程池的锁
            final ReentrantLock mainLock = this.mainLock;
            // 获取锁
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 比较并设置线程池控制状态为TIDYING
                    try {
                        // 终止,钩子函数
                        terminated();
                    } finally {
                        // 设置线程池控制状态为TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 释放在termination条件上等待的所有线程
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                // 释放锁
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
    private void interruptIdleWorkers(boolean onlyOne) {
        // 线程池的锁
        final ReentrantLock mainLock = this.mainLock;
        // 获取锁
        mainLock.lock();
        try {
            for (Worker w : workers) { // 遍历workers队列
                // worker对应的线程
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) { // 线程未被中断并且成功获得锁
                    try {
                        // 中断线程
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        // 释放锁
                        w.unlock();
                    }
                }
                if (onlyOne) // 若只中断一个,则跳出循环
                    break;
            }
        } finally {
            // 释放锁
            mainLock.unlock();
        }
    }

参考

StampedLock

  • 乐观锁(tryOptimisticRead),读写不互斥
  • 悲观锁,读写互斥

怎么保证读写同步

读后发现有写重新读取

class Point {
   private double x, y;
   private final StampedLock sl = new StampedLock();
   void move(double deltaX, double deltaY) { // an exclusively locked method
     long stamp = sl.writeLock();
     try {
       x += deltaX;
       y += deltaY;
     } finally {
       sl.unlockWrite(stamp);
     }
   }
  //下面看看乐观读锁案例
   double distanceFromOrigin() { // A read-only method
     long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
     double currentX = x, currentY = y; //将两个字段读入本地局部变量
     if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
        stamp = sl.readLock(); //如果没有,我们再次获得一个读悲观锁
        try {
          currentX = x; // 将两个字段读入本地局部变量
          currentY = y; // 将两个字段读入本地局部变量
        } finally {
           sl.unlockRead(stamp);
        }
     }
     return Math.sqrt(currentX * currentX + currentY * currentY);
   }
//下面是悲观读锁案例
   void moveIfAtOrigin(double newX, double newY) { // upgrade
     // Could instead start with optimistic, not read mode
     long stamp = sl.readLock();
     try {
       while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
         long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
         if (ws != 0L) { //这是确认转为写锁是否成功
           stamp = ws; //如果成功 替换票据
           x = newX; //进行状态改变
           y = newY; //进行状态改变
           break;
         }
         else { //如果不能成功转换为写锁
           sl.unlockRead(stamp); //我们显式释放读锁
           stamp = sl.writeLock(); //显式直接进行写锁 然后再通过循环再试
         }
       }
     } finally {
       sl.unlock(stamp); //释放读锁或写锁
     }
   }
 }

阶段二:多线程设计模式详细介绍

七钟单例模式

  1. 饿汉式;

  2. 懒汉式;

  3. synchronize method

  4. synchronize block

  5. volatile 解决重排序

  6. Holder方式 【推荐】

    public class SingletonObject6 {
    
        private SingletonObject6() {}
    
        private static class InstanceHolder {
            // static 只执行一次,并严格保证按照顺序执行
            private final static SingletonObject6 instance = new SingletonObject6(); 
        }
    
        public static SingletonObject6 getInstance() {
            return InstanceHolder.instance;
        }
    }
  7. 枚举

    public class SingletonObject7 {
        private SingletonObject7() {}
    
        private enum Singleton {
            INSTANCE;
            private final SingletonObject7 instance;
            Singleton() {
                instance = new SingletonObject7();
            }
            public SingletonObject7 getInstance() {
                return instance;
            }
        }
    
        public static SingletonObject7 getInstance() {
            return Singleton.INSTANCE.getInstance();
        }
    
        public static void main(String[] args) {
            IntStream.rangeClosed(1, 100)
                    .forEach(i -> new Thread(String.valueOf(i)) {
                        @Override
                        public void run() {
                            System.out.println(SingletonObject7.getInstance());
                        }
                    }.start());
        }
    }

WaitSet

  1. 所有的对象都会有一个wait set,用来存放调用了该对象wait方法之后进入block状态线程
  2. 线程被notify之后,不一定立即得到执行
  3. 线程从wait set中被唤醒顺序不一定是FIFO.
  4. 线程被唤醒后,必须重新获取锁

volatile 最好的例子

缓存不一致问题

  1. 内存模型,由于cpu和内存频率不同存在cpu 高速缓存

    volatile

  2. 由于Java优化,检测到线程没有写操作,就不需要去主线程中拿更新变量

解决缓存不一致问题

i = 1;
//两个线程同步执行 
i= i+1;
cpu1 -> main memory -> i -> cache i+1 -> cache(2) -> main memory(2)
cpu2 -> main memory -> i -> cache i+1 -> cache(2) -> main memory(2)

两种方案

  1. 给数据总线(数据总线,地址总线,控制总线)加锁
  2. CPU高速缓存一致性协议(Intel MESI)

核心思想:

  1. 当CPU写入数据的时候,如果发现该变量被共享(其他CPU存在该变量的副本),会发出一个信号,通知其他CPU该变量缓存无效;
  2. 其他的CPU访问该变量的时候,重新到主内存中获取

指令重排&happens-before规则

三个重要概念

原子性,可见性,有序性

  1. 原子性: 对基本数据类型的变量读取和赋值是保证原子性的,要么成功,要么失败,这些操作不可中断

    a = 10; 原子性
    b = a; 不满足, 1. read a ; 2. assign b;
    c++;   不满足    1. read c; 2. add; 3. assign to c;
    c = c+1; 不满足 1. read c; 2. add; 3. assign to c;
  2. 可见性

  3. 有序性
    有序性

volatile 关键字作用

  1. 保证重排序循序;
  2. 强制对缓存的修改立刻更新到主存;
  3. 如果有写操作,其他CPU中的缓存失效;

volatile实践

  1. volatile读操作和非volatile一样,写操作开销比读多得多,但是volatile 的总开销仍然要比锁获取低
    ==> 如果读操作的次数要远远超过写操作,与锁相比,volatile 变量通常能够减少同步的性能开销。

参考

  1. Java多线程(一)之volatile深入分析
  2. 正确使用 Volatile 变量 【重点】 EverNote有笔记

观察者模式

design_pattern/ObserverClient.java

一个人通过

共享资源

design_pattern/gate.java

读写锁分离

design_pattern/chapter03_readwrite_lock/ReaderWorker.java

不可变类 Immutable

chapter 07

作用: 多线不用加锁,提高程效率

简单的不可变操作

  1. 返回List用Collections.unmodifiableList(list)
  2. 返回对象用clone

Future Design Pattern

设计逻辑: 在不堵塞主线程的情况下,主线程做其他的事情,然后在去取结果

chapter08

Future        ->代表的是未来的一个凭据
FutureTask    ->将你的调用逻辑进行了隔离
FutureService ->桥接 Future和 FutureTask

Guarded Suspension design pattern

chapter09

实际生活中的例子

等我一下,我一会就来 ==> 任务太多,把任务缓存到队列中,后面在处理

我正在厨房做饭
快递员敲门,说你的快递来了,要求开门

The Thread-Specific Storage

线程保险箱, chapter11

  1. ThreadLocal 早期实现原理
Map<Thread,T> storage = new HassMap<>();
  1. 应用在同一线程中做上下文设计模式

     @Override
        public void run() {
            queryAction.execute();
            System.out.println("The name query successful");
            httpAction.execute();
            System.out.println("The card id query successful");
            //Context 为ThreadLocal 实现的单例模式用于run内部数据储存
            Context context = ActionContext.getActionContext().getContext();
            System.out.println("The Name is " + context.getName()
                               + " and CardId " +         context.getCardId());
        }

Balking pattern

执行到某个状态,终端执行

public synchronized void save() throws IOException {
    if (!changed) { // 设计模式核心重点,当发现事情已经被其他线程干完,不在往下执行
        return;
    }
    doSave();
    this.changed = false;
}

Producer and Consumer pattern

BlockingQueue 实现

private final BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);
//生产
blockingQueue.put(1);
//消费
blockingQueue.take();

Semaphore 实现

  final Semaphore notFull = new Semaphore(10); //计数
  final Semaphore notEmpty = new Semaphore(0); //计数
  final Semaphore mutex = new Semaphore(1); // 锁的效果

// Producer.java
 try {
     notFull.acquire();
     mutex.acquire();
     count++;
     System.out.println(Thread.currentThread().getName()
                        + "生产者生产,目前总共有" + count);
 } catch (InterruptedException e) {
     e.printStackTrace();
 } finally {
     mutex.release();
     notEmpty.release();
 }

//Cusomer.java
 try {
     notEmpty.acquire();
     mutex.acquire();
     count--;
     System.out.println(Thread.currentThread().getName()
                        + "消费者消费,目前总共有" + count);
 } catch (InterruptedException e) {
     e.printStackTrace();
 } finally {
     mutex.release();
     notFull.release();
 }

synchronized

public synchronized void push () {
    count ++;
    notifyAll();
}

public synchronized void take () {
    count --;
    notifyAll();
}

ReentrantLock实现

//创建一个锁对象
private Lock lock = new ReentrantLock();
//创建两个条件变量,一个为缓冲区非满,一个为缓冲区非空
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();

//Producer.java
  lock.lock();
    try{
        while (count == FULL) {
            notFull.await();
        }
        count ++;
        //唤醒消费者
         notEmpty.signal();
    }finally {
        lock.unlock();
    }
//Consumer.java
lock.lock();
try {
    while (count == 0) {
        notEmpty.await();
    }
    notFull.signal();
} finally {
    lock.unlock();
}
public void put(final Message message) throws InterruptedException {
    synchronized (queue) { // multi-thread lock 
        while (queue.size() > limit) {  // out over limit wait
            queue.wait();
        }
        queue.addLast(message); // first in last out 
        queue.notifyAll();
    }
}

Count Donw pattern

  1. JDK CountDownLatch

     CountDownLatch latch = new CountDownLatch(1);
          latch.countDown();
          latch.await();
  2. 模仿

     public CountDown(int total) {
            this.total = total;
        }
        public void down() {
            synchronized (this) {
                this.counter++;
                this.notifyAll();
            }
        }
        public void await() throws InterruptedException {
            synchronized (this) {
                while (counter != total) {
                    this.wait();
                }
            }
        }

Two-Phase Termination Design Pattern

线程执行完毕,在线程中关闭一次,还需要手动清理一下资源

Work Thread Design Pattern

流水线工人,流水线一直开着,有固定工人组装,运输工人负责运输添加流水线的货物

Active Object

(Chapter18)接受异步消息的主动方法

阶段三:并发包详情介绍

1. 原子类型详细讲解

1.1 Atomic*

  1. volatile 修饰的变量
  2. CAS算法, 也就是CPU级别的同步命令进程间通信

1.2 AtomicReference ABA问题

缺点:

CAS轻量级锁,带来的一个严重问题,ABA问题, 问题描述,当T2从A回到A状态可以有各种条件的变化,而T1的判断条件可能导致无法感知到T2的多种状态变化而产生问题

T1            T2
A->C        A->B->A

解决方案是加版本标签,官方提供的解决类AtomicStampedReference

AtomicLong 如果是64位的情况下可能会有高位低位分别传输的问题导致非原子性;

public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }

1.2 Atomic*FieldUpdater使用场景

  1. 想要类的属性操作具备原子性

    • volatile
    • 非private,protected(保证可访问)
    • 类型一致
  2. 不想使用锁(包括显式锁或者重量级锁synchronized)

    1. 大量需要原子类型修饰的对象,相对比较耗费内存,案例ConcurrentSkipListMap
    Node{
        pre:Node;
        next:Node;
        value:Objecct;
    }

1.3 Unsafe

Unsafe是Java中一个底层类,包含了很多基础的操作,比如数组操作、对象操作、内存操作、CAS操作、线程(park)操作、栅栏(Fence)操作,JUC包、一些三方框架都使用Unsafe类来保证并发安全。

Java is a safe programming language and prevents programmer from doing
a lot of stupid mistakes, most of which based on memory management.
But, there is a way to do such mistakes intentionally, using Unsafe class
  • park/unpark LockSupport 依赖park/unpark 实现

Java 调用C/C++ 代码

Hello.java

public class Hello {
  static {
    System.loadLibrary("hello");
  }

  private native void hi();

  public static void main(String[] args) {
    new Hello().hi();
  }
}

编译Hello.java javac Hello.java

生成Jni头文件 javah -jni Hello

编写C代码
Hello.c

#include <jni.h>
#include <stdio.h>
/* Header for class Hello */

#ifndef _Included_Hello
#define _Included_Hello
#ifdef __cplusplus
extern "C" {
#endif
/*
 * Class:     Hello
 * Method:    hi
 * Signature: ()V
 */
JNIEXPORT void JNICALL Java_Hello_hi
  (JNIEnv * env, jobject o){
        print("Hello Jni\n");
}

#ifdef __cplusplus
}
#endif
#endif

编译Cgcc -fPIC -D_REENETRANT -I "$JAVA_HOME/include" -I "$JAVA_HOME/include/linux" -c Hello.c

生成连接库 gcc -shared Hello.o -o libhello.so

运行java Hello 如下异常,需配置LD_LIBRARY_PATH为当前路径

export LD_LIBRARY_PATH=./lib:$LD_LIBRARY_PATH

Exception in thread "main" java.lang.UnsatisfiedLinkError: no hello in java.library.path
        at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
        at java.lang.Runtime.loadLibrary0(Runtime.java:870)
        at java.lang.System.loadLibrary(System.java:1122)
        at Hello.<clinit>(Hello.java:3)

unsafe常用方法

    /**
     * 绕过初始化
     */
    Simple simple2 = (Simple) unsafe.allocateInstance(Simple.class);
    System.out.println(simple2.getValue());

    Guard guard = new Guard();
    guard.work();
    /**
     * 直接修改guard内存
     */
    Field field = guard.getClass().getDeclaredField("ACCESS_ALLOWED");
    unsafe.putInt(guard, unsafe.objectFieldOffset(field), 42);
    guard.work();
    /**
     * 定义类
     */
    byte[] bytes = loadClassContent();
    Class<?> aClass = unsafe.defineClass(null, bytes, 0, bytes.length,null,null);
    int v = (int) aClass.getMethod("get").invoke(aClass.newInstance(), null);
    System.out.println("defineClass:" + v);
    /**
     * 获取类的size
     */
    System.out.println(sizeOf(new Simple()));

2. Java并发包工具

2.1 CountDownLatch

  1. 并行转窜行,等到最后结果,离散平行任务增加逻辑层次关系
  2. 有一个任务执行的时候发现可以交给其他线程执行;

2.2 CyclicBarrier

API

 CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
      @Override public void run() {
          //当所有线程线程写入操作完毕之后,所有线程就继续进行后续的操作了
      }
    });
barrier.getNumberWaiting(); //获取当前等待数量
barrier.getParties();
barrier.reset();  // 当getNumberWaiting !=0 的时候, reset会导致正在等待的线程报BrokenBarrierException 异常

即reset == initial == finish;

小结

CountDownLatch VS CyclicBarrier

  1. CountDownLatch 不能reset,而CyclicBarrier是可以循环使用的;
  2. latch 工作线程互不关系, barrier工作线程必须等到

2.3 Semaphore

Semaphore Api

官方demo

 class Pool {
   private static final int MAX_AVAILABLE = 100;
   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

   public Object getItem() throws InterruptedException {
     available.acquire();
     return getNextAvailableItem();
   }

   public void putItem(Object x) {
     if (markAsUnused(x))
       available.release();
   }

   // Not a particularly efficient data structure; just for demo
   protected Object[] items = ... whatever kinds of items being managed
   protected boolean[] used = new boolean[MAX_AVAILABLE];

   protected synchronized Object getNextAvailableItem() {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (!used[i]) {
          used[i] = true;
          return items[i];
       }
     }
     return null; // not reached
   }

   protected synchronized boolean markAsUnused(Object item) {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (item == items[i]) {
          if (used[i]) {
            used[i] = false;
            return true;
          } else
            return false;
       }
     }
     return false;
   }
 }
  • acquireUninterruptibly 不处理中断

  • acquire(int permits) 同时获取多个中断

  • release(int permits) 同时释放多个中断

  • drainPermits() Acquires and returns all permits that are immediately available.

2.4 ReentrantLock

ReentrantLock API

  1. getOwner() 尝试拿到这个锁,那不到的时候尝试中断
  • ConditionObject 具体实现类

    维护一个单向列表,await向等待队列插入Node,叫醒向等待队列头部移除一个节点,放人同步队列中竞争CPU资源

    • 同步队列
    • 等待队列

2.5 Exchanger

Exchanger API

注意:

  1. exchanger的值指向的是同一个堆内存,如果修改两边都会变动,如操作某些List => 代码ExchangerExample。

官网Demo

class FillAndEmpty {
   Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
   DataBuffer initialEmptyBuffer = ... a made-up type
   DataBuffer initialFullBuffer = ...

   class FillingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialEmptyBuffer;
       try {
         while (currentBuffer != null) {
           addToBuffer(currentBuffer);
           if (currentBuffer.isFull())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ... }
     }
   }

   class EmptyingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialFullBuffer;
       try {
         while (currentBuffer != null) {
           takeFromBuffer(currentBuffer);
           if (currentBuffer.isEmpty())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ...}
     }
   }

   void start() {
     new Thread(new FillingLoop()).start();
     new Thread(new EmptyingLoop()).start();
   }
 }

博文:【死磕Java并发】—–J.U.C之并发工具类:Exchanger

2.6 ReadWriteLock

2.7 Condition

2.8 StampedLock

读的时候也能写

最简单案例

  private static void read() {
    long stamped = -1;
    try {
      stamped = lock.readLock();
    } finally {
      lock.unlockRead(stamped);
    }
  }

  private static void write() {
    long stamped = -1;
    try {
      stamped = lock.writeLock();
    } finally {
      lock.unlockWrite(stamped);
    }
  }

官网demo

  class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();

    void move(double deltaX, double deltaY) { // an exclusively locked method
      long stamp = sl.writeLock();
      try {
        x += deltaX;
        y += deltaY;
      } finally {
        sl.unlockWrite(stamp);
      }
    }

    double distanceFromOrigin() { // A read-only method
      long stamp = sl.tryOptimisticRead();
      double currentX = x, currentY = y;
      if (!sl.validate(stamp)) {
         stamp = sl.readLock();
         try {
           currentX = x;
           currentY = y;
         } finally {
            sl.unlockRead(stamp);
         }
      }
      return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    void moveIfAtOrigin(double newX, double newY) { // upgrade
      // Could instead start with optimistic, not read mode
      long stamp = sl.readLock();
      try {
        while (x == 0.0 && y == 0.0) {
          long ws = sl.tryConvertToWriteLock(stamp);
          if (ws != 0L) {
            stamp = ws;
            x = newX;
            y = newY;
            break;
          }
          else {
            sl.unlockRead(stamp);
            stamp = sl.writeLock();
          }
        }
      } finally {
        sl.unlock(stamp);
      }
    }
  }

2.9 Forkjoin

任务分解框架

  1. RecursiveTask 有返回值
  2. RecursiveAction 无返回值

3.0 Phaser

Phaser API

  1. 通过 phaser.register() 可以动态添加;

  2. 可以循环使用

  3. arriveAndAwaitAdvance Arrives at this phaser and awaits others.

  4. arriveAndDeregister 动态的–;我退出你 们不用等我了

      /**
         * Arrives at this phaser and deregisters from it without waiting
         * for others to arrive. Deregistration reduces the number of
         * parties required to advance in future phases.  If this phaser
         * has a parent, and deregistration causes this phaser to have
         * zero parties, this phaser is also deregistered from its parent.
         *
         * <p>It is a usage error for an unregistered party to invoke this
         * method.  However, this error may result in an {@code
         * IllegalStateException} only upon some subsequent operation on
         * this phaser, if ever.
         *
         * @return the arrival phase number, or a negative value if terminated
         * @throws IllegalStateException if not terminated and the number
         * of registered or unarrived parties would become negative
         */
        public int arriveAndDeregister() {
            return doArrive(ONE_DEREGISTER);
        }
  1. arrive

     /**
         * Arrives at this phaser, without waiting for others to arrive.
         *
         * <p>It is a usage error for an unregistered party to invoke this
         * method.  However, this error may result in an {@code
         * IllegalStateException} only upon some subsequent operation on
         * this phaser, if ever.
         *
         * @return the arrival phase number, or a negative value if terminated
         * @throws IllegalStateException if not terminated and the number
         * of unarrived parties would become negative
         */
        public int arrive() {
            return doArrive(ONE_ARRIVAL);
        }
  1. bulkRegister 注册多个

状态判断

  1. getRegisteredParties
  2. getArrivedParties
  3. getUnarrivedParties

控制

//根据条件控制是否终止Phaser   
final Phaser phaser = new Phaser(2){
      @Override protected boolean onAdvance(int phase, int registeredParties) {
        System.out.println("phase:" + phase + "\tregisteredParties :" + registeredParties);
        //return super.onAdvance(phase, registeredParties);
        return true/*终止*/, false /*不终止*/;
      }
    };

强制终止

phaser.forceTermination();

awaitAdvance(phase phase) 案例

终止等待的两个条件

  • current phase is not equal to the given phase value
  • this phaser is terminated.

demo: F:\Source\javaBase\src\main\java\com\sample\current\phaser\PhaserExample6.java

带有等待的中断

  1. awaitAdvanceInterruptibly
  2. phaser.awaitAdvanceInterruptibly(0,3,TimeUnit.SECONDS); 带有超时时间

附录

1. CountDownLatch、CyclicBarrier和Semaphore

3. Executors框架

ThreadPoolExecutor

3.1 shutdown分析

shutdown

 /**
     * shutdown
     *
     * -- condition ---
     * 20 threads
     *   10 threads work
     *   10 idle
     *
     * shutdown invoked
     * -- result -- 
     * 1. 10 waiting to finished the work.
     * 2. 10 interrupted the idle works.
     * 3. 20 idle threads will exist.
     */

shutdownNow

    /**
     * shutdownNow
     * -- condition ---
     * 10 threads queue elements 10
     * 10 running
     * 10 stored in the blocking queue.
     *-- result --
     * 1. return list<Runnable> remain 10 un handle runnable in the queue.
     * 2. interrupt all of threads in the pool.
     */

调用shutdown后一些状态变化

executorService.isShutdown();  // ture
executorService.isTerminated(); // false 还有任务在执行
((ThreadPoolExecutor) executorService).isTerminating(); // true

3.2 Executors.newWorkStealingPool

  1. using all available processors
  2. 线程工作模式为“Work-Stealing Algorithm” ,当任务队列完成后不是进入等待状态,而是主动窃取别的线程任务来做;
  3. 任务处理完自动停止线程池;

3.3 Schedule 的实现

  1. Timer 存在问题是当任务执行超时时,影响下一个任务执行时间

  2. crontab (linux), 每隔一分钟执行run.sh 脚本

    run.sh

    #!/bin/sh
    echo `date "+%Y-%m-%d %H:%M:%S"`

    crontab -e edit user’s crontab

    #每一分钟执行一次
    * * * * * sh //root/document/scripts/run.sh >> /root/document/scripts/run.log

    crontab 简单说明 vi /etc/crontab

    SHELL=/bin/bash                     <==使用哪种 shell 接口 path="/sbin:/bin:/usr/sbin:/usr/bin" <="=执行文件搜寻路径" mailto="root" stdout,以 email 将数据送给谁 # example of job definition: .---------------- minute (0 - 59) | .------------- hour 23) .---------- day month (1 31) .------- 12) or jan,feb,mar,apr ... .---- week 6) (sunday="0" 7) sun,mon,tue,wed,thu,fri,sat * user-name command to be executed 
  3. quartz(推荐使用)

    石英钟

    相关教程精进 Quartz—Quartz大致介绍(一)

    依赖

    <!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz -->
        <dependency>
          <groupId>org.quartz-scheduler</groupId>
          <artifactId>quartz</artifactId>
          <version>2.2.1</version>
        </dependency>

    最简单demo

        JobDetail jobDetail = JobBuilder.newJob(SimpleJob.class)
            .withIdentity("job1", "group1")
            .build();
        Trigger trigger = TriggerBuilder.newTrigger()
            .withIdentity("trigger", "group1")
            .withSchedule(CronScheduleBuilder.cronSchedule("0/5 * * * * ?"))
            .build();
    
        Scheduler scheduler = new StdSchedulerFactory().getScheduler();
        scheduler.start();
        scheduler.scheduleJob(jobDetail, trigger);

    SimpleJob.java

    public class SimpleJob implements Job {
      @Override public void execute(JobExecutionContext jobExecutionContext)
          throws JobExecutionException {
        SimpleDateFormat dt = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss");
        System.out.println("======" + dt.format(new Date()) + "======");
      }
    }
  4. ScheduledExecutorService

    如果执行任务时长超过周期,那么周期按照任务时长算

3.4 异常处理

方案一:增加守护线程并设置异常捕获类setUncaughtExceptionHandler,缺点当获取不到详细的线程状态
方案二:通过自定义Runnable捕捉异常,模板如下:

   public abstract class MyTask implements Runnable {

     protected final String name ;

     public MyTask(String name) {
       this.name = name;
     }

     @Override public void run() {
       try {
         this.doInit();
         this.doExecute();
         this.done();
       } catch (Throwable e) {
         this.error(e);
       }
     }

     protected abstract void error(Throwable e);

     protected abstract void done();

     protected abstract void doExecute();

     protected abstract void doInit();
   }

3.5 拒绝策略(RejectedExecutionHandler)

3.6 API详解

F:\Source\javaBase\src\main\java\com\sample\current\executor\ExecutorServiceExampleApi.java

Core Thread Timeout 设置核心线程超时时间,超时关闭核心线程池

executorServices.setKeepAliveTime(10, TimeUnit.SECONDS);
executorServices.allowCoreThreadTimeOut(true);

带返回值的方法

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
//代超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
/**
* return {@code false}
* if all core threads have already been started.
*/
boolean prestartCoreThread();

prestartAllCoreThreads 预启动所有的coreThread

before/after 在每个任务启动开始和结束时调用

    @Override protected void beforeExecute(Thread t, Runnable r) {
      System.out.println("--before--");
      System.out.println("init the " + ((MyRunnable) r).getData());
    }

    @Override protected void afterExecute(Runnable r, Throwable t) {
      super.afterExecute(r, t);
      System.out.println("--after--");
      if (null == t) {
        System.out.println("successful " + ((MyRunnable) r).getData());
      }else {
        t.printStackTrace();
      }
    }

3.7 Future&Callable

  1. future.get() interrupt 是谁?

    中断的是调用的线程,但是中断后runnable还能继续工作
  1. future timeout 是否会继续执行? future.get(5, TimeUnit.SECONDS);

    future中断Runnable后会继续执行,如果需要终止任务需要额外处理 如kill -9 applicationId

3.8 CompletionService

解决一个问题,如下,多线程futureList.get的时候不知道随先完成谁后完成(谁先谁后),导致执行快的等执行慢的。

List<Future<Integer>> futureList = executorService.invokeAll(callableList);
System.out.println(futureList.get(0).get());
System.out.println(futureList.get(1).get());
completionService.take()// 阻塞,知道有任务完成可以获取结果
completionService.poll()//poll直接返回,不阻塞。但是没有完成的任务则返回null
completionService.poll(5,TimeUnit.SECONDS) //阻塞等待指定时间,如果有完成结果返回,没有的直接返回null

获取最先完成的任务

while ((future = completionService.take()) != null) {
  System.out.println(future.get());
}

3.9 CompletableFuture

入门体验

Java Doc Api

ExecutorService + future 当任务执行完成的时候通知调动者

解决future的缺点:

  1. 多个future不知道谁先执行完
  2. future需要主动去拿而且future.get() 会阻塞
  3. future获取的结果在放入线程池需要new callback,达不到极连

解决案例:
F:\Source\javaBase\src\main\java\com\sample\current\executor\CompletableFutureExample.java

知识点:

  1. CompletableFuture 默认的线程是守护线程当调用者退出,默认直接退出;

多个CompletableFuture的组合

实战经验

1. 任务超时无法shutdown

当有runnable访问某个资源,网络请求,某个db特别慢任务过重的时候,shutdown/shutdownNow 无法使用

   //新建线程池时,使用守护线程的方式
   new ThreadFactory() {
         @Override public Thread newThread(Runnable r) { 
           Thread thread = new Thread(r);
           thread.setDaemon(true);
           return thread;
         }
       }

2. ExecutorService中的陷阱

线程池中执行完一段时间(20s),shutdown线程,然后记录下未完成的任务
答案: F:\Source\javaBase\src\main\java\com\sample\current\executor\ComplexExample.java

使用CompletionService + shutdownNow 返回未完成值的方式, 容易犯的问题:

  1. ExecutorCompletionService 把runable重新封装,shutdownNow 返回私有类无法使用
  2. shutdownNow 没有包括中断的任务

小结

1559003770862

4. 并发集合

常用的并发集合类

 ConcurrentHashMap    
 ConcurrentSkipListMap
 ConcurrentLinkedQueue
 ConcurrentLinkedDeque
 CopyOnWriteArraySet  
 CopyOnWriteArrayList 

 ArrayBlockingQueue   
 PriorityBlockingQueue
 LinkedBlockingQueue  
 SynchronousQueue     
 DelayQueue           
 LinkedTransferQueue  

1. LinkedList 实现

LinkedList                                            Binary search treee
1. 单向LinkedList            Stack                    B+ tree
2. 单项有序          ==>      Queue     由树的平衡性    Red Black tree
3. 双向                      Binary Tree ==>          AVL
4. 双向有序                                           2-3-4 Tree 
                                                     Spary's Tree
  • 无序 F:\Source\javaBase\src\main\java\com\sample\current\collections\LinkedList.java
  • 有序 F:\Source\javaBase\src\main\java\com\sample\current\collections\PriorityLinkedList.java

2. SkipList跳表数据结构实现

F:\Source\javaBase\src\main\java\com\sample\current\collections\SimpleSkipList.java

跳表的实现

跳表的技术特点

  1. 一种随机的数据结构
  2. 最底层包含整个跳表的所有元素
  3. 典型的空间换时间的算法
  4. 由于对比效率高,查找删除快

3. ArrayBlockingQueue

插入队列方法(主要区分是队列满时应该怎么反应,异常,等待,返回值)

方法名称 参数描述 返回值 异常信息
add 插入对象 ture代表插入成功,如果队列已满,抛出异常 IllegalStateException(“Queue full”)异常——AbstractQueue
offer 插入对象 true代表插入成功,队列已满直接返回false
offer 插入对象,等待时间 true代表插入成功,队列已满等待一段时间后仍没有空间则返回false
put 插入对象 true代表插入成功,如果队列已满则阻塞线程等待队列为空的时候插入

获取队列内容

方法名称 参数描述 返回值 异常信息
remove 返回队首数据并移除,队列已空则抛出异常信息 NoSuchElementException()异常——AbstractQueue
poll 列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。
poll 等待时间 设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值
take 队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。

参考:

ArrayBlockingQueue原理详解

JDK 新特性

LongAddr

Atomctic 的不同之处

  • 竞争多个资源(拆分为Cell数组)
  • 动态扩容

缺点:

LongAdder在统计的时候如果有并发更新,可能导致统计的数据有误差java

提供的方法

  1. add():增加指定的数值;
  2. increament():增加1;
  3. decrement():减少1;
  4. intValue()/floatValue()/doubleValue():得到最终计数后的结果
  5. sum():求和,得到最终计数结果
  6. sumThenReset():求和得到最终计数结果,并重置value。

阶段四: 并发深入探讨

第1节 锁

•轻量级锁

•重量级锁

•重入锁

•自旋锁

•共享锁

•独占锁

•排他锁

•读写锁

•公平锁

•非公平锁

•死锁

•活锁

1.1 死锁

死锁面试题(什么是死锁,产生死锁的原因及必要条件)

Jconsole -> 线程-> 有检测死锁的工具

1.1.1 死锁的分类

  1. 1 顺序死锁,A调用leftRight(),B调用rightLeft() (厕所内有坑,厕所外有纸,相互缺少)

    public class LeftRightDeadlock {
        private final Object left = new Object();
        private final Object right = new Object();
        public void leftRight() {
            // 得到left锁
            synchronized (left) {
                // 得到right锁
                synchronized (right) {
                    doSomething();
                }
            }
        }
        public void rightLeft() {
            // 得到right锁
            synchronized (right) {
                // 得到left锁
                synchronized (left) {
                    doSomethingElse();
                }
            }
        }
    }
  2. 2 动态死锁,进来锁定自己的账户,相互等待。

     // 转账
        public static void transferMoney(Account fromAccount,
                                         Account toAccount,
                                         DollarAmount amount)
                throws InsufficientFundsException {
            // 锁定汇账账户
            synchronized (fromAccount) {
                // 锁定来账账户
                synchronized (toAccount) {
                    // 判余额是否大于0
                    if (fromAccount.getBalance().compareTo(amount) < 0) {
                        throw new InsufficientFundsException();
                    } else {
                        // 汇账账户减钱
                        fromAccount.debit(amount);
                        // 来账账户增钱
                        toAccount.credit(amount);
                    }
                }
            }
        }
    • 如果两个线程同时调用transferMoney()
    • 线程A从X账户向Y账户转账
    • 线程B从账户Y向账户X转账,产生死锁。
  3. 3 协作对象之间

    public class CooperatingDeadlock {
        // Warning: deadlock-prone!
        class Taxi {
            @GuardedBy("this") private Point location, destination;
            private final Dispatcher dispatcher;
    
            public Taxi(Dispatcher dispatcher) {
                this.dispatcher = dispatcher;
            }
    
            public synchronized Point getLocation() {
                return location;
            }
    
            // setLocation 需要Taxi内置锁
            public synchronized void setLocation(Point location) {
                this.location = location;
                if (location.equals(destination))
                    // 调用notifyAvailable()需要Dispatcher内置锁
                    dispatcher.notifyAvailable(this);
            }
    
            public synchronized Point getDestination() {
                return destination;
            }
    
            public synchronized void setDestination(Point destination) {
                this.destination = destination;
            }
        }
    
        class Dispatcher {
            @GuardedBy("this") private final Set<Taxi> taxis;
            @GuardedBy("this") private final Set<Taxi> availableTaxis;
    
            public Dispatcher() {
                taxis = new HashSet<Taxi>();
                availableTaxis = new HashSet<Taxi>();
            }
    
            public synchronized void notifyAvailable(Taxi taxi) {
                availableTaxis.add(taxi);
            }
    
            // 调用getImage()需要Dispatcher内置锁
            public synchronized Image getImage() {
                Image image = new Image();
                for (Taxi t : taxis)
                    // 调用getLocation()需要Taxi内置锁
                    image.drawMarker(t.getLocation());
                return image;
            }
        }
    
        class Image {
            public void drawMarker(Point p) {
            }
        }
    }

    Dispatcher的getImage() 及Taxi的setLocation有相互锁的可能性

1.1.2 避免死锁

  • 固定加锁的顺序(针对锁顺序死锁)
  • 开放调用(针对对象之间协作造成的死锁)
  • 使用定时锁–>tryLock()
2.1 固定锁顺序避免死锁

用HashCode 固定顺序

public class InduceLockOrder {

    // 额外的锁、避免两个对象hash值相等的情况(即使很少)
    private static final Object tieLock = new Object();

    public void transferMoney(final Account fromAcct,
                              final Account toAcct,
                              final DollarAmount amount)
            throws InsufficientFundsException {
        class Helper {
            public void transfer() throws InsufficientFundsException {
                if (fromAcct.getBalance().compareTo(amount) < 0)
                    throw new InsufficientFundsException();
                else {
                    fromAcct.debit(amount);
                    toAcct.credit(amount);
                }
            }
        }
        // 得到锁的hash值
        int fromHash = System.identityHashCode(fromAcct);
        int toHash = System.identityHashCode(toAcct);

        // 根据hash值来上锁
        if (fromHash < toHash) {
            synchronized (fromAcct) {
                synchronized (toAcct) {
                    new Helper().transfer();
                }
            }

        } else if (fromHash > toHash) {// 根据hash值来上锁
            synchronized (toAcct) {
                synchronized (fromAcct) {
                    new Helper().transfer();
                }
            }
        } else {// 额外的锁、避免两个对象hash值相等的情况(即使很少)
            synchronized (tieLock) {
                synchronized (fromAcct) {
                    synchronized (toAcct) {
                        new Helper().transfer();
                    }
                }
            }
        }
    }
}
2.2 开放调用避免死锁

如果在调用某个方法时不需要持有锁,那么这种调用被称为开放调用

用局部代码块锁代替整个方法的锁

class CooperatingNoDeadlock {
    @ThreadSafe
    class Taxi {
        @GuardedBy("this") private Point location, destination;
        private final Dispatcher dispatcher;

        public Taxi(Dispatcher dispatcher) {
            this.dispatcher = dispatcher;
        }

        public synchronized Point getLocation() {
            return location;
        }

        public synchronized void setLocation(Point location) {
            boolean reachedDestination;

            // 加Taxi内置锁
            synchronized (this) {
                this.location = location;
                reachedDestination = location.equals(destination);
            }
            // 执行同步代码块后完毕,释放锁

            if (reachedDestination)
                // 加Dispatcher内置锁
                dispatcher.notifyAvailable(this);
        }

        public synchronized Point getDestination() {
            return destination;
        }

        public synchronized void setDestination(Point destination) {
            this.destination = destination;
        }
    }

    @ThreadSafe
    class Dispatcher {
        @GuardedBy("this") private final Set<Taxi> taxis;
        @GuardedBy("this") private final Set<Taxi> availableTaxis;

        public Dispatcher() {
            taxis = new HashSet<Taxi>();
            availableTaxis = new HashSet<Taxi>();
        }

        public synchronized void notifyAvailable(Taxi taxi) {
            availableTaxis.add(taxi);
        }

        public Image getImage() {
            Set<Taxi> copy;

            // Dispatcher内置锁
            synchronized (this) {
                copy = new HashSet<Taxi>(taxis);
            }
            // 执行同步代码块后完毕,释放锁

            Image image = new Image();
            for (Taxi t : copy)
                // 加Taix内置锁
                image.drawMarker(t.getLocation());
            return image;
        }
    }

    class Image {
        public void drawMarker(Point p) {
        }
    }
}
2.3 使用定时锁

1.2 偏向锁/轻量级锁/重量级锁

1.3 独享锁 & 共享锁

独享锁(互斥锁):同时只能有一个线程获得锁比如,ReentrantLock 是互斥锁,ReadWriteLock 中的写锁是互斥锁。 共享锁:可以有多个线程同时获得锁。比如,Semaphore、CountDownLatch 是共享锁,ReadWriteLock 中的读锁是共享锁。

深度问题

Java的多路复用

  1. 常用4种IO模型(同步/异步/阻塞/非阻塞的概念)

实战

问题定位

数据+ 工具+ 经验

操作系统–> JVM 虚拟机(java提供的工具)–> 数据

jstack 使用

这里写图片描述

建议线程名字给的有意义,在排查问题时很有必要。

  1. jps -v 列出所有Java进程,拿到pid=1523
  2. jstack 1523 > 1523.log 将dump文件输出到日志
  3. 上传到http://heaphero.io/index.jsp 或使用MAT分析

Best practice

  1. 尽量不要在线程中做大量耗时的网络操作,如查询数据库(可以的话在一开始就将数据从从 DB 中查出准备好)【能在初始化做的耗时完成】

  2. 尽可能的减少多线程竞争锁。可以将数据分段,各个线程分别读取。

  3. 多利用 CAS+自旋 的方式更新数据,减少锁的使用。【待实践】

    private AtomicReference<Thread> atomicReference = new AtomicReference<>();
    private void lock() {
        while (atomicReference.compareAndSet(null, currentThread())) ;
    }
    private void unlock() {
        atomicReference.compareAndSet(currentThread(), null);
    }
  4. 应用中加上 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp 参数,在内存溢出时至少可以拿到内存日志。

  5. 线程池监控。如线程池大小、队列大小、最大线程数等数据,可提前做好预估。

     ThreadPoolExecutor tpe = ((ThreadPoolExecutor) es);
    
        while (true) {
            System.out.println();
    
            int queueSize = tpe.getQueue().size();
            System.out.println("当前排队线程数:" + queueSize);
    
            int activeCount = tpe.getActiveCount();
            System.out.println("当前活动线程数:" + activeCount);
    
            long completedTaskCount = tpe.getCompletedTaskCount();
            System.out.println("执行完成线程数:" + completedTaskCount);
    
            long taskCount = tpe.getTaskCount();
            System.out.println("总线程数:" + taskCount);
    
            Thread.sleep(3000);
        }
  6. JVM 监控,可以看到堆内存的涨幅趋势,GC 曲线等数据,也可以提前做好准备。

    1. Linux 系统监控 查看(Linux菜鸟私房菜笔记)之 性能监控 【待实践总结】
    2. jstat JVM监控命令(在Java基础笔记看到使用说明之监控)
    3. jvisualvm 同上

简易web服务器

附录一

推荐书籍

  • 《Java 并发编程实战》
  • 《Java并发编程的艺术》方腾飞
  • 《深入理解Java虚拟机》 周志明

网站

博客

  1. Java 理论与实践

专业名词

  • 指令重排序,在不影响程序运行结果的前提下重新排序代码

附录二

常用的消息队列