博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【Java并发编程实战】——ReentrantReadWriteLock源码分析
阅读量:4181 次
发布时间:2019-05-26

本文共 7984 字,大约阅读时间需要 26 分钟。

ReadWriteLock 读写锁,它维护了一个读锁和一个写锁。一个线程持有写锁,其他线程的读写操作全部阻塞;一个线程持有读锁,其它线程也可以持有写锁。ReadWriteLock 的实现类需要保证,成功获取读锁的线程能够看到写锁之前版本所做的更新。

和互斥锁 ReentrantLock 相比,在多处理器上并且访问共享数据的情况多于修改共享数据时,使用读写锁能够带来更大的性能提升。

ReentrantReadWriteLock (implements ReadWriteLock) 可重入的读写锁,它支持以下特性:

  • 支持可选的公平策略,默认非公平;
  • 支持重入,已经获取锁(不管是读锁还是写锁)的线程能够重新获取相同的锁。且获取了写锁能够获取读锁,反过来不行;
  • 支持写锁降级为读锁,实现方式为:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。
  • 锁中断
  • 支持 Condition
  • 监视锁获取的状态

看下面这个缓存的例子:

用一个非线程安全的 TreeMap 和 ReentrantReadWriteLock 来实现缓存。从 TreeMap 中获取数据需要持有读锁,并发读不会被阻塞;修改或者清空 TreeMap 中数据需要持有写锁,其他读写锁均被阻塞。

public class Cache {
private final Map
m = new TreeMap<>(); private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); public Object get(String key) {
r.lock(); try {
return m.get(key); } finally {
r.unlock(); } } public Object put(String key, Object value) {
w.lock(); try {
return m.put(key, value); } finally {
w.unlock(); } } public void clear() {
w.lock(); try {
m.clear(); } finally {
w.unlock(); } } public void getCount() {
int readHoldCount = rwl.getReadHoldCount(); int readLockCount = rwl.getReadLockCount(); int writeHoldCount = rwl.getWriteHoldCount(); }}

锁获取顺序:

非公平:与重入锁类似
公平:线程利用一个近似到达顺序的策略来争夺进入。如果写锁被持有,或者有一个等待获取写锁的线程,则试图获得公平读取锁(非重入)的线程将会阻塞。试图获得公平写入锁的(非重入地)的线程将会阻塞,除非读取锁和写入锁都自由(这意味着没有等待线程)。

锁的状态设计:读锁和写锁的同步状态维护在同一个整形变量中,高16位表示读锁获取的次数,低16位表示写锁的获取次数。

/* * Read vs write count extraction constants and functions. * Lock state is logically divided into two unsigned shorts: * The lower one representing the exclusive (writer) lock hold count, * and the upper the shared (reader) hold count. *///移位数量static final int SHARED_SHIFT   = 16;//共享读每增加一个,状态增加的单位static final int SHARED_UNIT    = (1 << SHARED_SHIFT);//此锁最多支持 65535 个递归写入锁和 65535 个读取锁static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;/** Returns the number of shared holds represented in count  *///无符号右移16位即可获取读锁的同步状态static int sharedCount(int c)    {
return c >>> SHARED_SHIFT; }/** Returns the number of exclusive holds represented in count *///低16位 & 1111111111111111 即可获取写锁的同步状态static int exclusiveCount(int c) {
return c & EXCLUSIVE_MASK; }

读锁的获取

r.lock();protected final int tryAcquireShared(int unused) {
/* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) //写锁被其他线程占用,失败 return -1; //获取读锁状态 int r = sharedCount(c); if (!readerShouldBlock() && //判断读锁是否应该被阻塞 r < MAX_COUNT && //已获取的读锁总数限制 compareAndSetState(c, c + SHARED_UNIT)) {
//设置读锁同步状态 if (r == 0) {
//读锁第一次获取,保存线程以及其持有数量 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) {
//本线程之前已获取,计数+1 firstReaderHoldCount++; } else {
//非第一个获取读锁的其他线程获取读锁 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) //rh 设置为当前线程 cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); //本线程持有的计数+1 rh.count++; } return 1; } //处理读锁没有被获取到的情况 return fullTryAcquireShared(current);}final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation, * block if the thread that momentarily appears to be head * of queue, if one exists, is a waiting writer. This is * only a probabilistic effect since a new reader will not * block if there is a waiting writer behind other enabled * readers that have not yet drained from the queue. */ //同步队列第一个节点是否明显是独占节点(获取写锁被阻塞的线程) //当前持有读锁,不仅新加入的读锁不会阻塞,且读锁被释放后同步队列从头开始的读锁都会被释放 //直到独占节点成为队头,防止写锁无限制被延后 return apparentlyFirstQueuedIsExclusive();}/** * Full version of acquire for reads, that handles CAS misses * and reentrant reads not dealt with in tryAcquireShared. */final int fullTryAcquireShared(Thread current) {
/* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ //删掉了计数的代码,这部分比较简单,而且不影响读写锁的控制,只用作监视,有兴趣再去研究 for (;;) {
int c = getState(); //获取写锁重入数量 if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current) //写锁被占有且写锁的持有者不是自己,失败 return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } if (sharedCount(c) == MAX_COUNT) //已达到最大支持读并发重入数,抛异常 throw new Error("Maximum lock count exceeded"); //CAS成功则退出,失败继续循环 if (compareAndSetState(c, c + SHARED_UNIT)) {
return 1; } }}

读锁的释放,唤醒同步队列中下个节点

protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread(); //删掉了监视的代码 for (;;) {
int c = getState(); //高16位才是读锁状态 int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. //读锁全部释放才算是释放成功,不影响读锁 return nextc == 0; }}

写锁的获取与释放

w.lock();protected final boolean tryAcquire(int acquires) {
/* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); //写锁的重入数量 int w = exclusiveCount(c); if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0) //写锁为0,但是总的锁不为0,即存在读锁,阻塞 //自己不为写锁的拥有者,也阻塞 if (w == 0 || current != getExclusiveOwnerThread()) return false; //写锁重入 //写锁重入数量是否超过限制 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } //读写锁状态为初始化的情况下 if (writerShouldBlock() || //写锁不能被阻塞 !compareAndSetState(c, c + acquires)) //设置状态失败,获取写锁失败 return false; //设置状态失败,获取写锁成功 setExclusiveOwnerThread(current); return true;}//释放protected final boolean tryRelease(int releases) {
//释放的线程一定要是写锁的持有者 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; //重入的写锁全部释放完才算是释放成功 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free;}

转载地址:http://wqrai.baihongyu.com/

你可能感兴趣的文章
hbase shell出现ERROR: org.apache.hadoop.hbase.ipc.ServerNotRunningYetException
查看>>
让代码变得更优雅-Lombok
查看>>
解决Rhythmbox乱码
查看>>
豆瓣爱问共享资料插件发布啦
查看>>
kermit的安装和配置
查看>>
vim 配置
查看>>
openocd zylin
查看>>
进程创建时文件系统处理
查看>>
内核线程创建
查看>>
linux中cat命令使用详解
查看>>
java中的异常机制
查看>>
商务智能-基本方法-数据钻取
查看>>
C++程序员技术需求规划(发展方向)
查看>>
JNI
查看>>
Cardboard虚拟现实开发初步(二)
查看>>
60个优秀的免费3D模型下载网站
查看>>
Cardboard虚拟现实开发初步(三)
查看>>
Android native和h5混合开发几种常见的hybrid通信方式
查看>>
Vista/Win7 UAC兼容程序开发指南
查看>>
IOS程序开发框架
查看>>