使用redis的分布式锁实现一个简单的秒杀系统

前言

实现秒杀系统时,为了控制并发造成商品数量的不一致,我们可能会在程序中使用锁,比如一般会用synchronized关键字或者ReentrantLock的Lock锁来加锁,但是这种加锁的最细粒子程度只能到代码段,按照我们的想法,当发生对同一个商品(比如id)执行操作时,才需要加锁;但是如果此时进来两个不同的商品(比如id为100和200),我们可以不用加锁,因为两个不同的商品互不干扰,就算不加锁也不会出现并发问题。而我们如果使用synchronized或者Lock实现加锁时,当两个并发访问两个不同的商品时,还是会对我们的代码段加锁。
这时,我们可以使用redis的setnx(set if not exists)来实现加锁的功能,setnx(key, value)就是如果key不存在redis,就运行设置(key,value)值。如果key值已经存在,就不允许操作。我们可以利用这个特性,在程序对商品进行操作时,可以向redis进行setnx,key值可以为商品的id比如gId:10092,value值随意。如果redis成功设置了gId:10092则证明目前除了当前操作还没有人对商品进行修改,当我们完成了对商品的操作完,记得一定要 把key值删除掉(del key) 。 如果redis设置gId:10092失败了,则说明当前有人正在修改商品的属性,我们可以先放弃操作,休息一小段时间后再请求。这样就差不多是占“茅坑”,茅坑有很多个,当你想要蹲的茅坑已经被别人占时,你只好先放弃然后稍后再尝试。
可能有人会问,为什么要绕这么大的弯子去请求redis服务器再操作了呢?主要有两个原因:第一、上面已经说了,如果用Synchronized或者Lock锁,就算是不同的商品,都会对核心代码段产生竞争。第二、Redis足够快,它是纯内存操作的,速度很快,就算是发送请求访问redis,延迟可以忽略不计。

实战

前期准备

Maven,Spring环境,SpringAOP环境,Jedis包,junti测试包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.8.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
</dependencies>

代码解析

秒杀的逻辑

假设我们就只有一个服务——秒杀商品,我们先定义一个接口,然后写一个实现类,接口里面只有一个秒杀方法
秒杀接口

1
2
3
4
package workspace;
public interface SecKill {
public void seckill(String userId, Long gId);
}

秒杀实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package workspace;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
public class SecKillImpl implements SecKill {
/**
* 模拟数据库
*/
static Map<Long, Long> inventory ;
static {
inventory = new HashMap<>();
// 插入两条数据
inventory.put(10001L, 1000L);
inventory.put(10002L, 1000L);
}
@Override
public void seckill(String userId, Long gId) {
if (null!=userId){
reduceInventory(gId);
}
}
/**
* 库存减一
* @param gId
* @return
*/
public Long reduceInventory(Long gId){
inventory.put(gId, inventory.get(gId)-1);
return inventory.get(gId);
}
}

这里我们用HashMap模拟数据库,数据里面有两条数据,库存都是1000,秒杀的逻辑就是让库存减一

AOP切面实现加锁解锁

接下来,我用AOP对seckill方法进行增强,在不改变seckill方法的前提下,利用AOP的环绕通知对seckill进行加锁和解锁,所以需要配置AOP环境
配置切面切点和方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package config;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.stereotype.Component;
import workspace.JedisLock;

@Component
@Aspect
public class AOPConfig implements BeanFactoryAware {
/**
* 客户端获取锁失败后轮询获取锁的时间30秒
*/
final static Long TIME_OUT = 90L;
/**
* Redis中键的过期时间10秒
*/
final static int EXPIRE = 10;

/**
* 记录失败操作的次数
*/
public static Long ERROR_COUNT = 0L;

/**
* 执行秒杀方法时(载点)
*/
@Pointcut("execution(* workspace.SecKillImpl.seckill(..))")
public void pointCut(){

}
@Around("pointCut()")
public boolean around(ProceedingJoinPoint pjp) throws Throwable {
Object[] objects = pjp.getArgs();
Long gId = (Long) objects[1];
JedisLock lock = null;
try {
System.out.println(Thread.currentThread().getId()+" before executing--");
lock = createJedisLock();
if (lock.lock(gId,TIME_OUT, EXPIRE)) {
pjp.proceed();
System.out.println("--after executing--");
return true;
}
else {
System.out.println(Thread.currentThread().getId()+" 操作失败--");
ERROR_COUNT++;
return false;
}
}catch (Exception e){
e.printStackTrace();
return false;
}finally {
lock.unlock(gId);
}
}
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
BeanFactory beanFactory;
public JedisLock createJedisLock(){
return this.beanFactory.getBean(JedisLock.class);
}
}

