1 Redis配置

1
2
3
4
5
6
7
8
9
redis:
host: localhost
port: 6379
lettuce:
pool:
max-active: 10
max-idle: 10
min-idle: 1
time-between-eviction-runs: 10s

2 Redis进行缓存

2.1 缓存穿透

缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
常见的解决方案有两种:
1.缓存空对象,优点:实现简单,维护方便。缺点:额外的内存消耗可能造成短期的不一致
2.布隆过滤,优点:内存占用较少,没有多余 key。缺点:实现复杂存在误判可能
推荐使用缓存空对象,这里封装到工具类的形式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

//缓存穿透
public <R,ID> R queryWithPassThrough(
String keyPrefix,ID id,Class<R> type,Function<ID,R> dbFallBack,Long time, TimeUnit timeUnit){
String key = keyPrefix+id;
//查询缓存
String JSON = (String) redisTemplate.opsForValue().get(key);
//缓存是否存在
if (StrUtil.isNotBlank(JSON)){
//存在,直接返回
R bean = JSONUtil.toBean(JSON, type);
return bean;
}

//命中的是否为空值
if (JSON != null){
return null;
}


//不存在,查询数据库并缓存(使用方法引用传入查询数据库的方法)
R r = dbFallBack.apply(id);
//数据库中不存在
if (r == null){
//缓存,防止缓存穿透(超时时间保底)
redisTemplate.opsForValue().set(key,"",time, timeUnit);
return null;
}
//缓存(超时时间保底)
redisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(r),time, timeUnit);
this.set(key,JSONUtil.toJsonStr(r),time,timeUnit);
//返回
return r;
}

2.2 缓存击穿

缓存击穿问题也叫热点 Key 问题,就是一个被高并发访问并且缓存重建业务较复杂的 key 突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

解决:1.互斥锁:查询缓存未命中时获取互斥锁,在写入缓存后释放。其他线程在查询未命中缓存后会获取不到锁而休眠一段时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
1.获取锁
private Boolean tryLock(String key) {
Boolean lock = redisTemplate.opsForValue().setIfAbsent(key, "1", 2, TimeUnit.MINUTES);
return BooleanUtil.isTrue(lock);
}
2.释放锁
private void unLock(String k){
redisTemplate.delete(k);
}


3.解决缓存击穿
//缓存击穿--互斥锁
private Shop queryWithPassMutex(Long id){
//查询缓存
String shopJSON = (String) redisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
//缓存是否存在
if (StrUtil.isNotBlank(shopJSON)){ //shopJson != null && shopJson.length() != 0
//存在,直接返回
Shop s = JSONUtil.toBean(shopJSON, Shop.class);
return s;
}
if (shopJSON != null){
return null;
}

//处理缓存击穿
//获取锁
Boolean lock = tryLock(LOCK_SHOP_KEY+id);
//获取失败,休眠并重试
if (BooleanUtil.isFalse(lock)){
try {
//休眠
Thread.sleep(200);
queryWithPassThrough(id);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

//获取锁
//不存在,查询并缓存
Shop shop = getById(id);
//数据库中不存在
if (shop == null){
//缓存,防止缓存穿透(超时时间保底)
redisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,JSONUtil.toJsonStr(shop),5, TimeUnit.MINUTES);
return null;
}
//缓存(超时时间保底)
redisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,JSONUtil.toJsonStr(shop),30, TimeUnit.MINUTES);
//释放锁
unLock(LOCK_SHOP_KEY+id);
//返回
return shop;
}

2.逻辑过期时间:在查询缓存时判断是否已经逻辑过期,若过期则返回旧的缓存,开启一个新的线程去更新缓存。若没过期则直接返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
1.锁的获取和释放
...省略,与互斥锁相同
2.线程池
private static final ExecutorService CACH_REBUID_EXCUTOR = Executors.newFixedThreadPool(10);

3.RedisData封装
@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}

