lettuce 洋葱工厂官网
在Lettuce研究报告(二)中主要讲了RedisAsyncCommandsImpl中路由redis命令前,构造命令的构造器,以及AsyncCommand和TransactionalCommand的处理逻辑。
今天接着从dispatch开始看(隐藏不相关的代码,下同)。上有一节里
public <T> AsyncCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
AsyncCommand<K, V, T> asyncCommand = new AsyncCommand<>(cmd);
RedisCommand<K, V, T> dispatched = connection.dispatch(asyncCommand);
。。。
}
这里的connection.dispatch再点进去,就到了RedisChannelHandler,看到handler就大概知道这里已经到来关键处理类了。
protected <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
。。。
return channelWriter.write(cmd);
}
在众多实现类里直接看默认实现DefaultEndpoint。
private final SharedLock sharedLock = new SharedLock();
public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
...
try {
// 计数
sharedLock.incrementWriters();
...
if (autoFlushCommands) {
if (isConnected()) {
writeToChannelAndFlush(command);
} else {
writeToDisconnectedBuffer(command);
}
} else {
writeToBuffer(command);
}
} finally {
sharedLock.decrementWriters();
}
return command;
}
在写命令之前用了一个自定义的共享锁的计数器。
共享锁
为什么要用这个锁呢?
由于底层是netty,如果网络连接突然断开,lettuce需要先将已经进入到代码块里的请求处理完,也就是DefaultEndpoint里的write里把command写到buffer,再做断开后处理,否则command还没有加入到buffer里的话,就会丢失命令。
所以这里就需要记录已经进入到待加入buffer的命令数量,当数量为0时,再开启独占锁,等自己处理完成后,再开启进入的阀门。
看lettuce怎么实现的。
看其成员变量
// 更新器
private static final AtomicLongFieldUpdater<SharedLock> WRITERS = AtomicLongFieldUpdater.newUpdater(SharedLock.class,
&34;writers&34;);
// 非公平锁
private final Lock lock = new ReentrantLock();
// 记录进入待处理的写入线程的个数
private volatile long writers = 0;
// 拥有排他锁的线程
private volatile Thread exclusiveLockOwner;
核心逻辑都在代码里了,先看计数器加1的逻辑:只需要关注1和5,在5里计数器自增了。
void incrementWriters() {
// 1 当前线程已经有排他锁了,就不处理
if (exclusiveLockOwner == Thread.currentThread()) {
return;
}
// 2
lock.lock();
try {
// 3
for (;;) {
// 4
if (WRITERS.get(this) >= 0) {
// 5 计数
WRITERS.incrementAndGet(this);
return;
}
}
} finally {
// 6
lock.unlock();
}
}
自减操作就不看,直接看抢排他锁的部分,都是通过调用doExclusive来实现获取锁的:
<T> T doExclusive(Supplier<T> supplier) {
lock.lock();
try {
try {
// 等待获取排他锁
lockWritersExclusive();
// 执行获取排他锁后的逻辑
return supplier.get();
} finally {
unlockWritersExclusive();
}
} finally {
lock.unlock();
}
}
// 核心逻辑
private void lockWritersExclusive() {
if (exclusiveLockOwner == Thread.currentThread()) {
WRITERS.decrementAndGet(this);
return;
}
lock.lock();
try {
for (;;) {
// 如果计数器是0,则将计数器置为-1
if (WRITERS.compareAndSet(this, 0, -1)) {
exclusiveLockOwner = Thread.currentThread();
return;
}
}
} finally {
lock.unlock();
}
}
// 需要排他锁
private void unlockWritersExclusive() {
if (exclusiveLockOwner == Thread.currentThread()) {
if (WRITERS.incrementAndGet(this) == 0) {
exclusiveLockOwner = null;
}
}
}
想要获取排他锁,就要等待计数器减为0,所以在lockWritersExclusive一个死循环,一直到writes为0时,才能跳出循环。
在把writes置为-1,则是为了incrementWriters里的4部分的代码同时进入死循环,避免writes计数器变更。而在2处的代码又保证了其他线程进入不了代码块里,这样就全面保证了安全性。
当处理完业务后,unlockWritersExclusive修改计数器为0同时清空掉排他锁线程,使得incrementWriters 开始工作。
总结
本节主要看了DefaultEndpoint类的write部分,里面的共享锁,设计还是很精妙的,逻辑虽然不复杂,但是却有点绕,前后关联度很高,所以看代码更直观些。
代码用到了ReentrantLock,既然来了,就再复习下吧。
番外
先看类图
Sync是ReentrantLock的一个成员变量,锁操作都由这个类来实现。继承自AbstractQueuedSynchronizer,它有2个子类FairSync公平锁和NonfairSync非公平锁。
ReentrantLock类有两个构造函数,默认是非公平锁。当传入的参数是true为公平锁,为false为非公平锁,这2个实现有什么区别?
先看默认的非公平锁:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
先cas尝试获取state的变更,如果失败了就排队。
在看公平锁:
final void lock() {
acquire(1);
}
直接排队,没有多余的操作。这个之间的差异体现在哪里?
非公平锁效率更高!
因为在恢复一个被挂起的线程与该线程真正运行之间存在着一定的延迟,从而吞吐量提高了。
写在最后
俗话说,用剑最高境界就是无剑胜有剑。在并发开发里,最高的境界是知道所有的锁而不用锁,同时还能达到同样的效果。