# Java 并发编程
# 并发基础
# 线程基础
线程的生命周期
- NEW:新建状态
- RUNNABLE:可运行状态
- BLOCKED:阻塞状态
- WAITING:等待状态
- TIMED_WAITING:超时等待状态
- TERMINATED:终止状态
线程的创建方式
- 继承Thread类
- 实现Runnable接口
- 实现Callable接口
- 线程池
# 线程安全
竞态条件
- 多线程访问共享资源
- 结果依赖线程执行顺序
临界区
- 访问共享资源的代码段
- 需要互斥访问
# 并发机制
# synchronized
基本使用
- 修饰实例方法
- 修饰静态方法
- 修饰代码块
实现原理
- 对象头Mark Word
- 锁的升级过程
- 偏向锁、轻量级锁、重量级锁
锁优化技术
- 自旋锁:避免线程切换的开销,适用于锁竞争不激烈且锁持有时间短的场景
- 锁消除:JIT编译时,对不可能存在竞争的锁进行消除
- 锁粗化:将多次连续的加锁、解锁操作合并为一次
- 偏向锁:减少无竞争情况下的同步开销
- 轻量级锁:在无竞争情况下,通过CAS操作避免使用互斥量
锁升级过程详解
无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁
- 偏向锁:Thread ID记录在Mark Word中,无需同步操作
- 轻量级锁:线程在栈帧中创建锁记录,通过CAS操作尝试获取锁
- 重量级锁:依赖操作系统的互斥量(Mutex)实现
# volatile
可见性
- 保证变量的可见性
- 禁止指令重排序
使用场景
- 状态标志
- 双重检查锁定
内存语义
- 写操作:当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值刷新到主内存
- 读操作:当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效,从主内存中读取共享变量
内存屏障实现
- 写操作前插入StoreStore屏障
- 写操作后插入StoreLoad屏障
- 读操作后插入LoadLoad屏障
- 读操作后插入LoadStore屏障
代码示例
public class VolatileExample { private volatile boolean flag = false; public void writer() { // 写入volatile变量 flag = true; } public void reader() { // 读取volatile变量 if (flag) { // 可以保证看到flag的最新值 doSomething(); } } private void doSomething() { // 业务逻辑 } }
# CAS机制
原理
- Compare And Swap
- 原子操作
问题
- ABA问题
- 自旋开销
- 只能保证一个共享变量的原子操作
底层实现
- 依赖处理器的CMPXCHG指令
- JDK中通过Unsafe类实现
ABA问题解决方案
- AtomicStampedReference:通过版本号解决
- AtomicMarkableReference:通过标记解决
代码示例
public class CASExample { private AtomicInteger atomicInt = new AtomicInteger(0); private AtomicStampedReference<Integer> stampedRef = new AtomicStampedReference<>(0, 0); public void casOperation() { // 基本CAS操作 atomicInt.compareAndSet(0, 1); // 解决ABA问题的CAS操作 int stamp = stampedRef.getStamp(); Integer reference = stampedRef.getReference(); stampedRef.compareAndSet(reference, reference + 1, stamp, stamp + 1); } }
# 锁机制
# ReentrantLock
特性
- 可重入性:同一线程可以多次获取同一把锁
- 公平性选择:支持公平锁和非公平锁
- 可中断:等待锁的线程可以选择放弃等待
- 超时等待:在指定时间内获取不到锁可以放弃
- 条件变量:支持多个条件变量
使用方式
- lock()和unlock():基本的获取锁和释放锁方法
- tryLock():尝试获取锁,立即返回结果
- lockInterruptibly():可中断的获取锁方法
- tryLock(long timeout, TimeUnit unit):超时等待获取锁
- newCondition():创建条件变量
源码分析
ReentrantLock的内部结构
public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; // 内部抽象类,继承自AQS abstract static class Sync extends AbstractQueuedSynchronizer { // 实现锁的获取和释放 } // 非公平锁实现 static final class NonfairSync extends Sync { // 非公平锁的实现 } // 公平锁实现 static final class FairSync extends Sync { // 公平锁的实现 } // 默认构造函数,创建非公平锁 public ReentrantLock() { sync = new NonfairSync(); } // 根据fair参数创建公平锁或非公平锁 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } }
非公平锁的获取过程
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 如果锁未被占用 if (c == 0) { // 尝试通过CAS获取锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果当前线程已经持有锁,实现可重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // 检查溢出 throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
公平锁的获取过程
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 与非公平锁的区别:先检查是否有线程在等待 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
锁的释放过程
protected final boolean tryRelease(int releases) { int c = getState() - releases; // 检查当前线程是否持有锁 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 如果状态为0,表示锁已完全释放 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
# AQS框架
基本原理
- AbstractQueuedSynchronizer(AQS)是Java并发包中锁和同步器的基础框架
- 通过一个int类型的state变量表示同步状态
- 通过一个FIFO队列完成线程的排队工作
- 支持独占模式和共享模式
核心方法
- acquire(int):独占模式获取同步状态
- release(int):独占模式释放同步状态
- acquireShared(int):共享模式获取同步状态
- releaseShared(int):共享模式释放同步状态
- 模板方法设计模式:子类需要实现tryAcquire、tryRelease等方法
源码分析
- acquire方法实现:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
- acquireQueued方法实现:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 如果前驱是头节点,尝试获取锁 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 判断是否应该阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
AQS实现的同步器
- ReentrantLock:可重入锁
- Semaphore:信号量
- CountDownLatch:倒计时门闩
- ReentrantReadWriteLock:读写锁
- ThreadPoolExecutor:线程池的工作队列
# 读写锁
ReentrantReadWriteLock
- 特点:允许多个读线程同时访问,但只允许一个写线程访问
- 适用场景:读多写少的并发场景
锁降级
- 写锁可以降级为读锁,但读锁不能升级为写锁
- 锁降级流程:获取写锁 -> 获取读锁 -> 释放写锁
源码分析
- 内部维护了一个ReadLock和一个WriteLock
- 使用同一个Sync实现,但读写状态存储在同一个int变量的不同位上
- 高16位表示读锁状态,低16位表示写锁状态
使用示例
public class ReadWriteLockExample { private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); private final Lock writeLock = rwLock.writeLock(); private int data = 0; public int readData() { readLock.lock(); try { return data; } finally { readLock.unlock(); } } public void writeData(int newData) { writeLock.lock(); try { data = newData; } finally { writeLock.unlock()
## 线程池
### ThreadPoolExecutor
1. **核心参数**
- corePoolSize:核心线程数
- maximumPoolSize:最大线程数
- keepAliveTime:线程空闲时间
- workQueue:工作队列
- threadFactory:线程工厂
- handler:拒绝策略
2. **工作原理**
- 当提交任务时,如果运行的线程数小于corePoolSize,创建新线程
- 如果运行的线程数大于等于corePoolSize,将任务放入队列
- 如果队列已满,且运行的线程数小于maximumPoolSize,创建新线程
- 如果队列已满,且运行的线程数等于maximumPoolSize,执行拒绝策略
3. **线程池状态**
```java
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
任务队列
- ArrayBlockingQueue:有界队列
- LinkedBlockingQueue:无界队列
- SynchronousQueue:直接交付
- PriorityBlockingQueue:优先级队列
拒绝策略
- AbortPolicy:抛出异常
- CallerRunsPolicy:调用者线程执行
- DiscardPolicy:直接丢弃
- DiscardOldestPolicy:丢弃最旧的任务
源码分析
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
最佳实践
- 根据任务类型设置合适的线程数
- CPU密集型:线程数 = CPU核心数 + 1
- IO密集型:线程数 = CPU核心数 * (1 + IO耗时/CPU耗时)
- 使用有界队列防止OOM
- 根据业务场景选择合适的拒绝策略
常用线程池
// 固定线程数的线程池 ExecutorService fixedPool = Executors.newFixedThreadPool(10); // 单线程的线程池 ExecutorService singlePool = Executors.newSingleThreadExecutor(); // 缓存线程池 ExecutorService cachedPool = Executors.newCachedThreadPool(); // 定时任务线程池 ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(10);
线程池监控
public class ThreadPoolMonitor { private ThreadPoolExecutor executor; public ThreadPoolMonitor(ThreadPoolExecutor executor) { this.executor = executor; } public void monitor() { System.out.println("核心线程数:" + executor.getCorePoolSize()); System.out.println("当前线程数:" + executor.getPoolSize()); System.out.println("最大线程数:" + executor.getMaximumPoolSize()); System.out.println("线程池活跃度:" + divide(executor.getActiveCount(), executor.getMaximumPoolSize())); System.out.println("任务完成数:" + executor.getCompletedTaskCount()); System.out.println("队列大小:" + (executor.getQueue().size() + executor.getQueue().remainingCapacity())); System.out.println("当前排队线程数:" + executor.getQueue().size()); System.out.println("队列剩余大小:" + executor.getQueue().remainingCapacity()); } private String divide(int num1, int num2) { return String.format("%.2f%%", Double.parseDouble(num1 + "") / Double.parseDouble(num2 + "") * 100); } }
← Java 多线程编程 Java虚拟机 →