目录

基于Redis的分布式锁

为什么需要分布式锁

提到分布式锁,那么就需要先讲一下单机锁。以目前互联网上比较常见的商品秒杀为例子,将该场景分为以下几个演化阶段:

  1. 服务单机部署,没啥并发量,商品表的库存字段设置为非负数即可。
  2. 用户量增多或者出现爆款商品,在并发的情况下商品出现超卖的情况,此时需要加锁(接口单机部署)。
  3. 随着用户量再次增大,单机加锁导致其它线程/协程需要等待锁的持有者处理完业务之后,才能获取资源,单机性能出现瓶颈,针对该类业务需要做分布式部署。此时就会遇到一个问题,不同服务器上的接口共享的不是同一个资源,单机锁失去作用,那么就需要中间者来提供锁的服务,这个就是分布式锁的由来。

分布式锁的常见使用场景

  • 互联网秒杀
  • 抢优惠卷
  • 接口幂等性校验

分布式锁的常见解决方案

主动轮询型

该模型类似于单机锁中的主动轮询 + cas 乐观锁模型,取锁方会持续对分布式锁发出尝试获取动作,如果锁已被占用则会不断发起重试,直到取锁成功为止。这种解决方案以Redis和Mysql为典型。

watch 回调型

在取锁方发现锁已被他人占用时,会创建 watcher 监视器订阅锁的释放事件,随后不再发起主动取锁的尝试;当锁被释放后,取锁方能通过之前创建的 watcher 感知到这一变化,然后再重新发起取锁的尝试动作。这种解决方案以Zookeeper和ETCD为典型。

基于Redis的分布式锁

Redis的setnx命令针对key赋值的时候会判断redis中是否存在这个key,如果有返回-1, 如果没有的话,他会直接set键值。
那他跟直接set键值有啥区别?setnx是原子操作,而set不能保证原子性。

实现思路

  • 使用SETNX操作尝试创建一个全局唯一的锁键,如果能够正常创建,则标识该线程/协程获取到锁,进行业务操作,然后释放锁。
  • 如果锁键已存在,则表示有其他客户端已经获取到锁,自身定时进行轮询。
  • 使用EXPIRE设置锁键的过期时间,避免锁一直被某个客户端持有导致死锁。
  • 客户端在释放锁时,通过DEL操作删除锁键。

参考逻辑图

/images/20230926/img_2.png

代码实现

Locker

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Locker 可以通过它得到一把锁
// 它主要是用于配置锁
type Locker struct {
	client          *redis.Client // Redis客户端
	script          *redis.Script // 解锁脚本
	ttl             time.Duration // 过期时间
	tryLockInterval time.Duration // 重新获取锁间隔
}

func NewLocker(client *redis.Client, ttl, tryLockInterval time.Duration) *Locker {
	return &Locker{
		client:          client,
		script:          redis.NewScript(unlockScript),
		ttl:             ttl,
		tryLockInterval: tryLockInterval,
	}
}

func (l *Locker) GetLock(resource string) *Lock {
	return &Lock{
		client:          l.client,
		script:          l.script,
		resource:        resource,
		randomValue:     gofakeit.UUID(),
		watchDog:        make(chan struct{}),
		ttl:             l.ttl,
		tryLockInterval: l.tryLockInterval,
	}
}

TryLock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (l *Lock) TryLock(ctx context.Context) error {
	success, err := l.client.SetNX(ctx, l.resource, l.randomValue, l.ttl).Result()
	if err != nil {
		return err
	}
	// 加锁失败
	if !success {
		return ErrLockFailed
	}
	// 加锁成功
	return nil
}

Lock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (l *Lock) Lock(ctx context.Context) error {
	// 尝试加锁
	err := l.TryLock(ctx)
	if err == nil {
		return nil
	}
	if !errors.Is(err, ErrLockFailed) {
		return err
	}
	// 加锁失败,不断尝试
	ticker := time.NewTicker(l.tryLockInterval)
	defer ticker.Stop()
	for {
		select {
		case <-ctx.Done():
			// 超时
			return ErrTimeout
		case <-ticker.C:
			// 重新尝试加锁
			err := l.TryLock(ctx)
			if err == nil {
				return nil
			}
			if !errors.Is(err, ErrLockFailed) {
				return err
			}
		}
	}
}

UnLock

1
2
3
4
func (l *Lock) Unlock(ctx context.Context) error {
	err := l.script.Run(ctx, l.client, []string{l.resource}, l.randomValue).Err()
	return err
}

解锁的底层原理

上面的l.script.Run对应redis.Script.Run方法,内容为一个lua脚本:

1
2
3
4
5
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

那么问题来了,这样写有什么用,为何不直接调用delete方法?

举个不用Lua脚本的例子:客户端A取得资源锁,但是紧接着被一个其他操作阻塞了,当客户端A运行完毕其他操作后要释放锁时,原来的锁早已超时并且被Redis自动释放,并且在这期间资源锁又被客户端B再次获取到。

因为判断和删除是两个操作,所以有可能A刚判断完锁就过期自动释放了,然后B就获取到了锁,然后A又调用了Del,导致把B的锁给释放了。

