RabbitMQ整合Spring Boot项目

添加依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application-dev.yml

1
2
3
4
5
6
7
spring: 
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /

交换机\路由Key\队列的配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package cn.tedu.csmall.stock.webapi.quartz;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//这个类是配置RabbitMQ中交换机、队列、路由键
//交换机和队列是对象,需要保存到Spring容器中,路由键是绑定关系和匹配使用的关键字
@Configuration
public class RabbitMQConfig {
//使用常量定义交换机路由键和队列
public static final String STOCK_EX="stock_ex";
public static final String STOCK_ROUT="stock_rout";
public static final String STOCK_QUEUE="stock_queue";

//声明交换机队列,保存到spring容器中
//根据实际需求生成交换机的数量,目前只需要一个
@Bean
public DirectExchange stockDirectExchange(){
return new DirectExchange(STOCK_EX);
}
//声明队列对象,保存到spring容器
@Bean
public Queue stockQueue(){
return new Queue(STOCK_QUEUE);
}
//通过路由键绑定交换机和队列
@Bean
public Binding stockBinding(){
return BindingBuilder.bind(stockQueue()).to(stockDirectExchange()).with(STOCK_ROUT);
}
}

计划任务–先前的计划任务代码先注释

1
2
3
4
5
6
7
8
9
10
    @Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
//先简单发送一个字符串
//convertAndSend([交换机名称],[路由key的名称],[要发送的消息])
rabbitTemplate.convertAndSend(
RabbitMQConfig.STOCK_EX,RabbitMQConfig.STOCK_ROUT,"消息:执行减少库存的操作");
}
}

修改cron表达式

1
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");

RabbitMQ的消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package cn.tedu.csmall.stock.webapi.quartz;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

//当前类也是将对象交给Spring容器管理
@Component
@RabbitListener(queues = {RabbitMQConfig.STOCK_QUEUE})
public class RabbitMQConsumer {
//在类上监听,实际上运行的也是一个方法
//@RabbitHandler注解标记的方法,是当监听的队列有消息时,就会被触发
//一个类只能有一个方法标记这个注解
//这个方法直接声明参数类型,从队列中接收到的消息会自动转成指定的参数类型
@RabbitHandler
public void process(String str){
System.out.println("消息的接受者收到消息:"+str);
}
}

启动Nacos\RabbitMQ\Seata

启动stock-webapi

根据Cron表达式,消息会在0/10/20/30/40/50秒数时运行

测试成功表示一切正常

开发酷鲨秒杀执行流程

准备流控和降级的处理类

秒杀业务肯定是一个高并发的处理,并发数超过程序设计的限制时,就需要对请求的数量进行限流

Sentinel是阿里提供的SpringCloud组件,主要用于外界访问当前服务器的控制器方法的限流操作

先编写限流异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package cn.tedu.mall.seckill.exception;

import cn.tedu.mall.common.restful.JsonResult;
import cn.tedu.mall.common.restful.ResponseCode;
import cn.tedu.mall.pojo.seckill.dto.SeckillOrderAddDTO;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import lombok.extern.slf4j.Slf4j;

//秒杀业务限流异常处理类
@Slf4j
public class SeckillBlockHandler {
//声明限流的方法,返回值必须和控制器一致
//参数要包含控制器的参数,最后再添加一个BlockException异常类型的参数
public static JsonResult seckillBlock(String randCode, SeckillOrderAddDTO seckillOrderAddDTO,
BlockException e){
log.error("一个请求被限流了");
return JsonResult.failed(ResponseCode.INTERNAL_SERVER_ERROR,"服务器繁忙!");
}
}

再创建降级类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package cn.tedu.mall.seckill.exception;

import cn.tedu.mall.common.restful.JsonResult;
import cn.tedu.mall.common.restful.ResponseCode;
import cn.tedu.mall.pojo.seckill.dto.SeckillOrderAddDTO;
import lombok.extern.slf4j.Slf4j;

//降级类
@Slf4j
public class SeckillFallback {
//返回值必须和控制器方法一致
//参数也是包含控制层方法参数,可以不写其他参数,也可以添加Throwable类型的参数
//Throwable类型的参数就是触发这次降级的原因
public static JsonResult seckillFall(String randCode,
SeckillOrderAddDTO seckillOrderAddDTO,
Throwable throwable){
log.error("一个请求被降级了!");
return JsonResult.failed(ResponseCode.INTERNAL_SERVER_ERROR,throwable.getMessage());
}
}

提交秒杀订单

开发业务层