4.逻辑过期
//缓存击穿
public <R,ID> R queryWithLogicExpire(
String cacheKey,String lockKey,ID id,Class<R> type,Long ttl,Function<ID,R> dbFallBack){
//缓存key
String CacheKey = cacheKey + id;
//锁key
String LockKey = lockKey + id;
//查询缓存
String RedisDataJSON = (String) redisTemplate.opsForValue().get(CacheKey);
//缓存不存在
if (StrUtil.isBlank(RedisDataJSON)){
return null;
}
//获取商铺信息和过期时间
RedisData redisData = JSONUtil.toBean(RedisDataJSON, RedisData.class);
R r = JSONUtil.toBean((JSONObject) redisData.getData(),type); //将Object转为JSONObject
LocalDateTime expireTime = redisData.getExpireTime();
//是否已过期
if (expireTime.isAfter(LocalDateTime.now())){
//未过期,直接返回
return r;
}

//已过期
//缓存重建
Boolean lock = tryLock(LockKey); //获取锁
if (lock){
//新线程,更新缓存
CACH_REBUID_EXCUTOR.submit(()->{
try {
//saveToRedis是将查询数据库并缓存封装的方法,daFallBack是查询数据库方法的引用,从ServiceImpl层引入
this.saveToRedis(id,ttl,dbFallBack,CacheKey);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
//释放锁
unLock(LockKey);
}
});
}
return r;
}

-d:参数表示容器将在后台运行,

—name:参数用于指定容器的名称。

-p:参数用于设置端口映射,其中5673:5672表示将容器的5672端口映射到宿主机的5673端口,15673:15672表示将容器的15672端口映射到宿主机的15673端口,这两个端口分别是RabbitMQ的消息接收端口和管理界面端口。

2.3 缓存雪崩

缓存雪崩是指在同一时段大量的缓存 key 同时失效或者 Redis 服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案:
◆ 给不同的 key 的 TTL 添加随机值
◆ 利用 Redis 集群提高服务的可用性
◆ 给缓存业务添加降级限流策略
◆ 给业务添加多级缓存

端口映射是Docker容器与外部通信的重要配置。以下是如何设置端口映射的详细说明:

-p 5673:5672:此参数将容器内部的5672端口映射到宿主机的5673端口,这是RabbitMQ用于接收客户端连接的标准AMQP协议端口。
-p 15673:15672:此参数将容器内部的15672端口映射到宿主机的15673端口,这是RabbitMQ的管理界面端口,允许用户通过Web界面管理RabbitMQ实例。
确保宿主机的这些端口没有被其他应用占用,否则会导致端口冲突。

3 秒杀优惠卷

3.1乐观锁防止超卖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Transactional
public Result seckkillVoucher(Long voucherId) {
//查询秒杀卷
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//判断秒杀是否开始
if (!LocalDateTime.now().isAfter(voucher.getBeginTime())){
return Result.fail("秒杀还未开始!");
}
if (LocalDateTime.now().isAfter(voucher.getEndTime())){
return Result.fail("秒杀已经结束!");
}
//判断库存
if (voucher.getStock()<=0){
return Result.fail("库存不足!");
}
//扣减库存
Boolean success = seckillVoucherService
.update().setSql("stock = stock -1")
.eq("voucher_id",voucherId)
.gt("stock",0) //乐观锁,在扣减时判断是否有库存
.update();
if (!success){
return Result.fail("库存不足!");
}
//生成订单
VoucherOrder order = new VoucherOrder();
//订单id
order.setId(oneIdUtil.nexInd("order"));
//用户id
order.setUserId(UserHolder.getUser().getId());
//秒杀卷id
order.setVoucherId(voucherId);
//添加订单
save(order);
return Result.ok(order.getId());
}

3.2 一人一单,简单分布式锁

1.使用互斥锁,在查询订单前加锁,防止多个线程同时并发导致一个人买多次。
2.使用互斥锁,并通过用户id做key确保一个用户只能有一把锁,线程id加uuid做value方便后续释放锁判断。
3.释放锁时要判断是否是同一个线程,防止线程并发问题:前一个线程阻塞超过了设置的释放时间导致在超过时间自动释放后,其他线程获取到锁但阻塞的业务在这时完成并释放锁,但释放的锁为线程2刚获取的锁导致的并发问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
1.工具类,获取,释放锁
public class SimpleRedisLock implements ILock{

private String name;

RedisTemplate redisTemplate;

public static final String KEY_PREFIX = "lock:";
//uuid方便后续判断是否自己的锁
public String uuid = UUID.randomUUID().toString()+"-";
public SimpleRedisLock(String name, RedisTemplate redisTemplate) {
this.name = name;
this.redisTemplate = redisTemplate;
}

@Override
public boolean tryLock(Long timeOut) {
//获取线程id
long id = Thread.currentThread().getId();
//获取锁(key=前缀+用户id)
Boolean lock = redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name,uuid+id, timeOut, TimeUnit.SECONDS);
return Boolean.TRUE.equals(lock);
}

