java按时间梯度异步通知第三方实现方案

业务背景

在对外提供的API 接口中,处理完自身的业务逻辑后需要调用第三方系统接口,将相应的处理结果通知给对方,就像微信、支付宝支付后 异步通知支付结果一样,按照
1s,2s,5s,1m,5m.....这种自定义的的时间梯度来通知第三方接口。

实现思路

在业务完成后把要推送的消息存入数据库,并且发送至mq的延时消息队列,在mq 消费时判断本次推送等级并且计算下一等级推送时间,如果本次回调第三方未得到正确
响应则继续发送下一等级的mq 延时队列。

使用技术

springboot 2.x + rocketmq-spring-boot-starter-2.0.4 + mysql

数据库表设计

回调通知数据库表设计:

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for tbl_callback_notify
-- ----------------------------
DROP TABLE IF EXISTS `tbl_callback_notify`;
CREATE TABLE `tbl_callback_notify` (
  `id` bigint(11) NOT NULL AUTO_INCREMENT,
  `mcht_no` varchar(50) DEFAULT NULL COMMENT '商户号',
  `order_no` varchar(50) DEFAULT NULL COMMENT '订单号',
  `notify_url` varchar(255) DEFAULT NULL COMMENT '通知url',
  `notify_data` text COMMENT '通知内容',
  `notify_times` int(11) DEFAULT '0' COMMENT '通知次数(等级)',
  `last_notify_time` datetime DEFAULT NULL COMMENT '最后一次通知时间',
  `next_notify_time` datetime DEFAULT NULL COMMENT '下次通知时间',
  `status` int(1) DEFAULT '0' COMMENT '状态 0 未完成 1已完成',
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COMMENT='支付回调';

自定义时间梯度:

在rocketmq-spring-boot-starter中,实现延时队列有固定的18个等级,每个等级对应的延时时长分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m
1h 2h,在自己的业务中 我们可以选取部分等级来作为回调通知的时间梯度,我这里选取了1s 5s 10s 30s 1m 5m 10m 30m 1h 2h 作为我方通知的时间梯度策略

代码实现

public class MQConstants {
    /**
     * 最大回调通知次数
     */
    public static final Integer MAX_NOTIFY_TIMES = 10;

    /**
     * rocekt mq 延时等级对应的秒数
     */
    public static final Integer[] ROCKET_MQ_DELAY_LEVEL_SECOND = {0,1,5,10,30,60,120,180,240,300,360,420,480,540,600,1200,1800,3600,7200};

    /**
     * MQ group
     */
    public static final String QUEUE_GROUP_PAY_CALLBACK="queue_group_pay_callback";
    public static final String QUEUE_GROUP_ORDER_SYNC="queue_group_order_sync";

    /**
     * 系统通知为最大10次,每次通知对应mq 等级<br/>
     * 回调通知频率,对应rocketmq 延时等级策略<br/>
     * rocketmq 延时等级: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h<br/>
     * 取mq的 1s 5s 10s 30s 1m 5m 10m 30m 1h 2h 作为我方通知策略
     */
    public static final Integer[] CALLBACK_PUSH_FREQUENCY_TO_MQ_LEVEL = {0,1,2,3,4,5,9,14,16,17,18};
    public static final String SYNC_ORDER_TOPIC = "sync_order_topic";
    public static final String PAY_callback_TOPIC = "pay_callback_topic";
    }

业务处理完后异步调用此service的callback方法进行通知

    @Slf4j
    @Service
    public class TblCallbackNotifyServiceImpl extends ServiceImpl<TblCallbackNotifyMapper, TblCallbackNotify> implements ITblCallbackNotifyService {
    @Resource
    private TblCallbackNotifyMapper mapper;
    @Resource
    private ISginService sginService;
    @Resource
    private ITblAgentInfoService tblAgentInfoService;
    @Resource
    private ITblInnerMchInfoService tblInnerMchInfoService;
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    private ExecutorService executorService = Executors.newFixedThreadPool(10);
    @Override
    public void callback(String orderNo,String mchtNo,String notifyUrl) {
        executorService.execute(()->{
            // 校验是否正在进行回调
            TblCallbackNotify tblCallbackNotify = mapper.selectOne(new LambdaQueryWrapper<TblCallbackNotify>().eq(TblCallbackNotify::getOrderNo, orderInfo.getOrderNo()));
            if(ObjectUtil.isEmpty(tblCallbackNotify)) {
                tblCallbackNotify = new TblCallbackNotify();
                tblCallbackNotify.setOrderNo(orderNo);
                tblCallbackNotify.setMchtNo(mchtNo);
                tblCallbackNotify.setNotifyUrl(notifyUrl);
                tblCallbackNotify.setNotifyData(JSON.toJSONString(这里写推送内容));
                tblCallbackNotify.setNotifyTimes(0);
                mapper.insert(tblCallbackNotify);
            }
            rocketMQTemplate.syncSend(MQConstants.PAY_callback_TOPIC, MessageBuilder.withPayload(JSONObject.toJSONString(tblCallbackNotify)).build());
        });
      }
    }

rocketMQ监听器

    /**
     * 回调消息推送监听<br/>
     * 如果此次推送失败或者收到的响应是false的时候 重新计算下次推送时间,并且再次发送下一级延时等级的队列
     * @author
     *
     */
    @Component
    @RocketMQMessageListener(consumerGroup = MQConstants.QUEUE_GROUP_PAY_CALLBACK, topic = MQConstants.PAY_callback_TOPIC, consumeMode = ConsumeMode.ORDERLY)
    @Slf4j
    public class PayCallbackListener implements RocketMQListener<MessageExt> {
    @Autowired
    private ITblCallbackNotifyService callbackNotifyService;
    @Value("${rocketmq.producer.send-message-timeout}")
    private Integer messageTimeOut;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void onMessage(MessageExt message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        log.info("接收到消息:<{}>", msg);
        Date nowDate = new Date();
        TblCallbackNotify callbackNotify = JSONObject.parseObject(msg, TblCallbackNotify.class);
        callbackNotify.setLastNotifyTime(nowDate);
        Integer times = callbackNotify.getNotifyTimes();
        times++;
        // 大于最大通知次数 直接结束
        if(times > MQConstants.MAX_NOTIFY_TIMES) {
            return;
        }
        // 下一次推送延时等级
        Integer nextLevel = MQConstants.CALLBACK_PUSH_FREQUENCY_TO_MQ_LEVEL[times];
        // 计算下一次推送的秒
        Integer nextNotifySecond = MQConstants.ROCKET_MQ_DELAY_LEVEL_SECOND[nextLevel];
        callbackNotify.setNextNotifyTime(DateUtil.offsetSecond(nowDate, nextNotifySecond));
        callbackNotify.setNotifyTimes(times);
        try {
            String resultStr =HttpUtil.post(callbackNotify.getNotifyUrl(),callbackNotify.getNotifyData());
            // 推送成功
            if(StringUtils.endsWithIgnoreCase(CommonConstants.pay_notify_succ,resultStr)){
                callbackNotify.setStatus(2);
            }else {
                rocketMQTemplate.syncSend(MQConstants.PAY_callback_TOPIC, MessageBuilder.withPayload(JSONObject.toJSONString(callbackNotify)).build(), messageTimeOut, nextLevel);
            }
        } catch (Exception e) {
            log.error("推送回调消息异常", e);
            rocketMQTemplate.syncSend(MQConstants.PAY_callback_TOPIC, MessageBuilder.withPayload(JSONObject.toJSONString(callbackNotify)).build(), messageTimeOut, nextLevel);
        }
        callbackNotify.setLastNotifyTime(nowDate);
        callbackNotifyService.updateById(callbackNotify);
      }
}
1. 本站所有资源来源于用户上传和网络,如有侵权请及时联系删除,本站不承担任何法律责任!
2. 分享目的仅供大家学习和研究,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的教程、源码等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需!
7. 如遇到加密压缩包,默认解压密码为"www.94zyw.com",如遇到无法解压的请联系管理员!
94资源网 » java按时间梯度异步通知第三方实现方案