Redisson
首先是来自Redis官网的分布式锁的说明:
在不同进程必须以互斥的方式使用共享资源的环境中,分布式锁是一个非常有用的元素。
有许多库和博客文章描述如何使用Redis实现DLM(分布式锁管理器),但每个库都使用不同的方法,并且许多库使用简单的方法。与稍微复杂的方法相比,保证较低的设计。
我们提出了一种名为Redlock的算法,它实现了一个DLM,我们认为比普通的单实例方法更安全。我们希望社区能够对其进行分析、提供反馈,并将其用作实现更复杂或替代设计的起点。
为什么基于故障转移的实现是不够的?
在《Redis实现分布式锁》这篇文章中,自己实现了一个分布式锁,但是还是存在一定的问题。Redis官网是这样说的。
让我们分析一下大多数基于Redis的分布式锁库的当前情况:
使用Redis锁定资源的最简单方法是在实例中创建一个键。通常使用Redis过期功能创建带有有限存活时间的键,以便最终会释放它。当客户端需要释放资源时,它会删除该键。
表面上这很有效,但存在一个问题:就是架构中的单点故障。如果Redis主服务器崩溃会怎样呢?那就添加一个副本!并在主服务器不可用时使用它。但不幸的是,通过这样做,我们无法实现互斥的安全属性,因为Redis复制是异步的。
这个模型存在竞争条件:
- 客户端A在主服务器上获取锁。
- 写入键的过程在传输到副本之前,主服务器崩溃了。
- 副本被提升为主服务器。
- 客户端B获取了与客户端A持有的同一资源的锁。安全性被破坏!
有时,在特殊情况下,例如在出现故障时,多个客户端同时持有锁是完全可以接受的。如果是这种情况,则可以使用基于复制的解决方案。否则,我们建议实现本文档中描述的解决方案。
简单说就是:线程A首先获取锁成功了,将键写入了Redis的Master,但是在Master同步数据到Slave之前,Master宕机了。这个时候触发了故障转移,Slave成为新的Master(但是这个Slave并没有之前线程A写入的键),此时线程B来了,尝试获取锁,因为Slave中没有之前线程A写入的键,所以这里线程B理所当然的获取到了锁。那么这时问题出现了,线程A和线程B同时获取到了锁。
RedLock的算法设计理念
Redis提供了RedLock算法,用来实现基于多个实例的分布式锁。锁变量由多个实例维护,即使有实例发生了故障,锁变量任然时存在的,客户端还是可以完成锁操作。RedLock算法是实现高可靠分布式锁的一种有效解决方案,可以在实际开发中使用。
官网的介绍是:
在算法的分布式版本中,我们假设有N个Redis主节点。这些节点是完全独立的,因此我们不使用复制或任何其他隐式协调系统。我们已经描述了如何在单个实例中安全地获取和释放锁。我们认为算法将使用此方法在单个实例中获取和释放锁。在我们的示例中,我们将N设置为5,这是一个合理的值,因此我们需要在不同的计算机或虚拟机上运行5个Redis主节点,以确保它们以几乎独立的方式发生故障。
设计理念
该方案也是基于(SET
加锁、Lua脚本解锁)进行改良的,所以大致方案如下:
假设有N个Redis主节点,例如N = 5,这些节点都是完全独立的,不使用复制或者任何其他隐式协调系统,为了获取到锁,客户端执行以下操作:
- 以毫秒为单位获取当前时间。
- 顺序尝试在所有N个实例中获取锁,使用相同的键名和随机值。在第2步中,设置每个实例中的锁时,客户端使用一个相对于总锁自动释放时间很小的超时时间来获取锁。例如,如果自动释放时间为10秒,则超时时间可以在 [5,50]毫秒范围内。这可以防止客户端尝试与已经宕机的Redis节点通信时长时间被阻塞:如果一个实例不可用,我们应该尽快尝试与下一个实例通信。
- 客户端通过从步骤1中获取的时间戳中减去当前时间来计算获得锁所用的时间。当且仅当客户端能够在大多数实例(至少3个)中获得锁时,并且获得锁所用的总时间少于锁的有效时间时,才认为已经获得了锁。
- 如果成功获得了锁,则其有效时间被视为初始有效时间减去步骤3中计算出的时间。
- 如果客户端由于某种原因未能获得锁(无法锁定N/2+1实例或有效时间为负),则将尝试解锁所有实例(即使客户端认为自己无法锁定某些实例)。
该方案是为了解决数据不一致的问题,直接舍弃了异步复制而只是用Master节点,同时因为舍弃了Slave,又为了保证可用性,引入了N个节点。
Redisson介绍
Redisson是Java的Redis客户端之一,提供了一些API来方便的操作。它是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。一个基于Redis实现的分布式工具,有基本分布式对象和高级又抽象的分布式服务,为每个试图再造分布式轮子的程序员带来了大部分分布式问题的解决办法。
简单使用Redisson的分布式锁
创建一个配置类
@Configuration
public class RedissionConfig {
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
config.useSingleServer().
setAddress("redis://127.0.0.1:6371").
setPassword("123456");
config.setCodec(new JsonJacksonCodec());
return Redisson.create(config);
}
}
在业务代码中使用
@Resource
private RedissonClient redissonClient;
@Override
public void deduct(Long id) {
RLock lock = redisson.getLock("LockName");
try{
lock.lock();
//抢到锁则执行业务
}catch (Exception exception){
exception.printStackTrace();
}finally {
lock.unlock();
}
}
Redisson分布式锁源码分析
这里可以先看看自己基于Redis实现一个简单的分布式锁。
自动续期
如果Redis上的分布式锁过期了,但是业务逻辑还没有处理完怎么办呢?
Redisson是这样做的:额外开了一个线程,定期检查业务线程是否还持有锁,如果还持有的话则延长过期时间。Redisson使用了Watch Dog来定期检查(每1/3的锁时间检查一次)。
Redisson的GitHub是这样描述的:
大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
tryAcquireAsync
加锁的逻辑会进入到org.redisson.RedissonLock#tryAcquireAsync
方法中,在获取锁成功之后会进入org.redisson.RedissonBaseLock#scheduleExpirationRenewal
:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
if (ttlRemaining == null) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}