分布式锁(2)-基于Redis实现

Posted by zhengguohuang on April 21, 2021

使用场景

分布式应用进行逻辑处理时经常会遇到并发问题。

比如一个操作要修改用户的状态,修改状态需要先读取用户的状态,在内存里进行修改,改完了再存回去。如果这样的操作同时进行了,就会出现并发问题,因为读取和保存状态这两个操作不是原子的。(所谓原子操作是指不会被线程调度机制打断的操作:这种操作一旦开始,就一直运行到结束,中间不会有任何线程切换)

分布式锁

在不同进程需要互斥地访问共享资源时,分布式锁是一种非常有用的技术手段。实现高效的分布式锁有三个属性要考虑:

  • 安全属性:互斥,不管什么时候,只有一个客户端持有锁
  • 效率属性A:不会死锁
  • 效率属性B:容错,只要大多数redis节点能够正常工作,客户端就能获取和释放锁。

Redis分布式锁本质上要实现的目标就是在Redis里面占一个”坑”,当别的进程也要来占时,发现已经有人在了,就只好放弃或者稍后再试。

占坑一般用setnx指令,(意为set if not exists),只允许被一个客户端占坑。先来先占,用完了再调用del指令释放坑。

setnx lock:codehole true
...do something critical...
del lock:codehole

但是上面的处理方式有个问题,如果逻辑执行到中间出现了异常,可能导致del指令没有被调用,这样就会陷入死锁,锁永远得不到释放。

于是我们在拿到锁之后,再给锁加上一个过期时间,比如5s,这样即使中间处理过程出现异常也可以保证5秒之后锁会自动释放。

setnx lock:codehole true
expire lock:codehole 5
... do something critical ...
del lock:codehole

但是以上逻辑还是有问题。如果setnx和expire之间服务器进程突然挂掉了,可能是因为机器掉电或者是被人为杀掉的,就会导致expire得不到执行,也会造成死锁。

image-20210421224635032

这种问题的根源就在于setnxexpire是两条指令而不是原子指令。如果这两条指令可以一起执行就不会出现问题。也许你会想用到Redis事务来解决。但是这里不行,因为expire是依赖于setnx的执行结果的,如果setnx没抢到锁,expire是不应该执行的。事务里没有if-else分支,事务的特点是一口气执行,要么全部执行要么一个都不执行。

Redis2.6.12开始,加入了set指令的扩展参数,使得setnxexpire指令可以一起执行。

SET key value [EX seconds] [PX milliseconds] [NX|XX]
set lock:codehole true ex 5 nx
... do something critical ...
del lock:codehole

超时问题

Redis的分布式锁不能解决超时问题,如果第一个线程在加锁和释放锁之间的执行的太长,以至于超出了锁的超时限制,就会出问题。因为这个时候锁过期了,第二个线程重新持有这把锁,但是紧接着第一个线程执行完了业务逻辑,就把锁给释放了,第三个线程就会在第二个线程执行完之前拿到锁。

解决方案是为set指令的value参数设置一个随机数,释放时先匹配随机数是否一致,然后再删除key。但是匹配value和删除key不是一个原子操作,Redis也没有提供类似于delifequals这样的指令,这就需要Lua脚本来处理,因为Lua脚本可以保证连续多个指令的原子性执行。

1176050-20190408094346807-659916317

// 获取锁
// NX是指如果key不存在就执行成功key存在返回false
// SET key value NX EX 5
tag = random.nextint() + thread_id
if redis.set(key, tag, nx=True, ex=5):
	do_something()
	redis.delifequals(key, tag)

// 释放锁通过执行一段Lua脚本
// 释放锁涉及到两条命令这两条指令不能原子执行
// 需要Lua脚本执行Lua脚本是原子性的
# delifequals
if redis.call("get", KEYS[1])==ARGV[1] then
	return redis.call("del", KEYS[1])
else
	return 0
end

这种实现方式有2大要点

  1. set命令要用set key value px milliseconds nx

  2. value要具有唯一性,这个是为了在解锁的时候,需要验证value是和加锁的一致才删除key。

除了要考虑客户端要怎么实现分布式锁之外,还需要考虑Redis的部署问题。