我们之前完成了秒杀的预热,预热中完成了秒杀商品sku库存数,spu随机码保存在redis中的操作

也完成了查询秒杀商品列表,和显示秒杀商品详情的方法

下面要开始进行秒杀商品生成订单的操作

如果用户选择商品规格(sku)提交订单,那么就要按照提交秒杀订单的业务流程处理

秒杀提交订单和普通订单的区别

1.要判断当前用户是否为重复购买

2.从Redis中判断是否有库存

3.秒杀订单转换成普通订单,需要使用dubbo在order模块完成

4.用消息队列(RabbitMQ)的方式将秒杀成功信息保存在success表中

创建一个SeckillServiceImpl业务逻辑层实现类,完成上面的业务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package cn.tedu.mall.seckill.service.impl;

import cn.tedu.mall.common.exception.CoolSharkServiceException;
import cn.tedu.mall.common.pojo.domain.CsmallAuthenticationInfo;
import cn.tedu.mall.common.restful.ResponseCode;
import cn.tedu.mall.order.service.IOmsOrderService;
import cn.tedu.mall.pojo.order.dto.OrderAddDTO;
import cn.tedu.mall.pojo.order.dto.OrderItemAddDTO;
import cn.tedu.mall.pojo.order.vo.OrderAddVO;
import cn.tedu.mall.pojo.seckill.dto.SeckillOrderAddDTO;
import cn.tedu.mall.pojo.seckill.model.Success;
import cn.tedu.mall.pojo.seckill.vo.SeckillCommitVO;
import cn.tedu.mall.seckill.config.RabbitMqComponentConfiguration;
import cn.tedu.mall.seckill.service.ISeckillService;
import cn.tedu.mall.seckill.utils.SeckillCacheUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboReference;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Service
@Slf4j
public class SeckillServiceImpl implements ISeckillService {
//减少sku库存数的Redis对象,是操作字符串
@Autowired
private StringRedisTemplate stringRedisTemplate;
//需要普通订单生成的方法,Dubbo调用
@DubboReference
private IOmsOrderService dubboOrderService;
//秒杀成功后的成功信息,先放到RabbitMQ中
@Autowired
private RabbitTemplate rabbitTemplate;

/*
1.要判断当前用户是否为重复购买
2.从Redis中判断是否有库存
3.秒杀订单转换成普通订单,需要使用dubbo在order中完成
4.用消息队列(RabbitMQ)的方式将秒杀成功信息保存在success表中
*/
@Override
public SeckillCommitVO commitSeckill(SeckillOrderAddDTO seckillOrderAddDTO) {
//第一步:利用redids检查库存数和检查是否为重复购买
//先获取用户Id和要购买的商品skuId
Long skuId = seckillOrderAddDTO.getSeckillOrderItemAddDTO().getSkuId();
Long userId = getUserId();
//我们根据userId和skuId的组合来确定谁买了什么商品
//秒杀业务规定,一个用户id只能购买一个skuId一次
//我们利用userId和skuId生成一个key,将key保存到redis中,表示当前用户已经购买过
//key的组成可能是:mall:seckill:reseckill:1:2
String reseckillCheckKey = SeckillCacheUtils.getReseckillCheckKey(skuId, userId);
//向Redis中保存这个key,利用increment()方式
//increment()效果如下
//1.如果当前key不存在,redis就会创建这个key,并保存他的值为1
//2.如果当前key存在,redis就会给当前值+1
//3.将当前key的值返回给调用者
Long seckillCounts = stringRedisTemplate.boundValueOps(reseckillCheckKey).increment();
//如果seckillCounts大于1,表示之前已经购买过
if(seckillCounts>1)
//购买次数超过1,证明不是第一次购买,终止业务,抛出异常
throw new CoolSharkServiceException(ResponseCode.FORBIDDEN,"您已经购买过该商品了");
//程序运行到这里,表示用户是第一次购买
//先检查该商品是否有库存
String stockKey = SeckillCacheUtils.getStockKey(skuId);
//从redis中获取库存数,使用decrement()方法,将当前库存减去1后返回值
Long seckillStocks = stringRedisTemplate.boundValueOps(stockKey).decrement();
//如果seckillStocks是0表示最后一件,可以正常购买
//如果小于0则表示,库存不足
if(seckillStocks<0){
//删除用户购买记录
stringRedisTemplate.boundValueOps(reseckillCheckKey).decrement();
//抛出异常,提示库存不足
throw new CoolSharkServiceException(ResponseCode.BAD_REQUEST,"对不起,您购买的商品已经无货了");
}
//代码到这里可以证明用户是第一次购买且库存充足
//第二阶段:将秒杀订单SeckillOrderAddDTO转成普通订单OrderAddDTO
OrderAddDTO orderAddDTO = converSeckillOrderTOOrder(seckillOrderAddDTO);
//为userId进行赋值
orderAddDTO.setUserId(userId);
//订单完整了,直接利用Dubbo生成普通订单
OrderAddVO orderAddVO = dubboOrderService.addOrder(orderAddDTO);
//第三阶段:使用消息队列记录秒杀成功的信息
//需要使用success对象
Success success = new Success();
BeanUtils.copyProperties(seckillOrderAddDTO.getSeckillOrderItemAddDTO(),success);
//将userId赋值给success对象
success.setUserId(userId);
//将订单编号
success.setOrderSn(orderAddVO.getSn());
//将success对象发送给RabbitMQ
rabbitTemplate.convertAndSend(RabbitMqComponentConfiguration.SECKILL_EX
,RabbitMqComponentConfiguration.SECKILL_RK,success);
//我们需要返回SeckillCommitVO类型对象
SeckillCommitVO seckillCommitVO = new SeckillCommitVO();
BeanUtils.copyProperties(orderAddVO,seckillCommitVO);
return seckillCommitVO;
}
//在这个方法中将秒杀订单转成普通订单
private OrderAddDTO converSeckillOrderTOOrder(SeckillOrderAddDTO seckillOrderAddDTO) {
OrderAddDTO orderAddDTO = new OrderAddDTO();
//进行同名属性赋值
BeanUtils.copyProperties(seckillOrderAddDTO,orderAddDTO);
//seckillOrderAddDTO对象中的订单项SeckillOrderItemAddDATO赋值给orderAddDTO订单项集合OrderItemAddDTO
OrderItemAddDTO orderItemAddDTO = new OrderItemAddDTO();
BeanUtils.copyProperties(seckillOrderAddDTO.getSeckillOrderItemAddDTO(),orderItemAddDTO);
//实例化一个List集合,将赋值好的orderItemAddDTO新增到集合中
List<OrderItemAddDTO> orderItemAddDTOS = new ArrayList<>();
orderItemAddDTOS.add(orderItemAddDTO);
//最后将集合赋值到orderAddDTO对象的orderItems属性中
orderAddDTO.setOrderItems(orderItemAddDTOS);
//转换完成,返回结果
return orderAddDTO;
}

//获取当前登录用户的用户信息
public CsmallAuthenticationInfo getUserInfo(){
//获得Spring Security上下文对象
UsernamePasswordAuthenticationToken authenticationToken =
(UsernamePasswordAuthenticationToken) SecurityContextHolder.getContext().getAuthentication();
//判断authenticationToken是否为空
if(authenticationToken==null)
throw new CoolSharkServiceException(ResponseCode.UNAUTHORIZED,"没有登录信息");
//不为空,获取其中用户信息
CsmallAuthenticationInfo csmallAuthenticationInfo =
(CsmallAuthenticationInfo) authenticationToken.getCredentials();
return csmallAuthenticationInfo;
}
//获取当前登录用户的用户id
public Long getUserId(){
return getUserInfo().getId();
}
}

