Hello World

吞风吻雨葬落日 欺山赶海踏雪径

0%

java.util.concurrent.locks.Lock类

什么是锁

锁的定义在java.util.concurrent.locks.Lock类中已有说明:

A lock is a tool for controlling access to a shared resource by multiple threads.
锁是在多个线程中控制访问共享资源锁的工具。

java.util.concurrent.locks.Lock类

官方说明
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html

LocK提供的方法:

  • void lock() – 阻塞方式获取锁
  • void lockInterruptibly() – 与lock()方法类似,但是阻塞线程可以被中断,抛出java.lang.InterruptedException后继续执行
  • boolean tryLock() – 非阻塞的lock()版本,成功获取锁返回true,否则返回false
  • boolean tryLock(long timeout, TimeUnit timeUnit) – 带有超时时间的尝试获取锁,比tryLock增加了超时等待。
  • void unlock() – 释放锁

Lock的使用方式

1
2
3
4
5
6
7
8
Lock lock = ...; 
lock.lock();
try {
// access to the shared resource
} finally {
lock.unlock();
}

Lock和synchronized的区别

Lock有比synchronized更精确的线程语义和更好的性能,Lock是显示的使用锁,而synchronized是隐式的,synchronized会自动释放锁,Lock要求手工释放(通常在finally中)。这种完全不同的语义使Lock能够满足更多场景的需求,比如保证顺序、不可重入、死锁检测等等,同时Lock也提供了非阻塞情况下尝试获取锁、有超时时间的获取锁等等一些增强的方法。

这里总结的很好

  • A synchronized block is fully contained within a method – we can have Lock API’s lock() and unlock() operation in separate methods(不局限于方法维度)
  • A synchronized block does not support the fairness, any thread can acquire the lock ones released, no preference can be specified. We can achieve fairness within the Lock APIs by specifying the fairness property. It makes sure that longest waiting thread is given access to lock(Lock可以实现公平锁,等待最长时间的获得锁)
  • A thread gets blocked if it can’t get an access to the synchronized block. The Lock API provides tryLock() method. The thread acquires lock only if it’s available and not held by any other thread. This reduces blocking time of thread waiting for the lock(Lock支持不阻塞的尝试获取锁)
  • A thread which is in “waiting” state to acquire the access to synchronized block, can’t be interrupted. The Lock API provides a method lockInterruptibly() which can be used to interrupt the thread when it is waiting for the lock(Lock可以中断的等待锁)

Lock比较典型的实现

  • ReentrantLock 提供与synchronized类似的功能,增加了一些灵活性与功能的扩展
  • ReentrantReadWriteLock 读写锁。
    • Read Lock 在没有写锁的情况下多个线程都可以获取读锁
    • Write Lock 在没有写锁的情况下只有一个线程能获取写锁
  • StampedLock 同样是读写锁,但是在获取锁的同时会返回一个邮戳来校验锁的有消息或释放锁。

ReentrantLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class SharedObject {
//...
ReentrantLock lock = new ReentrantLock();
int counter = 0;

public void perform() {
lock.lock();
try {
// Critical section here
count++;
} finally {
lock.unlock();
}
}

public void performTryLock(){
boolean isLockAcquired = lock.tryLock(1, TimeUnit.SECONDS);

if(isLockAcquired) {
try {
//Critical section here
} finally {
lock.unlock();
}
}
}
}

ReentrantReadWriteLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class SynchronizedHashMapWithReadWriteLock {

Map<String,String> syncHashMap = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();
//...
Lock writeLock = lock.writeLock();

public void put(String key, String value) {
try {
writeLock.lock();
syncHashMap.put(key, value);
} finally {
writeLock.unlock();
}
}
...
public String remove(String key){
try {
writeLock.lock();
return syncHashMap.remove(key);
} finally {
writeLock.unlock();
}
}

Lock readLock = lock.readLock();
//...
public String get(String key){
try {
readLock.lock();
return syncHashMap.get(key);
} finally {
readLock.unlock();
}
}

public boolean containsKey(String key) {
try {
readLock.lock();
return syncHashMap.containsKey(key);
} finally {
readLock.unlock();
}
}
}

StampedLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class StampedLockDemo {
Map<String,String> map = new HashMap<>();
private StampedLock lock = new StampedLock();

public void put(String key, String value){
long stamp = lock.writeLock();
try {
map.put(key, value);
} finally {
lock.unlockWrite(stamp);
}
}

public String get(String key) throws InterruptedException {
long stamp = lock.readLock();
try {
return map.get(key);
} finally {
lock.unlockRead(stamp);
}
}
}