@Override
//释放锁
public void unLock() {
//获取线程id判断是否能释放锁
//获取线程
long id = Thread.currentThread().getId();
String value = uuid+id;
//判断是否相同
String o = (String) redisTemplate.opsForValue().get(KEY_PREFIX + name);
//相同
if (o.equals(value)){
//是当前线程的锁
redisTemplate.delete(KEY_PREFIX + name);
}

}
}

2.实际业务使用
public Result seckkillVoucher(Long voucherId) {
//查询秒杀卷
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//判断秒杀是否开始
if (!LocalDateTime.now().isAfter(voucher.getBeginTime())){
return Result.fail("秒杀还未开始!");
}
if (LocalDateTime.now().isAfter(voucher.getEndTime())){
return Result.fail("秒杀已经结束!");
}

//判断库存
if (voucher.getStock()<=0){
return Result.fail("库存不足!");
}


Long userId = UserHolder.getUser().getId();
//分布式锁,互斥锁
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, redisTemplate);

//获取锁
boolean isLock = lock.tryLock(500L);
if (!isLock){
//获取锁失败,错误信息
return Result.fail("一个人只允许一单");
}
//获取锁成功
log.info("获取订单锁成功!{}","order:" + userId);
//获取当前接口的代理对象,防止事务失效
try {
//获取代理,防止事务失效
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createOrder(voucherId);
} finally {
//释放锁,防止线程崩溃不释放
lock.unlock();
}
}



public Result createOrder(Long voucherId){

//用户id
Long userId = UserHolder.getUser().getId();

//一人一单
//查询订单
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0){
//已经买过
return Result.fail("用户已购买过!");
}
//扣减库存
Boolean success = seckillVoucherService
.update().setSql("stock = stock -1")
.eq("voucher_id",voucherId)
.gt("stock",0) //乐观锁,在扣减时判断是否有库存
.update();
if (!success){
return Result.fail("库存不足!");
}
//生成订单
VoucherOrder order = new VoucherOrder();
//订单id
order.setId(oneIdUtil.nexInd("order"));
//用户id
order.setUserId(UserHolder.getUser().getId());
//秒杀卷id
order.setVoucherId(voucherId);
//添加订单
save(order);
return Result.ok(order.getId());
}

4 Redisson,现成的互斥锁等工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1.导入依赖
<!-- redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
</dependency>
2.配置cofig
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
//单一redis,可使用集群
config.useSingleServer().setAddress("redis://localhost:6379");
return Redisson.create(config);
}
}
3.使用与分布式锁使用大概一致
自动注入
@Resource
RedissonClient redissonClient;

4.1 Redisson可重入锁

在上锁后如果是

5. 杂项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
1.线程池
private static final ExecutorService CACH_REBUID_EXCUTOR = Executors.newFixedThreadPool(10);

2.Json转Bean
RedisData redisData = JSONUtil.toBean(RedisDataJSON, RedisData.class);

3.Bean转Json
JSONUtil.toJsonStr(redisData);

4.时间转为秒
timeUnit.toSeconds(time);

5.通过Redis生成唯一id(订单号)
@Slf4j
@Component
public class OneIdUtil {

public static final long BEGIN_TIMESTAP = 1640995200; //2022年1月1日0时0分0秒

public static final int COUNT_BITS = 32;
@Resource
RedisTemplate redisTemplate;

public long nexInd(String prefix){ //key前缀
//时间戳
LocalDateTime now = LocalDateTime.now();
long ThisStap = now.toEpochSecond(ZoneOffset.UTC); //转为时间戳
long timeStap = ThisStap - BEGIN_TIMESTAP;

//序列号:前缀+当前时间戳
String day = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
Long id = redisTemplate.opsForValue().increment(prefix + day); //通过当前前缀+日期作为key防止溢出
log.info("生成的唯一id:{}",timeStap << COUNT_BITS | id);
//唯一id:时间戳(31)+序列号(32)
return timeStap << COUNT_BITS | id; //将timeStap左移COUNT_BIST位并且与id或运算
}

}