博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ消息队列处理库存解锁及关闭订单问题
阅读量:4148 次
发布时间:2019-05-25

本文共 32847 字,大约阅读时间需要 109 分钟。

文章目录

一、RabbitMQ延时队列

RabbitMQ延时队列实现定时任务。

场景:
比如未付款订单,超过一定时间后,系统自动取消订单并释放占有的库存。
常用解决方案:
spring的schedule定时任务轮训数据库
缺点:
消耗系统内存,增加了数据库的压力、存在较大的时间误差
解决:
Rabbit的消息 TTL 和死信Exchange结合。

消息的TTL

消息的TTL(Time To Live)就是消息的存活时间,单位是毫秒。。

RabbitMQ 可以对队列消息分别设置TTL。

  • 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就是死了,称之为死信
  • 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration 字段或者 x-message-ttl 属性来设置时间,两者是一样的效果。

注意:延时消息放入到队列中,没有被任何消费者监听,如果监听就拿到了,也就被消费了,队列里边的消息只要一过设置的过期时间,就成了死信队列,服务器就会丢弃。

那么,如何设置这个TTL值呢?有两种方式,第一种是在创建队列的时候设置队列的“x-message-ttl”属性,如下:

Map<String, Object> args = new HashMap<String, Object>();

args.put(“x-message-ttl”, 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

这样所有被投递到该队列的消息都最多不会存活超过6s。

另一种方式便是针对每条消息设置TTL,代码如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();

builder.expiration(“6000”);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, “msg body”.getBytes());

这样这条消息的过期时间也被设置成了6s。

但这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列没有严重的消息积压情况,则已过期的消息也许还能存活较长时间。

另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

死信

死信:Dead Letter Exchange(DLX)

一个消息在满足如下条件,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

  • 一个消息被Consumer拒收了,并且reject方法的参数里 requeue 是false。也就是说不会被放在队列里,被其他消费者使用。(basic.reject/basic.nack) requeue=false
  • 上面的消息的TTL到了,消息过期了。
  • 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由。

Dead Letter Exchange 其实就是一种普通的 exchange,和创建其他exchange一样。只是在某一个设置Dead Letter Exchange 的队列中有信息过期了,会自动触发消息的转发,发送到 Dead Letter Exhange中去。

我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列。
在这里插入图片描述

二、实战

延时关单

场景:用户下单,过了30分钟没有支付,系统会默认关闭该订单,以前可以用定时任务做,现在使用延时队列。

在这里插入图片描述

规范设计

设计建议规范(基于事件模型的交换机设计):