StampedLock还提供了乐观锁,通常,读的时候并不需要等待写的完成。乐观锁读:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public String readWithOptimisticLock(String key) {
long stamp = lock.tryOptimisticRead();
String value = map.get(key); //先直接获取资源

if(!lock.validate(stamp)) { //在获取时已经有新的写入了,有写竞争走悲观锁流程
stamp = lock.readLock(); //重新获取读锁(悲观锁)
try {
return map.get(key);
} finally {
lock.unlock(stamp);
}
}
return value;
}

Conditions

当一个线程获取到了资源的访问权限,但是却没有满足执行的权限时就用到了Condition.进程间的内部通讯机制,同wait(), notify() and notifyAll() 方法,但是允许定义多个Condition。

例子如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ReentrantLockWithCondition {

Stack<String> stack = new Stack<>();
int CAPACITY = 5;

ReentrantLock lock = new ReentrantLock();
Condition stackEmptyCondition = lock.newCondition(); //栈空的Condition
Condition stackFullCondition = lock.newCondition(); //栈满的Condition

public void pushToStack(String item){
try {
lock.lock();
while(stack.size() == CAPACITY){ //注意这里的while循环,必须while校验条件是否满足
stackFullCondition.await();
}
stack.push(item);
stackEmptyCondition.signalAll();
} finally {
lock.unlock();
}
}

public String popFromStack() {
try {
lock.lock();
while(stack.size() == 0){
stackEmptyCondition.await();
}
return stack.pop();
} finally {
stackFullCondition.signalAll();
lock.unlock();
}
}
}

JUC

JUC是java.util.concurrent包的简称,该包提供了并发编程的解决方案。JUC包有两大核心:CAS和AQS。其中CAS是java.util.concurrent.atomic包的基础,AQS是java.util.concurrent.locks包以及一些常用类比如Semophore等类的基础

CAS

CAS的全称为Compare-And-Swap,它是一条CPU并发原语.它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的。CAS并发原语体现在JAVA语言中就是sun.misc.Unsafe类中的各个方法。调用UnSafe类中的CAS方法,JVM会帮我们实现出CAS汇编指令。这是一种完全依赖于硬件的功能。

典型的使用场景java.util.concurrent.atomic.AtomicInteger,参见其源码(1.7源码)

1
2
3
4
5
6
7
8
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}

CAS是乐观锁,是一种冲突重试的机制,在并发竞争不是很激烈的情况下(也是大多数情况),他的性能要好于基于锁的并发性能。因为并发竞争激烈的话,冲突重试的过程会很多。

AQS

AQS是AbstractQueuedSynchronizer的简称。它是一个同步框架,负责资源的管理和资源申请者的管理
AQS有两个主要部分

  • volatile int state; 代表共享资源
  • FIFO线程等待队列; 多线程争用资源被阻塞时会进入此队列

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

实现自定义同步器时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

1
2
3
4
5
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,成功则返回true,失败则返回false

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

自定义同步器类的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class Mutex implements Lock , Serializable{
private static class Syn extends AbstractQueuedSynchronizer{
@Override
protected boolean isHeldExclusively() {
return getState()==1;
}
@Override
protected boolean tryAcquire(int arg) {
assert arg==1;
if(compareAndSetState(0,1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
assert arg==1;
if(getState()==0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
private Syn syn=new Syn();
public void lock() {
syn.acquire(1);
}
public void unlock() {
syn.release(1);
}
public void lockInterruptibly() throws InterruptedException{
syn.acquireInterruptibly(1);
}
public boolean tryLock(){
return syn.tryAcquire(1);
}
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException{
return syn.tryAcquireNanos(1,unit.toNanos(time));
}
public Condition newCondition(){
return null;
}
}

acquire()方法是独占模式获取资源的顶级入口

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

流程如下:

  1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  2. 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

再看release()方法,release()方法是独占模式下释放资源的顶级入口

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;//找到头结点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒等待队列里的下一个线程
return true;
}
return false;
}

AQS是许多常用类的基础,比如ReentrantLock,Semaphore,ReentrantReadWriteLock,CountDownLatch等等,都是基于AQS构建的.

分布式锁

待补充
https://github.com/redisson/redisson#quick-start
org.redisson.RedissonLock

参考文档

Guide to java.util.concurrent.Locks
http://www.baeldung.com/java-concurrent-locks

JUC之CAS
https://blog.csdn.net/yizhenn/article/details/52384590
JUC之AQS
https://blog.csdn.net/yizhenn/article/details/52384592

AtomicInteger源码分析——基于CAS的乐观锁实现
https://www.aliyun.com/jiaocheng/290273.html?spm=5176.100033.1.24.tnquSx