延迟性任务实现解析

背景

很多人在面试的时候可能都碰到过这样的一个面试题:设计一个秒杀系统,30分钟没付款就自动关闭交易,这里我们主要来看下在实际的项目中如何结合业务需求来实现类似”xxx分钟后自动完成xxx”这种属于延迟任务的功能。

业务场景

下面来看看具体的业务场景,我们在系统中有很多的需求如活动报名,活动签到,活动取消,活动审核等等都需要发送短信或者消息,其中有些比较特殊的如活动开始或者结束前xxx小时发送消息、活动到期后自动发送通知等等都属于延迟性触发的任务,那么针对于类似这样的任务,一般我们都是怎么处理的呢?

设计思路

由于系统是目前仍属于单机应用,所以在实现上暂时不考虑分布式,为了简单快速采用了JDK自带的本地延迟队列DelayQueue结合redis作为数据灾备的方案.DelayQueue是JUC框架中提供的一个具备延迟机制的队列.
DelayQueue具有有如下特点:

  • 队列中存储的元素必须实现Delayed接口,且元素具有时效性.
  • Delayed接口提供了getDelay方法来返回对象的延迟时间.
  • Delayed接口提供了compareTo方法用于队列内部元素的比较排序.
  • 内部使用了优先级队列PriorityQueue来实现每次从队首中取出来的都是最先要过期的元素.
  • 实现了BlockingQueue接口,是一个无界阻塞队列,且元素不允许为null.
  • 提供了如阻塞方法take()返回队首元素,put()方法添加元素,remove()方法元素出队等等…

那么大致实现的思路如下:
在创建活动时会将活动id与计算好的时间差值存储到redis缓存中,服务器后台开启守护线程实时监控本地队列中到期的任务,并触发相应的推送操作,同时为了防止服务器意外重启等情况,在系统初始化时会将缓存数据load到本地队列中,这样可以避免由于数据丢失导致消息与短信数据没有推送到,下面来看看实现步骤:
1.先创建任务对象dto,内部定义了任务标识ID与延迟时间戳并实现Delayed接口和Runnable接口,我们来看看其中一个dto的实现,其他的类似,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
public class NotifyDto implements Delayed, Runnable, Serializable {
private static final long serialVersionUID = 1L;
private final static Logger logger = LoggerFactor.getLogger(NotifyDto.class);

private String redisMsg;//缓存数据即任务标识ID
private long expireTime;//延迟时间

//带参构造函数
public NotifyDto(String redisMsg, long delayTime) {
this.redisMsg = redisMsg;
this.expireTime = TimeUnit.NANOSECONDS.convert(delayTime,
TimeUnit.MILLISECONDS) + System.nanoTime();
}

public String getRedisMsg() {
return redisMsg;
}

public void setRedisMsg(String redisMsg) {
this.redisMsg = redisMsg;
}

public long getExpireTime() {
return expireTime;
}

public void setExpireTime(long expireTime) {
this.expireTime = expireTime;
}

/**
* 用于延迟队列内部比较排序 当前对象的延迟时间 - 比较对象的延迟时间
* @see java.lang.Comparable#compareTo(java.lang.Object)
**/
@Override
public int compareTo(Delayed o) {
NotifyDto task = (NotifyDto) o;
long result = this.getDelay(TimeUnit.NANOSECONDS)
- task.getDelay(TimeUnit.NANOSECONDS);
if (result < 0) {
return -1;
} else if (result > 0) {
return 1;
} else {
return 0;
}
}

/**
* 返回对象延迟时间
* @see java.util.concurrent.Delayed#getDelay(java.util.concurrent.TimeUnit)
**/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expireTime - System.nanoTime(),
TimeUnit.NANOSECONDS);
}

/**
* 任务回调处理
* @see java.lang.Runnable#run()
**/
@Override
public void run() {
logger.debug("当前任务队列:msgQueue,延迟时间:{},活动ID:{}", this.expireTime
+ "=========================================", this.redisMsg);
this.msgPus h(this.redisMsg);
}

private void msgPush(String redisMsg) {
Map<String, Object> msgMap = new HashMap<>();
Integer activeId = NumberHelpUtils.toInt(redisMsg);
//省略发送消息动作
//清空缓存记录
JedisUtils.zRemove("active:review:notify", activeId + "");
}

@Override
public String toString() {
return "NotifyDto [redisMsg=" + redisMsg + ", expireTime=" + expireTime
+ "]";
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result
+ ((redisMsg == null) ? 0 : redisMsg.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
NotifyDto other = (NotifyDto) obj;
if (redisMsg == null) {
if (other.redisMsg != null)
return false;
} else if (!redisMsg.equals(other.redisMsg))
return false;
return true;
}
}

