百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

「实战Elastic-Job」——详解当当网分布式作业框架

ccwgpt 2024-10-12 02:51 26 浏览 0 评论

在做电商项目的时候,有很多地方需要作业来完成,通过对比在scheduler的地方用了当当网的分布式作业框架Elastic-Job而没有选择spring自带的scheduler, elastic-job可以不依赖于Spring直接运行,但是也提供了自定义的命名空间方便与Spring集成。下面小编带领你详细的了解一下Elastic-Job是什么,怎么用.一步步的学习ElastIc-Job的原理及应用.

什么是作业?

作业即定时任务.无需做复杂的控制,在指定的时间执行指定的任务.

为什么需要作业?

时间驱动/事件驱动: 内部系统一般可以通过事件来驱动,但涉及到外部系统,则只能使用时间驱动.如:抓取外部系统价格.每小时抓取,由于是外部系统,不能像内部系统一样发送事件触发事件.

批量处理/逐条处理:批量处理堆积的数据更加高效,在不需要实时性的情况下比消息中间件更有优势,而且有的业务逻辑只能批量处理.

系统内部/系统解耦:作业一般封装在系统内部,而消息中间件可用于系统间解耦.

Elastic-Job是什么?

elastic-job主要的设计理念是无中心化的分布式定时调度框架,思路来源于Quartz的基于数据库的高可用方案。但数据库没有分布式协调功能,所以在高可用方案的基础上增加了弹性扩容和数据分片的思路,以便于更大限度的利用分布式服务器的资源。Elastic-Job是ddframe中dd-job的作业模块中分离出来的分布式弹性作业框架。去掉了和dd-job中的监控和ddframe接入规范部分。

怎么用?

Elastic-Job常用的三种作业类型: SimpleJob, SequenceDataFlowJob和ThroughputDataFlowJob

SimpleJob类型作业:

SimpleJob是常用的实现方式,意为简单实现.需要继承AbstractSimpleElasticJob ,用于执行普通的任务.

SimpleJob的任务测试类:

@Component
public class SimpleJobDemo extends AbstractSimpleElasticJob {
 
	@Override
	public void process(final JobExecutionMultipleShardingContext context) {
 System.out.println("#######################");
	}
}

ThroughputDataFlow类型作业

ThroughputDataFlow类型作业意为高吞吐的数据流作业。需要继承AbstractIndividualThroughputDataFlowElasticJob并可以指定返回值泛型,该类提供3个方法可覆盖,分别用于抓取数据,处理数据和指定是否流式处理数据。可以获取数据处理成功失败次数等辅助监控信息。如果流式处理数据,fetchData方法的返回值只有为null或长度为空时,作业才会停止执行,否则作业会一直运行下去;非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,即完成本次作业。流式数据处理参照TbSchedule设计,适用于不间歇的数据处理。

ThroughputDataFlow的任务测试类:

@Component
public class ThroughputDataFlowJobDemo extends AbstractIndividualThroughputDataFlowElasticJob<String> {
@Override
public List<String> fetchData(final JobExecutionMultipleShardingContext context) {
System.out.println("#####进入任务计划#######");
List<String> result = new ArrayList<String>();
for (int i = 0; i < 10; i++) {
result.add(String.valueOf(i));
}
return result;
}
@Override
public boolean processData(JobExecutionMultipleShardingContext shardingContext, String data) {
System.out.println(data);
return true;
}
}

SequenceDataFlowJob类型作业:

SequenceDataFlowJob和ThroughputDataFlowJob的区别是:获取数据和处理数据使用相关的数据分配规则不同.由于ThroughputDataFlow作业可以使用多于分片项的任意线程数处理,所以性能调优的可能会优于SequenceDataFlow作业。

SequenceDataFlowJob的测试类:

@Component
public class SequenceDataFlowJobDemo extends AbstractBatchSequenceDataFlowElasticJob<String> {
@Override
public List<String> fetchData(final JobExecutionSingleShardingContext context) {
return null;
}
@Override
public int processData(final JobExecutionSingleShardingContext context, final List<String> data) {
return 1;
}
}

如果使用spring管理的话,首先在applicationContext-scheduler的配置文件中添加基本信息:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd"
default-lazy-init="true">
<!--配置作业注册中心-->
<reg:zookeeper id="regCenter" server-lists="${JOB_URL}"
namespace="${namespace}" base-sleep-time-milliseconds="1000"
max-sleep-time-milliseconds="3000" max-retries="3" />
</beans>

里面用到了zookeeper的东西,这篇博客不做具体介绍,上面的部分都是基本的配置,添加需要执行的job节点

<!--配置作业A-->
<job:dataflow id="sequenceDataFlowJob"
 class="com.dangdang.example.elasticjob.spring.job.SequenceDataFlowJobDemo"
 registry-center-ref="regCenter" sharding-total-count="${sequenceDataFlowJob.shardingTotalCount}"
 cron="${sequenceDataFlowJob.cron}" sharding-item-parameters="${sequenceDataFlowJob.shardingItemParameters}"
 monitor-execution="${sequenceDataFlowJob.monitorExecution}" failover="${sequenceDataFlowJob.failover}"
 process-count-interval-seconds="${sequenceDataFlowJob.processCountIntervalSeconds}"
 max-time-diff-seconds="${sequenceDataFlowJob.maxTimeDiffSeconds}"
 description="${sequenceDataFlowJob.description}" disabled="${sequenceDataFlowJob.disabled}"
 overwrite="${sequenceDataFlowJob.overwrite}" />

