mykit-delay精准定时任务和延时队列框架

空空姬
阅读 372 标签:消息中间件  

项目简述
Mykit体系中提供的简单、稳定、可扩展的延迟消息队列框架,提供精准的定时任务和延迟队列处理功能。
项目模块说明
• mykit-delay-common: mykit-delay 延迟消息队列框架通用工具模块,提供全局通用的工具类
• mykit-delay-config: mykit-delay 延迟消息队列框架通用配置模块,提供全局配置
• mykit-delay-queue: mykit-delay 延迟消息队列框架核心实现模块,目前所有主要的功能都在此模块实现
• mykit-delay-controller: mykit-delay 延迟消息队列框架Restful接口实现模块,对外提供Restful接口访问,兼容各种语言调用
• mykit-delay-core: mykit-delay 延迟消息队列框架的入口,整个框架的启动程序在此模块实现
• mykit-delay-test: mykit-delay 延迟消息队列框架通用测试模块,主要提供Junit单元测试用例
需求背景
• 用户下订单后未支付,30分钟后支付超时
• 在某个时间点通知用户参加系统活动
• 业务执行失败之后隔10分钟重试一次
类似的场景比较多 简单的处理方式就是使用定时任务 假如数据比较多的时候 有的数据可能延迟比较严重,而且越来越多的定时业务导致任务调度很繁琐不好管理。
队列设计

整体架构设计如下图所示。
开发前需要考虑的问题
• 及时性 消费端能按时收到
• 同一时间消息的消费权重
• 可靠性 消息不能出现没有被消费掉的情况
• 可恢复 假如有其他情况 导致消息系统不可用了 至少能保证数据可以恢复
• 可撤回 因为是延迟消息 没有到执行时间的消息支持可以取消消费
• 高可用 多实例 这里指HA/主备模式并不是多实例同时一起工作
• 消费端如何消费
当然初步选用redis作为数据缓存的主要原因是因为redis自身支持zset的数据结构(score 延迟时间毫秒) 这样就少了排序的烦恼而且性能还很高,正好我们的需求就是按时间维度去判定执行的顺序 同时也支持map list数据结构。
简单定义一个消息数据结构

private String topic;/***topic**/
private String id;/***自动生成 全局唯一 snowflake**/
private String bizKey;
private long delay;/***延时毫秒数**/
private int priority;//优先级
private long ttl;/**消费端消费的ttl**/
private String body;/***消息体**/
private long createTime=System.currentTimeMillis();
private int status= Status.WaitPut.ordinal();

运行原理:
• 用Map来存储元数据。id作为key,整个消息结构序列化(json/…)之后作为value,放入元消息池中。
• 将id放入其中(有N个)一个zset有序列表中,以createTime+delay+priority作为score。修改状态为正在延迟中
• 使用timer实时监控zset有序列表中top 10的数据 。如果数据score<=当前时间毫秒就取出来,根据topic重新放入一个新的可消费列表(list)中,在zset中删除已经取出来的数据,并修改状态为待消费
• 客户端获取数据只需要从可消费队列中获取就可以了。并且状态必须为待消费 运行时间需要<=当前时间的 如果不满足 重新放入zset列表中,修改状态为正在延迟。如果满足修改状态为已消费。或者直接删除元数据。
客户端
因为涉及到不同程序语言的问题,所以当前默认支持http访问方式。
• 添加延时消息添加成功之后返回消费唯一ID POST /push {…..消息体}
• 删除延时消息 需要传递消息ID GET /delete?id=
• 恢复延时消息 GET /reStore?expire=true|false expire是否恢复已过期未执行的消息。
• 恢复单个延时消息 需要传递消息ID GET /reStore/id
• 获取消息 需要长连接 GET /get/topic
用nginx暴露服务,配置为轮询 在添加延迟消息的时候就可以流量平均分配。
目前系统中客户端并没有采用HTTP长连接的方式来消费消息,而是采用MQ的方式来消费数据这样客户端就可以不用关心延迟消息队列。只需要在发送MQ的时候拦截一下 如果是延迟消息就用延迟消息系统处理。
消息可恢复
实现恢复的原理 正常情况下一般都是记录日志,比如mysql的binlog等。
这里我们直接采用mysql数据库作为记录日志。
目前创建以下2张表:
• 消息表 字段包括整个消息体
• 消息流转表 字段包括消息ID、变更状态、变更时间、zset扫描线程Name、host/ip
定义zset扫描线程Name是为了更清楚的看到消息被分发到具体哪个zset中。前提是zset的key和监控zset的线程名称要有点关系 这里也可以是zset key。
支持消息恢复
假如redis服务器宕机了,重启之后发现数据也没有了。所以这个恢复是很有必要的,只需要从表1也就是消息表中把消息状态不等于已消费的数据全部重新分发到延迟队列中去,然后同步一下状态就可以了。
当然恢复单个任务也可以这么干。
数据表设计
这里,我就直接给出创建数据表的SQL语句。

