# Java 并发编程

# 并发基础

# 线程基础

  1. 线程的生命周期

    • NEW:新建状态
    • RUNNABLE:可运行状态
    • BLOCKED:阻塞状态
    • WAITING:等待状态
    • TIMED_WAITING:超时等待状态
    • TERMINATED:终止状态
  2. 线程的创建方式

    • 继承Thread类
    • 实现Runnable接口
    • 实现Callable接口
    • 线程池

# 线程安全

  1. 竞态条件

    • 多线程访问共享资源
    • 结果依赖线程执行顺序
  2. 临界区

    • 访问共享资源的代码段
    • 需要互斥访问

# 并发机制

# synchronized

  1. 基本使用

    • 修饰实例方法
    • 修饰静态方法
    • 修饰代码块
  2. 实现原理

    • 对象头Mark Word
    • 锁的升级过程
    • 偏向锁、轻量级锁、重量级锁
  3. 锁优化技术

    • 自旋锁:避免线程切换的开销,适用于锁竞争不激烈且锁持有时间短的场景
    • 锁消除:JIT编译时,对不可能存在竞争的锁进行消除
    • 锁粗化:将多次连续的加锁、解锁操作合并为一次
    • 偏向锁:减少无竞争情况下的同步开销
    • 轻量级锁:在无竞争情况下,通过CAS操作避免使用互斥量
  4. 锁升级过程详解

    无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁
    
    • 偏向锁:Thread ID记录在Mark Word中,无需同步操作
    • 轻量级锁:线程在栈帧中创建锁记录,通过CAS操作尝试获取锁
    • 重量级锁:依赖操作系统的互斥量(Mutex)实现

# volatile

  1. 可见性

    • 保证变量的可见性
    • 禁止指令重排序
  2. 使用场景

    • 状态标志
    • 双重检查锁定
  3. 内存语义

    • 写操作:当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值刷新到主内存
    • 读操作:当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效,从主内存中读取共享变量
  4. 内存屏障实现

    • 写操作前插入StoreStore屏障
    • 写操作后插入StoreLoad屏障
    • 读操作后插入LoadLoad屏障
    • 读操作后插入LoadStore屏障
  5. 代码示例

    public class VolatileExample {
        private volatile boolean flag = false;
        
        public void writer() {
            // 写入volatile变量
            flag = true;
        }
        
        public void reader() {
            // 读取volatile变量
            if (flag) {
                // 可以保证看到flag的最新值
                doSomething();
            }
        }
        
        private void doSomething() {
            // 业务逻辑
        }
    }
    

# CAS机制

  1. 原理

    • Compare And Swap
    • 原子操作
  2. 问题

    • ABA问题
    • 自旋开销
    • 只能保证一个共享变量的原子操作
  3. 底层实现

    • 依赖处理器的CMPXCHG指令
    • JDK中通过Unsafe类实现
  4. ABA问题解决方案

    • AtomicStampedReference:通过版本号解决
    • AtomicMarkableReference:通过标记解决
  5. 代码示例

    public class CASExample {
        private AtomicInteger atomicInt = new AtomicInteger(0);
        private AtomicStampedReference<Integer> stampedRef = 
            new AtomicStampedReference<>(0, 0);
        
        public void casOperation() {
            // 基本CAS操作
            atomicInt.compareAndSet(0, 1);
            
            // 解决ABA问题的CAS操作
            int stamp = stampedRef.getStamp();
            Integer reference = stampedRef.getReference();
            stampedRef.compareAndSet(reference, reference + 1, stamp, stamp + 1);
        }
    }
    

# 锁机制

