本文共 7984 字,大约阅读时间需要 26 分钟。
ReadWriteLock 读写锁,它维护了一个读锁和一个写锁。一个线程持有写锁,其他线程的读写操作全部阻塞;一个线程持有读锁,其它线程也可以持有写锁。ReadWriteLock 的实现类需要保证,成功获取读锁的线程能够看到写锁之前版本所做的更新。
和互斥锁 ReentrantLock 相比,在多处理器上并且访问共享数据的情况多于修改共享数据时,使用读写锁能够带来更大的性能提升。
ReentrantReadWriteLock (implements ReadWriteLock) 可重入的读写锁,它支持以下特性:
看下面这个缓存的例子:
用一个非线程安全的 TreeMap 和 ReentrantReadWriteLock 来实现缓存。从 TreeMap 中获取数据需要持有读锁,并发读不会被阻塞;修改或者清空 TreeMap 中数据需要持有写锁,其他读写锁均被阻塞。public class Cache { private final Mapm = new TreeMap<>(); private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); public Object get(String key) { r.lock(); try { return m.get(key); } finally { r.unlock(); } } public Object put(String key, Object value) { w.lock(); try { return m.put(key, value); } finally { w.unlock(); } } public void clear() { w.lock(); try { m.clear(); } finally { w.unlock(); } } public void getCount() { int readHoldCount = rwl.getReadHoldCount(); int readLockCount = rwl.getReadLockCount(); int writeHoldCount = rwl.getWriteHoldCount(); }}
锁获取顺序:
非公平:与重入锁类似 公平:线程利用一个近似到达顺序的策略来争夺进入。如果写锁被持有,或者有一个等待获取写锁的线程,则试图获得公平读取锁(非重入)的线程将会阻塞。试图获得公平写入锁的(非重入地)的线程将会阻塞,除非读取锁和写入锁都自由(这意味着没有等待线程)。锁的状态设计:读锁和写锁的同步状态维护在同一个整形变量中,高16位表示读锁获取的次数,低16位表示写锁的获取次数。
/* * Read vs write count extraction constants and functions. * Lock state is logically divided into two unsigned shorts: * The lower one representing the exclusive (writer) lock hold count, * and the upper the shared (reader) hold count. *///移位数量static final int SHARED_SHIFT = 16;//共享读每增加一个,状态增加的单位static final int SHARED_UNIT = (1 << SHARED_SHIFT);//此锁最多支持 65535 个递归写入锁和 65535 个读取锁static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;/** Returns the number of shared holds represented in count *///无符号右移16位即可获取读锁的同步状态static int sharedCount(int c) { return c >>> SHARED_SHIFT; }/** Returns the number of exclusive holds represented in count *///低16位 & 1111111111111111 即可获取写锁的同步状态static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
读锁的获取
r.lock();protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) //写锁被其他线程占用,失败 return -1; //获取读锁状态 int r = sharedCount(c); if (!readerShouldBlock() && //判断读锁是否应该被阻塞 r < MAX_COUNT && //已获取的读锁总数限制 compareAndSetState(c, c + SHARED_UNIT)) { //设置读锁同步状态 if (r == 0) { //读锁第一次获取,保存线程以及其持有数量 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { //本线程之前已获取,计数+1 firstReaderHoldCount++; } else { //非第一个获取读锁的其他线程获取读锁 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) //rh 设置为当前线程 cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); //本线程持有的计数+1 rh.count++; } return 1; } //处理读锁没有被获取到的情况 return fullTryAcquireShared(current);}final boolean readerShouldBlock() { /* As a heuristic to avoid indefinite writer starvation, * block if the thread that momentarily appears to be head * of queue, if one exists, is a waiting writer. This is * only a probabilistic effect since a new reader will not * block if there is a waiting writer behind other enabled * readers that have not yet drained from the queue. */ //同步队列第一个节点是否明显是独占节点(获取写锁被阻塞的线程) //当前持有读锁,不仅新加入的读锁不会阻塞,且读锁被释放后同步队列从头开始的读锁都会被释放 //直到独占节点成为队头,防止写锁无限制被延后 return apparentlyFirstQueuedIsExclusive();}/** * Full version of acquire for reads, that handles CAS misses * and reentrant reads not dealt with in tryAcquireShared. */final int fullTryAcquireShared(Thread current) { /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ //删掉了计数的代码,这部分比较简单,而且不影响读写锁的控制,只用作监视,有兴趣再去研究 for (;;) { int c = getState(); //获取写锁重入数量 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) //写锁被占有且写锁的持有者不是自己,失败 return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } if (sharedCount(c) == MAX_COUNT) //已达到最大支持读并发重入数,抛异常 throw new Error("Maximum lock count exceeded"); //CAS成功则退出,失败继续循环 if (compareAndSetState(c, c + SHARED_UNIT)) { return 1; } }}
读锁的释放,唤醒同步队列中下个节点
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //删掉了监视的代码 for (;;) { int c = getState(); //高16位才是读锁状态 int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. //读锁全部释放才算是释放成功,不影响读锁 return nextc == 0; }}
写锁的获取与释放
w.lock();protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); //写锁的重入数量 int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) //写锁为0,但是总的锁不为0,即存在读锁,阻塞 //自己不为写锁的拥有者,也阻塞 if (w == 0 || current != getExclusiveOwnerThread()) return false; //写锁重入 //写锁重入数量是否超过限制 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } //读写锁状态为初始化的情况下 if (writerShouldBlock() || //写锁不能被阻塞 !compareAndSetState(c, c + acquires)) //设置状态失败,获取写锁失败 return false; //设置状态失败,获取写锁成功 setExclusiveOwnerThread(current); return true;}//释放protected final boolean tryRelease(int releases) { //释放的线程一定要是写锁的持有者 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; //重入的写锁全部释放完才算是释放成功 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free;}
转载地址:http://wqrai.baihongyu.com/