quartz基础
# 理论基础
# 小根堆: 插入与删除
插入时,在数组末尾插入,然后往上进行比较上浮
删除时,将根节点取出,用最后一个元素代替根节点,然后下沉
参考文章:小根堆(Heap)的详细实现 - WindSun - 博客园 (cnblogs.com) (opens new window)
# 时间轮算法
# ScheduledThreadPoolExecutor
# 一、quartz体系结构
# 1.特点
(1)强大的调度功能:作为spring默认的调度框架,很容易与spring集成,实现灵活可配置的调度功能;
还提供了调度运行环境的持久化机制,可以保存并恢复调度现场,
即使系统因故障关闭,任务调度现场数据并不会丢失。
(2)灵活的应用方式:允许开发者灵活的定义触发器的调度时间表并可以为触发器和任务进行关联映射。
(3)分布式和集群能力。
2
3
4
5
6
7
# 2.主要用到的设计模式
Builder模式
Factory模式
组件模式
链式写法
2
3
4
# 3.三个核心概念
调度器:负责定期定时定频率的去执行任务
任务:包括了业务逻辑
触发器:让东西生效的时间
2
3
# 4.Quartz的体系结构

# 5.重要组成
# (1) Job:
区别于JobDetail,是一个接口,只有一个方法void execute(JobExecutionContext context),开发者可以实现该接口定义运行任务,相当于TimerTask下面的run()方法,区别在于,Job有一个参数JobExecutionContext,JobExecutionContext这个类提供了调度上下文的各种信息,Job运行时的信息就保存在
JobExecutionContext里的JobDataMap
实例中。
# (2) JobDetail:
Quartz在每次执行实例的时候都重新创建一个job实例,所以它不直接接受一个job实例,而是通过接受一个job实现类,以便运行时通过new Instance()的反射机制实例化job,因此需要通过一个类来描述job的实现类及其他相关静态信息,如job的名字,描述,关联监听器等信息。(即用来绑定job,并且在job执行的时候携带一些执行的信息) 通过该类的构造函数可以更具体地了解它的功用:JobDetail(java.lang.String name, java.lang.String group, java.lang.Class jobClass),该构造函数要求指定Job的实现类,以及任务在Scheduler中的组名和Job名称;
# (3) JobBuilder:
用来定义或者创建JobDetail的实例,JobDetail限定了只能是job的实例。
# (4) JobStore:
接口,用来保存job数据,实现类主要有RAMJobStore,JobStoreTX,JobStoreCMT; JobStoreTX和JobStoreCMT均将数据保存在数据库中,RAMJobStore将数据保存在内存中,保存一些执行的信息。
建表语句见源码: tables_mysql_innodb.sql
# (5)Trigger:
一个类,描述触发的job执行时的时间触发规则;主要有SimpleTrigger和CronTrigger两个子类。当仅触发一次或者以固定时间间隔周期执行时,使用SimpleTrigger;CronTrigger通过cron表达式,定义出各种复杂时间规则的调度方案,如每天早晨的固定时间执行,或周二周三的固定时间执行等需求。
# (6)TriggerBuilder:
使用builder模式,用来定义或者创建触发器的实例
# (7)ThreadPool:
Timer有且只有一个后台线程在执行,Quartz的schedule下有ThreadPool整个线程池来运行,schedule使用线程池作为任务运行的基础设施,任务通过共享线程池中的线程提高运行的效率,从而解决并发问题
# (8)Scheduler:
调度器,代表Quartz的一个独立运行容器,Trigger和JobDetail可以注册到Scheduler中,两者在Scheduler中拥有各自的组及名称,组及名称是Scheduler查找定位容器中某一对象的依据,Trigger的组及名称必须唯一,JobDetail的组和名称也必须唯一(但可以和Trigger的组和名称相同,因为它们是不同类型的)。Scheduler定义了多个接口方法,允许外部通过组及名称访问和控制容器中Trigger和JobDetail。
Scheduler可以将Trigger绑定到某一JobDetail中,这样当Trigger触发时,对应的Job就被执行。一个Job可以对应多个Trigger,但一个Trigger只能对应一个Job。可以通过SchedulerFactory创建一个Scheduler实例。Scheduler拥有一个SchedulerContext,它类似于ServletContext,保存着Scheduler上下文信息,Job和Trigger都可以访问SchedulerContext内的信息。 SchedulerContext内部通过一个Map,以键值对的方式维护这些上下文数据,SchedulerContext为保存和获取数据提供了多个put()和getXxx()的方法。 可以通过Scheduler# getContext()获取对应的SchedulerContext实例;
# (9)Calendar:
一个Trigger可以和多个Calendar关联,以排除或包含某些时间点。比如某个任务希望放假时间不执行。
# (10)监听器:
JobListener,TriggerListener,SchedulerListener;分别对Job,Trigger,Scheduler的事件进行监听,包括scheduler一运行起来的时候,或者在执行任务的时候,或终止的时候进行监听,监听的时候加入一些自定义的某些逻辑,比如打印日志信息。
# 二、quartz程序
参考: 定时任务调度工具之Quartz(一) - 癫狂编程 - 博客园 (cnblogs.com) (opens new window)
# 1、定义Job任务
public class MyJob1 implements Job {
public void execute(JobExecutionContext context) throws JobExecutionException {
Date date = new Date();
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 拿到配置的任务数据
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
System.out.println( " " + sf.format(date) + " 任务1执行了," + dataMap.getString("gupao"));
}
}
2
3
4
5
6
7
8
9
10
# 2、封装jobDetail
// JobDetail
JobDetail jobDetail = JobBuilder.newJob(MyJob1.class)
.withIdentity("job1", "group1")
.usingJobData("gupao","只为更好的你")
.usingJobData("moon",5.21F)
.build();
2
3
4
5
6
必须要指定 JobName 和 groupName,两个合起来是唯一标识符。 可以携带 KV 的数据(JobDataMap),用于扩展属性,在运行的时候可以从 context 获取到
# 3、定义Trigger触发器
// Trigger
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1")
.startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(2)
.repeatForever())
.build();
2
3
4
5
6
7
8
# 4、Scheduler触发执行
// SchedulerFactory
SchedulerFactory factory = new StdSchedulerFactory();
// Scheduler 单例
Scheduler scheduler = factory.getScheduler();
// 绑定关系是1:N
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();
2
3
4
5
6
7
8
9
Scheduler 中的方法主要分为三大类:
1)操作调度器本身,例如调度器的启动 start()、调度器的关闭 shutdown()。
2)操作 Trigger,例如 pauseTriggers()、resumeTrigger()。
3)操作 Job,例如 scheduleJob()、unscheduleJob()、rescheduleJob() 这些方法非常重要,可以实现任务的动态调度。
# 5、配置文件
默认的配置文件,quartz.properties
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
2
3
4
5
6
7
8
9
10
11
12
13
# 6、排除规则Calendar
# 7、监听器Listener
Quartz 中提供了三种 Listener,监听 Scheduler 的,监听 Trigger 的,监听 Job 的。 只需要创建类实现相应的接口,并在 Scheduler 上注册 Listener,便可实现对核心对象的监听。
工具类:ListenerManager,用于添加、获取、移除监听器 工具类:Matcher,主要是基于 groupName 和 keyName 进行匹配
# 三、与Spring集成
quartz里面的主要类,由spring的factoryBean
去生成, JobDetailFactoryBean生成JobDetail、SimpleTriggerFactoryBean、CronTriggerFactoryBean、SchedulerFactoryBean,详情请见其源码。
# 四、与Springboot集成
# 1、定义job
/**
* @DisallowConcurrentExecution 不允许并发执行, 任务执行完才会进行下一个任务
* @PersistJobDataAfterExecution 执行后持久化jobDetail里面的dataMap
*/
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
public class TestTask2 extends QuartzJobBean {
// 可以注入spring对象
@Override
public void executeInternal(JobExecutionContext context) throws JobExecutionException {
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(Thread.currentThread().getName() + " " +sdf.format(date) + " Task2:精神导师-");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 2、写配置
/**
*
* 将Spring的对象注入到Quartz Job 2
*/
@Component
public class MyJobFactory extends AdaptableJobFactory {
//AutowireCapableBeanFactory接口是BeanFactory的子类,可以连接和填充那些生命周期不被Spring管理的已存在的bean实例
@Autowired
private AutowireCapableBeanFactory capableBeanFactory;
//重写创建Job任务的实例方法
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
//调用父类的方法
Object jobInstance = super.createJobInstance(bundle);
//通过以下方式,解决Job任务无法使用Spring中的Bean问题
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
@Slf4j
@EnableScheduling
public class ScheduleConfig {
@Autowired
private JobFactory myJobFactory;
//创建调度器工厂
@Bean
public SchedulerFactoryBean schedulerFactoryBean(){
//1.创建SchedulerFactoryBean
//2.加载自定义的quartz.properties配置文件
//3.设置MyJobFactory
SchedulerFactoryBean factoryBean=new SchedulerFactoryBean();
try {
factoryBean.setQuartzProperties(quartzProperties());
factoryBean.setJobFactory(myJobFactory);
return factoryBean;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean=new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
@Bean(name="scheduler")
public Scheduler scheduler(){
return schedulerFactoryBean().getScheduler();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 4、项目启动后加载任务
可以写监听器、也可以CommandLineRunner、ApplicationRunner
/**
* 这个类用于启动SpringBoot时,加载作业。run方法会自动执行。
* 另外可以使用 ApplicationRunner
*/
@Component
@Slf4j
public class InitStartSchedule implements CommandLineRunner {
@Autowired
private ISysJobService sysJobService;
@Autowired
private Scheduler scheduler;
@Override
public void run(String... args) throws Exception {
/**
* 用于程序启动时加载定时任务,并执行已启动的定时任务(只会执行一次,在程序启动完执行)
*/
//查询job状态为启用的
HashMap<String,String> map = new HashMap<String,String>();
map.put("jobStatus", "1");
List<SysJob> jobList= sysJobService.querySysJobList(map);
if( null == jobList || jobList.size() ==0){
log.info("系统启动,没有需要执行的任务... ...");
}
// 启动调度器
scheduler.start();
for (SysJob sysJob:jobList) {
String jobClassName=sysJob.getJobName();
String jobGroupName=sysJob.getJobGroup();
//第一步:构建job信息
JobDetail jobDetail = JobBuilder.newJob( getClass(sysJob.getJobClassPath()).getClass())
.withIdentity(jobClassName, jobGroupName).build();
// 第二步:设置JobDataMap
if (StringUtils.isNotEmpty(sysJob.getJobDataMap())) {
JSONObject jb = JSONObject.parseObject(sysJob.getJobDataMap());
Map<String, Object> dataMap = (Map<String, Object>)jb.get("data");
for (Map.Entry<String, Object> m:dataMap.entrySet()) {
jobDetail.getJobDataMap().put(m.getKey(),m.getValue());
}
}
//第三步:生成一到多个触发器 表达式调度构建器(即任务执行的时间)
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(sysJob.getJobCron());
//按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobClassName, jobGroupName)
.withSchedule(scheduleBuilder)
.startNow()
.build();
// 任务不存在的时候才添加
if( !scheduler.checkExists(jobDetail.getKey()) ){
try {
// 第四步:构建任务并启动
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
log.info("\n创建定时任务失败"+e);
throw new Exception("创建定时任务失败");
}
}
}
}
public static Job getClass(String classname) throws Exception
{
Class<?> c= Class.forName(classname);
return (Job)c.newInstance();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# 5、quartz工具service
/**
* Quartz工具类
*/
@Slf4j
@Service
public class QuartzSchedulerService {
@Autowired
private Scheduler scheduler;
/**
* 新增定时任务
* @param jobClassName 类路径
* @param jobName 任务名称
* @param jobGroupName 组别
* @param cronExpression Cron表达式
* @param jobDataMap 需要传递的参数
* @throws Exception
*/
public void addJob(String jobClassName,String jobName, String jobGroupName, String cronExpression,String jobDataMap) throws Exception {
// 启动调度器
scheduler.start();
// 构建job信息
JobDetail jobDetail = JobBuilder.newJob(getClass(jobClassName).getClass())
.withIdentity(jobName, jobGroupName).build();
// JobDataMap用于传递任务运行时的参数,比如定时发送邮件,可以用json形式存储收件人等等信息
if (StringUtils.isNotEmpty(jobDataMap)) {
JSONObject jb = JSONObject.parseObject(jobDataMap);
Map<String, Object> dataMap =(Map<String, Object>) jb.get("data");
for (Map.Entry<String, Object> m:dataMap.entrySet()) {
jobDetail.getJobDataMap().put(m.getKey(),m.getValue());
}
}
// 表达式调度构建器(即任务执行的时间)
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
// 按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
.withSchedule(scheduleBuilder).startNow().build();
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
log.info("创建定时任务失败" + e);
throw new Exception("创建定时任务失败");
}
}
/**
* 停用一个定时任务
* @param jobName 任务名称
* @param jobGroupName 组别
* @throws Exception
*/
public void jobPause(String jobName, String jobGroupName) throws Exception {
// 通过SchedulerFactory获取一个调度器实例
scheduler.pauseJob(JobKey.jobKey(jobName, jobGroupName));
}
/**
* 启用一个定时任务
* @param jobName 任务名称
* @param jobGroupName 组别
* @throws Exception
*/
public void jobresume(String jobName, String jobGroupName) throws Exception {
scheduler.resumeJob(JobKey.jobKey(jobName, jobGroupName));
}
/**
* 删除一个定时任务
* @param jobName 任务名称
* @param jobGroupName 组别
* @throws Exception
*/
public void jobdelete(String jobName, String jobGroupName) throws Exception {
scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, jobGroupName));
scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, jobGroupName));
scheduler.deleteJob(JobKey.jobKey(jobName, jobGroupName));
}
/**
* 更新定时任务表达式
* @param jobName 任务名称
* @param jobGroupName 组别
* @param cronExpression Cron表达式
* @throws Exception
*/
public void jobReschedule(String jobName, String jobGroupName, String cronExpression) throws Exception {
try {
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
// 表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
// 按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).startNow().build();
// 按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);
} catch (SchedulerException e) {
log.error("更新定时任务失败" + e);
throw new Exception("更新定时任务失败");
}
}
/**
* 检查Job是否存在
* @throws Exception
*/
public Boolean isResume(String jobName, String jobGroupName) throws Exception {
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
Boolean state = scheduler.checkExists(triggerKey);
return state;
}
/**
* 暂停所有任务
* @throws Exception
*/
public void pauseAlljob() throws Exception {
scheduler.pauseAll();
}
/**
* 唤醒所有任务
* @throws Exception
*/
public void resumeAlljob() throws Exception {
scheduler.resumeAll();
}
/**
* 获取Job实例
* @param classname
* @return
* @throws Exception
*/
public static Job getClass(String classname) throws Exception {
try {
Class<?> c = Class.forName(classname);
return (Job) c.newInstance();
} catch (Exception e) {
throw new Exception("类["+classname+"]不存在!");
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145