开发控制层

image-20230304104628257

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package cn.tedu.mall.seckill.controller;

import cn.tedu.mall.common.exception.CoolSharkServiceException;
import cn.tedu.mall.common.restful.JsonResult;
import cn.tedu.mall.common.restful.ResponseCode;
import cn.tedu.mall.pojo.seckill.dto.SeckillOrderAddDTO;
import cn.tedu.mall.pojo.seckill.vo.SeckillCommitVO;
import cn.tedu.mall.seckill.exception.SeckillBlockHandler;
import cn.tedu.mall.seckill.exception.SeckillFallback;
import cn.tedu.mall.seckill.service.ISeckillService;
import cn.tedu.mall.seckill.utils.SeckillCacheUtils;
import com.alibaba.csp.sentinel.annotation.SentinelResource;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/seckill")
@Api(tags = "提交秒杀订单")
public class SeckillController {
@Autowired
private ISeckillService seckillService;

@Autowired
private RedisTemplate redisTemplate;

@PostMapping("/{randCode}")
@ApiOperation("随机码验证并提交订单")
@ApiImplicitParam(value = "随机码",name = "randCode",required = true,dataType = "string")
@PreAuthorize("hasRole('user')")
//Sentinel限流和降级配置
@SentinelResource(value = "seckill",
blockHandlerClass = SeckillBlockHandler.class,blockHandler = "seckillBlock",
fallbackClass = SeckillFallback.class,fallback = "seckillFall")
public JsonResult<SeckillCommitVO> commitSeckillOrder(
@PathVariable String randCode, SeckillOrderAddDTO seckillOrderAddDTO
){
//获取spuId
Long spuId = seckillOrderAddDTO.getSpuId();
//获取当前spuId对应的随机码
//先获取key,再获取value
String randCodeKey = SeckillCacheUtils.getRandCodeKey(spuId);
//判断redis中是否包含这个key
if(redisTemplate.hasKey(randCodeKey)){
//根据key获取value
String redisRandCode = redisTemplate.boundValueOps(randCodeKey).get()+"";
//为了防止Redis信息丢失,我们可以判断一下redisRandCode的存在
if(redisRandCode==null)
//redis信息丢失
throw new CoolSharkServiceException(ResponseCode.INTERNAL_SERVER_ERROR,"服务器内部错误,请联系客服");
//判断Redis中的随机码和控制器的随机码是否一致,防止投机购买
if(!redisRandCode.equals(randCode))
//如果不一致,判断为投机购买,抛出异常
throw new CoolSharkServiceException(ResponseCode.NOT_FOUND,"没有指定商品");
//执行购买操作
SeckillCommitVO seckillCommitVO = seckillService.commitSeckill(seckillOrderAddDTO);
return JsonResult.ok(seckillCommitVO);
} else {
//如果redis中没有这个随机码的key值,直接发送异常,提示没有该商品
throw new CoolSharkServiceException(ResponseCode.NOT_FOUND,"没有指定商品");
}
}
}