Redis有三种部署方式:

  • 单机模式

    • 存在单点问题,只要Redis故障了,就无法加锁了。
  • master-slave + sentinel

    • 加锁的时候只对一个节点加锁,即便通过sentinel做了高可用,但是如果master节点故障了,由于Redis复制是异步的,还没来得及同步到从节点,发生主从切换,此时就会出现锁丢失问题。
    • 在Sentinel集群中,主节点挂掉时,从节点会取而代之。客户端1在主节点成功申请了一把锁,但是这把锁还没来得及同步到从节点,主节点突然挂掉了。然后从节点变成了主节点,这个新的节点内部没有这个锁,所以当另一个客户端过来请求加锁的时候,立即就批准了。这样就会导致同一个锁被两个客户端同时持有。
    • 这个失败的原因是因为从Redis立刻升级为主Redis,如果能够过TTL时间再升级为主Redis(延迟升级),或者立刻升级但是过了TTL时间之后再执行获取锁的任务,就能产生互斥效果。

    image-20210421152336447

  • Cluster模式

Redlock算法

针对单机模式和sentinel模式存在的问题,Redis的作者提出了Redlock算法。https://github.com/redis/redis-doc/blob/master/topics/distlock.md

为了使用Redlock,需要提供多个Redis实例,这些实例之间完全互相独立,不存在主从复制或者其他集群协调机制

