锁的使用以及AQS的原理和使用
锁的原理与使用
要想知道锁的使用就要先了解线程同步的模式,线程同步是一种保护性暂停模式。
保护性暂停模式
定义(Guarded Suspension Design Pattern)
- 某个结果需要在多线程之间传递,则可以让这些线程关联到一个对象 GuardedObject
- 但是如果这个结果需要不断的从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- 我们前面前面说的join、future采用的就是这个模式
如何实现
最简单的实现
1、首先编写一个简单的GuardedObject
package com.shadow.guarded; import lombok.extern.slf4j.Slf4j; @Slf4j(topic = "enjoy") public class GuardedObject { private Object response; Object lock = new Object();
public Object getResponse(){ synchronized (lock) { log.debug("主线程 获取 response 如果为null则wait"); while (response == null) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } return response; }
public void setResponse(Object response) { synchronized (lock) { this.response = response; lock.notifyAll(); } } }
|
2、编写模拟耗时操作
package com.shadow.guarded;
import java.util.concurrent.TimeUnit;
public class Operate {
public static String dbOprate(){ try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } return "result"; }
}
|
3、编写测试类
package com.shadow.guarded;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "enjoy") public class Test { public static void main(String[] args) { GuardedObject guardedObject = new GuardedObject(); new Thread(() -> { String result = Operate.dbOprate(); log.debug("t1 set完毕..."); guardedObject.setResponse(result); },"t1").start();
log.debug("主线程等待t1 set"); Object response = guardedObject.getResponse(); log.debug("response: [{}] lines",response); }
}
|
结果
17:36:15.856 [main] DEBUG enjoy - 主线程等待t1 set 17:36:15.859 [main] DEBUG enjoy - 主线程 获取 response 如果为null则wait 17:36:19.856 [t1] DEBUG enjoy - t1 set完毕... 17:36:19.856 [main] DEBUG enjoy - response: [result] lines
Process finished with exit code 0
|
超时实现
如果想要实现超时,那么在get的时候需要定义一个超时时间
public Object getResponse(long millis)
|
然后wait的不能无限的等待
继而结束while循环
public Object getResponse(long millis){ synchronized (lock) { log.debug("主线程 获取 response 如果为null则wait"); while (response == null) { try { lock.wait(millis); break; } catch (InterruptedException e) { e.printStackTrace(); } } } return response; }
|
分析1
这种做法的问题在于如果主线程被别人叫醒了;就会立马返回;比如超时时间是5s;但是在第2s的时候别人把主线程叫醒了,那么主线程会立马返回没有等足5s
所以需要设计一个经历时间;也就是从他wait到被别人叫醒中间一共经历了多少时间;判断这个时间是否符合超时;如果要计算这个经历时间必须知道开始时间和结束时间;
1、首先定一个开始时间等于当前时间 long begin = System.currentTimeMillis();
2、定一个经历时间 默认为0 long timePassed = 0;
3、判断是否满足条件,满足则返回结果不阻塞;不满足则然后进入while循环 首先计算等待时间(也就是wait的时间) millis-timePassed
4、判断等待时间是否小于0;小于0标识超时了直接结束while循环 返回不等待了
5、如果大于0 进入wait 这样就算提前被别人叫醒 也会在继续wait
最终代码实现
package com.shadow.guarded;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "enjoy") public class GuardedObjectTimeOut { private Object response;
Object lock = new Object();
public Object getResponse(long millis){ synchronized (lock) { long begin = System.currentTimeMillis(); long timePassed = 0; while (response == null) { long waitTime = millis-timePassed; log.debug("主线程 判断如果没有结果则wait{}毫秒",waitTime); if (waitTime <= 0) { log.debug("超时了 直接结束while 不等了"); break; } try { lock.wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } timePassed = System.currentTimeMillis() - begin; log.debug("经历了: {}", timePassed); } } return response; }
public void setResponse(Object response) { synchronized (lock) { this.response = response; lock.notifyAll(); } } }
|
死锁
如果线程需要获取多把锁那么就很可能会发现死锁
package com.shadow.lock;
import jdk.nashorn.internal.ir.Block; import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j(topic = "enjoy") public class LockTest {
static Object x = new Object(); static Object y = new Object();
public static void main(String[] args) { new Thread(()->{ synchronized (x){ log.debug("locked x"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (y){ log.debug("locked x"); log.debug("t1---------"); } }
},"t1").start();
new Thread(()->{ synchronized (y){ log.debug("locked y"); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (x){ log.debug("locked x"); log.debug("t2---------"); } }
},"t2").start(); }
}
|
活锁
不可避免 但是我可以错开他们的执行时间
package com.shadow.lock;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j(topic = "enjoy") public class LockTest1 { static volatile int count = 10; static final Object lock = new Object();
public static void main(String[] args) { new Thread(() -> { while (count > 0) { try { TimeUnit.NANOSECONDS.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } count--; log.debug("count: {}", count); } }, "t1").start();
new Thread(() -> { while (count < 20) { try { TimeUnit.NANOSECONDS.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } count++; log.debug("count: {}", count); } }, "t2").start(); } }
|
Lock–应用
特点:
- 可打断,可重入
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量
- 支持读写锁(单独的篇章来讲)
基本语法
reentrantLock.lock(); try {
} finally {
reentrantLock.unlock(); }
|
重入
package com.shadow.lock;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "enjoy") public class LockTest3 {
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) { lock1(); }
public static void lock1() { lock.lock(); try { log.debug("执行lock1"); lock2(); } finally { lock.unlock(); } }
public static void lock2() { lock.lock(); try { log.debug("执行lock2"); } finally { lock.unlock(); } }
}
|
可打断
package com.shadow.lock;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "enjoy") public class LockTest4 {
public static void main(String[] args) throws InterruptedException { ReentrantLock lock = new ReentrantLock();
new Thread(()->{ try { lock.lock(); log.debug("获取锁----"); TimeUnit.SECONDS.sleep(5); log.debug("t2 5s 之后继续执行"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); }
},"t2").start();
TimeUnit.SECONDS.sleep(1);
Thread t1 = new Thread(() -> { try { lock.lockInterruptibly(); log.debug("获取了锁--执行代码"); } catch (InterruptedException e) { e.printStackTrace(); log.debug("被打断了没有获取锁"); return; } finally { lock.unlock(); } }, "t1"); t1.start();
try { log.debug("主线程------1s后打断t1"); TimeUnit.SECONDS.sleep(2); t1.interrupt(); } catch (InterruptedException e) { e.printStackTrace(); }
} }
|
t---线程 lock.lockInterruptibly(); 标识可以打断
怎么打断
t.interrupt();
|
超时
package com.shadow.lock;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "enjoy") public class LockTest5 {
public static void main(String[] args) throws InterruptedException { ReentrantLock lock = new ReentrantLock(); Thread t1 = new Thread(() -> { log.debug("t1启动---------"); try { if (!lock.tryLock(2,TimeUnit.SECONDS)) { log.debug("拿不到鎖,返回"); return; } } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("获得了锁"); } finally { lock.unlock(); } }, "t1"); lock.lock(); log.debug("主綫程获得了锁"); t1.start(); try { TimeUnit.SECONDS.sleep(3); } finally { lock.unlock(); }
} }
|
多個條件
synchronized 中也有条件变量,就是以前讲的waitSet 不满足条件的线程进入waitSet;而Lock也有waitSet而且有多个
package com.shadow.lock;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "enjoy") public class TestWait5 {
static final ReentrantLock lock = new ReentrantLock(); static boolean isPrettyGril = false; static boolean isMoney = false;
static Condition waitpg = lock.newCondition(); static Condition waitm = lock.newCondition();
public static void main(String[] args) throws InterruptedException { new Thread(() -> { try { lock.lock(); log.debug("有没有女人[{}]", isPrettyGril); while (!isPrettyGril) { log.debug("没有女人!等女人"); try { waitpg.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("男女搭配干活不累;啪啪啪写完了代码"); }finally { lock.unlock(); } }, "jack").start();
new Thread(() -> { try { lock.lock();
log.debug("有没有工资[{}]", isMoney); while (!isMoney) { log.debug("没有工资!等发工资"); try { waitm.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("-----卧槽好多钱;啪啪啪写完了代码");
}finally { lock.unlock(); } }, "rose").start();
Thread.sleep(1000); new Thread(() -> { try { lock.lock(); isMoney = true; log.debug("钱来哦了"); waitm.signal(); isPrettyGril=true; waitpg.signal();
}finally { lock.unlock(); } }, "boss").start(); }
}
|
读写锁
读读并发
读写互斥
写写互斥
读写锁读锁不支持条件
读锁的条件直接调用ReentrantReadWriteLock的 newCondition 会直接exception public Condition newCondition() { throw new UnsupportedOperationException(); }
|
读写锁使用的例子
package com.shadow.lock;
import com.shadow.aqs.CustomSync; import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.*;
@Slf4j(topic = "enjoy") public class LockTest10 {
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); static Lock r = rwl.readLock(); static Lock w = rwl.writeLock();
public static void main(String[] args) throws InterruptedException {
new Thread(()->{ log.debug("read 获取 锁"); r.lock(); try { for (int i = 0; i < 10; i++) { m1(i); } }finally { r.unlock(); }
},"t1").start();
new Thread(()->{ log.debug("write 获取 锁"); w.lock(); try { for (int i = 0; i < 20; i++) { m1(i); } }finally { w.unlock(); }
},"t2");
new Thread(()->{ log.debug("write 获取 锁"); r.lock(); try { for (int i = 0; i < 20; i++) { m1(i); } }finally { r.unlock(); }
},"t3").start();
}
public static void m1(int i){ log.debug("exe"+i); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
} }
|
读写支持重入但是只支持降级不支持升级
package com.shadow.lock;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@Slf4j(topic = "enjoy") public class LockTest11 {
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); static Lock r = rwl.readLock(); static Lock w = rwl.writeLock();
public static void main(String[] args) throws InterruptedException {
new Thread(() -> { log.debug("read"); w.lock(); try { log.debug("read 已经获取"); r.lock(); log.debug("write 已经获取"); } finally { r.unlock(); w.unlock();
}
}, "t1").start();
} }
|
class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { rwl.readLock().unlock(); rwl.writeLock().lock(); try { if (!cacheValid) { data = "数据库得到真实数据" cacheValid = true; } rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } } } }
|
AQS框架
定义
1、全称是 AbstractQueuedSynchronizer
2、阻塞式锁和相关的同步器工具的框架;
3、AQS用一个变量(volatile state) 属性来表示锁的状态,子类去维护这个状态
3、getState、compareAndSetState cas改变这个变量
4、独占模式是只有一个线程能够访问资源
5、而共享模式可以允许多个线程访问资源(读写锁)
6、内部维护了一个FIFO等待队列,类似于 synchronized关键字当中的 Monitor 的 EntryList
7、条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
8、内部维护了一个Thread exclusiveOwnerThread 来记录当前持有锁的那个线程
功能
1、实现阻塞获取锁 acquire 拿不到锁就去阻塞 等待锁被释放再次获取锁
2、实现非阻塞尝试获取锁 tryAcquire 拿不到锁则直接放弃
3、实现获取锁超时机制
4、实现通过打断来取消
5、实现独占锁及共享锁
6、实现条件不满足的时候等待
自定义实现AQS框架
继承AQS 实现其主要方法
package com.shadow.aqs;
import java.util.concurrent.locks.AbstractQueuedLongSynchronizer; import java.util.concurrent.locks.Condition;
public class CustomSync extends AbstractQueuedLongSynchronizer { @Override public boolean tryAcquire(long acquires) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
@Override protected boolean tryRelease(long arg) {
if(getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; }
@Override protected boolean isHeldExclusively() { return getState()==1; }
public Condition newCondition() { return new ConditionObject(); } }
|
实现Lock接口实现加锁解锁
package com.shadow.lock;
import com.shadow.aqs.CustomSync; import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.*;
@Slf4j(topic = "enjoy") public class LockTest10 implements Lock{ CustomSync customSync = new CustomSync();
@Override public void lock() { customSync.acquire(1); }
@Override public void lockInterruptibly() throws InterruptedException { customSync.acquireInterruptibly(1); }
@Override public boolean tryLock() { return customSync.tryAcquire(1); }
@Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return customSync.tryAcquireNanos(1, unit.toNanos(time)); }
@Override public void unlock() { customSync.release(1); }
@Override public Condition newCondition() { return customSync.newCondition(); }
public static void main(String[] args) throws InterruptedException { LockTest10 l = new LockTest10(); new Thread(()->{ l.lock(); log.debug("xxx"); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } l.unlock(); },"t1").start();
TimeUnit.SECONDS.sleep(1); l.lock(); log.debug("main"); l.unlock(); } }
|
ReentrantLock的使用及原理见下文。