节点属性的含义:

1.id:作业名称

2.class:作业实现类,需要实现ElasticJob接口,脚本型作业不需要配置

3.registry-center-ref:注册中心Bean的引用,需引用reg:zookeeper的声明

job节点的各个属性可以放在scheduler.properties的配置文件中,如果有多个作业,可以按照这个节奏继续添加相应的任务.

sequenceDataFlowJob.cron=0/5 * * * * ?
sequenceDataFlowJob.shardingTotalCount=10
sequenceDataFlowJob.shardingItemParameters=0=A,1=B,2=C,3=D,4=E,5=F,6=G,7=H,8=I,9=J
sequenceDataFlowJob.maxTimeDiffSeconds=-1
sequenceDataFlowJob.monitorExecution=true
sequenceDataFlowJob.failover=true
sequenceDataFlowJob.processCountIntervalSeconds=10
sequenceDataFlowJob.description=\u6309\u987A\u5E8F\u4E0D\u505C\u6B62\u8FD0\u884C\u7684\u4F5C\u4E1A\u793A\u4F8B
sequenceDataFlowJob.disabled=false
sequenceDataFlowJob.overwrite=true

节点属性的含义:

1.cron:cron表达式,用于配置作业触发时间

2.shardingTotalCount:作业分片总数

3.shardingItemParameters:分片序列号和参数用等号分隔,多个键值对用逗号分隔分片序列号从0开始,不可大于或等于作业分片总数如:0=a,1=b,2=c

4.maxTimeDiffSeconds:最大允许的本机与注册中心的时间误差秒数如果时间误差超过配置秒数则作业启动时将抛异常配置为-1表示不校验时间误差

5.failover:是否开启失效转移仅monitorExecution开启,失效转移才有效

6.processCountIntervalSeconds:统计作业处理数据数量的间隔时间,单位:秒

7.description:作业描述信息

8.disabled:作业是否禁止启动,可用于部署作业时,先禁用启动,部署结束后统一启动

9.overwrite:本地配置是否可覆盖注册中心配置,如果可覆盖,每次启动作业都以本地配置为准.

为什么不用spring自带的定时器任务功能而选择Elastic-Job:

这就需要了解一下Elastic-Job的功能了.

1. 主要功能

a) 分布式:重写Quartz基于数据库的分布式功能,改用Zookeeper实现注册中心。

b) 并行调度:采用任务分片方式实现。将一个任务拆分为n个独立的任务项,由分布式的服务器并行执行各自分配到的分片项。

c) 弹性扩容缩容:将任务拆分为n个任务项后,各个服务器分别执行各自分配到的任务项。一旦有新的服务器加入集群,或现有服务器下线,elastic-job将在保留本次任务执行不变的情况下,下次任务开始前触发任务重分片。

d) 集中管理:采用基于Zookeeper的注册中心,集中管理和协调分布式作业的状态,分配和监听。外部系统可直接根据Zookeeper的数据管理和监控elastic-job。

e) 定制化流程型任务:作业可分为简单和数据流处理两种模式,数据流又分为高吞吐处理模式和顺序性处理模式,其中高吞吐处理模式可以开启足够多的线程快速的处理数据,而顺序性处理模式将每个分片项分配到一个独立线程,用于保证同一分片的顺序性,这点类似于kafka的分区顺序性。

2. 其他功能

a) 失效转移:弹性扩容缩容在下次作业运行前重分片,但本次作业执行的过程中,下线的服务器所分配的作业将不会重新被分配。失效转移功能可以在本次作业运行中用空闲服务器抓取孤儿作业分片执行。同样失效转移功能也会牺牲部分性能。

b) Spring命名空间支持:elastic-job可以不依赖于spring直接运行,但是也提供了自定义的命名空间方便与spring集成。

c) 运维平台:提供web控制台用于管理作业。

3. 非功能需求

a) 稳定性:在服务器无波动的情况下,并不会重新分片;即使服务器有波动,下次分片的结果也会根据服务器IP和作业名称哈希值算出稳定的分片顺序,尽量不做大的变动。

b) 高性能:同一服务器的批量数据处理采用自动切割并多线程并行处理。

c) 灵活性:所有在功能和性能之间的权衡,都可通过配置开启/关闭。如:elastic-job会将作业运行状态的必要信息更新到注册中心。如果作业执行频度很高,会造成大量Zookeeper写操作,而分布式Zookeeper同步数据可能引起网络风暴。因此为了考虑性能问题,可以牺牲一些功能,而换取性能的提升。

d) 幂等性:elastic-job可牺牲部分性能用以保证同一分片项不会同时在两个服务器上运行。