# ReentrantLock

  1. 特性

    • 可重入性:同一线程可以多次获取同一把锁
    • 公平性选择:支持公平锁和非公平锁
    • 可中断:等待锁的线程可以选择放弃等待
    • 超时等待:在指定时间内获取不到锁可以放弃
    • 条件变量:支持多个条件变量
  2. 使用方式

    • lock()和unlock():基本的获取锁和释放锁方法
    • tryLock():尝试获取锁,立即返回结果
    • lockInterruptibly():可中断的获取锁方法
    • tryLock(long timeout, TimeUnit unit):超时等待获取锁
    • newCondition():创建条件变量
  3. 源码分析

    ReentrantLock的内部结构

    public class ReentrantLock implements Lock, java.io.Serializable {
        private final Sync sync;
        
        // 内部抽象类,继承自AQS
        abstract static class Sync extends AbstractQueuedSynchronizer {
            // 实现锁的获取和释放
        }
        
        // 非公平锁实现
        static final class NonfairSync extends Sync {
            // 非公平锁的实现
        }
        
        // 公平锁实现
        static final class FairSync extends Sync {
            // 公平锁的实现
        }
        
        // 默认构造函数,创建非公平锁
        public ReentrantLock() {
            sync = new NonfairSync();
        }
        
        // 根据fair参数创建公平锁或非公平锁
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    }
    

    非公平锁的获取过程

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // 如果锁未被占用
        if (c == 0) {
            // 尝试通过CAS获取锁
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果当前线程已经持有锁,实现可重入
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // 检查溢出
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    公平锁的获取过程

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 与非公平锁的区别:先检查是否有线程在等待
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    锁的释放过程

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        // 检查当前线程是否持有锁
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        // 如果状态为0,表示锁已完全释放
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    

# AQS框架

  1. 基本原理

    • AbstractQueuedSynchronizer(AQS)是Java并发包中锁和同步器的基础框架
    • 通过一个int类型的state变量表示同步状态
    • 通过一个FIFO队列完成线程的排队工作
    • 支持独占模式和共享模式
  2. 核心方法

    • acquire(int):独占模式获取同步状态
    • release(int):独占模式释放同步状态
    • acquireShared(int):共享模式获取同步状态
    • releaseShared(int):共享模式释放同步状态
    • 模板方法设计模式:子类需要实现tryAcquire、tryRelease等方法
  3. 源码分析

    • acquire方法实现:
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && 
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    • acquireQueued方法实现:
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 如果前驱是头节点,尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 判断是否应该阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
  4. AQS实现的同步器

    • ReentrantLock:可重入锁
    • Semaphore:信号量
    • CountDownLatch:倒计时门闩
    • ReentrantReadWriteLock:读写锁
    • ThreadPoolExecutor:线程池的工作队列

# 读写锁

  1. ReentrantReadWriteLock

    • 特点:允许多个读线程同时访问,但只允许一个写线程访问
    • 适用场景:读多写少的并发场景
  2. 锁降级

    • 写锁可以降级为读锁,但读锁不能升级为写锁
    • 锁降级流程:获取写锁 -> 获取读锁 -> 释放写锁
  3. 源码分析

    • 内部维护了一个ReadLock和一个WriteLock
    • 使用同一个Sync实现,但读写状态存储在同一个int变量的不同位上
    • 高16位表示读锁状态,低16位表示写锁状态
  4. 使用示例

    public class ReadWriteLockExample {
        private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
        private final Lock readLock = rwLock.readLock();
        private final Lock writeLock = rwLock.writeLock();
        private int data = 0;
        
        public int readData() {
            readLock.lock();
            try {
                return data;
            } finally {
                readLock.unlock();
            }
        }
        
        public void writeData(int newData) {
            writeLock.lock();
            try {
                data = newData;
            } finally {
                writeLock.unlock()
    

## 线程池

### ThreadPoolExecutor

1. **核心参数**
   - corePoolSize:核心线程数
   - maximumPoolSize:最大线程数
   - keepAliveTime:线程空闲时间
   - workQueue:工作队列
   - threadFactory:线程工厂
   - handler:拒绝策略

2. **工作原理**
   - 当提交任务时,如果运行的线程数小于corePoolSize,创建新线程
   - 如果运行的线程数大于等于corePoolSize,将任务放入队列
   - 如果队列已满,且运行的线程数小于maximumPoolSize,创建新线程
   - 如果队列已满,且运行的线程数等于maximumPoolSize,执行拒绝策略

3. **线程池状态**
   ```java
   private static final int RUNNING    = -1 << COUNT_BITS;
   private static final int SHUTDOWN   =  0 << COUNT_BITS;
   private static final int STOP       =  1 << COUNT_BITS;
   private static final int TIDYING    =  2 << COUNT_BITS;
   private static final int TERMINATED =  3 << COUNT_BITS;
  1. 任务队列

    • ArrayBlockingQueue:有界队列
    • LinkedBlockingQueue:无界队列
    • SynchronousQueue:直接交付
    • PriorityBlockingQueue:优先级队列
  2. 拒绝策略

    • AbortPolicy:抛出异常
    • CallerRunsPolicy:调用者线程执行
    • DiscardPolicy:直接丢弃
    • DiscardOldestPolicy:丢弃最旧的任务
  3. 源码分析

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (!isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
    
  4. 最佳实践

    • 根据任务类型设置合适的线程数
    • CPU密集型:线程数 = CPU核心数 + 1
    • IO密集型:线程数 = CPU核心数 * (1 + IO耗时/CPU耗时)
    • 使用有界队列防止OOM
    • 根据业务场景选择合适的拒绝策略
  5. 常用线程池

    // 固定线程数的线程池
    ExecutorService fixedPool = Executors.newFixedThreadPool(10);
    
    // 单线程的线程池
    ExecutorService singlePool = Executors.newSingleThreadExecutor();
    
    // 缓存线程池
    ExecutorService cachedPool = Executors.newCachedThreadPool();
    
    // 定时任务线程池
    ScheduledExecutorService scheduledPool = 
        Executors.newScheduledThreadPool(10);
    
  6. 线程池监控

    public class ThreadPoolMonitor {
        private ThreadPoolExecutor executor;
        
        public ThreadPoolMonitor(ThreadPoolExecutor executor) {
            this.executor = executor;
        }
        
        public void monitor() {
            System.out.println("核心线程数:" + executor.getCorePoolSize());
            System.out.println("当前线程数:" + executor.getPoolSize());
            System.out.println("最大线程数:" + executor.getMaximumPoolSize());
            System.out.println("线程池活跃度:" + 
                divide(executor.getActiveCount(), executor.getMaximumPoolSize()));
            System.out.println("任务完成数:" + executor.getCompletedTaskCount());
            System.out.println("队列大小:" + 
                (executor.getQueue().size() + executor.getQueue().remainingCapacity()));
            System.out.println("当前排队线程数:" + executor.getQueue().size());
            System.out.println("队列剩余大小:" + executor.getQueue().remainingCapacity());
        }
        
        private String divide(int num1, int num2) {
            return String.format("%.2f%%", 
                Double.parseDouble(num1 + "") / Double.parseDouble(num2 + "") * 100);
        }
    }