说明:

  • 1、使用around环绕通知,是为了在执行seckill秒杀方法前对其进行加锁,在方法执行完成后进行解锁。如果获取不到锁,就不能执行seckill秒杀方法
  • 2、实现BeanFactoryAware接口的原因是,为了获取原型的JedisLock,JedisLock是操作Jedis的封装,每个JedisLock代表一条Jedis连接,操作完需要关闭Jedis。而且JedisLock需要注明是prototype原型对象
  • 3、必须设置KEY的过期时间,也就是加锁在redis的有效时间,这必须设置,防止因为突发原因比如服务器忽然宕机忘记解锁导致其他客户端获取不到锁

JedisLock实现加解锁

接下来说下JedisLock的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package workspace;

import config.JedisFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;

@Component
@Scope("prototype")
public class JedisLock implements InitializingBean {
@Autowired
JedisFactory jedisFactory;

Jedis jedis;

/**
* 该标志用于判断是否需要删除锁
*/
boolean flag = false;

public boolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
@Override
public void afterPropertiesSet() throws Exception {
// 每个Jedis锁对应一个Jedis连接
// 只有最后调用unlock,才释放Jedis连接
this.jedis = jedisFactory.getJedis();
}

public boolean lock(Long gId, long timeout, int expire) {
try {
Long startTime = System.currentTimeMillis() / 1000;
// 在限定时间内有限循环轮询
while (System.currentTimeMillis() / 1000 - startTime < timeout) {
// 如果获取锁成功
Long result = 0L;
try {
// 调用redis的setnx
result = jedis.setnx(gId.toString(), "1");
}catch (Exception e){
continue;
}
if (result == 1) {
System.out.println(Thread.currentThread().getId() + "获取"+ gId +"锁成功");
// 设置过期时间
jedis.expire(gId + "", expire);
this.flag = true;
return true;
}
// 如果获取不到锁
System.out.println(Thread.currentThread().getId() + "出现锁等待");
// 短暂休眠,尽可能的避免活锁
Thread.sleep(200);
}
return false;
} catch (Exception e) {
System.out.println(Thread.currentThread().getId()+ "获取锁"+gId+"错误");
e.printStackTrace();
return false;
}
}

public void unlock(Long gId) {
try {
if (this.flag){
// 直接删除
jedis.del(gId + "");
System.out.println(Thread.currentThread().getId()+ "删除" + gId +"锁成功");
}
} catch (Exception e) {
e.printStackTrace();
}
finally {
if (jedis.isConnected()) {
jedis.close();
}
}
}
}

主要是通过JedisFactory获取Jedis连接,JedisFactory也是自己实现,等下会有说明。这里需要特别注意两点

  • 1、JedisLock必须是原型模式,因为并发操作时,每个操作代表一个用户,每个用户需要对应一条Redis连接,避免多个用户公用一条Jedis连接,防止其中一个用户关闭了连接导致另一个不能用
  • 2、实现InitializingBean方法,每生成一个Bean就初始化一条Jedis连接

JedisFactory创建Jedis连接

跟数据库连接池一样,这里需要使用一JedisPool创建Jedis连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package config;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