e) 容错性:作业服务器和Zookeeper断开连接则立即停止作业运行,用于防止分片已经重新分配,而脑裂的服务器仍在继续执行,导致重复执行。

消息中间件和作业的区别:

消息中间件也可以做到实时处理数据, 两者确有相似之处。可互相替换的场景,如队列表。将待处理的数据放入队列表,然后使用频率极短的定时任务拉取队列表的数据并处理。这种情况使用消息中间件的推送模式可更好的处理实时性数据。而且基于数据库的消息存储吞吐量远远小于基于文件的顺序追加消息存储。

相关推荐

盲盒小程序背后的技术揭秘:如何打造个性化购物体验

在2025年的今天,盲盒小程序作为一种新兴的购物方式,正以其独特的魅力和个性化体验吸引着越来越多的消费者。这种将线上购物与盲盒概念相结合的应用,不仅为消费者带来了未知的惊喜,还通过一系列技术手段实现了...

小程序·云开发已支持单日亿级调用量,接口可用率高达99.99%

2019-10-1914:1210月19日,由腾讯云与微信小程序团队联合举办的“小程序·云开发”技术峰会在北京召开。会上,微信小程序团队相关负责人表示“小程序·云开发”系统架构已经支持每天亿级别的...

程序员副业开启模式:8个GitHub上可以赚钱的小程序

前言开源项目作者:JackonYang今天推荐的这个项目是「list-of-wechat-mini-program-list」,开源微信小程序列表的列表、有赚钱能力的小程序开源代码。这个项目分为两部分...

深度科普:盲盒小程序开发的底层逻辑

在当下的数字化浪潮中,盲盒小程序以其独特的趣味性和互动性,吸引着众多消费者的目光。无论是热衷于收集玩偶的年轻人,还是享受拆盒惊喜的上班族,都对盲盒小程序情有独钟。那么,这种备受欢迎的盲盒小程序,其开发...

微信小程序的制作步骤

SaaS小程序制作平台,作为数字化转型时代下的创新产物,不仅将易用性置于设计的核心位置,让非技术背景的用户也能轻松上手,快速制作出功能丰富、界面精美的小程序,更在性能和稳定性方面投入了大量精力,以确保...

携程开源--小程序构建工具,三分钟搞定

前言今天推荐的这个项目是「wean」,一个小程序构建打包工具。在wean之前,大量小程序工具使用webpack进行打包,各种loader、plugin导致整个开发链路变长。wean旨在解...

校园小程序的搭建以及营收模式校园外卖程序校园跑腿校园圈子系统

校园小程序的架构设计主要包括云端架构和本地架构两部分。云端架构方面,采用Serverless架构可以降低技术门槛,通过阿里云、腾讯云等平台提供的云服务,可以实现弹性扩容和快速部署。例如,使用云数据库、...

盲盒小程序开发揭秘:技术架构与实现原理全解析

在2025年的今天,盲盒小程序作为一种结合了线上购物与趣味性的创新应用,正受到越来越多用户的喜爱。其背后的技术架构与实现原理,对于想要了解或涉足这一领域的人来说,无疑充满了神秘与吸引力。本文将为大家科...

月活百万的小程序架构设计:流量暴增秘籍

从小程序到"大"程序的蜕变之路当你的小程序用户量从几千跃升至百万级别时,原有的架构就像一件不合身的衣服,处处紧绷。这个阶段最常遇到的噩梦就是服务器崩溃、接口超时、数据丢失。想象一下,在...

认知智能如何与产业结合?专家学者共探理论框架与落地实践

当前,以大模型为代表的生成式人工智能等前沿技术加速迭代,如何将认知智能与产业结合,成为摆在各行各业面前的一个问题。论坛现场。主办方供图7月4日,2024世界人工智能大会暨人工智能全球治理高级别会议在...

现代中医理论框架

...

认知行为(CBT)中的ABC情绪理论

情绪ABC理论是由美国心理学家阿尔伯特·艾利斯(AlbertEllis1913-2007)创建的理论,A表示诱发性事件(Activatingevent),B表示个体针对此诱发性事件产生的一些信...

说说卡伦霍妮的理论框架,对你调整性格和人际关系,价值很大

01自在今天我主要想说下霍妮的理论框架。主要说三本书,第一本是《我们时代的神经症人格》,第二本是《我们内心的冲突》,第三本是《神经症与人的成长》。根据我的经验,三本书价值巨大,但并不是每个人都能读进去...

供应链管理-理论框架

一个最佳价值的供应链,应该是一个具有敏捷性、适应性和联盟功能(3A)的供应链,其基本要素包括战略资源、物流管理、关系管理以及信息系统,目标是实现速度、质量、成本、柔性的竞争优势。篇幅有...

微信WeUI设计规范文件下载及使用方法

来人人都是产品经理【起点学院】,BAT实战派产品总监手把手系统带你学产品、学运营。WeUI是一套同微信原生视觉体验一致的基础样式库,由微信官方设计团队为微信Web开发量身设计,可以令用户的使用感知...

取消回复欢迎 发表评论: