logo头像
ICQL

230_javase-concurrent

总结

concurrent总结1

并发编程

  • 目的:使程序运行的更快
  • 并发问题:
    • 1)上下文切换:cpu以时间片分配算法循环执行任务,切换时保存上个任务状态,以便下次切回这个任务时恢复,这个过程叫上下分切换;创建线程和上下文切换需要开销,因此应该尽可能减少上下文切换
      • (1)无锁编程:将数据的ID按照hash算法取模分段,不同的线程处理不同段的数据
      • (2)CAS算法:java的Atomic包使用cas算法来更新数据不需要加锁
      • (3)使用最少线程:避免创建不需要的线程
      • (4)使用协程:在单线程里实现多任务的调度,并在单线程里维持多个任务间的切换
    • 2)死锁:多线程互相竞争对方正在持有的锁,造成死锁
    • 3)资源限制:cpu/io等资源较少,只够维持较少线程的运行,过多的线程只会造成等待,增加线程创建和上下文切换的开销

并发编程原理

  • CAS(compare and swap):比较并交换,需要两个值,操作前先比较旧值有没有发生变化,如果没有发生变化才交换成新值
  • CPU–>CPU缓存–>主内存
    • 1)程序以及数据被加载到主内存
    • 2)指令和数据被加载到CPU缓存
    • 3)CPU执行指令,把结果写到高速缓存
    • 4)高速缓存中的数据回写主内存
  • CPU原子操作原理
    • 1)总线锁:传输数据的总线被锁定,其他cpu的请求将发生阻塞,实现原子安全操作;开销较大,现代处理器遇到 lock 指令一般采用的是缓存锁实现
    • 2)缓存锁:检查数据所在的内存区域,如果该数据是在处理器的内部缓存中,则会锁定此缓存区域,处理完后把缓存写回到主存中,并且会利用缓存一致性协议来保证其他处理器中的缓存数据的一致性
  • java原子操作原理
    • 1)CAS自旋:即CAS循环操作,虽然高效但存在3大问题:
      • (1)循环时间长开销很大:如果自旋长时间不成功,则浪费大量cpu资源
      • (2)ABA问题:如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量追加上版本号,每次变量更新的时候把版本号加1,那么A→B→A就会变成1A→2B→3A。JDK的Atomic包里提供了一个类AtomicStampedReference来解决ABA问题
      • (3)只能保证一个共享变量的原子操作:jdk的AtomicReference类可以实现引用对象的cas原子操作,解决此问题
    • 2)使用锁机制:偏向锁、轻量级锁、互斥锁(重量级锁)
  • volatile实现原理:基于CPU原子操作原理,保证共享变量的原子性
  • synchronized实现原理:基于java原子操作原理的锁机制
    • 锁对象:任何一个对象都能成为锁对象,对象的对象头中的Markword里存储hashCode,GC分代年龄,锁信息等
    • jdk1.6以后,优化synchronized不再是一开始就是重量级锁,而是偏向锁-轻量级锁(包括自旋锁和自适应自旋锁)-重量级锁逐步升级,不能降级
    • 具体原理
      • (1)创建锁对象LockObject,初始状态时对象头中markword的关键数据(锁标志位:01,是否偏向锁:0),锁标志01代表偏向锁,00代表轻量级锁,10代表重量级锁
      • (2)偏向锁:当第一个线程开始进入synchronized同步块时,利用CAS将线程ID插入markword中,此时markword(线程id,锁标志位:01,是否偏向锁:1),表示偏向锁生效。当有线程来了竞争锁对象时:
        • 先判断当前线程id是否与Markword当中的线程id一致
        • 如果一致,则说明此线程已经成功获得了锁,不需要再进行同步操作
        • 如果不一致,则要检查一下对象是否还是可偏向,即markword中的是否偏向锁是否为0
        • 如果可偏:值为0,执行第一次获取锁的操作
        • 如果不可偏:值为1,存在了线程竞争,大部分情况下就是升级成轻量级锁了
      • (3)轻量级锁(膨胀锁):升级为轻量级锁需要先撤销偏向锁(开销比较大),过程如下:
        • 撤销偏向锁
          • 在一个安全点停止拥有偏向锁的线程
          • 遍历拥有偏向锁线程的线程栈,如果存在锁记录的话,需要修复锁记录和Markword,使其变成无锁状态(初始时的状态)
        • 升级成轻量级锁
          • 线程在自己的栈帧中创建锁记录LockRecord
          • 将锁对象markword赋值到LockRecord中
          • 将锁记录LockRecord的Owner指针指向锁对象
          • 将锁对象的markword增加指向锁记录的指针,此时markword(指向LockRecord的指针,锁标志位:00,是否偏向锁:0)
        • 当有其他线程来了竞争锁对象时(自旋锁或自适应自旋锁):
          • 其他线程会在原地循环等待,而不是把该线程给阻塞,直到那个获得锁的线程释放锁之后,这个线程就可以马上获得锁的;原地循环的时会消耗cpu,所以轻量级锁适用于那些同步代码块执行的很快的场景;默认情况下,自旋的次数为10次,超过则升级为重量级锁;自适应自旋锁会动态着根据实际情况来改变自旋等待的次数
      • (4)重量级锁(互斥锁):轻量级锁经过锁撤销等步骤升级为重量级锁,重量级锁是依赖对象内部的monitor锁来实现的,而monitor又依赖操作系统的MutexLock(互斥锁)来实现的。升级后此时markword(指向MutexLock的指针,锁标志位:10,是否偏向锁:0);当有其他线程来了竞争锁对象时,会阻塞其他想要获得锁的线程直至当前线程释放锁,被阻塞的线程不会消耗cpu,但需要依赖操作系统唤醒等待的线程,开销很大
    • 锁的比较:
      • 偏向锁:乐观锁,加锁和解锁不需要额外消耗,速度块,适用于只有一个线程访问同步块的场景
      • 轻量级锁:乐观锁,竞争的线程不会阻塞,只是在自旋,程序响应速度高,适合追求响应时间,同步块执行时间短
      • 重量级锁:悲观锁,适合同步块执行时间长,追求吞吐量的场景

并发内存模型

  • 并发内存模型(主要是定义各个共享变量的访问规则,这里共享变量指的是实例字段/静态字段/数组元素,其他的方法中的局部变量和参数都是线程私有的,不会被共享,所以无需考虑并发问题)
    • 1)主内存:指的是堆中的共享变量构成的内存部分
    • 2)工作内存:线程使用到共享变量时,会将主内存中的共享变量内存拷贝一份到线程的工作内存中(不真实存在,涵盖了缓存、缓冲区和寄存器等),这部分内存是工作内存
  • 重排序:为提高性能,编译器和处理器会进行指令重排序
    • 源代码-1编译器优化重排序-2指令级并行重排序-3内存系统重排序-最终执行的指令序列
    • 1属于编译器重排序,java编译器会禁止特定类型重排序
    • 2和3属于处理器重排序,java编译器在生成指令时插入特定类型的内存屏障,通过内存屏障来禁止处理器重排序
  • happen-before 8大原则
    happen-before

并发编程基础

  • 线程定义:启动一个java程序就是开启了一个jvm进程(jvm进程启动时默认的创建了几个线程,有main线程,gc线程等),进程间是互相隔离的;线程是”虚拟cpu”,可以异步的执行各种操作,所有线程共享同一个进程中的资源
  • 进程退出:在多线程环境下,程序退出的条件是,所有的用户线程都正常结束或者某个线程条用了system.exit方法,导致进程强行退出
  • 线程分类
    • User Thread(用户线程)
    • Daemon Thread(守护线程)
  • 线程状态
    • 1)初始状态NEW:线程被创建(默认是用户线程),但没有调用start()
    • 2)运行状态RUNNABLE:java线程将操作系统中的就绪和运行统称为运行态,线程调用了start()方法后的状态
    • 3)阻塞状态BLOCKED:表示线程阻塞于锁
    • 4)等待状态WAITING:表示线程进入等待状态,进入该状态表示当前先线程需要等待其他线程做出一些特定动作(通知或中断)
    • 5)超时等待状态TIME_WAITING:该状态不同于WAITING,它可以在指定时间自行返回的
    • 6)终止状态TERMINATED:表示当前线程已经执行完毕

线程状态变迁

  • 线程的启动和终止

    • 1)构造
      • (1)继承Thread类,重写run方法(线程的主方法),创建实例
      • (2)实现Runnable接口,重写run方法,然后再利用Thread的构造方法创建Thread实例,new Thread(() -> {System.out.println(“子线程”);});
    • 2)启动:调用线程对象的thread.start()方法
    • 3)停止
      • (1)安全停止:使用实例字段标志判断,推荐使用
        1
        2
        3
        4
        5
        6
        7
        8
        9
        new Thread(new Runnable() {
        private volatile boolean flag = true;
        @Override
        public void run() {
        while (flag) {
        //do something
        }
        }
        }).start();
      • (2)停止(已过时,不要使用)thread.stop()
      • (3)中断(不建议使用):提前中断结束线程,如果遇到抛出InterruptedException的方法,则发生异常
        • 设置中断线程标志:调用线程对象的thread.interrupt()方法,不会立即中断线程,只是在线程的中断线程标志打上标记
        • 判断线程是否中断:
          • 调用线程对象的thread.isInterrupted()方法,不会复位中断标记
          • 调用线程类Thread.interrupted()静态方法,会复位中断标记为false
    • 4)暂停/恢复(已过时,不要使用):thread.suspend(),thread.resume()
  • 线程优先级:作用不大,且操作系统大多会忽略我们的优先级设置

  • 线程间通信

    • volatile:修饰的变量,保证此变量对所有线程的可见性,任何线程读到的都是最新的变量值,写完变量后立刻刷新到主内存,读写单个volatile变量是线程安全的
    • synchronized
      • 同步方法:是同步代码块的一种特殊情况,实例方法相当于synchronized(this),静态方法相当于synchronized(当前类Class对象)
      • 同步代码块
        • synchronized(obj)锁住其他对象,一般是锁住方法的参数对象,推荐使用
        • synchronized(this)锁住的是当前实例对象,尽量少用,因为当一个线程占用这个锁时,其他线程访问的其他锁当前实例对象的同步代码块也将阻塞
        • synchronized(Class对象),锁住的是Class对象,对此class的所有实例对象生效,不推荐使用
      • 锁住的对象:即使锁住的对象自己的属性发生变化,不影响同步效果
    • 等待/通知机制(synchronized块中使用)
      • Object为锁对象:Objcet.wait(),Objcet.notify(),Objcet.notifyAll()
      • 伪代码实现生产者消费者
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        //消费者
        synchronized(锁对象){
        while(条件不满足){
        锁对象.wait();
        //执行wait()方法后,该线程将释放锁,释放资源,处于WAITING状态
        //必须得等到得到其他线程调用锁对象.notify()后才有机会竞争锁
        //竞争到锁以后继续执行后面的代码
        }
        条件满足时的处理逻辑
        }
        //生产者
        synchronized(锁对象){
        改变条件
        锁对象.notifyAll();
        //执行notify()方法,通知其他同一个锁对象的处于WAITTING状态的线程在此同步块结束后可以竞争这个锁对象
        }
      • thread.join(),当前线程等待thead线程执行完成
        • 原理:join是Thread的synchronized方法,锁对象是thread对象,join方法里面调用了wait()方法,在线程结束的时候会唤醒锁对象notify,当前线程接受到了通知,继续执行
        • 调用thread.join()相当于当前主线程对thread对象加锁,里面调用了wait()方法,导致主线程阻塞,直到thread线程结束时,jvm会自动调用这个thread锁对象的notify唤醒当前主线程
  • ThreadLocal:单线程中的全局变量

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    //每个线程都有一个ThreadLocalMap(thread.threadLocals)
    //每个map的key都是ThreadLocal对象,value是一个需要线程安全的值
    //一般ThreadLocal变量是用作共享变量,其实是数据隔离,每个线程都有一个单独的value数据
    //使用场景:一份代码实现的多线程,避免参数传递,每个线程需要隔离数据,如获取数据库连接,数据源
    public class ThreadLocalTest {
    ThreadLocal<Object> threadLocal = new ThreadLocal<>();
    public void test(){
    Object obj = new Object();//value是在方法中,所以每次都是新的,线程隔离
    threadLocal.set(obj);
    }
    }
  • 实例:数据库连接池,线程池,基于线程池的web服务器


原子操作类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//只能保证一个变量的原子更新,即代替类似 i++;这种非原子操作,实际开发使用意义不大

//原子更新基本类型
AtomicBoolean
AtomicInteger
AtomicLong

//原子更新数组
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray

//原子更新引用类型
AtomicReference
AtomicReferenceFieldUpdater
AtomicMarkableReference

//原子更新字段类
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
AtomicStampedReference

并发显示锁

  • Lock接口java.util.concurrent.locks.Lock

    • synchronized关键字将会隐式地获取锁,系统自动管理同步;而Lock拥有了锁获取与释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性
  • Lock API

  • 队列同步器AQS-AbstractQueuedSynchronizer:是一个双向队列(入队放到尾结点,头节点第二个出队,复杂度O(1)),用来构建Lock锁或者其他同步组件的基础抽象类

  • 重入锁 ReentrantLock:可以代替synchronizer关键字

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    private int num;
    private ReentrantLock reentrantLock = new ReentrantLock();//所有并发线程必须共享此锁
    @Test
    public void mainTest() throws Exception {
    int threadCount = 1000;
    //计数器,并发工具类,是为了让主线程等待所有子线程结束时再执行
    CountDownLatch countDownLatch = new CountDownLatch(threadCount);

    Runnable runnable = () -> {
    //为了效果明显,实现类似的wait()的效果,不释放锁/资源 等待
    LockSupport.parkNanos(1000000);

    //获取锁操作不需要放入try块是因为如果获取过程中失败会自动释放锁
    reentrantLock.lock();
    try {
    //需要线程同步的操作
    num++;
    } finally {
    //必须安全释放锁,避免死锁
    reentrantLock.unlock();
    }

    //每个线程执行完要将计数器减一,计数器为0时主线程的await()方法将不再等待
    countDownLatch.countDown();
    };

    for (int i = 0; i < threadCount; i++) {
    new Thread(runnable).start();
    }

    countDownLatch.await();
    System.out.println("结束:" + num);
    }
  • 读写锁 ReentrantReadWriteLock

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    private int threadCount = 1000;
    private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private Lock r = rwl.readLock();
    private Lock w = rwl.writeLock();
    @Test
    public void mainTest() throws Exception {
    int threadCount = 1000;
    CountDownLatch countDownLatch = new CountDownLatch(threadCount);

    Runnable runnable = () -> {
    LockSupport.parkNanos(1000000);

    r.lock();
    try {
    //读操作,共享锁
    System.out.println(num);
    } finally {
    r.unlock();
    }

    w.lock();
    try {
    //写操作,互斥锁,当有线程进入此块时,一定没有进入读操作块的线程和写操作块的线程(除了它自己)
    num++;
    } finally {
    w.unlock();
    }

    countDownLatch.countDown();
    };

    for (int i = 0; i < threadCount; i++) {
    new Thread(runnable).start();
    }

    countDownLatch.await();
    System.out.println("结束:" + num);
    }
  • Condition,实现类似synchronized的等待通知机制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    //等待
    public void conditionWait() throws InterruptedException {
    lock.lock();
    try {
    condition.await();
    } finally {
    lock.unlock();
    }
    }

    //通知
    public void conditionSignal() throws InterruptedException {
    lock.lock();
    try {
    condition.signal();
    } finally {
    lock.unlock();
    }
    }

并发工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//LockSupport
LockSupport.park();//阻塞当前线程,直至调用unpark()
LockSupport.parkNanos(long nanos);//加入纳秒超时
LockSupport.parkUntil(long deadline);//超时,直至1970开始到deadline毫秒数
LockSupport.unpark(Thread thread);//唤醒阻塞的线程

//CountDownLatch:用于多个子线程执行完成后主线程才开始继续执行,代替join的工具类
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
countDownLatch.countDown();//每个线程里面执行完将计数器减一
countDownLatch.await();//阻塞当前线程直至计数器为0

//CyclicBarrier:同步屏障,用于多个子线程内部到达一定点时再继续各自执行
CyclicBarrier c = new CyclicBarrier(2);
run(){
c.await();
//第一个线程需要到达同步点后执行的操作
}
run(){
c.await();
//第二个线程需要到达同步点后执行的操作
}

//Semaphore:信号量,控制并发线程数
Semaphore s = new Semaphore(10);
run(){
s.acquire();
//需要执行的操作
s.release();
}

//Exchanger:交换两个线程的数据,相当于两个格子,当两个格子都被填满时交换
Exchanger<String> exchanger = new Exchanger<>();
run(){
String re = exchanger.exchange("第一个线程");
//re="第二个线程";
}
run(){
String re = exchanger.exchange("第二个线程");
//re="第一个线程";
}

并发容器

  • ConcurrentHashMap-并发map
  • ConcurrentLinkedQueue-并发队列
  • BlockingQueue-阻塞队列
    Executor框架的类与接口
    1
    2
    3
    4
    5
    6
    7
    8
    9
    //有界无界:容量是否有上限
    //是否阻塞:容器达到容量上限时
    ArrayBlockingQueue:一个由数组结构组成的 有界阻塞 队列
    LinkedBlockingQueue:一个由链表结构组成的 有界阻塞 队列
    PriorityBlockingQueue:一个支持优先级排序的 无界阻塞 队列
    DelayQueue:一个使用优先级队列实现的 无界阻塞 队列
    SynchronousQueue:一个不存储元素的 阻塞队列
    LinkedTransferQueue:一个由链表结构组成的 无界阻塞 队列
    LinkedBlockingDeque:一个由链表结构组成的 双向阻塞 队列

Executor框架(线程池相关)

Executor框架的类与接口

  • Executor框架的主要成员

    • (1)ThreadPoolExecutor:线程池通用类
      • 1)SingleThreadExecutor:创建只有一个线程的线程池,可保证执行顺序
      • 2)FixedThreadPool:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
      • 3)CachedThreadPool:创建一个可缓存的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
    • (2)ScheduledThreadPoolExecutor:通常使用工厂类Executors来创建
      • 1)ScheduledThreadPoolExecutor:创建一个定长线程池,支持定时及周期性任务执行
      • 2)SingleThreadScheduledExecutor:创建一个线程的线程池,支持定时及周期性任务执行,可保证执行顺序
    • (3)Future接口/FutureTask:返回值处理
  • ThreadPoolExecutor线程池详解

微信打赏

赞赏是不耍流氓的鼓励