1、交换机命名:业务+exchange;交换机为Topic
2、路由键:事件.需要感知的业务(可以不写)
3、队列命名:事件+想要监听服务名+queue
4、绑定关系:事件.感知的业务(#)
整体业务设计:
在这里插入图片描述
按照上边的规范设计,对关单业务进行升级设计:
在这里插入图片描述
上图说明:交换机 order-event-exchange 绑定了一个延时队列order.delay.queue,路由key是 order.create.order, 当创建了一个订单时,会发消息到该延时队列,等到TTL过期,变为死信,会自动触发消息的转发,发送到 Dead Letter Exhange(order-event-exchange) 中去,注意死信路由是 order.release.order,然后exchange根据路由key order.release.order转发消息到 order.release.order.queue队列,客户端监听该队列获取消息。
根据上图的业务设计分析,需要创建两个队列,一个交换机,和两个绑定。
gulimall-order/xxx/order/config/MyMQConfig.java

package com.atguigu.gulimall.order.config;import com.atguigu.gulimall.order.entity.OrderEntity;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.Exchange;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.io.IOException;import java.util.HashMap;/** * @author: kaiyi * @create: 2020-09-16 13:53 */@Configurationpublic class MyMQConfig {  /* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 */  /**   * 客户端监听队列(测试)   * @param orderEntity   * @param channel   * @param message   * @throws IOException   */  @RabbitListener(queues = "order.release.order.queue")  public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {    System.out.println("收到过期的订单信息:准备关闭订单" + orderEntity.getOrderSn());    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  }  /**   * 死信队列   *   * @return   */  @Bean  public Queue orderDelayQueue(){     /*            Queue(String name,  队列名字            boolean durable,  是否持久化            boolean exclusive,  是否排他            boolean autoDelete, 是否自动删除            Map
arguments) 属性 */ HashMap
arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "order-event-exchange"); arguments.put("x-dead-letter-routing-key", "order.release.order"); arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟 Queue queue = new Queue("order.delay.queue", true, false, false, arguments); return queue; } /** * 普通队列 * * @return */ @Bean public Queue orderReleaseQueue(){ Queue queue = new Queue("order.release.order.queue", true, false, false); return queue; } /** * TopicExchange * * @return */ @Bean public Exchange orderEventExchange(){ /* * String name, * boolean durable, * boolean autoDelete, * Map
arguments * */ return new TopicExchange("order-event-exchange", true, false); } @Bean public Binding orderCreateBinding() { /* * String destination, 目的地(队列名或者交换机名字) * DestinationType destinationType, 目的地类型(Queue、Exhcange) * String exchange, * String routingKey, * Map
arguments * */ return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", // 路由key一般为事件名 null); } @Bean public Binding orderReleaseBinding() { return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null); }}

然后在控制器创建测试消息:

gulimall-order/xxx/order/web/HelloController.java

** * @author: kaiyi * @create: 2020-09-12 18:09 */@Controllerpublic class HelloController {  @Autowired  private RabbitTemplate rabbitTemplate;  @ResponseBody  @GetMapping(value = "/test/createOrder")  public String createOrderTest() {    //订单下单成功    OrderEntity orderEntity = new OrderEntity();    orderEntity.setOrderSn(UUID.randomUUID().toString());    orderEntity.setModifyTime(new Date());    //给MQ发送消息    rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);    return "ok";  }}

然后访问该路径 http://order.gulimall.com/test/createOrder, 发送消息,然后去RMQ管理界面可以看到创建的消息已经成功了。

交换机:
在这里插入图片描述
交换机绑定的队列(路由key):
在这里插入图片描述
队列:
在这里插入图片描述
可以看到第一个队列是死信队列,第二个事普通队列
收到的消息为实体对象json:
在这里插入图片描述
控制器输出的监控信息:

收到过期的订单信息:准备关闭订单321c3329-d57a-4613-a4ff-331066d4105a

收到过期的订单信息:准备关闭订单44fcf65f-1e7a-40c6-8336-a6c60362920b

三、消息队列处理库存解锁及关单

1、流程分析

在这里插入图片描述

在这里插入图片描述

2、库存微服务

2.1 解锁库存配置

1、库存微服务gulimall-ware 引入高级消息队列amqp依赖:

gulimall-ware/pom.xml

org.springframework.boot
spring-boot-starter-amqp

2、添加RMQ配置

gulimall-ware/src/main/resources/application.properties

# ===== RabbitMQ配置 ======spring.rabbitmq.host=192.168.10.10spring.rabbitmq.port=5672# 虚拟主机配置spring.rabbitmq.virtual-host=/# 开启发送端消息抵达Broker确认spring.rabbitmq.publisher-confirms=true# 开启发送端消息抵达Queue确认spring.rabbitmq.publisher-returns=true# 只要消息抵达Queue,就会异步发送优先回调returnfirmspring.rabbitmq.template.mandatory=true# 手动ack消息,不使用默认的消费端确认spring.rabbitmq.listener.simple.acknowledge-mode=manual

3、创建RMQ配置文件

gulimall-ware/xxx/ware/config/MyRabbitMQConfig.java

package com.atguigu.gulimall.ware.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.Exchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;/** * RMQ配置 * * @author: kaiyi * @createTime: 2020-09-15 16:40 **/@Configurationpublic class MyRabbitMQConfig {    /**     * 使用JSON序列化机制,进行消息转换     * @return     */    @Bean    public MessageConverter messageConverter() {        return new Jackson2JsonMessageConverter();    }    // @RabbitListener(queues = "stock.release.stock.queue")    // public void handle(Message message) {    //    // }    /**     * 库存服务默认的交换机     * @return     */    @Bean    public Exchange stockEventExchange() {        //String name, boolean durable, boolean autoDelete, Map
arguments TopicExchange topicExchange = new TopicExchange("stock-event-exchange", true, false); return topicExchange; } /** * 普通队列 * @return */ @Bean public Queue stockReleaseStockQueue() { //String name, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments Queue queue = new Queue("stock.release.stock.queue", true, false, false); return queue; } /** * 延迟队列 * @return */ @Bean public Queue stockDelay() { HashMap
arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "stock-event-exchange"); arguments.put("x-dead-letter-routing-key", "stock.release"); // 消息过期时间 2分钟 arguments.put("x-message-ttl", 120000); Queue queue = new Queue("stock.delay.queue", true, false, false,arguments); return queue; } /** * 交换机与普通队列绑定 * @return */ @Bean public Binding stockLocked() { //String destination, DestinationType destinationType, String exchange, String routingKey, // Map
arguments Binding binding = new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.release.#", null); return binding; } /** * 交换机与延迟队列绑定 * @return */ @Bean public Binding stockLockedBinding() { return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.locked", null); }}

2.2 解锁库存流程

解锁库存流程:

在这里插入图片描述

可以看到,在锁定库存时,我们增加了库存工作单,用来记录库存锁定的明细记录,如果库存锁定异常,则会回滚,该表不会有数据记录,如果锁定成功,则会有具体的锁定记录,锁定成功后会发送消息到延时队列,过段时间会根据订单创建的状态(订单取消或订单未创建成功)来解锁库存。
解锁库存具体步骤:
在这里插入图片描述

2.3 业务代码

锁库存

锁库存,并发送消息到延时队列,方法 orderLockStock(WareSkuLockVo vo)
gulimall-ware/xxx/ware/service/impl/WareSkuServiceImpl.java

package com.atguigu.gulimall.ware.service.impl;import com.alibaba.fastjson.TypeReference;import com.atguigu.common.exception.NoStockException;import com.atguigu.common.to.mq.StockDetailTo;import com.atguigu.common.to.mq.StockLockedTo;import com.atguigu.common.utils.R;import com.atguigu.gulimall.ware.entity.WareOrderTaskDetailEntity;import com.atguigu.gulimall.ware.entity.WareOrderTaskEntity;import com.atguigu.gulimall.ware.feign.OrderFeignService;import com.atguigu.gulimall.ware.feign.ProductFeignService;import com.atguigu.gulimall.ware.service.WareOrderTaskDetailService;import com.atguigu.gulimall.ware.service.WareOrderTaskService;import org.springframework.transaction.annotation.Transactional;@Service("wareSkuService")public class WareSkuServiceImpl extends ServiceImpl
implements WareSkuService { @Autowired WareSkuDao wareSkuDao; @Autowired ProductFeignService productFeignService; @Autowired private WareOrderTaskService wareOrderTaskService; @Autowired private WareOrderTaskDetailService wareOrderTaskDetailService; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private OrderFeignService orderFeignService; /** * 为某个订单锁定库存 * @param vo * @return */ @Transactional(rollbackFor = Exception.class) @Override public boolean orderLockStock(WareSkuLockVo vo) { /** * 保存库存工作单详情信息 * 追溯 */ WareOrderTaskEntity wareOrderTaskEntity = new WareOrderTaskEntity(); wareOrderTaskEntity.setOrderSn(vo.getOrderSn()); wareOrderTaskEntity.setCreateTime(new Date()); wareOrderTaskService.save(wareOrderTaskEntity); //1、按照下单的收货地址,找到一个就近仓库,锁定库存 //2、找到每个商品在哪个仓库都有库存 List
locks = vo.getLocks(); List
collect = locks.stream().map((item) -> { SkuWareHasStock stock = new SkuWareHasStock(); Long skuId = item.getSkuId(); stock.setSkuId(skuId); stock.setNum(item.getCount()); //查询这个商品在哪个仓库有库存 List
wareIdList = wareSkuDao.listWareIdHasSkuStock(skuId); stock.setWareId(wareIdList); return stock; }).collect(Collectors.toList()); //2、锁定库存 for (SkuWareHasStock hasStock : collect) { boolean skuStocked = false; Long skuId = hasStock.getSkuId(); List
wareIds = hasStock.getWareId(); if (org.springframework.util.StringUtils.isEmpty(wareIds)) { //没有任何仓库有这个商品的库存,抛出异常,前边已经锁定的库存也一起会回滚 throw new NoStockException(skuId); } //1、如果每一个商品都锁定成功,将当前商品锁定了几件的工作单记录发给MQ //2、锁定失败。前面保存的工作单信息都回滚了。发送出去的消息,即使要解锁库存,由于在数据库查不到指定的id,所有就不用解锁 for (Long wareId : wareIds) { //锁定成功就返回1,失败就返回0 Long count = wareSkuDao.lockSkuStock(skuId,wareId,hasStock.getNum()); if (count == 1) { skuStocked = true; WareOrderTaskDetailEntity taskDetailEntity = WareOrderTaskDetailEntity.builder() .skuId(skuId) .skuName("") .skuNum(hasStock.getNum()) .taskId(wareOrderTaskEntity.getId()) .wareId(wareId) .lockStatus(1) .build(); wareOrderTaskDetailService.save(taskDetailEntity); //TODO 告诉MQ库存锁定成功 StockLockedTo lockedTo = new StockLockedTo(); lockedTo.setId(wareOrderTaskEntity.getId()); StockDetailTo detailTo = new StockDetailTo(); BeanUtils.copyProperties(taskDetailEntity,detailTo); lockedTo.setDetailTo(detailTo); rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",lockedTo); break; } else { //当前仓库锁失败,重试下一个仓库 } } if (skuStocked == false) { //当前商品所有仓库都没有锁住 throw new NoStockException(skuId); } } //3、肯定全部都是锁定成功的 return true; } @Override public void unlockStock(StockLockedTo to) { //库存工作单的id StockDetailTo detail = to.getDetailTo(); Long detailId = detail.getId(); /** * 解锁 * 1、查询数据库关于这个订单锁定库存信息 * 有:证明库存锁定成功了 * 解锁:订单状况 * 1、没有这个订单,必须解锁库存 * 2、有这个订单,不一定解锁库存 * 订单状态:已取消:解锁库存 * 已支付:不能解锁库存 */ WareOrderTaskDetailEntity taskDetailInfo = wareOrderTaskDetailService.getById(detailId); if (taskDetailInfo != null) { //查出wms_ware_order_task工作单的信息 Long id = to.getId(); WareOrderTaskEntity orderTaskInfo = wareOrderTaskService.getById(id); //获取订单号查询订单状态 String orderSn = orderTaskInfo.getOrderSn(); //远程查询订单信息 R orderData = orderFeignService.getOrderStatus(orderSn); if (orderData.getCode() == 0) { //订单数据返回成功 OrderVo orderInfo = orderData.getData("data", new TypeReference
() {}); //判断订单状态是否已取消或者支付或者订单不存在 if (orderInfo == null || orderInfo.getStatus() == 4) { //订单已被取消,才能解锁库存 if (taskDetailInfo.getLockStatus() == 1) { //当前库存工作单详情状态1,已锁定,但是未解锁才可以解锁 unLockStock(detail.getSkuId(),detail.getWareId(),detail.getSkuNum(),detailId); } } } else { //消息拒绝以后重新放在队列里面,让别人继续消费解锁 //远程调用服务失败 throw new RuntimeException("远程调用服务失败"); } } else { //无需解锁 } } /** * 解锁库存的方法 * @param skuId * @param wareId * @param num * @param taskDetailId */ public void unLockStock(Long skuId,Long wareId,Integer num,Long taskDetailId) { //库存解锁 wareSkuDao.unLockStock(skuId,wareId,num); //更新工作单的状态 WareOrderTaskDetailEntity taskDetailEntity = new WareOrderTaskDetailEntity(); taskDetailEntity.setId(taskDetailId); taskDetailEntity.setLockStatus(2); //变为已解锁 wareOrderTaskDetailService.updateById(taskDetailEntity); } @Data class SkuWareHasStock { private Long skuId; private Integer num; private List
wareId; }}

监听队列:

gulimall-ware/xxx/ware/listener/StockReleaseListener.java

package com.atguigu.gulimall.ware.listener;import com.atguigu.common.to.mq.StockLockedTo;import com.atguigu.gulimall.ware.service.WareSkuService;import com.rabbitmq.client.Channel;/** * 库存解锁监听 * * @desc * 库存锁定成功发送消息到延时队列 stock.locked(路由key),超时TTL,消息进入私信路由,然后转发到解锁库存的队列。 * * @author: kaiyi * @create: 2020-09-16 19:01 */@Slf4j@RabbitListener(queues = "stock.release.stock.queue")@Servicepublic class StockReleaseListener {  @Autowired  private WareSkuService wareSkuService;  /**   * 1、库存自动解锁   *  下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁   *   *  2、订单失败   *      库存锁定失败   *   *   只要解锁库存的消息失败,一定要告诉服务解锁失败   */  @RabbitHandler  public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {    log.info("******收到解锁库存的信息******");    try {      //当前消息是否被第二次及以后(重新)派发过来了      // Boolean redelivered = message.getMessageProperties().getRedelivered();      //解锁库存      wareSkuService.unlockStock(to);      // 手动删除消息      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);    } catch (Exception e) {      // 解锁失败 将消息重新放回队列,让别人消费      channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);    }  }}

上边就是创建订单后锁库存,发消息到延时队列,监听队列,创建的订单出现异常是否来解锁库存,手动确认消息的核心代码逻辑。

2.4 调试

订单结算页,购买了一件商品。

在这里插入图片描述
在提交订单时,远程锁库存成功后模拟代码异常:
在这里插入图片描述
提交订单,由于异常会回滚订单并且回退到结算页,连续提交三次,我们可以看到延时队列里边有三条信息。
在这里插入图片描述
库存表wms_ware_sku,原来锁定了 3 件库存,现在库存锁定为6,因为库存是远程锁定的,所以,主程序事务回滚对远程的不起作用,不过在锁定库存成功时发库存锁定成功的消息,后边通过消息会检查是否释放库存。
在这里插入图片描述
库存工作单主表:
在这里插入图片描述
库存工作单明细表,我们可以看到新增的3条记录,明细状态lock_status(1-已锁定 2-已解锁 3-扣减)
在这里插入图片描述
订单服务订单表:
在这里插入图片描述
然后等到消息过期进入死信路由,TTL后客户端监听消息判断是否释放库存,消息在判断的时候先根据生成的订单号远程查询gulimall-order是否存在对应的订单,如果不存在,则直接释放锁定的库存,因为在生成订单的时候抛出异常生成的订单回滚了,所以 oms_order 表不存在订单,这时监听的消息拿到延时消息后,做完判断后会触发解锁库存的动作。
过了几分钟后,我们可以看到消息已经被消费了,并且锁定的库存也释放了,变回原来的 3 件。
消息队列:
在这里插入图片描述
库存表:
在这里插入图片描述
可以看到,RMQ在解决分布式事务一致性问题上非常强大,不仅实现了解耦,而且还保证了可靠消息+最终一致性。

四、RMQ 延时队列处理关单及库存解锁整合

1、流程分析

在这里插入图片描述

步骤:

  • 1、订单创建成功,发送消息给MQ
  • 2、订单服务订单关单监听器(死信之后判断是否关单)
  • 3、订单关单成功后发消息给MQ(订单释放直接和库存释放进行绑定)
  • 4、库存服务是释放库存监听器监听是否解锁库存队列(stock.release.stock.queue)
  • 5、库存解锁处理逻辑

这里出现了两个交换机绑定同一个队列的情况,即订单的交换机和库存的队列绑定在一起了。

2、订单关单

1、订单释放直接和库存释放进行绑定

gulimall-order/xxx/order/config/MyMQConfig.java

package com.atguigu.gulimall.order.config;import com.atguigu.gulimall.order.entity.OrderEntity;import com.rabbitmq.client.AMQP;/** * @author: kaiyi * @create: 2020-09-16 13:53 */@Configurationpublic class MyMQConfig {  /* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 */  /**   * 客户端监听队列(测试)   * @param orderEntity   * @param channel   * @param message   * @throws IOException   */  /*  @RabbitListener(queues = "order.release.order.queue")  public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {    System.out.println("收到过期的订单信息:准备关闭订单" + orderEntity.getOrderSn());    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  }   */  /**   * 死信队列   *   * @return   */  @Bean  public Queue orderDelayQueue(){     /*            Queue(String name,  队列名字            boolean durable,  是否持久化            boolean exclusive,  是否排他            boolean autoDelete, 是否自动删除            Map
arguments) 属性 */ HashMap
arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "order-event-exchange"); // 信死了交给哪个交换机 arguments.put("x-dead-letter-routing-key", "order.release.order"); // 信死了交给哪个路由key arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟 Queue queue = new Queue("order.delay.queue", true, false, false, arguments); return queue; } /** * 普通队列 * * @return */ @Bean public Queue orderReleaseQueue(){ Queue queue = new Queue("order.release.order.queue", true, false, false); return queue; } /** * TopicExchange * * @return */ @Bean public Exchange orderEventExchange(){ /* * String name, * boolean durable, * boolean autoDelete, * Map
arguments * */ return new TopicExchange("order-event-exchange", true, false); } @Bean public Binding orderCreateBinding() { /* * String destination, 目的地(队列名或者交换机名字) * DestinationType destinationType, 目的地类型(Queue、Exhcange) * String exchange, * String routingKey, * Map
arguments * */ return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", // 路由key一般为事件名 null); } @Bean public Binding orderReleaseBinding() { return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null); } /** * 订单释放直接和库存释放进行绑定 * @return */ @Bean public Binding orderReleaseOtherBinding() { return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.other.#", null); }}

2、提交订单增加订单创建成功,发送消息给MQ

gulimall-order/xxx/order/service/impl/OrderServiceImpl.java

/**     * 提交订单     * @param vo     * @return     */    // @Transactional(isolation = Isolation.READ_COMMITTED) 设置事务的隔离级别    // @Transactional(propagation = Propagation.REQUIRED)   设置事务的传播级别    @Transactional(rollbackFor = Exception.class)    // @GlobalTransactional(rollbackFor = Exception.class)    @Override    public SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) {        confirmVoThreadLocal.set(vo);        SubmitOrderResponseVo responseVo = new SubmitOrderResponseVo();        //去创建、下订单、验令牌、验价格、锁定库存...        //获取当前用户登录的信息        MemberResponseVo memberResponseVo = LoginUserInterceptor.loginUser.get();        responseVo.setCode(0);        //1、验证令牌是否合法【令牌的对比和删除必须保证原子性】        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";        String orderToken = vo.getOrderToken();        //通过lua脚本原子验证令牌和删除令牌        Long result = redisTemplate.execute(new DefaultRedisScript
(script, Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseVo.getId()), orderToken); if (result == 0L) { //令牌验证失败 responseVo.setCode(1); return responseVo; } else { //令牌验证成功 //1、创建订单、订单项等信息 OrderCreateTo order = createOrder(); //2、验证价格 BigDecimal payAmount = order.getOrder().getPayAmount(); BigDecimal payPrice = vo.getPayPrice(); if (Math.abs(payAmount.subtract(payPrice).doubleValue()) < 0.01) { //金额对比 //TODO 3、保存订单 saveOrder(order); //4、库存锁定,只要有异常,回滚订单数据 //订单号、所有订单项信息(skuId,skuNum,skuName) WareSkuLockVo lockVo = new WareSkuLockVo(); lockVo.setOrderSn(order.getOrder().getOrderSn()); //获取出要锁定的商品数据信息 List
orderItemVos = order.getOrderItems().stream().map((item) -> { OrderItemVo orderItemVo = new OrderItemVo(); orderItemVo.setSkuId(item.getSkuId()); orderItemVo.setCount(item.getSkuQuantity()); orderItemVo.setTitle(item.getSkuName()); return orderItemVo; }).collect(Collectors.toList()); lockVo.setLocks(orderItemVos); //TODO 调用远程锁定库存的方法 //出现的问题:扣减库存成功了,但是由于网络原因超时,出现异常,导致订单事务回滚,库存事务不回滚(解决方案:seata) //为了保证高并发,不推荐使用seata,因为是加锁,并行化,提升不了效率,可以发消息给库存服务 R r = wmsFeignService.orderLockStock(lockVo); if (r.getCode() == 0) { //锁定成功 responseVo.setOrder(order.getOrder()); // int i = 10/0; // 抛出异常,测试远程回滚 //TODO 订单创建成功,发送消息给MQ rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order.getOrder()); //删除购物车里的数据 redisTemplate.delete(CartConstant.CART_PREFIX + memberResponseVo.getId()); return responseVo; } else { //锁定失败 String msg = (String) r.get("msg"); throw new NoStockException(msg); // responseVo.setCode(3); // return responseVo; } } else { responseVo.setCode(2); return responseVo; } } }

3、关单监听

gulimall-order/xxx/order/listener/OrderCloseListener.java

package com.atguigu.gulimall.order.listener;import com.atguigu.gulimall.order.entity.OrderEntity;import com.atguigu.gulimall.order.service.OrderService;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.io.IOException;/** * 关单监听 * * @author: kaiyi * @create: 2020-09-17 11:01 */@RabbitListener(queues = "order.release.order.queue")@Servicepublic class OrderCloseListener {  @Autowired  private OrderService orderService;  @RabbitHandler  public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {    System.out.println("收到过期的订单信息,准备关闭订单" + orderEntity.getOrderSn());    try {      orderService.closeOrder(orderEntity);      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);    } catch (Exception e) {      channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);    }  }}

4、关单成功给库存服务发送MQ消息

gulimall-order/xxx/order/service/impl/OrderServiceImpl.java

/**     * 关闭订单     * @param orderEntity     */    @Override    public void closeOrder(OrderEntity orderEntity) {        //关闭订单之前先查询一下数据库,判断此订单状态是否已支付        OrderEntity orderInfo = this.getOne(new QueryWrapper
(). eq("order_sn",orderEntity.getOrderSn())); if (orderInfo.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) { //代付款状态进行关单 OrderEntity orderUpdate = new OrderEntity(); orderUpdate.setId(orderInfo.getId()); orderUpdate.setStatus(OrderStatusEnum.CANCLED.getCode()); this.updateById(orderUpdate); // 发送消息给MQ OrderTo orderTo = new OrderTo(); BeanUtils.copyProperties(orderInfo, orderTo); try { //TODO 确保每个消息发送成功,给每个消息做好日志记录,(给数据库保存每一个详细信息)保存每个消息的详细信息 rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo); } catch (Exception e) { //TODO 定期扫描数据库,重新发送失败的消息 } } }

3、订单释放和库存释放进行绑定

1、库存服务-库存释放监听器增加订单关单库存释放处理方法

gulimall-ware/xxx/ware/listener/StockReleaseListener.java

package com.atguigu.gulimall.ware.listener;import com.atguigu.common.to.OrderTo;import com.atguigu.common.to.mq.StockLockedTo;/** * 库存解锁监听 * * @desc * 库存锁定成功发送消息到延时队列 stock.locked(路由key),超时TTL,消息进入私信路由,然后转发到解锁库存的队列。 * * @author: kaiyi * @create: 2020-09-16 19:01 */@Slf4j@RabbitListener(queues = "stock.release.stock.queue")@Servicepublic class StockReleaseListener {  @Autowired  private WareSkuService wareSkuService;  /**   * 1、库存自动解锁   *  下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁   *   *  2、订单失败   *      库存锁定失败   *   *   只要解锁库存的消息失败,一定要告诉服务解锁失败   */  @RabbitHandler  public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {    log.info("******收到解锁库存的信息******");    try {      //当前消息是否被第二次及以后(重新)派发过来了      // Boolean redelivered = message.getMessageProperties().getRedelivered();      //解锁库存      wareSkuService.unlockStock(to);      // 手动删除消息      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);    } catch (Exception e) {      // 解锁失败 将消息重新放回队列,让别人消费      channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);    }  }  /**   * 订单关单库存释放   *   * @param orderTo   * @param message   * @param channel   * @throws IOException   */  @RabbitHandler  public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {    log.info("******收到订单关闭,准备解锁库存的信息******");    try {      wareSkuService.unlockStock(orderTo);      // 手动删除消息      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);    } catch (Exception e) {      // 解锁失败 将消息重新放回队列,让别人消费      channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);    }  }}

这个监听器既可以处理库存解锁又可以处理订单关单的处理业务,根据参数来决定具体调用哪一个,这是一个重载。

2、具体解锁库存实现

gulimall-ware/xxx/ware/service/impl/WareSkuServiceImpl.java

/**     * 防止订单服务卡顿,导致订单状态消息一直改不了,库存优先到期,查订单状态新建,什么都不处理     * 导致卡顿的订单,永远都不能解锁库存     * @param orderTo     */    @Transactional(rollbackFor = Exception.class)    @Override    public void unlockStock(OrderTo orderTo) {        String orderSn = orderTo.getOrderSn();        //查一下最新的库存解锁状态,防止重复解锁库存        WareOrderTaskEntity orderTaskEntity = wareOrderTaskService.getOrderTaskByOrderSn(orderSn);        //按照工作单的id找到所有 没有解锁的库存,进行解锁        Long id = orderTaskEntity.getId();        List
list = wareOrderTaskDetailService.list(new QueryWrapper
() .eq("task_id", id).eq("lock_status", 1)); for (WareOrderTaskDetailEntity taskDetailEntity : list) { unLockStock(taskDetailEntity.getSkuId(), taskDetailEntity.getWareId(), taskDetailEntity.getSkuNum(), taskDetailEntity.getId()); } }

五、消息丢失、重复、积压等解决方案

高并发场景的分布式事务,我们采用柔性事务+可靠消息+最终一致性方案(异步确保型),可靠性是最重要的,那么如何保证消息的可靠性呢?

1、消息丢失

1、消息发送出去,由于网络问题没有抵达服务器

  • 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有容错机制,可记录到数据库,采用定期扫描重发的方式。
  • 做好日志记录,每个消息状态是否都被服务器收到都应该记录
  • 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发

代码示例:

/**     * 关闭订单     * @param orderEntity     */    @Override    public void closeOrder(OrderEntity orderEntity) {        //关闭订单之前先查询一下数据库,判断此订单状态是否已支付        OrderEntity orderInfo = this.getOne(new QueryWrapper
(). eq("order_sn",orderEntity.getOrderSn())); if (orderInfo.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) { //代付款状态进行关单 OrderEntity orderUpdate = new OrderEntity(); orderUpdate.setId(orderInfo.getId()); orderUpdate.setStatus(OrderStatusEnum.CANCLED.getCode()); this.updateById(orderUpdate); // 发送消息给MQ OrderTo orderTo = new OrderTo(); BeanUtils.copyProperties(orderInfo, orderTo); try { //TODO 确保每个消息发送成功,给每个消息做好日志记录,(给数据库保存每一个详细信息)保存每个消息的详细信息 rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo); } catch (Exception e) { //TODO 定期扫描数据库,重新发送失败的消息 // while() 重试次数 } } }

创建消息日志记录表:

在这里插入图片描述
2、消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。

  • publisher 也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。

生产者消息确认回调应该增加日志记录,确认回调成功后修改记录日志的状态:

gulimall-order/xxx/order/config/MyRabbitConfig.java

/**   * 定制RabbitTemplate   * 1、服务收到消息就会回调   * 1、spring.rabbitmq.publisher-confirms: true   * 2、设置确认回调   * 2、消息正确抵达队列就会进行回调   * 1、spring.rabbitmq.publisher-returns: true   * spring.rabbitmq.template.mandatory: true   * 2、设置确认回调ReturnCallback   * 

* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息) */ // @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法 public void initRabbitTemplate() { /** * 1、只要消息抵达Broker就ack=true * correlationData:当前消息的唯一关联数据(这个是消息的唯一id) * ack:消息是否成功收到 * cause:失败的原因 */ //设置确认回调 rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> { /** * 1、做好消息确认机制(publisher,consumer【手动ack】】) * 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再发送一遍 */ // 服务器收到生产者发送的消息了 // 修改消息的状态 System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]"); }); /** * 2、只要消息没有投递给指定的队列,就触发这个失败回调 * message:投递失败的消息详细信息 * replyCode:回复的状态码 * replyText:回复的文本内容 * exchange:当时这个消息发给哪个交换机 * routingKey:当时这个消息用哪个路邮键 */ rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> { System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" + "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]"); }); }

3、自动ACK的状态下。消费者收到消息,但没来得及消费然后宕机。

  • 一定开启手动ACK,消费成功才移除,失败或者还没来得及处理就 noAck并重新入队。

防止消息丢失记住这两条:

1、做好消息确认机制(publisher,consumer【手动ack】】)

2、每一个发送的消息都在数据库做好记录。定期将失败的消息再发送一遍

2、消息重复

出现重复的几种情况

  • 1、消息消费成功,事务已经提交,ack时,机器宕机,导致没有ack成功。
    • Broker的消息重新由 unack 变为ready,并发送给其他消费者
  • 2、消息消费失败,由于重试机制,自动又将消息发送出去。
  • 3、成功消费,ack时宕机,消息又unack变为ready,Broker又重新发送

解决方案

  • 消费者的业务消费接口应该设计为幂等性的。比如扣库存有工作单的状态标识。
  • 使用防重表(redis/mysql),发送消息每一个都有业务的唯一标识,处理过就不用再处理。
  • rabbitMQ的每一个消息都有 redilivered字段,可以获取是否被重新投递过来的,而不是第一次被投递过来的。

3、消息积压

  • 消费者宕机
  • 消费者消费能力不足
  • 发送者发送流量太大
    • 上线更多的消费者,进行正常的消费
    • 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理

转载地址:http://lonti.baihongyu.com/

你可能感兴趣的文章
nginx反代 499 502 bad gateway 和timeout
查看>>
linux虚拟机安装tar.gz版jdk步骤详解
查看>>
python实现100以内自然数之和,偶数之和
查看>>
python数字逆序输出及多个print输出在同一行
查看>>
苏宁产品经理面经
查看>>
百度产品经理群面
查看>>
去哪儿一面+平安科技二面+hr面+贝贝一面+二面产品面经
查看>>
pytorch
查看>>
pytorch(三)
查看>>
ubuntu相关
查看>>
C++ 调用json
查看>>
nano中设置脚本开机自启动
查看>>
动态库调动态库
查看>>
Kubernetes集群搭建之CNI-Flanneld部署篇
查看>>
k8s web终端连接工具
查看>>
手绘VS码绘(一):静态图绘制(码绘使用P5.js)
查看>>
手绘VS码绘(二):动态图绘制(码绘使用Processing)
查看>>
基于P5.js的“绘画系统”
查看>>
《达芬奇的人生密码》观后感
查看>>
论文翻译:《一个包容性设计的具体例子:聋人导向可访问性》
查看>>