假设Redis的部署模式是Cluster,共有5个节点,通过以下步骤获取一把锁:

  1. 获取当前时间戳(毫秒)
  2. 依次尝试从5个实例,使用相同的key和具有唯一性的value获取锁。当向Redis请求获取锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间,这样可以可以避免客户端死等。轮流用相同的key和随机值在N个节点上请求锁,在这一步里,客户端在每个master上请求锁时,会有一个和总的锁释放时间相比小的多的超时时间。比如如果锁自动释放时间是10秒钟,那么每个节点锁请求的超时时间可能是5-10毫秒的范围,这个超时时间可以防止一个客户端在某个宕掉的master主节点上阻塞过长时间,如果一个master节点不可用了,应该尽快尝试下一个master节点。
  3. 客户端使用当前时间减去开始获取锁时间就得到获取锁使用的时间。当且仅当半数以上的Redis节点取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
  4. 如果获取到了锁,key真正的有效时间等于有效时间减去获取锁使用的时间。
  5. 如果因为某些原因,获取锁失败(没有在半数以上实例上获取到锁,或者获取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁,无论Redis实例是否加锁成功,因为可能服务端响应消息丢失了但是实际成功了,毕竟多释放一次也不会有问题。

image-20210421224757764

1176050-20190409171037788-1969443095

RedLock算法是否是异步算法??

可以看成是同步算法;因为 即使进程间(多个电脑间)没有同步时钟,但是每个进程时间流速大致相同;并且时钟漂移相对于TTL叫小,可以忽略,所以可以看成同步算法;(不够严谨,算法上要算上时钟漂移,因为如果两个电脑在地球两端,则时钟漂移非常大)

RedLock失败重试

当client不能获取锁时,应该在随机时间后重试获取锁;并且最好在同一时刻并发的把set命令发送给所有redis实例;而且对于已经获取锁的client在完成任务后要及时释放锁,这是为了节省时间;

RedLock释放锁

由于释放锁时会判断这个锁的value是不是自己设置的,如果是才删除;所以在释放锁时非常简单,只要向所有实例都发出释放锁的命令,不用考虑能否成功释放锁;

RedLock注意点(Safety arguments):

1.先假设client获取所有实例,所有实例包含相同的key和过期时间(TTL) ,但每个实例set命令时间不同导致不能同时过期,第一个set命令之前是T1,最后一个set命令后为T2,则此client有效获取锁的最小时间为TTL-(T2-T1)-时钟漂移;

2.对于以N/2+ 1(也就是一半以 上)的方式判断获取锁成功,是因为如果小于一半判断为成功的话,有可能出现多个client都成功获取锁的情况, 从而使锁失效

3.一个client锁定大多数事例耗费的时间大于或接近锁的过期时间,就认为锁无效,并且解锁这个redis实例(不执行业务) ;只要在TTL时间内成功获取一半以上的锁便是有效锁;否则无效

系统有活性的三个特征

1.能够自动释放锁

2.在获取锁失败(不到一半以上),或任务完成后 能够自动释放锁,不用等到其自动过期

3.在client重试获取哦锁前(第一次失败到第二次重试时间间隔)大于第一次获取锁消耗的时间;

4.重试获取锁要有一定次数限制

RedLock性能及崩溃恢复的相关解决方法

1.如果redis没有持久化功能,在clientA获取锁成功后,所有redis重启,clientB能够再次获取到锁,这样违法了锁的排他互斥性;

2.如果启动AOF永久化存储,事情会好些, 举例:当我们重启redis后,由于redis过期机制是按照unix时间戳走的,所以在重启后,然后会按照规定的时间过期,不影响业务;但是由于AOF同步到磁盘的方式默认是每秒-次,如果在一秒内断电,会导致数据丢失,立即重启会造成锁互斥性失效;但如果同步磁盘方式使用Always(每一个写命令都同步到硬盘)造成性能急剧下降;所以在锁完全有效性和性能方面要有所取舍;

3.有效解决既保证锁完全有效性及性能高效及即使断电情况的方法是redis同步到磁盘方式保持默认的每秒,在redis无论因为什么原因停掉后要等待TTL时间后再重启(学名:延迟重启) ;缺点是 在TTL时间内服务相当于暂停状态;

总结:

1.TTL时长 要大于正常业务执行的时间+获取所有redis服务消耗时间+时钟漂移

2.获取redis所有服务消耗时间要 远小于TTL时间,并且获取成功的锁个数要 在总数的一般以上:N/2+1

3.尝试获取每个redis实例锁时的时间要 远小于TTL时间

4.尝试获取所有锁失败后 重新尝试一定要有一定次数限制

5.在redis崩溃后(无论一个还是所有),要延迟TTL时间重启redis

6.在实现多redis节点时要结合单节点分布式锁算法 共同实现

另一种方式:Redisson

此外,实现Redis的分布式锁,除了自己基于Redis client原生API来实现之外,还可以使用开源框架:Redisson https://github.com/redisson/redisson

Redisson是一个企业级的开源Redis Client,也提供了分布式锁的支持

回想一下上面说的,如果自己写代码来通过Redis设置一个值,是通过下面这个命令设置的

SET key value NX PX30000

这里设置的超时时间是30秒,假如一个客户端超过30秒都没有完成业务逻辑(网络延迟、GC、IO堵塞等等原因),key会过期,其他线程就有可能获取到锁。这样一来的话,第一个线程还没执行完业务逻辑,第二个线程进来了会出现线程安全问题。所以我们还需要额外去维护这个过期时间,太麻烦了。如下所示

image-20210421224846584

为了解决由于系统宕机带来的锁失效而给锁强加了一个失效时间,异常情况下,程序(业务)执行的时间大于锁失效时间从而造成的一系列的问题,我们能否从这方面去考虑,从而用程序来解决这个样一个死局呢?

既然是因为锁的失效时间小于业务时间,那么我们想办法保证业务程序执行时间绝对小于锁超时时间不久解决了?

Java语言中Redisson实现了一种保证锁失效时间绝对大于业务程序执行时间的机制。官方叫做看门狗机制(Watchdog)。

我们来看看Redisson是怎么实现的?先感受一下使用Redisson的爽:

导入依赖

Maven

<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.15.4</version>
</dependency>  

Gradle

compile 'org.redisson:redisson:3.15.4'  

Java

Redis based distributed reentrant Lock object for Java and implements Lock interface.

If Redisson instance which acquired lock crashes then such lock could hang forever in acquired state. To avoid this Redisson maintains lock watchdog, it prolongs lock expiration while lock holder Redisson instance is alive. By default lock watchdog timeout is 30 seconds and can be changed through Config.lockWatchdogTimeout setting.

Also Redisson allow to specify leaseTime parameter during lock acquisition. After specified time interval locked lock will be released automatically.

RLock object behaves according to the Java Lock specification. It means only lock owner thread can unlock it otherwise IllegalMonitorStateException would be thrown. Otherwise consider to use RSemaphore object.

Code example:

// 1. Create config object
Config config = new Config();
config.useClusterServers()
       // use "rediss://" for SSL connection
      .addNodeAddress("redis://127.0.0.1:7181");

// or read config from file
config = Config.fromYAML(new File("config-file.yaml")); 

// 2. Get Redis based implementation of java.util.concurrent.locks.Lock
RLock lock = redisson.getLock("myLock");

// traditional lock method
lock.lock();

// or acquire lock and automatically unlock it after 10 seconds
lock.lock(10, TimeUnit.SECONDS);

// or wait for lock aquisition up to 100 seconds 
// and automatically unlock it after 10 seconds
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}
  • Redisson所有指令都通过Lua脚本执行,Redis支持Lua脚本原子性执行。

  • Redisson设置一个key的默认过期时间为30s,如果某个客户端持有一个锁超过30s怎么办?

    Redisson中有一个watchdog的概念,在程序成功获取锁之后,会fork一条子线程去不断的给该锁续期每隔10秒帮你把key的过期时间设为30s,直至该锁释放为止!Redisson使用守护线程来进行锁的续期,(守护线程的作用:当主线程销毁,会和主线程一起销毁。)防止程序宕机后,线程依旧不断续命,造成死锁!

  • Redisson的“看门狗”逻辑保证了没有死锁发生

    (如果机器宕机了,看门狗就没了。因此就不会延长key的过期时间了,30s自动过期后,其他线程就可以获取到锁了。)