启动Nacos\Seata\RabbitMQ\Redis\Sentinel

项目Leaf\product\passport\order\seckill

注意yml配置文件中的RabbitMQ的用户名和密码

为了方便测试:如果说已经购买过,就修改允许购买的数量 >1为 >100

为了方便测试:如果说没有库存,可以把判断库存的if注释掉

测试成功即可

还可以测试sentinel的限流

success成功信息的处理

开发持久层

我们要连接数据库,对这个表进行新增

还有对秒杀数据库sku库存的修改

1
2
3
//根据skuId修改库存数
void updateReduceStockBySkuId(@Param("skuId") Long skuId,
@Param("quantity") Integer quantity);
1
2
3
4
5
6
7
8
9
<!--根据skuId修改库存数-->
<update id="updateReduceStockBySkuId">
update
seckill_sku
set
seckill_stock=seckill_stock - #{quantity}
where
sku_id=#{skuId}
</update>

下面再编写新增Success的方法

1
2
3
4
5
@Repository
public interface SuccessMapper {
//新增Success对象到数据库的方法
void saveSuccess(Success success);
}

SuccessMapper.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<!--  新增Success对象到数据库的方法  -->
<insert id="saveSuccess">
insert into success(
user_id,
user_phone,
sku_id,
title,
main_picture,
seckill_price,
quantity,
bar_code,
data,
order_sn
)values(
#{userId},
#{userPhone},
#{skuId},
#{title},
#{mainPicture},
#{seckillPrice},
#{quantity},
#{barCode},
#{data},
#{orderSn}
)
</insert>

开发消息的接收功能

我们当前触发新增Success的方法并不是常规的业务逻辑层

而是由RabbitMQ消息收发机制中接收消息的对象来调用

所有我们编写一个接收消息的监听器类来完成这个操作

创建consumer包,包中创建类SekillQueueConsumer代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 必须保存到Spring容纳中
@Component
// RabbitMQ监听器声明
@RabbitListener(queues = {RabbitMqComponentConfiguration.SECKILL_QUEUE})
public class SeckillQueueConsumer {
//业务需要的Mapper对象装配
@Autowired
private SuccessMapper successMapper;
@Autowired
private SeckillSkuMapper skuMapper;
// 当前类上标记的队列收到消息时
// 下面方法会接收这个消息,自动运行
@RabbitHandler
public void process(Success success){
// 先减库存
// 减少seckill_sku表中的库存数并不迫切,运行可能延迟,真正运行时,秒杀可能已经结束了
// 这个库存的减少操作也不会影响秒杀过程中redis的库存数
skuMapper.updateReduceStockBySkuId(success.getSkuId(),success.getQuantity());
// 新增success对象到数据库
successMapper.saveSuccess(success);
}
}

环境方面

Nacos\Sentinel\Seata\redis\RabbitMQ

服务方面

Leaf\product\order\seckill