@Component
public class JedisFactory {
JedisPool jedisPool;

public JedisFactory() {
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxIdle(100);
poolConfig.setMinIdle(1);
poolConfig.setMaxTotal(1000);
poolConfig.setMaxWaitMillis(5000);
this.jedisPool = new JedisPool(poolConfig, "127.0.0.1");
}

public Jedis getJedis() {
Jedis jedis = jedisPool().getResource();
return jedis;
}

public JedisPool jedisPool() {
if (this.jedisPool != null) {
return this.jedisPool;
}
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxIdle(100);
poolConfig.setMinIdle(1);
poolConfig.setMaxTotal(1000);
JedisPool jedisPool = new JedisPool(poolConfig, "127.0.0.1");
this.jedisPool = jedisPool;
return this.jedisPool;
}
}

这里主要是创建一个JedisPool的连接池,设置连接池大小和每条连接的最长连接时间,getJedis方法就是从JedisPool里面获取一条Jedis连接
逻辑大概就是这样,接下来就来测试一下

测试

因为是使用Spring,所以需要配置Spring环境

1
2
3
4
5
6
7
8
9
10
package config;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
@Configuration
@ComponentScan({"config", "workspace"})
@EnableAspectJAutoProxy
public class AppConfig {

}

注解扫描指定的包,开启AOP功能

开始测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package workspace;
import config.AOPConfig;
import config.AppConfig;
import config.JedisFactory;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import redis.clients.jedis.Jedis;
import java.util.concurrent.CountDownLatch;
public class App
{
@Test
public void testSecKill(){

final ApplicationContext ac = new AnnotationConfigApplicationContext();
((AnnotationConfigApplicationContext) ac).register(AppConfig.class);
((AnnotationConfigApplicationContext) ac).refresh();
System.out.println("获取SecKill Bean");


int threadCount = 100;
int splitPoint = threadCount / 2;
final CountDownLatch endCount = new CountDownLatch(threadCount);
final CountDownLatch beginCount = new CountDownLatch(1);

Thread[] threads = new Thread[threadCount];
//起500个线程,秒杀第一个商品
for(int i= 0;i < splitPoint;i++){
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
//等待在一个信号量上,挂起
beginCount.await();
SecKill secKill = ac.getBean(SecKill.class);
// 该方法会被AOP代理
secKill.seckill("1", 10001L);

}catch (Exception e){
e.printStackTrace();
}finally {
endCount.countDown();
}
}
});
threads[i].start();

}
//再起500个线程,秒杀第二件商品
for(int i= splitPoint;i < threadCount;i++){
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
//等待在一个信号量上,挂起
beginCount.await();
SecKill secKill = ac.getBean(SecKill.class);
// 该方法会被AOP代理
secKill.seckill("1", 10002L);

}catch (Exception e){
e.printStackTrace();
}finally {
endCount.countDown();
}
}
});
threads[i].start();

}

long startTime = System.currentTimeMillis()/1000;
//主线程释放开始信号量,并等待结束信号量,这样做保证1000个线程做到完全同时执行,保证测试的正确性
// 也就说开始让线程工作
beginCount.countDown();

try {
//主线程等待结束信号量,也就是等待其他线程的工作全部做完
endCount.await();
//观察秒杀结果是否正确
System.out.println(SecKillImpl.inventory.get(10001L));
System.out.println(SecKillImpl.inventory.get(10002L));
System.out.println("error count" + AOPConfig.ERROR_COUNT);
System.out.println("total cost " + (System.currentTimeMillis()/1000 - startTime));
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

简单的说明一下

  • 1、先让Spring跑起来,把我们写的AppConfig注册进去
  • 2、声明CountDownLatch(阀),是为了最后数据的正确性校验,等所有线程跑完了再输出最后库存里的剩余数量是否正确
  • 3、我们开启100个线程测试,50个操作10001商品,50个操作10002商品,如果操作正确的话,最后两个商品的库存应该都是950

最后结果:

库存跟我们预想的一致

其他问题

当线程很多的时候,比如有1000个线程,为了顺利通过测试,需要延长每个操作循环等待锁的时间。我使用1000个线程并发时,需要延长时间至60秒(不知道是不是我电脑的问题…)
或者你可以使用另一个Redis的连接池——lettuce,我测试过了,lettuce比以往的JedisPool更快,1000个并发使用JedisPool需要60秒,而使用lettuce则需要不到40秒

项目地址

https://github.com/SouthLight-Lin/redis-in-seckill