2.创建一个任务调度服务service监控所有队列中的过期任务对象,其内部包含各种任务队列初始化,出队与入队等方法,大致代码如下.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
public class MsgQueueService {

private final static Logger logger = LoggerFactory
.getLogger(MsgQueueService.class);

private MsgQueueService() {

}

private static class MsgHolder {
private static MsgQueueService msgQueueService = new MsgQueueService();
}

public static MsgQueueService getInstance() {
return MsgHolder.msgQueueService;
}

/**
* 执行任务线程池
*/
private final static Executor es = Executors.newFixedThreadPool(5);

/**
* 创建3个守护线程
*/
private Thread expireActiveThread;

private Thread startByActiveThread;

private Thread beforeEndActiveThread;

/**
* 创建延迟任务队列
*/
private DelayQueue<NotifyDto> msgQueue = new DelayQueue<>();

private DelayQueue<StartByNotifyDto> msgQueue2 = new DelayQueue<>();

private DelayQueue<BeforEndNotifyDto> msgQueue3 = new DelayQueue<>();

/**
*
* 系统启动时初始化
*/
public void init() {
//初始化数据
initRedisMsg();
//监听活动结束后任务
expireActiveThread = new Thread(() -> execute());
expireActiveThread.setDaemon(true);
expireActiveThread.setName("ExpireActive Daemon Thread");
expireActiveThread.start();
//监听活动开始前2小时任务
startByActiveThread = new Thread(() -> execute2());
startByActiveThread.setDaemon(true);
startByActiveThread.setName("StartByActive Daemon Thread");
startByActiveThread.start();
//监听活动结束前2小时任务
beforeEndActiveThread = new Thread(() -> execute3());
beforeEndActiveThread.setDaemon(true);
beforeEndActiveThread.setName("BeforeEndActive Daemon Thread");
beforeEndActiveThread.start();
}

/**
*
* 从Redis中初始化任务到队列中
*/
public void initRedisMsg() {
Set<String> keySet = JedisUtils.zRange("active:review:notify", 0, -1);
if (CollectionHelpUtils.isNotEmpty(keySet)) {
keySet.stream().forEach(
o -> {
long expireTime = JedisUtils.zScore(
"active:review:notify", o).longValue();
NotifyDto redisTask = new NotifyDto(o, expireTime
- System.currentTimeMillis());
this.push(redisTask);
});
}
Set<String> applyStartSet = JedisUtils.zRange(
"active:applyStart:notify", 0, -1);
if (CollectionHelpUtils.isNotEmpty(applyStartSet)) {
applyStartSet.stream().forEach(
o -> {
long expireTime = JedisUtils.zScore(
"active:applyStart:notify", o).longValue();
StartByNotifyDto redisTask = new StartByNotifyDto(o,
expireTime - System.currentTimeMillis());
this.push(redisTask);
});
}
Set<String> applyEndSet = JedisUtils.zRange("active:applyEnd:notify",
0, -1);
if (CollectionHelpUtils.isNotEmpty(applyEndSet)) {
applyEndSet.stream().forEach(
o -> {
long expireTime = JedisUtils.zScore(
"active:applyEnd:notify", o).longValue();
BeforEndNotifyDto redisTask = new BeforEndNotifyDto(o,
expireTime - System.currentTimeMillis());
this.push(redisTask);
});
}
}

/**
* 监听队列,如果没有过期对象则阻塞
* @param es
*/
private void execute() {
while (true) {
try {
NotifyDto task = msgQueue.take();
if (task != null) {
logger.debug("当前任务队列:{},执行时间:{}", "msgQueue",
DateTimeUtils.getTime());
//此处真正执行了任务对象中的run方法,触发了业务推送动作
es.execute(task);
}
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}

private void execute2() {
while (true) {
try {
StartByNotifyDto task2 = msgQueue2.take();
if (task2 != null) {
logger.debug("当前任务队列:{},执行时间:{}", "msgQueue2",
DateTimeUtils.getTime());
//此处真正执行了任务对象中的run方法,触发了业务推送动作
es.execute(task2);
}
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}

private void execute3() {
while (true) {
try {
BeforEndNotifyDto task3 = msgQueue3.take();
if (task3 != null) {
logger.debug("当前任务队列:{},执行时间:{}", "msgQueue3",
DateTimeUtils.getTime());
//此处真正执行了任务对象中的run方法,触发了业务推送动作
es.execute(task3);
}
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}

/**
*
* 任务入队
* @param time
* @param task
*/
public void push(NotifyDto task) {
msgQueue.put(task);
}

public void push(StartByNotifyDto task) {
msgQueue2.put(task);
}

public void push(BeforEndNotifyDto task) {
msgQueue3.put(task);
}

/**
*
* 任务出队
* @param task
*/
public void remove(NotifyDto task) {
msgQueue.remove(task);
}

public void remove(StartByNotifyDto task) {
msgQueue2.remove(task);
}

public void remove(BeforEndNotifyDto task) {
msgQueue3.remove(task);
}
}

注意: MsgQueueService的实现使用了单例模式,并且其中的init方法在系统初始化时被调度执行.
3.创建一个系统初始化服务类,默认实现Spring框架提供的接口InitializingBean,重写其接口方法afterPropertiesSet以便在系统启动后自动执行初始化逻辑.

1
2
3
4
5
@Override
public void afterPropertiesSet() throws Exception {
logger.debug("系统初始化...");
MsgQueueService.getInstance().init();
}

4.最后一步就是具体的业务逻辑处理了,我们在业务service中的调用方法代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
*
* 把活动任务推送到延迟队列中
* @param activeId
*/
private void Msg2Queue(Active active, boolean isAdd) {
long delayTime = 0;
if (isAdd) {
//计算出当前任务延迟时间
delayTime = active.getActiveEndTime().getTime()
- active.getCreateTime().getTime();
//推送到redis中
JedisUtils.zAdd("active:review:notify", active.getActiveEndTime()
.getTime(), active.getId() + "");
//活动到期未点评的任务入队
NotifyDto task = new NotifyDto(active.getId() + "", delayTime);
MsgQueueService.getInstance().push(task);
//判断活动报名截止状态
if (active.getApplyAudit() == 1) {
if (active.getApplyAbort() == 1) {
//活动开始截止报名
delayTime = active.getActiveStartTime().getTime() - 2 * 60
* 60 * 1000 - System.currentTimeMillis();
//推送到redis中
JedisUtils.zAdd("active:applyStart:notify", active
.getActiveStartTime().getTime()
- 2
* 60
* 60
* 1000, active.getId() + "");
StartByNotifyDto task2 = new StartByNotifyDto(
active.getId() + "", delayTime);
MsgQueueService.getInstance().push(task2);
} else {
//活动结束前可报名
delayTime = active.getActiveEndTime().getTime() - 2 * 60
* 60 * 1000 - System.currentTimeMillis();
//推送到redis中
JedisUtils.zAdd("active:applyEnd:notify", active
.getActiveEndTime().getTime() - 2 * 60 * 60 * 1000,
active.getId() + "");
BeforEndNotifyDto task3 = new BeforEndNotifyDto(
active.getId() + "", delayTime);
MsgQueueService.getInstance().push(task3);
}
}
} else {
//动态取消任务
Double score = JedisUtils.zScore("active:review:notify",
active.getId() + "");
if (score != null) {
long expireTime = score.longValue();
MsgQueueService.getInstance().remove(
new NotifyDto(active.getId() + "", expireTime));
//清空redis中记录
JedisUtils.zRemove("active:review:notify", active.getId() + "");
//重新计算出当前任务延迟时间
delayTime = active.getActiveEndTime().getTime()
- System.currentTimeMillis();
//推送到redis中
JedisUtils.zAdd("active:review:notify", active
.getActiveEndTime().getTime(), active.getId() + "");
//延迟任务入队
NotifyDto task = new NotifyDto(active.getId() + "", delayTime);
MsgQueueService.getInstance().push(task);
}
}
}

注意:由于实际业务中任务可以被修改或取消,所以定义任务dto时需重写其hashcode方法与equals方法,来防止队列中的对象出现冲突,由于dto中的redisMsg字段对应了mysql中的表自增主键,所以我们使用了这个字段来重写这两个方法.

总结

此方案的优点:

  • 代码实现相对比较简单,利用JDK自带的容器类来解决延迟处理问题,无需自己造轮子.
  • 效率高,任务触发时间延迟低.
  • 由于线程安全还可以实现多生产者与消费者.
  • 结合redis做数据灾备,避免由于服务重启或其他异常退出导致的数据丢失问题.

此方案的缺点:

  • DelayQueue属于单机队列,若在分布式集群环境下,要自己做横向扩展以实现高可用,难度较高.
  • 服务器一旦宕机,数据将丢失,需结合其他底层存储做持久化,增加了编码的复杂性.
  • 数据存储在单机内存中,受物理条件限制,数据量大时容易OOM.

其他的解决方案:

  • JDK自带的线程池ScheduledExecutorService.
  • 简单的定时任务轮询,扫描数据表,数据量大时会有性能瓶颈.
  • 可以定时任务结合redis,任务和到期时间都保存在redis中,启动定时任务扫描redis,到期的key删除,并且异步更新数据库.
  • google guava的缓存也可实现类似的功能.
  • 消息队列如ActiveMQ或者RobbitMQ都提供了死信队列可实现延迟功能,设定任务的到期时间,到期之后自动进入死信队列,后台开启守护线程监控死信队列.
  • 时间轮算法,Netty提供了一个HashedWheelTimer来实现.
  • 利用redis的zset可实现延迟队列或者redis的Keyspace Notifications(键空间机制)实现key失效后提供回调函数.
    ……