Redis实现分布式锁
Redis除了用来作为缓存,还有哪些其他的用途呢?
-
分布式锁:在分布式系统中,为了保证数据的正确性,经常需要使用分布式锁。Redis提供了
setnx
和expire
等操作,可以实现分布式锁的功能。使用Redis作为分布式锁可以有效地解决多个进程或者多台机器之间的竞争问题,从而提高系统的稳定性和可靠性。 -
分布式Session:在Web应用中,为了保证用户的登录状态,经常需要使用会话管理。Redis提供了String和Hash等数据结构,可以方便地实现会话管理的功能。使用Redis作为会话管理存储可以有效地提高系统的性能和可扩展性。
-
全局唯一ID:Redis可以通过
Incr
等操作实现全局唯一Id的生成。具体实现方式是在Redis中创建一个计数器,每当需要生成新的全局唯一Id时,对计数器进行自增,将自增后的值作为新的全局唯一Id返回。 -
计数器:在很多应用场景中,需要对某些数据进行计数。Redis提供了
Incr
和Decr
等操作,可以方便地实现计数器的功能。同时,Redis还提供了HyperLogLog数据结构,可以实现一些基数统计的功能。使用Redis作为计数器可以有效地提高系统的性能和可扩展性。 -
地理位置:Redis提供了Geo数据结构,可以实现一些基于地理位置的应用,比如附近的人等。使用Redis作为地理位置存储可以有效地提高系统的可扩展性和可维护性。
除此之外,Redis还提供了很多其他的数据结构和操作,如Set、Sorted Set、Hash等。可以根据具体的需求选择合适的数据结构和操作。
-
轻量级的消息队列:在分布式系统中,消息队列是一种常见的解决方案。Redis提供了List数据结构,我们可以通过LeftPush和RightPop等操作来实现消息队列的功能。此外,Redis还提供了pub/sub模式,可以实现消息的订阅和发布。使用Redis作为消息队列可以有效地解耦系统各个模块之间的依赖关系,从而提高系统的可靠性和可维护性。
除此之外,Redis还提供了很多其他的数据结构和操作,如Set、Sorted Set、Hash等。可以根据具体的需求选择合适的数据结构和操作。
什么是分布式锁
要介绍分布式锁,首先要提到与分布式锁相对应的是线程锁、进程锁。
- 线程锁:主要用来给方法、代码块加锁。当某个方法或代码使用锁,在同一时刻仅有一个线程执行该方法或该代码段。线程锁只在同一JVM中有效果,因为线程锁的实现在根本上是依靠线程之间共享内存实现的,比如synchronized是共享对象头,显示锁Lock是共享某个变量(state)。
- 进程锁:为了控制同一操作系统中多个进程访问某个共享资源,因为进程具有独立性,各个进程无法访问其他进程的资源,因此无法通过synchronized等线程锁实现进程锁。
- 分布式锁:当多个进程不在同一个系统中(比如分布式系统中控制共享资源访问),用分布式锁控制多个进程对资源的访问。
分布式锁需要具备的条件
一个靠谱的分布式锁需要具备以下条件:
- 互斥性:同一时刻只能有一个客户端持有锁,其他客户端无法获取锁。
- 可重入性:允许同一个客户端在持有锁的情况下再次获取锁,避免死锁。
- 高可用性:当锁服务器出现故障时,需要有备用服务器来提供锁服务。
- 容错性:当一个客户端持有锁的时间超过一定阈值时,需要自动释放锁,避免死锁。
除此之外,分布式锁的设计中还可以/需要考虑:
- 加锁解锁的同源性:A加的锁,不能被B解锁。
- 获取锁是非阻塞的:如果获取不到锁,不能无限期等待。
- 高性能:加锁解锁是高性能的。
分布式锁的实现方案
- 基于数据库的实现方案:
- 基于数据库表(锁表,很少使用)。
- 乐观锁(基于版本号)。
- 悲观锁(基于排它锁)。
- 基于Redis实现分布式锁:
- 单个Redis实例:
setnx
(key,当前时间+过期时间)+ Lua。 - Redis集群模式:Redlock。
- 单个Redis实例:
- 基于ZooKeeper的实现方案:当一个进程或线程需要访问共享资源时,先在ZooKeeper中创建一个临时节点,并将该节点作为锁。其他进程或线程尝试创建同名节点时,由于该节点已存在,创建操作将失败,从而实现了对共享资源的访问控制。当进程或线程完成对共享资源的访问后,删除该节点,释放锁。
- 基于 Consul 实现分布式锁
实现一个简单的分布式锁
业务场景
假设一个商品扣减库存的场景,库存数据就存放在Redis了。首先是Service的实现:
@Override
public void deduct(Long id) {
String key = "storage:" + id;
//查询Redis里面的库存
int storage = Optional.ofNullable(redisTemplate.opsForValue().get(key)).map(Integer::parseInt).orElse(0);
//判断库存是否足够
if(storage>0){
//扣减库存
redisTemplate.opsForValue().set(key,String.valueOf(storage-1));
}
}
具体业务就是根据一个库存Id,从Redis查询到剩余库存然后判断是否大于0,最后扣除库存保存到Redis。
使用setnx
实现简单分布式锁
这里是存在线程安全问题的,高并发下极有可能出现”超卖“现象,这里我们使用分布式锁来解决,从上文得知,Redis可以使用setnx
来实现分布式锁,那么代码可以改成如下:
@Override
public void deduct(Long id) {
String key = "storage:" + id;
String redisLockKey = "redisLock:storage";
String uuId = UUID.randomUUID().toString();
String lockId = uuId + Thread.currentThread().getId();
try{
//尝试抢占锁
Boolean hasLock = redisTemplate.opsForValue().setIfAbsent(redisLockKey,lockId);
//这里如果没有抢到锁则重试
if(Boolean.FALSE.equals(hasLock)){
//暂停20ms,然后递归重试
TimeUnit.MICROSECONDS.sleep(20);
deduct(id);
}else{
//抢到锁则执行业务
//查询Redis里面的库存
int storage = Optional.ofNullable(redisTemplate.opsForValue().get(key)).map(Integer::parseInt).orElse(0);
//判断库存是否足够
if(storage>0){
//扣减库存
redisTemplate.opsForValue().set(key,String.valueOf(storage-1));
}
}
}catch (Exception exception){
exception.printStackTrace();
}finally {
//关闭资源,释放锁
redisTemplate.delete(redisLockKey);
}
}
测试上面的代码后并没有出现”超卖“,但是上面的代码还是存在一定的问题的:
- 递归的次数:这里使用递归在高并发的场景下很容易导致SOF。
使用自旋来重试获取锁
@Override
public void deduct(Long id) {
String key = "storage:" + id;
String redisLockKey = "redisLock:storage";
String uuId = UUID.randomUUID().toString();
String lockId = uuId + Thread.currentThread().getId();
try{
//尝试抢占锁,自旋
while(Boolean.FALSE.equals(redisTemplate.opsForValue().setIfAbsent(redisLockKey, lockId))) {
//暂停20ms
TimeUnit.MICROSECONDS.sleep(20);
}
//抢到锁则执行业务
//查询Redis里面的库存
int storage = Optional.ofNullable(redisTemplate.opsForValue().get(key)).map(Integer::parseInt).orElse(0);
//判断库存是否足够
if(storage>0){
//扣减库存
redisTemplate.opsForValue().set(key,String.valueOf(storage-1));
}
}catch (Exception exception){
exception.printStackTrace();
}finally {
//关闭资源,释放锁
redisTemplate.delete(redisLockKey);
}
}
这里我们使用自旋来代替了上面的递归,避免了上面的SOF的出现,但是这里还是存在一些问题的,想想如果线程的关闭资源之前,服务器挂掉了,那么这个key将会一直存在,其他线程永远获取不了锁。那么怎样才能使这个key不一直存在呢,这里我们可以给这个key加上过期时间,这样就算没有关闭资源,那这个key还是会自动过期删除的。
redisTemplate.opsForValue().setIfAbsent(redisLockKey,lockId,10L,TimeUnit.SECONDS)
这里不能分开使用
expire
方法,因为如果不保证原子性的话,还是会出现上述问题。
防止误删Key
上面我们设置了key的过期时间来保证在没有释放锁的情况下使key过期来自动删除,但是上面过期时间是写死的,实际这个具体时间是没有办法保证的。
假设这里两个线程A和B,A先抢到了锁并开始执行业务,但是A执行的时间没法保证,所以当某一次A的执行时间超过了设置的过期时间时,这里一直在自旋重试的B则会马上获取到锁,这里当B还在执行业务的同时,A线程执行完了然后释放锁(实际上释放的时B加的锁)。根据上面提到的分布式锁需要具备锁的同源性,所以这里违背了分布式锁的设计原则。
那么如何解决这个问题呢?首先我们就需要保证每个线程只能删除自己的key,可以在删除key的同时增加一个判断,判断当前key所对应的value是否是自己的。
//关闭资源,释放锁
if(lockId.equalsIgnoreCase(redisTemplate.opsForValue().get(redisLockKey))){
redisTemplate.delete(redisLockKey);
}
使用Lua保证原子性
上面我们使用Redis的
setnx
实现了一个简单的分布式锁,但是实际还存在很多的问题,比如在最后释放锁的时候是不能保证原子性的。所以这里采用Lua来保证原子性。
什么是Lua脚本
Lua是一种轻量小巧的脚本语言,用标准的C语言编写并以源代码形式开放,设计目的时为了嵌入到程序之中,从而为应用程序提供灵活的扩展和定制功能。
Lua的特性
- 轻量级:使用标准C语言编写,编译后仅仅一百多k,可以很方便的嵌入到别的程序里面。
- 可扩展:Lua提供了非常易于使用的扩展接口和机制:由宿主语言(通常时C或者C++)提供这些功能,Lua可以就像时本来内置的功能一样来使用它们。
Redis中使用Lua
Redis中执行Lua可以通过两种方式:
EVAL
:将Lua脚本或命令直接使用Redis执行。EVALSHA
:相当于把脚本或命令保存到redis中,然后使用一串sha码调用(可以理解为调用函数)
eval
命令的使用:
EVAL script numkeys [key [key ...]] [arg [arg ...]]
script
:是一段Lua5.1脚本程序。脚本内容就是要执行的lua脚本内容。numkeys
:指定后续参数有几个key,即:key [key …]中key的个数。如没有key,则为0。key [key …]
:key列表,作为参数传递给Lua语言,lua中是用KEYS[n]
来获取对应的参数表示在脚本中所用到的那些Redis键(key)。在Lua脚本中通过KEYS[1], KEYS[2]获取。arg [arg …]
:参数列表是传递给Lua语言,可填可不填,lua中使用ARGV[n]
来获取对应的参数。
比如:
> EVAL "return" 0 hello
"hello"
使用Lua脚本解决不能保证原子性的问题
修改上面代码中的finally代码块为即可保证释放锁时候的原子性:
finally {
//关闭资源,释放锁,使用Lua脚本来保证原子性
String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then" +
" return redis.call('del',KEYS[1]) " +
"else " +
"return 0" +
"end";
List<String> keys = new ArrayList<>() {{
add(redisLockKey);
}};
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(luaScript, Boolean.class);
Boolean result = redisTemplate.execute(redisScript, keys, lockId);
}
可重入的分布式锁
什么是可重入锁
可重入锁还可以叫做递归锁,它是指同一个线程在外层方法获取锁的时候,再次进入该方法内层调用的方法时可以自动获取锁(这里前提是同一把锁),不会因为之前已经获取过该锁还没释放而阻塞。可重入锁的一个优点就是在一定程度上防止了死锁。
简单说就是一个线程中的多个流程可以获取在不释放锁的情况下可以获取同一把锁。
可重入锁的种类
- 隐式锁:synchronized,synchronized实现可重入锁是通过使每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针:
- 当执行monitor enter的时候,如果目标锁对象的计数器为0,那么说明它没有被其他线程所持有,Java虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加1。
- 在目标锁对象的计数器不为0的情况下,如果锁对象的持有线程是当前线程,那么Java虚拟机可以将其计数器再加1,否则需要等待其他线程释放锁。
- 当执行monitor exit时,Java虚拟机则需要将锁对象的计数器减1。计数器为0则代表锁已经被释放。
- 显式锁:比如Lock中的ReentrantLock。
简单来说实现一个可重入锁(可以参考synchronized以及JUC中的AQS),当线程再次获取同一把锁时判断当前锁是否被同一个线程获取了,如果是则当前线程便可以直接获得该锁,否则需要等待其他线程释放锁才行。
基于Redis实现可重入锁
在实现之前,还有一个计数器的问题,显然继续使用
setnx
并不能满足这一条件,那么要基于Redis实现一个可重入的分布式锁还能用Redis的什么数据类型呢?
我们可以使用Hash这个数据类型来实现:
HSET RedisLock xxxxxxxx:1 1
也就是Hset 锁名称 全局唯一Id:线程Id 计数器(线程加锁的次数)
。
修改业务代码为:
@Override
public void deduct(Long id) {
String key = "storage:" + id;
Lock lock = distributedLockFactory.createLock(DistributedLockFactory.LockType.REDIS);
try{
lock.lock();
//抢到锁则执行业务
//查询Redis里面的库存
int storage = Optional.ofNullable(redisTemplate.opsForValue().get(key)).map(Integer::parseInt).orElse(0);
//判断库存是否足够
if(storage>0){
//扣减库存
redisTemplate.opsForValue().set(key,String.valueOf(storage-1));
}
testReentrant();
}catch (Exception exception){
exception.printStackTrace();
}finally {
lock.unlock();
}
}
@Override
public void testReentrant() {
Lock lock = distributedLockFactory.createLock(DistributedLockFactory.LockType.REDIS);
try {
lock.lock();
}finally {
lock.unlock();
}
}
基于JUC的Lock接口来实现RedisLock:
public class RedisLock extends AbstractDistributedLock {
private final RedisTemplate<String,Object> redisTemplate;
private final long expireTime;
private final String LOCK_NAME = "RedisLock";
public RedisLock(RedisTemplate<String, Object> redisTemplate,String uuid) {
this.redisTemplate = redisTemplate;
this.expireTime = 50L;
super.lockId = uuid+":"+Thread.currentThread().getId();
}
@Override
public void lock() {
while(Boolean.FALSE.equals(tryLock())){
try {
TimeUnit.MILLISECONDS.sleep(60);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
@Override
public boolean tryLock() {
String luaScript = "if redis.call('EXISTS',KEYS[1]) == 0 or redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then" +
"redis.call('HINCRBY',KEYS[1],ARGV[1],1)"+
"redis.call('EXPIRE',KEYS[1],ARGV[2])"+
"return 1"+
"else"+
"return 0"+
"end";
return Boolean.TRUE.equals(redisTemplate.execute(new DefaultRedisScript<>(luaScript, Boolean.class), List.of(LOCK_NAME), lockId, String.valueOf(expireTime)));
}
@Override
public void unlock() {
String luaScript = "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then" +
"return nil"+
"else if redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then"+
"return redis.call('DEL',KEYS[1])"+
"else"+
"return 0"+
"end";
redisTemplate.execute(new DefaultRedisScript<>(luaScript, Long.class), List.of(LOCK_NAME), lockId);
}
}