spring boot 1 如何使用Quartz cluster
maven依赖
1 | <dependency> |
quartz.properties配置
1 | # 是否使用properties作为数据存储 |
相关quartz bean配置
装配SchedulerFactoryBean ,替换数据源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class QuartzConfig {
private DataSource dataSource;
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
schedulerFactoryBean.setDataSource(dataSource); // 使用application.properties 中的数据源
schedulerFactoryBean.setOverwriteExistingJobs(true);
// schedulerFactoryBean.setJobFactory(jobFactory);
schedulerFactoryBean.setQuartzProperties(quartzProperties());
schedulerFactoryBean.setSchedulerName("quartz-cluster-scheduler");
schedulerFactoryBean.setStartupDelay(2);// 延迟两秒启动
schedulerFactoryBean.setAutoStartup(true);
return schedulerFactoryBean;
}
private Properties quartzProperties() throws IOException {//解析quartz配置
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource("quartz.properties"));
propertiesFactoryBean.afterPropertiesSet();
Properties properties = propertiesFactoryBean.getObject();
return properties;
}
}自定义Job类
1 | 4j |
- 封装addJob/deleteJob方便使用
1 |
|
- 外部操作定时器的地方只需要
@Autowired private QuartzService quartzService;
注入service 按需调用即可。
Quartz 调度原理
Scheduler 任务调度控制器 (StdScheduler)
管理 Trigger 和 Job
Trigger 任务调度单元
CronTrigger 可以通过 Cron 表达式规定任务触发规则
SimpleTrigger 规定任务执行几次,每次的时间间隔,类似 SchedulerExecutor
Job 调度任务,用于定义你的业务任务具体执行过程
一个 Job 可以对应多个 Trigger,一个 Trigger 只能对应一个 Job
Quartz cluster 实现原理
集群的概念:是指在多台不同的服务器中部署相同应用或服务模块,构成一个集群,通过负载均衡设备对外提供服务。
分布式的概念:是指在多台不同的服务器中部署不同的服务模块,通过远程调用协同工作,对外提供服务。
quartz的实现核心:
集群调度相关类
JobStoreSuppoœrt 数据库存储任务信息实现
StdRowLockSemaphore 数据库行锁实现表QRTZ_SCHEDULER_STATE维护失效实例(LAST_CHECKIN_TIME 心跳时间)信息
表QRTZ_LOCKS实现数据库行锁
通过 select for update 语句会阻塞其他同样针对这一行的 select 语句,直到该 session commit 或 rollback
- 更多原理查看此连接 QuartzCluster实现原理
源码分析
内存模式启动信息(不使用db持久化任务)
说明了使用的JobStore,线程池及大小DB存储方式并启用集群
当我们指定了DataSource后,Quartz启动后会把
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
覆盖为LocalDataSourceJobStore
,核心逻辑如下1
2
3
4CollectionUtils.mergePropertiesIntoMap(this.quartzProperties, mergedProps);
if (this.dataSource != null) {
mergedProps.put(StdSchedulerFactory.PROP_JOB_STORE_CLASS, LocalDataSourceJobStore.class.getName());
}
Quartz 初始化过程
- 配置SchedulerFactoryBean
org.springframework.scheduling.quartz.SchedulerFactoryBean.afterPropertiesSet
- 执行
org.springframework.scheduling.quartz.SchedulerFactoryBean.prepareSchedulerFactory
- 然后在 次方法中会初始化
org.springframework.scheduling.quartz.SchedulerFactoryBean.initSchedulerFactory
- 会先准备配置然后做
org.springframework.scheduling.quartz.SchedulerFactoryBean.prepareScheduler
创建真正的Scheduler。 - 完成创建后会调用
org.quartz.impl.StdSchedulerFactory.instantiate()
进行其他组件初始化。
misfire 说明
详细分析在这个地址
https://segmentfault.com/a/1190000015492260,
解决我测试中发现的misfire问题(kill 进程,等待一定时间后再次拉起服务,依然会再次触发几次任务),主要是这个计算公式
调度线程会一次性拉取距离现在,一定时间窗口内的,一定数量内的,即将触发的trigger信息。那么,时间窗口和数量信息如何确定呢,我们先来看一下,以下几个参数:
idleWaitTime: 默认30s,可通过配置属性org.quartz.scheduler.idleWaitTime设置。
availThreadCount:获取可用(空闲)的工作线程数量,总会大于1,因为该方法会一直阻塞,直到有工作线程空闲下来。
maxBatchSize:一次拉取trigger的最大数量,默认是1,可通过org.quartz.scheduler.batchTriggerAcquisitionMaxCount改写
batchTimeWindow:时间窗口调节参数,默认是0,可通过org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow改写
misfireThreshold: 超过这个时间还未触发的trigger,被认为发生了misfire,默认60s,可通过org.quartz.jobStore.misfireThreshold设置。
调度线程一次会拉取NEXT_FIRE_TIME小于(now + idleWaitTime +batchTimeWindow),大于(now - misfireThreshold)的,min(availThreadCount,maxBatchSize)个triggers,默认情况下,会拉取未来30s,过去60s之间还未fire的1个trigger。随后将这些triggers的状态由WAITING改为ACQUIRED,并插入fired_triggers表。
这样子通过调整配置中的
#单位毫秒, 集群中的节点退出后,再次检查进入的时间间隔 org.quartz.jobStore.misfireThreshold=1000
到一个较小值,这样MISFIRE_INSTRUCTION_DO_NOTHING
就能够达到我想要的,错过就不执行,等待下一次执行时机再触发的目的。
在 quartz.properties
中
1 | org.quartz.impl.jdbcjobstore.JobStoreTX = |
指定了事务类型。其initialize方法调用的是父类org.quartz.impl.jdbcjobstore.JobStoreSupport.initialize()
在方法中指定了使用的是什么锁实现方式,如下图
LocalDataSourceJobStore的初始化方式同理,也是其中的initialize
方法。
此次架构升级注意点
之前使用的是内存保持调度信息的方式,所已在代码中有一个进程拉起后手动添加
任务的动作com.enmotech.mozhe.sys.service.BatchTaskRunner
。当升级为集群模式的时候,由于信息是已经存储好了,
quartz会自动拉起所有定时器,不用手动操作,则需要屏蔽addJob相关动作。定时器相关的功能,如果有停用、终止动作,最好同步做deleteJob操作,避免相同任务存在多个。
使用Spring 自身提供的
@EnableScheduling
定时器机制,需要替换为Quartz的方式来做
SpringBoot 集成QuartzCluster方法
配置方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52server:
port: 8080
spring:
profiles:
active: mysqlDB #切换指定的数据源.
# quartz 调度器配置
quartz:
job-store-type: jdbc #数据持久化方式,可选值:MEMORY、JDBC
auto-startup: true #初始化后是否自动启动计划程序,默认为 true
overwrite-existing-jobs: false #配置的作业是否应覆盖现有的作业定义
scheduler-name: quartzSchedulerV2 # 计划程序的名称
startup-delay: 2s #初始化完成后启动计划程序的延迟时间,默认为 0 秒
wait-for-jobs-to-complete-on-shutdown: false # 关闭时是否等待正在运行的作业完成
#对于 Quartz 自带的配置,即可以使用 quartz 自己的 quartz.properties 配置文件进行配置,也可以直接配置在 properties 属性下,它是一个 map
#quartz 完整配置:https://wangmaoxiong.blog.csdn.net/article/details/105057405#quartz.properties%20%E4%B8%8E%20QuartzProperties%20%E9%85%8D%E7%BD%AE%E9%80%89%E9%A1%B9
properties:
org:
quartz:
scheduler:
instanceName: quartzSchedulerV2
instanceId: AUTO
jobStore:
#如果不需要将调度命令(例如添加和删除triggers)绑定到其他事务,那么可以通过使用 JobStoreTX 管理事务
class: org.quartz.impl.jdbcjobstore.JobStoreTX
#设置数据库驱动代理,StdJDBCDelegate 是一个使用 JDBC 代码来执行其工作的代理. 其他代理可以在"org.quartz.impl.jdbcjobstore“包或其子包中找到
driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#通知 JobStore 使用的表前缀
tablePrefix: QRTZ_
isClustered: true
clusterCheckinInterval: 5000
misfireThreshold: 2000
threadPool:
class: org.quartz.simpl.SimpleThreadPool #quartz 使用的线程池类型,org.quartz.spi.ThreadPool
threadCount: 15 #线程池中的线程总个数,表示最多可以同时执行的个任务/作业个数
threadPriority: 5 #线程优先级
threadsInheritContextClassLoaderOfInitializingThread: true #线程继承初始化线程的上下文类加载器
spring:
profiles: mysqlDB
datasource:
username: root
password: 123456
# spring boot 2.1.5 搭配 mysql 驱动 8.0.16,高版本 mysql 驱动的 driver-class-name 值要带 cj;url 值要带时区 serverTimezone
url: jdbc:mysql://127.0.0.1:3306/training?characterEncoding=UTF-8&serverTimezone=UTC
driver-class-name: com.mysql.cj.jdbc.Driver
关键bean声明方式
1
2
3
4
5
6
7
8
public class BaseConfig {
public Scheduler scheduler(SchedulerFactoryBean schedulerFactoryBean) {
return schedulerFactoryBean.getScheduler();
}
}
- 其他结构封装参考SpringBoot1的集成方式
工具类封装示例:
1 |
|