DROP TABLE IF EXISTS `mykit_delay_queue_job`;
CREATE TABLE `mykit_delay_queue_job` (
  `id` varchar(128) NOT NULL,
  `bizkey` varchar(128) DEFAULT NULL,
  `topic` varchar(128) DEFAULT NULL,
  `subtopic` varchar(250) DEFAULT NULL,
  `delay` bigint(20) DEFAULT NULL,
  `create_time` bigint(20) DEFAULT NULL,
  `body` text,
  `status` int(11) DEFAULT NULL,
  `ttl` int(11) DEFAULT NULL,
  `update_time` datetime(3) DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `mykit_delay_queue_job_ID_STATUS` (`id`,`status`),
  KEY `mykit_delay_queue_job_STATUS` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for mykit_delay_queue_job_log
-- ----------------------------
DROP TABLE IF EXISTS `mykit_delay_queue_job_log`;
CREATE TABLE `mykit_delay_queue_job_log` (
  `id` varchar(128) NOT NULL,
  `status` int(11) DEFAULT NULL,
  `thread` varchar(60) DEFAULT NULL,
  `update_time` datetime(3) DEFAULT NULL,
  `host` varchar(128) DEFAULT NULL,
  KEY `mykit_delay_queue_job_LOG_ID_STATUS` (`id`,`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

关于高可用
分布式协调还是选用zookeeper。
如果有多个实例最多同时只能有1个实例工作 这样就避免了分布式竞争锁带来的坏处,当然如果业务需要多个实例同时工作也是支持的,也就是一个消息最多只能有1个实例处理,可以选用zookeeper或者redis就能实现分布式锁了。
最终做了一下测试多实例同时运行,可能因为会涉及到锁的问题性能有所下降,反而单机效果很好。所以比较推荐基于docker的主备部署模式。
运行模式
• 支持 master,slave (HA)需要配置mykit.delay.registry.serverList zk集群地址列表
• 支持 cluster 会涉及到分布式锁竞争 效果不是很明显 分布式锁采用redis的 setNx实现
• StandAlone
目前,经过测试,推荐使用master slave的模式,后期会优化Cluster模式
如何接入
为了提供一个统一的精准定时任务和延时队列框架,mykit-delay提供了HTTP Rest接口供其他业务系统调用,接口使用简单方便,只需要简单的调用接口,传递相应的参数即可。
消息体
以JSON数据格式参数 目前只提供了http协议
• body 业务消息体
• delay 延时毫秒 距createTime的间隔毫秒数
• id 任务ID 系统自动生成 任务创建成功返回
• status 状态 默认不填写
• topic 标题
• subtopic 保留字段
• ttl 保留字段
• createTime 创建任务时间 非必填 系统默认
添加任务

/push  
    POST application/json
{"body":"{hello world}","delay":10000,"id":"20","status":0,"topic":"ces","subtopic":"",ttl":12}

删除任务
删除任务 需要记录一个JobId

/delete?jobId=xxx
   GET

恢复单个任务
用于任务错乱 脑裂情况 根据日志恢复任务

/reStoreJob?JobId=xxx
   GET

恢复所有未完成的任务
根据日志恢复任务

/reStore?expire=true
   GET

参数expire 表示是否需要恢复已过期还未执行的数据
清空队列数据
根据日志中未完成的数据清空队列中全部数据。清空之后 会删除缓存中的所有任务

/clearAll
 GET

客户端获取队列方式
目前默认实现了RocketMQ与ActiveMQ的推送方式。依赖MQ的方式来实现延时框架与具体业务系统的耦合。
消息体中消息与RocketMQ和 ActiveMQ 消息字段对应关系

mykit-delayRocketMQActiveMQ备注
topictopictopic点对点发送队列名称或者主题名称
subtopicsubtopicsubtopic点对点发送队列子名称或者主题子名称
body消息内容消息内容消息内容

关于系统配置
延迟框架与具体执行业务系统的交互方式通过延迟框架配置实现,具体配置文件位置为mykit-delay-config项目下的resources/properties/starter.properties文件中。
测试
需要配置好数据库地址和Redis的地址 如果不是单机模式 也需要配置好Zookeeper
运行mykit-delay-test模块下的测试类io.mykit.delay.test.PushTest添加任务到队列中
启动mykit-delay-test模块下的io.mykit.delay.TestDelayQueue消费前面添加数据 为了方便查询效果 默认的消费方式是consoleCQ 控制台输出
扩展
支持zset队列个数可配置 避免大数据带来高延迟的问题。
近期规划
• 分区(buck)支持动态设置
• redis与数据库数据一致性的问题 (重要)
• 实现自己的推拉机制
• 支持可切换实现方式 目前只是依赖Redis实现,后续待优化
• 支持Web控制台管理队列
• 实现消息消费TTL机制

文章来源:网络 版权归原作者所有,如涉及知识产权问题,请权利人联系我们,我们将立即处理.
标签: 消息中间件
空空姬
文章 96 获得 0个赞 共 0个粉丝

推荐阅读 更多精彩内容

  •   狗狗难产对于没有接产经验的宠主来说,只有及时将狗狗送去宠物医院生产才是*保险的。如果发现狗狗难产超过2-4个小时,没有生出小狗,建议马上带狗狗去宠物医院观察。如果是间隔一晚上没有生出来小狗,*好带
    田井中律 田井中律 阅读 1696
  •   狗狗配了2次怀孕按哪次算这个是说不准的,一般来说,母狗的预产期是63天,但只是一个平均数,产期大概是在第59到第65日发生,早过第58日出生的小狗会较难存活。主人可以在记录下狗狗每次配种的时间,然
    空空姬 空空姬 阅读 1778
  •   中华田园犬产前征兆有哪些?大家是不是也想了解中华田园犬什么时候快生产了呢?那么中华田园犬产前准备什么呢?下面为大家介绍。  中华田园犬产前征兆有哪些?  生产前1-2天中华田园犬阴部会有粘液流出,
    空空姬 空空姬 阅读 1381
  •   圣伯纳犬绝育好不好?很多的宠物狗家长,在狗狗成年之后都会给它们做绝育手术,当然也有些家长会反对。那么下面来告诉大家圣伯纳犬绝育的优缺点。  圣伯纳犬绝育好不好?  关于圣伯纳犬是否应该绝育的问题,
    上杉夏香 上杉夏香 阅读 9285
  •   银狐犬什么时候绝育好?  传统的*适合绝育的年龄段是6个月到9个月大的时候。但是*佳的年龄对于不同个体是不一样的,但无论如何都不要等到年龄过大的时候再绝育。  银狐犬绝育优缺点:  1、如果在手术
  •   史毕诺犬产后护理方法:  1.将史毕诺犬母犬的外阴部、尾部及乳房等部位用温水洗净、擦干。及时更换被污染的褥垫及注意保温。  2.史毕诺犬产后会因保护仔犬而变得很凶猛,刚分娩过的母犬,要保持8—24
  •   美国水猎犬产后护理方法:  1.美国水猎犬妈妈分娩结束以后,应该给它一些葡萄糖水、牛奶和淡盐水。产后1-2天,应该供应充足的饮水和少量的肉食,3-4天应逐渐增加肉食的量。5-6天除了增加肉食外,每
  •   狗狗流产的症状  1、由布鲁氏菌引起的流产,多发生于妊娠第30~57天;黄体形成不全性流产,常见于妊娠第2-5周;而黄体早期退化性流产,多见于妊娠第6~7周。所有流产的情况,80%母犬多于妊娠第4
    血叶洛莉兰 血叶洛莉兰 阅读 1542
  •   博美怎样配狗?配狗时*好选择两只狗狗都熟悉的地方,因为陌生的环境会让狗狗紧张,而不愿配种;配种当天,先让博美放松心情,再放配偶出来,在交配时,两只狗狗会尾部相连,期间要多安慰鼓励母犬,全程大概会持
    上杉夏香 上杉夏香 阅读 1402
  •   柯基狗几个月发情?柯基狗母犬*次发情大约在出生后6-10个月;柯基狗成年后每年会发情两次,一般是在春季3-5月一次,秋季9-11月再次发情;发情的主要表现是柯基狗变得兴奋躁动,排尿频繁,阴门流出红