image-20210421225501596

3c207eb0e58a6b68add657a0b069244bdf50163e

Redisson的实现源码

        // 加锁逻辑
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL\_LONG);
    }
    // 调用一段lua脚本,设置一些key、过期时间
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL\_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }

            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
            // 看门狗逻辑
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS\[1\]) == 0) then " +
                    "redis.call('hset', KEYS\[1\], ARGV\[2\], 1); " +
                    "redis.call('pexpire', KEYS\[1\], ARGV\[1\]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS\[1\], ARGV\[2\]) == 1) then " +
                    "redis.call('hincrby', KEYS\[1\], ARGV\[2\], 1); " +
                    "redis.call('pexpire', KEYS\[1\], ARGV\[1\]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS\[1\]);",
            Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}


// 看门狗最终会调用了这里
private void scheduleExpirationRenewal(final long threadId) {
    if (expirationRenewalMap.containsKey(getEntryName())) {
        return;
    }

    // 这个任务会延迟10s执行
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {

            // 这个操作会将key的过期时间重新设置为30s
            RFuture<Boolean> future = renewExpirationAsync(threadId);

            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    expirationRenewalMap.remove(getEntryName());
                    if (!future.isSuccess()) {
                        log.error("Can't update lock " + getName() + " expiration", future.cause());
                        return;
                    }

                    if (future.getNow()) {
                    // reschedule itself
                    // 通过递归调用本方法,无限循环延长过期时间
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
        }

    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
        task.cancel();
    }
}

使用Redisson的RedLock

RLock lock1 = redisson1.getLock("myLock");
RLock lock2 = redisson2.getLock("myLock");
RLock lock2 = redisson2.getLock("myLock");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
lock.lock();
System.out.println("RedLock");
lock.unlock();

Redisson部分源码

唯一ID

实现分布式锁的一个非常重要的点就是set的value要具有唯一性,redisson的value是怎样保证value的唯一性呢?答案是UUID+threadId

RedissonLock的父类RedissonBaseLockgetLockName方法

UUID id = UUID.randomUUID();
protected String getLockName(long threadId) {
	return id + ":" + threadId;
}

获取锁

获取锁的代码为redLock.tryLock()

核心代码为:

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    	// 获取锁时向N个Redis实例发送的命令
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                 //首先如果分布式锁的KEY不存在,那么执行hset命令(hincrby REDLOCK_KEY uuid+threadId 1)
                 //并通过pexpire设置失效时间(也是锁的租约时间)
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        // 如果锁的key已经存在,并且value也匹配,表示是当前线程持有锁,那么重入次数加1,并且设置失效时间
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        // 获取分布式锁的失效时间(毫秒)
                        // TTL(Time to Live)过期时间或有效生存时间
                        "return redis.call('pttl', KEYS[1]);",
                // 这三个参数分别对应KEY[1], ARGV[1]和ARGV[2]
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

释放锁

redLock.unlock();核心代码如下:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
        		// 如果分布式锁不存在或者分布式锁存在,但是value不匹配,表示锁已经被占用了,那么直接返回
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        // 如果就是当前线程占用分布式锁,那么重入次数减1
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        // 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        // 如果重入次数减1后的值为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                        // KEY[1], KEY[2], ARGV[1],ARGV[2], ARGV[3]
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

巨人的肩膀

  • Redis深度历险
  • http://doc.redisfans.com/string/set.html
  • https://juejin.cn/post/6844904039218429960
  • https://segmentfault.com/a/1190000022935064?utm_source=sf-similar-article
  • http://redis.cn/topics/distlock.html
  • https://github.com/redisson/redisson/wiki/8.-%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%E5%92%8C%E5%90%8C%E6%AD%A5%E5%99%A8#84-%E7%BA%A2%E9%94%81redlock
  • https://zhuanlan.zhihu.com/p/59256821
  • https://www.cnblogs.com/rgcLOVEyaya/p/RGC_LOVE_YAYA_1003days.html