项目中经常会用到定时器,最简单莫过于springboot中的scheduled了,只需要在方法上加个@Scheduled注解,即可实现定时任务的功能。但这样也有明显的缺点,无法实现在线灵活配置,job任务的管理,还有非常必要的监控。我因此基于quartz写了一个简单易用的定时器组件,可以实现在线启动、停止、修改定时参数、job运行监控,也支持多节点部署,手动指定每个节点执行的任务。
demo运行地址:https://rooster.fetosoft.cn/

运行效果如下图:

项目源码地址:

https://gitee.com/gbinb/rooster
rooster-autoconfigure #自动配置模块
rooster-core #核心实现代码
rooster-demo #多机部署DEMO
rooster-demo-stand #单机部署DEMO,与多机部署的区别是省去了zookeeper组件的支持

简单的实现一个单机上运行的定时任务:

1、先导入保存job任务信息的sql:

CREATE TABLE `tasks` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `code` varchar(50) NOT NULL,
  `name` varchar(50) DEFAULT NULL,
  `description` varchar(100) DEFAULT NULL,
  `expression` varchar(30) NOT NULL,
  `jobClass` varchar(100) NOT NULL,
  `clusterIP` varchar(30) NOT NULL,
  `params` varchar(300) DEFAULT NULL,
  `status` int(11) DEFAULT NULL,
  `create_time` datetime NOT NULL,
  `start_time` datetime DEFAULT NULL,
  `stop_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`,`code`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8_general_ci;

2、引入rooster-core.jar即可(需要下载源码编译):

   <dependency>
      <groupId>cn.fetosoft</groupId>
      <artifactId>rooster-core</artifactId>
      <version>0.0.3</version>
    </dependency>

3、开发一个job任务:

/**
 * 此处可省略Spring的@Component注解
 */
@DisallowConcurrentExecution
public class PrintJob implements Job {

    /**
     * 这里可以注入spring容器中的bean
     */
    @Autowired
    private RoosterConfig config;

    /**
     *
     */
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        JobDetail jobDetail = context.getJobDetail();
        JobDataMap map = jobDetail.getJobDataMap();
        try {
            TimeUnit.MILLISECONDS.sleep(new Random().nextInt(3000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //打印map中的所有参数
        Iterator<Map.Entry<String, Object>> iter = map.entrySet().iterator();
        while (iter.hasNext()){
            Map.Entry<String, Object> entry = iter.next();
            System.out.println(entry.getKey() + "=" + entry.getValue());
        }
    }
}

4、自定义一个类,继承AbstractScheduled抽象类,这里直接返回true即可,表示在本机运行;

@Component
public class DefaultScheduled extends AbstractScheduled {

    @Override
    protected boolean isClusterExec(TaskInfo taskInfo) {
        return true;
    }
}

5、自定义一个Controller,实现对任务的功能管理;

部分代码示例:

    ......

    @Autowired
    private ScheduledService scheduledService;

    ......


    /**
     * 启动任务
     * @return
     */
    @RequestMapping("/start")
    @ResponseBody
    public String start(@RequestParam String code){
        Result result = null;
        try {
            TaskInfo taskInfo = this.getTaskByCode(code);
            taskInfo.setAction(TaskAction.START.getCode());
            result = scheduledService.start(taskInfo);
        } catch (Exception e) {
            result = Result.FAIL.setMsg(e.getMessage());
        }
        return result.toString();
    }

    /**
     * 停止任务
     * @return
     */
    @RequestMapping("/stop")
    @ResponseBody
    public String stop(@RequestParam String code){
        Result result = null;
        try {
            TaskInfo taskInfo = this.getTaskByCode(code);
            taskInfo.setAction(TaskAction.STOP.getCode());
            result = scheduledService.stop(taskInfo);
        } catch (Exception e) {
            result = Result.FAIL.setMsg(e.getMessage());
        }
        return result.toString();
    }

6、增加一个定时服务启停的监控,实现TaskListener接口;

@Component
public class TaskListenerDemo implements TaskListener {

    @Autowired
    private TaskDAO taskDAO;

    /**
     * 任务启动
     *
     * @param taskInfo
     * @param e
     */
    @Override
    public void start(TaskInfo taskInfo, SchedulerException e) {
        System.out.println("Start------------------" + taskInfo.getCode());
        //修改数据库中定时任务的状态
        taskDAO.updateTaskStatus(taskInfo.getCode(), TaskAction.START);
    }

    /**
     * 任务停止
     *
     * @param taskInfo
     * @param e
     */
    @Override
    public void stop(TaskInfo taskInfo, SchedulerException e) {
        System.out.println("Stop------------------" + taskInfo.getCode());
        taskDAO.updateTaskStatus(taskInfo.getCode(), TaskAction.STOP);
    }
}

7、新增一个job执行的监控,只需实现JobExecListener接口,可以把job的执行日志记录到数据库,便于查询或异常时报警;

@Component
public class JobMonitorDemo implements JobExecListener, ApplicationContextAware {

    /**
     * log
     */
    private static final Logger logger = LoggerFactory.getLogger(JobMonitorDemo.class);
    private ApplicationContext applicationContext;

    @Override
    public void beforeExec(JobContext jobContext) {
        logger.info("beginRunJob --- {} --- {}", jobContext.getTaskInfo().getCode(),
                DateFormatUtils.format(jobContext.getFireTime(), "yyyy-MM-dd HH:mm:ss"));
    }

    @Override
    public void afterExec(JobContext jobContext) {
        logger.info("endRunJob --- {} --- {} --- {}", jobContext.getTaskInfo().getCode(),
                DateFormatUtils.format(jobContext.getFireTime(), "yyyy-MM-dd HH:mm:ss"), jobContext.getRunTime());
        if(jobContext.isException()){
            logger.error(jobContext.getErrorMsg());
        }

        MonitorEvent event = new MonitorEvent(jobContext.getTaskInfo().getCode());
        event.setFireTime(DateFormatUtils.format(jobContext.getFireTime(), "yyyy-MM-dd HH:mm:ss"));
        event.setRunTime(jobContext.getRunTime());
        applicationContext.publishEvent(event);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

debug信息:

参数打印信息

发表评论