延时任务架构

类关系图

说明
  1. AbstractDelayQueueMachineFactory 延时任务工厂抽象类,内部使用redis的Sorted Set实现了延时任务的执行,通过使用redis的Sorted Set结构存储延时任务的执行时间、任务描述。把任务描述序列化成字符串,放在Sorted Set的value中,然后把任务的执行时间戳作为score,利用Sorted Set天然的排序特性,执行时刻越早的会排在越前面。开一个定时线程,每隔一段时间去查一下这个Sorted Set中score小于或等于当前时间戳的元素,然后再执行元素对应的任务。执行完任务后,将元素从Sorted Set中删除,避免任务重复执行。
  2. PromotionDelayQueue 延时队列类,继承延时任务工厂抽象类,定义延时任务传递的对象和使用的延时队列名称
  3. RocketmqTimerTrigger 延时任务类,实现了添加延时任务(addDelay),修改延时任务,删除延时任务的功能。添加延时任务时,通过在redis中添加一个唯一的key来标示当前延时任务的唯一性,延时任务执行类时到时间后执行延时任务时,通过首先检查redis中的key是否存在,存在则执行,不存在则认为该任务已被取消。具体实现代码如下
    // 添加延时任务
    public void addDelay(TimeTriggerMsg timeTriggerMsg, int delayTime) {
        // 找到redis缓存key
        String uniqueKey = timeTriggerMsg.getUniqueKey();
        if (StringUtils.isEmpty(uniqueKey)) {
            uniqueKey = StringUtils.getRandStr(10);
        }
        String generateKey = TimeTriggerUtil.generateKey(timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getTriggerTime(), uniqueKey);
        this.cache.put(generateKey, 1);
        if (Boolean.TRUE.equals(promotionDelayQueue.addJobId(JSONUtil.toJsonStr(timeTriggerMsg), delayTime))) {
            log.info("add Redis key {} --------------------------", generateKey);
            log.info("定时执行在【" + DateUtil.toString(timeTriggerMsg.getTriggerTime(), "yyyy-MM-dd HH:mm:ss") + "】,消费【" + timeTriggerMsg.getParam().toString() + "】");
        } else {
            log.info("延时任务添加失败!");
        }
    }
  // 删除延时任务
   public void delete(String executorName, Long triggerTime, String uniqueKey, String topic) {
        // 找到redis缓存key
        String generateKey = TimeTriggerUtil.generateKey(executorName, triggerTime, uniqueKey);
        log.info("delete redis key {} -----------------------", generateKey);
        // 删除redis缓存key
        this.cache.remove(generateKey);
    }
    // 修改延时任务
    public void edit(String executorName, Object param, Long oldTriggerTime, Long triggerTime, String uniqueKey, int delayTime, String topic) {
        // 删除旧的延时任务标示
        this.delete(executorName, oldTriggerTime, uniqueKey, topic);
        // 添加新的延时任务
        this.addDelay(new TimeTriggerMsg(executorName, triggerTime, param, uniqueKey, topic), delayTime);
    }
    // 延时任务执行
    public void onMessage(TimeTriggerMsg timeTriggerMsg) {
        try {
            String key = TimeTriggerUtil.generateKey(timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getTriggerTime(), timeTriggerMsg.getUniqueKey());

            if (cache.get(key) == null) {
                log.info("执行器执行被取消:{} | 任务标识:{}", timeTriggerMsg.getTriggerExecutor(), timeTriggerMsg.getUniqueKey());
                return;
            }

            log.info("执行器执行:" + timeTriggerMsg.getTriggerExecutor());
            log.info("执行器参数:" + JSONUtil.toJsonStr(timeTriggerMsg.getParam()));

            cache.remove(key);

            TimeTriggerExecutor executor = (TimeTriggerExecutor) SpringContextUtil.getBean(timeTriggerMsg.getTriggerExecutor());
            executor.execute(timeTriggerMsg.getParam());
        } catch (Exception e) {
            log.error("mq延时任务异常", e);
        }

    }

results matching ""

    No results matching ""