lua脚本命令在Redis中执行时,会被当成一条命令来执行,能够保证原子性,故要不都成功,要不都失败。在Java中,Redis的Redssion组件许多方法实现中很多都用到了lua脚本,这样能够极大的保证命令执行的原子性,下面看门狗的内容会提到。

Redis看门狗

OK,经过上面的步骤,我们初步实现了一个分布式锁,健壮性如何?上面提到要给锁设置一个时间,避免某个锁一直持有,造成死锁。

假设线程1操作超过了设置的时间,锁自动被释放,线程2持有锁正在处理业务,线程1会把线程2的锁给删了。这时需要一个监控者,监控线程1的服务是否操作完毕,Redis提供了一个看门狗机制,当线程1持有锁但是操作未完成的情况下,给锁续期。 /images/20230926/img_1.png

代码实现

Lock

1
2
3
4
5
6
7
8
9
type Lock struct {
	client          *redis.Client // Redis客户端
	script          *redis.Script // 解锁脚本
	resource        string        // 锁定的资源
	randomValue     string        // 随机值
	watchDog        chan struct{} // 看门狗
	ttl             time.Duration // 过期时间
	tryLockInterval time.Duration // 重新获取锁间隔
}

TryLock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (l *Lock) TryLock(ctx context.Context) error {
	success, err := l.client.SetNX(ctx, l.resource, l.randomValue, l.ttl).Result()
	if err != nil {
		return err
	}
	// 加锁失败
	if !success {
		return ErrLockFailed
	}
	// 加锁成功,启动看门狗
	go l.startWatchDog()
	return nil
}

WatchDog

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func (l *Lock) startWatchDog() {
	ticker := time.NewTicker(l.ttl / 3)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			// 延长锁的过期时间
			ctx, cancel := context.WithTimeout(context.Background(), l.ttl/3*2)
			ok, err := l.client.Expire(ctx, l.resource, l.ttl).Result()
			cancel()
			// 异常或锁已经不存在则不再续期
			if err != nil || !ok {
				return
			}
		case <-l.watchDog:
			// 已经解锁
			return
		}
	}
}

红锁

再进一步进行优化,上面的思路都是基于单机Redis来实现的,如果采用Redis主从模式,获取锁成功,数据存入Master节点,还没有同步的时候,Master节点挂了,Slave节点无相关数据。 这个时候就需要红锁的设计思路,总的来说就是Master节点设置锁后,要等所有Slave节点存储成功之后才响应。

代码实现

RedLock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type RedLock struct {
	clients        []*redis.Client // Redis客户端
	successClients []*redis.Client // 加锁成功的客户端
	script         *redis.Script   // 解锁脚本
	resource       string          // 锁定的资源
	randomValue    string          // 随机值
	watchDog       chan struct{}   // 看门狗
}

func NewRedLock(clients []*redis.Client, resource string) *RedLock {
	return &RedLock{
		clients:  clients,
		script:   redis.NewScript(unlockScript),
		resource: resource,
	}
}

TryLock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (l *RedLock) TryLock(ctx context.Context) error {
	randomValue := gofakeit.UUID()
	var wg sync.WaitGroup
	wg.Add(len(l.clients))
	// 成功获得锁的Redis实例的客户端
	successClients := make(chan *redis.Client, len(l.clients))
	for _, client := range l.clients {
		go func(client *redis.Client) {
			defer wg.Done()
			success, err := client.SetNX(ctx, l.resource, randomValue, ttl).Result()
			if err != nil {
				return
			}
			// 加锁失败
			if !success {
				return
			}
			// 加锁成功,启动看门狗
			go l.startWatchDog()
			successClients <- client
		}(client)
	}
	// 等待所有获取锁操作完成
	wg.Wait()
	close(successClients)
	// 如果成功加锁得客户端少于客户端数量的一半+1,表示加锁失败
	if len(successClients) < len(l.clients)/2+1 {
		// 就算加锁失败,也要把已经获得的锁给释放掉
		for client := range successClients {
			go func(client *redis.Client) {
				ctx, cancel := context.WithTimeout(context.Background(), ttl)
				l.script.Run(ctx, client, []string{l.resource}, randomValue)
				cancel()
			}(client)
		}
		return ErrLockFailed
	}

	// 加锁成功,启动看门狗
	l.randomValue = randomValue
	l.successClients = nil
	for successClient := range successClients {
		l.successClients = append(l.successClients, successClient)
	}

	return nil
}

WatchDog

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (l *RedLock) startWatchDog() {
	l.watchDog = make(chan struct{})
	ticker := time.NewTicker(resetTTLInterval)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			// 延长锁的过期时间
			for _, client := range l.successClients {
				go func(client *redis.Client) {
					ctx, cancel := context.WithTimeout(context.Background(), ttl-resetTTLInterval)
					client.Expire(ctx, l.resource, ttl)
					cancel()
				}(client)
			}
		case <-l.watchDog:
			// 已经解锁
			return
		}
	}
}

参考文章