本文共 32847 字,大约阅读时间需要 109 分钟。
RabbitMQ延时队列实现定时任务。
场景: 比如未付款订单,超过一定时间后,系统自动取消订单并释放占有的库存。 常用解决方案: spring的schedule定时任务轮训数据库 缺点: 消耗系统内存,增加了数据库的压力、存在较大的时间误差 解决: Rabbit的消息 TTL 和死信Exchange结合。消息的TTL(Time To Live)就是消息的存活时间,单位是毫秒。。
RabbitMQ 可以对队列
和消息
分别设置TTL。 也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就是死了,称之为死信
。会取小
的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration 字段或者 x-message-ttl 属性来设置时间
,两者是一样的效果。注意:延时消息放入到队列中,没有被任何消费者监听,如果监听就拿到了,也就被消费了,队列里边的消息只要一过设置的过期时间,就成了死信队列,服务器就会丢弃。
那么,如何设置这个TTL值呢?有两种方式,第一种是在创建队列的时候设置队列的“x-message-ttl”属性,如下:Map<String, Object> args = new HashMap<String, Object>();
args.put(“x-message-ttl”, 6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
这样所有被投递到该队列的消息都最多不会存活超过6s。
另一种方式便是针对每条消息设置TTL,代码如下:AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration(“6000”); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(exchangeName, routingKey, mandatory, properties, “msg body”.getBytes());
这样这条消息的过期时间也被设置成了6s。
但这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列没有严重的消息积压情况,则已过期的消息也许还能存活较长时间。另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
死信:Dead Letter Exchange(DLX)
一个消息在满足如下条件,会进死信路由
,记住这里是路由而不是队列,一个路由可以对应很多队列。 Dead Letter Exchange 其实就是一种普通的 exchange,和创建其他exchange一样。只是在某一个设置Dead Letter Exchange 的队列中有信息过期了,会自动触发消息的转发,发送到 Dead Letter Exhange中去。
我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列。场景:用户下单,过了30分钟没有支付,系统会默认关闭该订单,以前可以用定时任务做,现在使用延时队列。
设计建议规范(基于事件模型的交换机设计):
1、交换机命名:业务+exchange;交换机为Topic 2、路由键:事件.需要感知的业务(可以不写) 3、队列命名:事件+想要监听服务名+queue 4、绑定关系:事件.感知的业务(#) 整体业务设计: 按照上边的规范设计,对关单业务进行升级设计: 上图说明:交换机order-event-exchange
绑定了一个延时队列order.delay.queue
,路由key是 order.create.order
, 当创建了一个订单时,会发消息到该延时队列,等到TTL过期,变为死信,会自动触发消息的转发,发送到 Dead Letter Exhange(order-event-exchange) 中去,注意死信路由是 order.release.order
,然后exchange根据路由key order.release.order
转发消息到 order.release.order.queue
队列,客户端监听该队列获取消息。 根据上图的业务设计分析,需要创建两个队列,一个交换机,和两个绑定。 gulimall-order/xxx/order/config/MyMQConfig.java
package com.atguigu.gulimall.order.config;import com.atguigu.gulimall.order.entity.OrderEntity;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.Exchange;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.io.IOException;import java.util.HashMap;/** * @author: kaiyi * @create: 2020-09-16 13:53 */@Configurationpublic class MyMQConfig { /* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 */ /** * 客户端监听队列(测试) * @param orderEntity * @param channel * @param message * @throws IOException */ @RabbitListener(queues = "order.release.order.queue") public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException { System.out.println("收到过期的订单信息:准备关闭订单" + orderEntity.getOrderSn()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } /** * 死信队列 * * @return */ @Bean public Queue orderDelayQueue(){ /* Queue(String name, 队列名字 boolean durable, 是否持久化 boolean exclusive, 是否排他 boolean autoDelete, 是否自动删除 Maparguments) 属性 */ HashMap arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "order-event-exchange"); arguments.put("x-dead-letter-routing-key", "order.release.order"); arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟 Queue queue = new Queue("order.delay.queue", true, false, false, arguments); return queue; } /** * 普通队列 * * @return */ @Bean public Queue orderReleaseQueue(){ Queue queue = new Queue("order.release.order.queue", true, false, false); return queue; } /** * TopicExchange * * @return */ @Bean public Exchange orderEventExchange(){ /* * String name, * boolean durable, * boolean autoDelete, * Map arguments * */ return new TopicExchange("order-event-exchange", true, false); } @Bean public Binding orderCreateBinding() { /* * String destination, 目的地(队列名或者交换机名字) * DestinationType destinationType, 目的地类型(Queue、Exhcange) * String exchange, * String routingKey, * Map arguments * */ return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", // 路由key一般为事件名 null); } @Bean public Binding orderReleaseBinding() { return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null); }}
然后在控制器创建测试消息:
gulimall-order/xxx/order/web/HelloController.java
** * @author: kaiyi * @create: 2020-09-12 18:09 */@Controllerpublic class HelloController { @Autowired private RabbitTemplate rabbitTemplate; @ResponseBody @GetMapping(value = "/test/createOrder") public String createOrderTest() { //订单下单成功 OrderEntity orderEntity = new OrderEntity(); orderEntity.setOrderSn(UUID.randomUUID().toString()); orderEntity.setModifyTime(new Date()); //给MQ发送消息 rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity); return "ok"; }}
然后访问该路径 http://order.gulimall.com/test/createOrder, 发送消息,然后去RMQ管理界面可以看到创建的消息已经成功了。
交换机: 交换机绑定的队列(路由key): 队列: 可以看到第一个队列是死信队列,第二个事普通队列 收到的消息为实体对象json: 控制器输出的监控信息:收到过期的订单信息:准备关闭订单321c3329-d57a-4613-a4ff-331066d4105a
收到过期的订单信息:准备关闭订单44fcf65f-1e7a-40c6-8336-a6c60362920b
1、库存微服务gulimall-ware
引入高级消息队列amqp依赖:
gulimall-ware/pom.xml
org.springframework.boot spring-boot-starter-amqp
2、添加RMQ配置
gulimall-ware/src/main/resources/application.properties
# ===== RabbitMQ配置 ======spring.rabbitmq.host=192.168.10.10spring.rabbitmq.port=5672# 虚拟主机配置spring.rabbitmq.virtual-host=/# 开启发送端消息抵达Broker确认spring.rabbitmq.publisher-confirms=true# 开启发送端消息抵达Queue确认spring.rabbitmq.publisher-returns=true# 只要消息抵达Queue,就会异步发送优先回调returnfirmspring.rabbitmq.template.mandatory=true# 手动ack消息,不使用默认的消费端确认spring.rabbitmq.listener.simple.acknowledge-mode=manual
3、创建RMQ配置文件
gulimall-ware/xxx/ware/config/MyRabbitMQConfig.java
package com.atguigu.gulimall.ware.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.Exchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;/** * RMQ配置 * * @author: kaiyi * @createTime: 2020-09-15 16:40 **/@Configurationpublic class MyRabbitMQConfig { /** * 使用JSON序列化机制,进行消息转换 * @return */ @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } // @RabbitListener(queues = "stock.release.stock.queue") // public void handle(Message message) { // // } /** * 库存服务默认的交换机 * @return */ @Bean public Exchange stockEventExchange() { //String name, boolean durable, boolean autoDelete, Maparguments TopicExchange topicExchange = new TopicExchange("stock-event-exchange", true, false); return topicExchange; } /** * 普通队列 * @return */ @Bean public Queue stockReleaseStockQueue() { //String name, boolean durable, boolean exclusive, boolean autoDelete, Map arguments Queue queue = new Queue("stock.release.stock.queue", true, false, false); return queue; } /** * 延迟队列 * @return */ @Bean public Queue stockDelay() { HashMap arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "stock-event-exchange"); arguments.put("x-dead-letter-routing-key", "stock.release"); // 消息过期时间 2分钟 arguments.put("x-message-ttl", 120000); Queue queue = new Queue("stock.delay.queue", true, false, false,arguments); return queue; } /** * 交换机与普通队列绑定 * @return */ @Bean public Binding stockLocked() { //String destination, DestinationType destinationType, String exchange, String routingKey, // Map arguments Binding binding = new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.release.#", null); return binding; } /** * 交换机与延迟队列绑定 * @return */ @Bean public Binding stockLockedBinding() { return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.locked", null); }}
解锁库存流程:
可以看到,在锁定库存时,我们增加了库存工作单,用来记录库存锁定的明细记录,如果库存锁定异常,则会回滚,该表不会有数据记录,如果锁定成功,则会有具体的锁定记录,锁定成功后会发送消息到延时队列,过段时间会根据订单创建的状态(订单取消或订单未创建成功)来解锁库存。 解锁库存具体步骤:锁库存
锁库存,并发送消息到延时队列,方法orderLockStock(WareSkuLockVo vo)
gulimall-ware/xxx/ware/service/impl/WareSkuServiceImpl.java
package com.atguigu.gulimall.ware.service.impl;import com.alibaba.fastjson.TypeReference;import com.atguigu.common.exception.NoStockException;import com.atguigu.common.to.mq.StockDetailTo;import com.atguigu.common.to.mq.StockLockedTo;import com.atguigu.common.utils.R;import com.atguigu.gulimall.ware.entity.WareOrderTaskDetailEntity;import com.atguigu.gulimall.ware.entity.WareOrderTaskEntity;import com.atguigu.gulimall.ware.feign.OrderFeignService;import com.atguigu.gulimall.ware.feign.ProductFeignService;import com.atguigu.gulimall.ware.service.WareOrderTaskDetailService;import com.atguigu.gulimall.ware.service.WareOrderTaskService;import org.springframework.transaction.annotation.Transactional;@Service("wareSkuService")public class WareSkuServiceImpl extends ServiceImplimplements WareSkuService { @Autowired WareSkuDao wareSkuDao; @Autowired ProductFeignService productFeignService; @Autowired private WareOrderTaskService wareOrderTaskService; @Autowired private WareOrderTaskDetailService wareOrderTaskDetailService; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private OrderFeignService orderFeignService; /** * 为某个订单锁定库存 * @param vo * @return */ @Transactional(rollbackFor = Exception.class) @Override public boolean orderLockStock(WareSkuLockVo vo) { /** * 保存库存工作单详情信息 * 追溯 */ WareOrderTaskEntity wareOrderTaskEntity = new WareOrderTaskEntity(); wareOrderTaskEntity.setOrderSn(vo.getOrderSn()); wareOrderTaskEntity.setCreateTime(new Date()); wareOrderTaskService.save(wareOrderTaskEntity); //1、按照下单的收货地址,找到一个就近仓库,锁定库存 //2、找到每个商品在哪个仓库都有库存 List locks = vo.getLocks(); List collect = locks.stream().map((item) -> { SkuWareHasStock stock = new SkuWareHasStock(); Long skuId = item.getSkuId(); stock.setSkuId(skuId); stock.setNum(item.getCount()); //查询这个商品在哪个仓库有库存 List wareIdList = wareSkuDao.listWareIdHasSkuStock(skuId); stock.setWareId(wareIdList); return stock; }).collect(Collectors.toList()); //2、锁定库存 for (SkuWareHasStock hasStock : collect) { boolean skuStocked = false; Long skuId = hasStock.getSkuId(); List wareIds = hasStock.getWareId(); if (org.springframework.util.StringUtils.isEmpty(wareIds)) { //没有任何仓库有这个商品的库存,抛出异常,前边已经锁定的库存也一起会回滚 throw new NoStockException(skuId); } //1、如果每一个商品都锁定成功,将当前商品锁定了几件的工作单记录发给MQ //2、锁定失败。前面保存的工作单信息都回滚了。发送出去的消息,即使要解锁库存,由于在数据库查不到指定的id,所有就不用解锁 for (Long wareId : wareIds) { //锁定成功就返回1,失败就返回0 Long count = wareSkuDao.lockSkuStock(skuId,wareId,hasStock.getNum()); if (count == 1) { skuStocked = true; WareOrderTaskDetailEntity taskDetailEntity = WareOrderTaskDetailEntity.builder() .skuId(skuId) .skuName("") .skuNum(hasStock.getNum()) .taskId(wareOrderTaskEntity.getId()) .wareId(wareId) .lockStatus(1) .build(); wareOrderTaskDetailService.save(taskDetailEntity); //TODO 告诉MQ库存锁定成功 StockLockedTo lockedTo = new StockLockedTo(); lockedTo.setId(wareOrderTaskEntity.getId()); StockDetailTo detailTo = new StockDetailTo(); BeanUtils.copyProperties(taskDetailEntity,detailTo); lockedTo.setDetailTo(detailTo); rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",lockedTo); break; } else { //当前仓库锁失败,重试下一个仓库 } } if (skuStocked == false) { //当前商品所有仓库都没有锁住 throw new NoStockException(skuId); } } //3、肯定全部都是锁定成功的 return true; } @Override public void unlockStock(StockLockedTo to) { //库存工作单的id StockDetailTo detail = to.getDetailTo(); Long detailId = detail.getId(); /** * 解锁 * 1、查询数据库关于这个订单锁定库存信息 * 有:证明库存锁定成功了 * 解锁:订单状况 * 1、没有这个订单,必须解锁库存 * 2、有这个订单,不一定解锁库存 * 订单状态:已取消:解锁库存 * 已支付:不能解锁库存 */ WareOrderTaskDetailEntity taskDetailInfo = wareOrderTaskDetailService.getById(detailId); if (taskDetailInfo != null) { //查出wms_ware_order_task工作单的信息 Long id = to.getId(); WareOrderTaskEntity orderTaskInfo = wareOrderTaskService.getById(id); //获取订单号查询订单状态 String orderSn = orderTaskInfo.getOrderSn(); //远程查询订单信息 R orderData = orderFeignService.getOrderStatus(orderSn); if (orderData.getCode() == 0) { //订单数据返回成功 OrderVo orderInfo = orderData.getData("data", new TypeReference () {}); //判断订单状态是否已取消或者支付或者订单不存在 if (orderInfo == null || orderInfo.getStatus() == 4) { //订单已被取消,才能解锁库存 if (taskDetailInfo.getLockStatus() == 1) { //当前库存工作单详情状态1,已锁定,但是未解锁才可以解锁 unLockStock(detail.getSkuId(),detail.getWareId(),detail.getSkuNum(),detailId); } } } else { //消息拒绝以后重新放在队列里面,让别人继续消费解锁 //远程调用服务失败 throw new RuntimeException("远程调用服务失败"); } } else { //无需解锁 } } /** * 解锁库存的方法 * @param skuId * @param wareId * @param num * @param taskDetailId */ public void unLockStock(Long skuId,Long wareId,Integer num,Long taskDetailId) { //库存解锁 wareSkuDao.unLockStock(skuId,wareId,num); //更新工作单的状态 WareOrderTaskDetailEntity taskDetailEntity = new WareOrderTaskDetailEntity(); taskDetailEntity.setId(taskDetailId); taskDetailEntity.setLockStatus(2); //变为已解锁 wareOrderTaskDetailService.updateById(taskDetailEntity); } @Data class SkuWareHasStock { private Long skuId; private Integer num; private List wareId; }}
监听队列:
gulimall-ware/xxx/ware/listener/StockReleaseListener.java
package com.atguigu.gulimall.ware.listener;import com.atguigu.common.to.mq.StockLockedTo;import com.atguigu.gulimall.ware.service.WareSkuService;import com.rabbitmq.client.Channel;/** * 库存解锁监听 * * @desc * 库存锁定成功发送消息到延时队列 stock.locked(路由key),超时TTL,消息进入私信路由,然后转发到解锁库存的队列。 * * @author: kaiyi * @create: 2020-09-16 19:01 */@Slf4j@RabbitListener(queues = "stock.release.stock.queue")@Servicepublic class StockReleaseListener { @Autowired private WareSkuService wareSkuService; /** * 1、库存自动解锁 * 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁 * * 2、订单失败 * 库存锁定失败 * * 只要解锁库存的消息失败,一定要告诉服务解锁失败 */ @RabbitHandler public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException { log.info("******收到解锁库存的信息******"); try { //当前消息是否被第二次及以后(重新)派发过来了 // Boolean redelivered = message.getMessageProperties().getRedelivered(); //解锁库存 wareSkuService.unlockStock(to); // 手动删除消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { // 解锁失败 将消息重新放回队列,让别人消费 channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); } }}
上边就是创建订单后锁库存,发消息到延时队列,监听队列,创建的订单出现异常是否来解锁库存,手动确认消息的核心代码逻辑。
订单结算页,购买了一件商品。
在提交订单时,远程锁库存成功后模拟代码异常: 提交订单,由于异常会回滚订单并且回退到结算页,连续提交三次,我们可以看到延时队列里边有三条信息。 库存表wms_ware_sku
,原来锁定了 3 件库存,现在库存锁定为6,因为库存是远程锁定的,所以,主程序事务回滚对远程的不起作用,不过在锁定库存成功时发库存锁定成功的消息,后边通过消息会检查是否释放库存。 库存工作单主表: 库存工作单明细表,我们可以看到新增的3条记录,明细状态lock_status(1-已锁定 2-已解锁 3-扣减) 订单服务订单表: 然后等到消息过期进入死信路由,TTL后客户端监听消息判断是否释放库存,消息在判断的时候先根据生成的订单号远程查询gulimall-order
是否存在对应的订单,如果不存在,则直接释放锁定的库存,因为在生成订单的时候抛出异常生成的订单回滚了,所以 oms_order
表不存在订单,这时监听的消息拿到延时消息后,做完判断后会触发解锁库存的动作。 过了几分钟后,我们可以看到消息已经被消费了,并且锁定的库存也释放了,变回原来的 3 件。 消息队列: 库存表: 可以看到,RMQ在解决分布式事务一致性问题上非常强大,不仅实现了解耦,而且还保证了可靠消息+最终一致性。 这里出现了两个交换机绑定同一个队列的情况,即订单的交换机和库存的队列绑定在一起了。
1、订单释放直接和库存释放进行绑定
gulimall-order/xxx/order/config/MyMQConfig.java
package com.atguigu.gulimall.order.config;import com.atguigu.gulimall.order.entity.OrderEntity;import com.rabbitmq.client.AMQP;/** * @author: kaiyi * @create: 2020-09-16 13:53 */@Configurationpublic class MyMQConfig { /* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 */ /** * 客户端监听队列(测试) * @param orderEntity * @param channel * @param message * @throws IOException */ /* @RabbitListener(queues = "order.release.order.queue") public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException { System.out.println("收到过期的订单信息:准备关闭订单" + orderEntity.getOrderSn()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } */ /** * 死信队列 * * @return */ @Bean public Queue orderDelayQueue(){ /* Queue(String name, 队列名字 boolean durable, 是否持久化 boolean exclusive, 是否排他 boolean autoDelete, 是否自动删除 Maparguments) 属性 */ HashMap arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "order-event-exchange"); // 信死了交给哪个交换机 arguments.put("x-dead-letter-routing-key", "order.release.order"); // 信死了交给哪个路由key arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟 Queue queue = new Queue("order.delay.queue", true, false, false, arguments); return queue; } /** * 普通队列 * * @return */ @Bean public Queue orderReleaseQueue(){ Queue queue = new Queue("order.release.order.queue", true, false, false); return queue; } /** * TopicExchange * * @return */ @Bean public Exchange orderEventExchange(){ /* * String name, * boolean durable, * boolean autoDelete, * Map arguments * */ return new TopicExchange("order-event-exchange", true, false); } @Bean public Binding orderCreateBinding() { /* * String destination, 目的地(队列名或者交换机名字) * DestinationType destinationType, 目的地类型(Queue、Exhcange) * String exchange, * String routingKey, * Map arguments * */ return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", // 路由key一般为事件名 null); } @Bean public Binding orderReleaseBinding() { return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null); } /** * 订单释放直接和库存释放进行绑定 * @return */ @Bean public Binding orderReleaseOtherBinding() { return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.other.#", null); }}
2、提交订单增加订单创建成功,发送消息给MQ
gulimall-order/xxx/order/service/impl/OrderServiceImpl.java
/** * 提交订单 * @param vo * @return */ // @Transactional(isolation = Isolation.READ_COMMITTED) 设置事务的隔离级别 // @Transactional(propagation = Propagation.REQUIRED) 设置事务的传播级别 @Transactional(rollbackFor = Exception.class) // @GlobalTransactional(rollbackFor = Exception.class) @Override public SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) { confirmVoThreadLocal.set(vo); SubmitOrderResponseVo responseVo = new SubmitOrderResponseVo(); //去创建、下订单、验令牌、验价格、锁定库存... //获取当前用户登录的信息 MemberResponseVo memberResponseVo = LoginUserInterceptor.loginUser.get(); responseVo.setCode(0); //1、验证令牌是否合法【令牌的对比和删除必须保证原子性】 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; String orderToken = vo.getOrderToken(); //通过lua脚本原子验证令牌和删除令牌 Long result = redisTemplate.execute(new DefaultRedisScript(script, Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseVo.getId()), orderToken); if (result == 0L) { //令牌验证失败 responseVo.setCode(1); return responseVo; } else { //令牌验证成功 //1、创建订单、订单项等信息 OrderCreateTo order = createOrder(); //2、验证价格 BigDecimal payAmount = order.getOrder().getPayAmount(); BigDecimal payPrice = vo.getPayPrice(); if (Math.abs(payAmount.subtract(payPrice).doubleValue()) < 0.01) { //金额对比 //TODO 3、保存订单 saveOrder(order); //4、库存锁定,只要有异常,回滚订单数据 //订单号、所有订单项信息(skuId,skuNum,skuName) WareSkuLockVo lockVo = new WareSkuLockVo(); lockVo.setOrderSn(order.getOrder().getOrderSn()); //获取出要锁定的商品数据信息 List orderItemVos = order.getOrderItems().stream().map((item) -> { OrderItemVo orderItemVo = new OrderItemVo(); orderItemVo.setSkuId(item.getSkuId()); orderItemVo.setCount(item.getSkuQuantity()); orderItemVo.setTitle(item.getSkuName()); return orderItemVo; }).collect(Collectors.toList()); lockVo.setLocks(orderItemVos); //TODO 调用远程锁定库存的方法 //出现的问题:扣减库存成功了,但是由于网络原因超时,出现异常,导致订单事务回滚,库存事务不回滚(解决方案:seata) //为了保证高并发,不推荐使用seata,因为是加锁,并行化,提升不了效率,可以发消息给库存服务 R r = wmsFeignService.orderLockStock(lockVo); if (r.getCode() == 0) { //锁定成功 responseVo.setOrder(order.getOrder()); // int i = 10/0; // 抛出异常,测试远程回滚 //TODO 订单创建成功,发送消息给MQ rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order.getOrder()); //删除购物车里的数据 redisTemplate.delete(CartConstant.CART_PREFIX + memberResponseVo.getId()); return responseVo; } else { //锁定失败 String msg = (String) r.get("msg"); throw new NoStockException(msg); // responseVo.setCode(3); // return responseVo; } } else { responseVo.setCode(2); return responseVo; } } }
3、关单监听
gulimall-order/xxx/order/listener/OrderCloseListener.java
package com.atguigu.gulimall.order.listener;import com.atguigu.gulimall.order.entity.OrderEntity;import com.atguigu.gulimall.order.service.OrderService;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.io.IOException;/** * 关单监听 * * @author: kaiyi * @create: 2020-09-17 11:01 */@RabbitListener(queues = "order.release.order.queue")@Servicepublic class OrderCloseListener { @Autowired private OrderService orderService; @RabbitHandler public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException { System.out.println("收到过期的订单信息,准备关闭订单" + orderEntity.getOrderSn()); try { orderService.closeOrder(orderEntity); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); } }}
4、关单成功给库存服务发送MQ消息
gulimall-order/xxx/order/service/impl/OrderServiceImpl.java
/** * 关闭订单 * @param orderEntity */ @Override public void closeOrder(OrderEntity orderEntity) { //关闭订单之前先查询一下数据库,判断此订单状态是否已支付 OrderEntity orderInfo = this.getOne(new QueryWrapper(). eq("order_sn",orderEntity.getOrderSn())); if (orderInfo.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) { //代付款状态进行关单 OrderEntity orderUpdate = new OrderEntity(); orderUpdate.setId(orderInfo.getId()); orderUpdate.setStatus(OrderStatusEnum.CANCLED.getCode()); this.updateById(orderUpdate); // 发送消息给MQ OrderTo orderTo = new OrderTo(); BeanUtils.copyProperties(orderInfo, orderTo); try { //TODO 确保每个消息发送成功,给每个消息做好日志记录,(给数据库保存每一个详细信息)保存每个消息的详细信息 rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo); } catch (Exception e) { //TODO 定期扫描数据库,重新发送失败的消息 } } }
1、库存服务-库存释放监听器增加订单关单库存释放处理方法
gulimall-ware/xxx/ware/listener/StockReleaseListener.java
package com.atguigu.gulimall.ware.listener;import com.atguigu.common.to.OrderTo;import com.atguigu.common.to.mq.StockLockedTo;/** * 库存解锁监听 * * @desc * 库存锁定成功发送消息到延时队列 stock.locked(路由key),超时TTL,消息进入私信路由,然后转发到解锁库存的队列。 * * @author: kaiyi * @create: 2020-09-16 19:01 */@Slf4j@RabbitListener(queues = "stock.release.stock.queue")@Servicepublic class StockReleaseListener { @Autowired private WareSkuService wareSkuService; /** * 1、库存自动解锁 * 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁 * * 2、订单失败 * 库存锁定失败 * * 只要解锁库存的消息失败,一定要告诉服务解锁失败 */ @RabbitHandler public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException { log.info("******收到解锁库存的信息******"); try { //当前消息是否被第二次及以后(重新)派发过来了 // Boolean redelivered = message.getMessageProperties().getRedelivered(); //解锁库存 wareSkuService.unlockStock(to); // 手动删除消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { // 解锁失败 将消息重新放回队列,让别人消费 channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); } } /** * 订单关单库存释放 * * @param orderTo * @param message * @param channel * @throws IOException */ @RabbitHandler public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException { log.info("******收到订单关闭,准备解锁库存的信息******"); try { wareSkuService.unlockStock(orderTo); // 手动删除消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { // 解锁失败 将消息重新放回队列,让别人消费 channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); } }}
这个监听器既可以处理库存解锁又可以处理订单关单的处理业务,根据参数来决定具体调用哪一个,这是一个重载。
2、具体解锁库存实现
gulimall-ware/xxx/ware/service/impl/WareSkuServiceImpl.java
/** * 防止订单服务卡顿,导致订单状态消息一直改不了,库存优先到期,查订单状态新建,什么都不处理 * 导致卡顿的订单,永远都不能解锁库存 * @param orderTo */ @Transactional(rollbackFor = Exception.class) @Override public void unlockStock(OrderTo orderTo) { String orderSn = orderTo.getOrderSn(); //查一下最新的库存解锁状态,防止重复解锁库存 WareOrderTaskEntity orderTaskEntity = wareOrderTaskService.getOrderTaskByOrderSn(orderSn); //按照工作单的id找到所有 没有解锁的库存,进行解锁 Long id = orderTaskEntity.getId(); Listlist = wareOrderTaskDetailService.list(new QueryWrapper () .eq("task_id", id).eq("lock_status", 1)); for (WareOrderTaskDetailEntity taskDetailEntity : list) { unLockStock(taskDetailEntity.getSkuId(), taskDetailEntity.getWareId(), taskDetailEntity.getSkuNum(), taskDetailEntity.getId()); } }
高并发场景的分布式事务,我们采用柔性事务+可靠消息+最终一致性方案(异步确保型),可靠性是最重要的,那么如何保证消息的可靠性呢?
1、消息发送出去,由于网络问题没有抵达服务器
代码示例:
/** * 关闭订单 * @param orderEntity */ @Override public void closeOrder(OrderEntity orderEntity) { //关闭订单之前先查询一下数据库,判断此订单状态是否已支付 OrderEntity orderInfo = this.getOne(new QueryWrapper(). eq("order_sn",orderEntity.getOrderSn())); if (orderInfo.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) { //代付款状态进行关单 OrderEntity orderUpdate = new OrderEntity(); orderUpdate.setId(orderInfo.getId()); orderUpdate.setStatus(OrderStatusEnum.CANCLED.getCode()); this.updateById(orderUpdate); // 发送消息给MQ OrderTo orderTo = new OrderTo(); BeanUtils.copyProperties(orderInfo, orderTo); try { //TODO 确保每个消息发送成功,给每个消息做好日志记录,(给数据库保存每一个详细信息)保存每个消息的详细信息 rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo); } catch (Exception e) { //TODO 定期扫描数据库,重新发送失败的消息 // while() 重试次数 } } }
创建消息日志记录表:
2、消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。生产者消息确认回调应该增加日志记录,确认回调成功后修改记录日志的状态:
gulimall-order/xxx/order/config/MyRabbitConfig.java
/** * 定制RabbitTemplate * 1、服务收到消息就会回调 * 1、spring.rabbitmq.publisher-confirms: true * 2、设置确认回调 * 2、消息正确抵达队列就会进行回调 * 1、spring.rabbitmq.publisher-returns: true * spring.rabbitmq.template.mandatory: true * 2、设置确认回调ReturnCallback ** 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息) */ // @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法 public void initRabbitTemplate() { /** * 1、只要消息抵达Broker就ack=true * correlationData:当前消息的唯一关联数据(这个是消息的唯一id) * ack:消息是否成功收到 * cause:失败的原因 */ //设置确认回调 rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> { /** * 1、做好消息确认机制(publisher,consumer【手动ack】】) * 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再发送一遍 */ // 服务器收到生产者发送的消息了 // 修改消息的状态 System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]"); }); /** * 2、只要消息没有投递给指定的队列,就触发这个失败回调 * message:投递失败的消息详细信息 * replyCode:回复的状态码 * replyText:回复的文本内容 * exchange:当时这个消息发给哪个交换机 * routingKey:当时这个消息用哪个路邮键 */ rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> { System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" + "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]"); }); }
3、自动ACK的状态下。消费者收到消息,但没来得及消费然后宕机。
防止消息丢失记住这两条:
1、做好消息确认机制(publisher,consumer【手动ack】】)
2、每一个发送的消息都在数据库做好记录。定期将失败的消息再发送一遍出现重复的几种情况
解决方案
防重表
(redis/mysql),发送消息每一个都有业务的唯一标识,处理过就不用再处理。是否被重新投递过来的
,而不是第一次被投递过来的。转载地址:http://lonti.baihongyu.com/