为什么需要分布式锁
提到分布式锁,那么就需要先讲一下单机锁。以目前互联网上比较常见的商品秒杀为例子,将该场景分为以下几个演化阶段:
- 服务单机部署,没啥并发量,商品表的库存字段设置为非负数即可。
- 用户量增多或者出现爆款商品,在并发的情况下商品出现超卖的情况,此时需要加锁(接口单机部署)。
- 随着用户量再次增大,单机加锁导致其它线程/协程需要等待锁的持有者处理完业务之后,才能获取资源,单机性能出现瓶颈,针对该类业务需要做分布式部署。此时就会遇到一个问题,不同服务器上的接口共享的不是同一个资源,单机锁失去作用,那么就需要中间者来提供锁的服务,这个就是分布式锁的由来。
分布式锁的常见使用场景
分布式锁的常见解决方案
主动轮询型
该模型类似于单机锁中的主动轮询 + cas 乐观锁模型,取锁方会持续对分布式锁发出尝试获取动作,如果锁已被占用则会不断发起重试,直到取锁成功为止。这种解决方案以Redis和Mysql为典型。
watch 回调型
在取锁方发现锁已被他人占用时,会创建 watcher 监视器订阅锁的释放事件,随后不再发起主动取锁的尝试;当锁被释放后,取锁方能通过之前创建的 watcher 感知到这一变化,然后再重新发起取锁的尝试动作。这种解决方案以Zookeeper和ETCD为典型。
基于Redis的分布式锁
Redis的setnx命令针对key赋值的时候会判断redis中是否存在这个key,如果有返回-1, 如果没有的话,他会直接set键值。
那他跟直接set键值有啥区别?setnx是原子操作,而set不能保证原子性。
实现思路
- 使用SETNX操作尝试创建一个全局唯一的锁键,如果能够正常创建,则标识该线程/协程获取到锁,进行业务操作,然后释放锁。
- 如果锁键已存在,则表示有其他客户端已经获取到锁,自身定时进行轮询。
- 使用EXPIRE设置锁键的过期时间,避免锁一直被某个客户端持有导致死锁。
- 客户端在释放锁时,通过DEL操作删除锁键。
参考逻辑图
代码实现
Locker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
// Locker 可以通过它得到一把锁
// 它主要是用于配置锁
type Locker struct {
client *redis.Client // Redis客户端
script *redis.Script // 解锁脚本
ttl time.Duration // 过期时间
tryLockInterval time.Duration // 重新获取锁间隔
}
func NewLocker(client *redis.Client, ttl, tryLockInterval time.Duration) *Locker {
return &Locker{
client: client,
script: redis.NewScript(unlockScript),
ttl: ttl,
tryLockInterval: tryLockInterval,
}
}
func (l *Locker) GetLock(resource string) *Lock {
return &Lock{
client: l.client,
script: l.script,
resource: resource,
randomValue: gofakeit.UUID(),
watchDog: make(chan struct{}),
ttl: l.ttl,
tryLockInterval: l.tryLockInterval,
}
}
|
TryLock
1
2
3
4
5
6
7
8
9
10
11
12
|
func (l *Lock) TryLock(ctx context.Context) error {
success, err := l.client.SetNX(ctx, l.resource, l.randomValue, l.ttl).Result()
if err != nil {
return err
}
// 加锁失败
if !success {
return ErrLockFailed
}
// 加锁成功
return nil
}
|
Lock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
func (l *Lock) Lock(ctx context.Context) error {
// 尝试加锁
err := l.TryLock(ctx)
if err == nil {
return nil
}
if !errors.Is(err, ErrLockFailed) {
return err
}
// 加锁失败,不断尝试
ticker := time.NewTicker(l.tryLockInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
// 超时
return ErrTimeout
case <-ticker.C:
// 重新尝试加锁
err := l.TryLock(ctx)
if err == nil {
return nil
}
if !errors.Is(err, ErrLockFailed) {
return err
}
}
}
}
|
UnLock
1
2
3
4
|
func (l *Lock) Unlock(ctx context.Context) error {
err := l.script.Run(ctx, l.client, []string{l.resource}, l.randomValue).Err()
return err
}
|
解锁的底层原理
上面的l.script.Run对应redis.Script.Run方法,内容为一个lua脚本:
1
2
3
4
5
|
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
|
那么问题来了,这样写有什么用,为何不直接调用delete方法?
举个不用Lua脚本的例子:客户端A取得资源锁,但是紧接着被一个其他操作阻塞了,当客户端A运行完毕其他操作后要释放锁时,原来的锁早已超时并且被Redis自动释放,并且在这期间资源锁又被客户端B再次获取到。
因为判断和删除是两个操作,所以有可能A刚判断完锁就过期自动释放了,然后B就获取到了锁,然后A又调用了Del,导致把B的锁给释放了。
lua脚本命令在Redis中执行时,会被当成一条命令来执行,能够保证原子性,故要不都成功,要不都失败。在Java中,Redis的Redssion组件许多方法实现中很多都用到了lua脚本,这样能够极大的保证命令执行的原子性,下面看门狗的内容会提到。
Redis看门狗
OK,经过上面的步骤,我们初步实现了一个分布式锁,健壮性如何?上面提到要给锁设置一个时间,避免某个锁一直持有,造成死锁。
假设线程1操作超过了设置的时间,锁自动被释放,线程2持有锁正在处理业务,线程1会把线程2的锁给删了。这时需要一个监控者,监控线程1的服务是否操作完毕,Redis提供了一个看门狗机制,当线程1持有锁但是操作未完成的情况下,给锁续期。
代码实现
Lock
1
2
3
4
5
6
7
8
9
|
type Lock struct {
client *redis.Client // Redis客户端
script *redis.Script // 解锁脚本
resource string // 锁定的资源
randomValue string // 随机值
watchDog chan struct{} // 看门狗
ttl time.Duration // 过期时间
tryLockInterval time.Duration // 重新获取锁间隔
}
|
TryLock
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func (l *Lock) TryLock(ctx context.Context) error {
success, err := l.client.SetNX(ctx, l.resource, l.randomValue, l.ttl).Result()
if err != nil {
return err
}
// 加锁失败
if !success {
return ErrLockFailed
}
// 加锁成功,启动看门狗
go l.startWatchDog()
return nil
}
|
WatchDog
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func (l *Lock) startWatchDog() {
ticker := time.NewTicker(l.ttl / 3)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 延长锁的过期时间
ctx, cancel := context.WithTimeout(context.Background(), l.ttl/3*2)
ok, err := l.client.Expire(ctx, l.resource, l.ttl).Result()
cancel()
// 异常或锁已经不存在则不再续期
if err != nil || !ok {
return
}
case <-l.watchDog:
// 已经解锁
return
}
}
}
|
红锁
再进一步进行优化,上面的思路都是基于单机Redis来实现的,如果采用Redis主从模式,获取锁成功,数据存入Master节点,还没有同步的时候,Master节点挂了,Slave节点无相关数据。
这个时候就需要红锁的设计思路,总的来说就是Master节点设置锁后,要等所有Slave节点存储成功之后才响应。
代码实现
RedLock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
type RedLock struct {
clients []*redis.Client // Redis客户端
successClients []*redis.Client // 加锁成功的客户端
script *redis.Script // 解锁脚本
resource string // 锁定的资源
randomValue string // 随机值
watchDog chan struct{} // 看门狗
}
func NewRedLock(clients []*redis.Client, resource string) *RedLock {
return &RedLock{
clients: clients,
script: redis.NewScript(unlockScript),
resource: resource,
}
}
|
TryLock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
func (l *RedLock) TryLock(ctx context.Context) error {
randomValue := gofakeit.UUID()
var wg sync.WaitGroup
wg.Add(len(l.clients))
// 成功获得锁的Redis实例的客户端
successClients := make(chan *redis.Client, len(l.clients))
for _, client := range l.clients {
go func(client *redis.Client) {
defer wg.Done()
success, err := client.SetNX(ctx, l.resource, randomValue, ttl).Result()
if err != nil {
return
}
// 加锁失败
if !success {
return
}
// 加锁成功,启动看门狗
go l.startWatchDog()
successClients <- client
}(client)
}
// 等待所有获取锁操作完成
wg.Wait()
close(successClients)
// 如果成功加锁得客户端少于客户端数量的一半+1,表示加锁失败
if len(successClients) < len(l.clients)/2+1 {
// 就算加锁失败,也要把已经获得的锁给释放掉
for client := range successClients {
go func(client *redis.Client) {
ctx, cancel := context.WithTimeout(context.Background(), ttl)
l.script.Run(ctx, client, []string{l.resource}, randomValue)
cancel()
}(client)
}
return ErrLockFailed
}
// 加锁成功,启动看门狗
l.randomValue = randomValue
l.successClients = nil
for successClient := range successClients {
l.successClients = append(l.successClients, successClient)
}
return nil
}
|
WatchDog
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
func (l *RedLock) startWatchDog() {
l.watchDog = make(chan struct{})
ticker := time.NewTicker(resetTTLInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 延长锁的过期时间
for _, client := range l.successClients {
go func(client *redis.Client) {
ctx, cancel := context.WithTimeout(context.Background(), ttl-resetTTLInterval)
client.Expire(ctx, l.resource, ttl)
cancel()
}(client)
}
case <-l.watchDog:
// 已经解锁
return
}
}
}
|
参考文章