企业资讯,产品资讯_博拉资讯 技术资讯 一文读懂,硬核 Apache DolphinScheduler3.0 源码解析

一文读懂,硬核 Apache DolphinScheduler3.0 源码解析


点亮 ⭐️ Star · 照亮开源之路

一文读懂,硬核 Apache DolphinScheduler3.0 源码解析


  • 1 DolphinScheduler的设计与策略

  • 1.1 分布式设计

  • 1.1.1 中心化

  • 1.1.2 去中心化

  • 1.2 DophinScheduler架构设计

  • 1.3 容错问题

  • 1.3.1 宕机容错

  • 1.3.2 失败重试

  • 1.4 远程日志访问

  • 2 DolphinScheduler源码分析

  • 2.1 工程模块介绍与配置文件

  • 2.1.1 工程模块介绍

  • 2.1.2 配置文件

  • 2.2 Api主要任务操作接口

  • 2.3 Quaterz架构与运行流程

  • 2.3.1 概念与架构

  • 2.3.2 初始化与执行流程

  • 2.3.3 集群运转

  • 2.4 Master启动与执行流程

  • 2.4.1 概念与执行逻辑

  • 2.4.2 集群与槽(slot)

  • 2.4.3 代码执行流程

  • 2.5 Work启动与执行流程

  • 2.5.1 概念与执行逻辑

  • 2.5.2 代码执行流程

  • 2.6 rpc交互

  • 2.6.1 Master与Worker交互

  • 2.6.2 其他服务与Master交互

  • 2.7 负载均衡算法

  • 2.7.1 加权随机

  • 2.7.2 线性负载

  • 2.7.3 平滑轮询

  • 2.8 日志服务

  • 2.9 报警

  • 3 后记

  • 3.1 Make friends

  • 3.2 参考文献


研究Apache Dolphinscheduler也是机缘巧合,平时负责基于xxl-job二次开发出来的调度平台,因为遇到了并发性能瓶颈,到了不得不优化重构的地步,所以搜索市面上应用较广的调度平台以借鉴优化思路。


1 DolphinScheduler的设计与策略


  • 任务定义:各种类型的任务,是流程定义的关键组成,如sql,shell,spark,mr,python等;

  • 任务实例:任务的实例化,标识着具体的任务执行状态;

  • 流程定义:一组任务节点通过依赖关系建立的起来的有向无环图(DAG);

  • 流程实例:通过手动或者定时调度生成的流程实例;

  • 定时调度:系统采用Quartz 分布式调度器,并同时支持cron表达式可视化的生成;

1.1 分布式设计


1.1.1 中心化


一文读懂,硬核 Apache DolphinScheduler3.0 源码解析

Master: Master的角色主要负责任务分发并监督Slave的健康状态,可以动态的将任务均衡到Slave上,以致Slave节点不至于“忙死”或”闲死”的状态。






1.1.2 去中心化

一文读懂,硬核 Apache DolphinScheduler3.0 源码解析






1.2 DophinScheduler架构设计


一文读懂,硬核 Apache DolphinScheduler3.0 源码解析

API: API接口层,主要负责处理前端UI层的请求。该服务统一提供RESTful api向外部提供请求服务。接口包括工作流的创建、定义、查询、修改、发布、下线、手工启动、停止、暂停、恢复、从该节点开始执行等等。

MasterServer: MasterServer采用分布式无中心设计理念,MasterServer集成了Quartz,主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。WorkServer:WorkerServer也采用分布式无中心设计理念,WorkerServer主要负责任务的执行和提供日志服务。WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。

ZooKeeper: ZooKeeper服务,系统中的MasterServer和WorkerServer节点都通过ZooKeeper来进行集群管理和容错。另外系统还基于ZooKeeper进行事件监听和分布式锁。


1.3 容错问题


1.3.1 宕机容错


一文读懂,硬核 Apache DolphinScheduler3.0 源码解析


一文读懂,硬核 Apache DolphinScheduler3.0 源码解析

ZooKeeper Master容错完成之后则重新由DolphinScheduler中Scheduler线程调度,遍历 DAG 找到“正在运行”和“提交成功”的任务,对“正在运行”的任务监控其任务实例的状态,对“提交成功”的任务需要判断Task Queue中是否已经存在,如果存在则同样监控任务实例的状态,如果不存在则重新提交任务实例。

一文读懂,硬核 Apache DolphinScheduler3.0 源码解析

Master Scheduler线程一旦发现任务实例为” 需要容错”状态,则接管任务并进行重新提交。注意由于” 网络抖动”可能会使得节点短时间内失去和ZooKeeper的心跳,从而发生节点的remove事件。


1.3.2 失败重试


  1. 任务失败重试是任务级别的,是调度系统自动进行的,比如一个Shell任务设置重试次数为3次,那么在Shell任务运行失败后会自己再最多尝试运行3次。

  2. 流程失败恢复是流程级别的,是手动进行的,恢复是从只能从失败的节点开始执行或从当前节点开始执行。流程失败重跑也是流程级别的,是手动进行的,重跑是从开始节点进行。


  1. 一种是业务节点,这种节点都对应一个实际的脚本或者处理语句,比如Shell节点、MR节点、Spark节点、依赖节点等。

  2. 还有一种是逻辑节点,这种节点不做实际的脚本或语句处理,只是整个流程流转的逻辑处理,比如子流程节等。



1.4 远程日志访问



  1. 将日志放到ES搜索引擎上;

  2. 通过netty通信获取远程日志信息;


2 DolphinScheduler源码分析


2.1 工程模块介绍与配置文件

2.1.1 工程模块介绍

  • dolphinscheduler-alert 告警模块,提供告警服务;

  • dolphinscheduler-api web应用模块,提供 Rest Api 服务,供 UI 进行调用;

  • dolphinscheduler-common 通用的常量枚举、工具类、数据结构或者基类 dolphinscheduler-dao 提供数据库访问等操作;

  • dolphinscheduler-remote 基于netty的客户端、服务端 ;

  • dolphinscheduler-server 日志与心跳服务 ;

  • dolphinscheduler-log-server LoggerServer 用于Rest Api通过RPC查看日志;

  • dolphinscheduler-master MasterServer服务,主要负责 DAG 的切分和任务状态的监控 ;

  • dolphinscheduler-worker WorkerServer服务,主要负责任务的提交、执行和任务状态的更新;

  • dolphinscheduler-service service模块,包含Quartz、Zookeeper、日志客户端访问服务,便于server模块和api模块调用 ;

  • dolphinscheduler-ui 前端模块;

2.1.2 配置文件


#本地工作目录,用于存放临时文件  data.basedir.path=/tmp/dolphinscheduler  #资源文件存储类型: HDFS,S3,NONE  #资源文件存储路径  resource.upload.path=/dolphinscheduler  #hadoop是否开启kerberos权限  #kerberos配置目录  #kerberos登录用户  login.user.keytab.username=hdfs-mycluster@ESZ.COM    #kerberos登录用户keytab  login.user.keytab.path=/opt/hdfs.headless.keytab    #kerberos过期时间,整数,单位为小时  kerberos.expire.time=2  #	如果存储类型为HDFS,需要配置拥有对应操作权限的用户  hdfs.root.user=hdfs  #请求地址如果,该值类似为: s3a://dolphinscheduler. 如果, 如果 hadoop 配置了 HA,需要复制core-site.xml 和 hdfs-site.xml 文件到conf目录  fs.defaultFS=hdfs://mycluster:8020  aws.secret.access.key=minioadmin  aws.region=us-east-1  aws.endpoint=http://localhost:9000  # resourcemanager port, the default value is 8088 if not specified  resource.manager.httpaddress.port=8088  #yarn resourcemanager 地址, 如果resourcemanager开启了HA, 输入HA的IP地址(以逗号分隔),如果resourcemanager为单节点, 该值为空即可  yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx  #如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可. 如果resourcemanager为单节点,你需要将ds1 配置为resourcemanager对应的hostname  yarn.application.status.address=http://ds1:%s/ws/v1/cluster/apps/%s  # job history status url when application number threshold is reached(default 10000, maybe it was set to 1000)  yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s    # datasource encryption enable  datasource.encryption.enable=false    # datasource encryption salt  datasource.encryption.salt=!@#$%^&*    # data quality option    #data-quality.error.output.path=/tmp/data-quality-error-data    # Network IP gets priority, default inner outer    # Whether hive SQL is executed in the same session  support.hive.oneSession=false    # use sudo or not, if set true, executing user is tenant user and deploy user needs sudo permissions; if set false, executing user is the deploy user and doesn't need sudo permissions  sudo.enable=true    # network interface preferred like eth0, default: empty    # network IP gets priority, default: inner outer    # system env path    #是否处于开发模式  development.state=false    # rpc port  alert.rpc.port=50052    # Url endpoint for zeppelin RESTful API      

dolphinscheduler-api application.yaml

server:    port: 12345    servlet:      session:        timeout: 120m      context-path: /dolphinscheduler/    compression:      enabled: true      mime-types: text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml    jetty:      max-http-form-post-size: 5000000    spring:    application:      name: api-server    banner:      charset: UTF-8    jackson:      time-zone: UTC      date-format: "yyyy-MM-dd HH:mm:ss"    servlet:      multipart:        max-file-size: 1024MB        max-request-size: 1024MB    messages:      basename: i18n/messages    datasource:  #    driver-class-name: org.postgresql.Driver  #    url: jdbc:postgresql://      driver-class-name: com.mysql.jdbc.Driver      url: jdbc:mysql://      username: root      password: root      hikari:        connection-test-query: select 1        minimum-idle: 5        auto-commit: true        validation-timeout: 3000        pool-name: DolphinScheduler        maximum-pool-size: 50        connection-timeout: 30000        idle-timeout: 600000        leak-detection-threshold: 0        initialization-fail-timeout: 1    quartz:      auto-startup: false      job-store-type: jdbc      jdbc:        initialize-schema: never      properties:        org.quartz.threadPool:threadPriority: 5        org.quartz.jobStore.isClustered: true        org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX        org.quartz.scheduler.instanceId: AUTO        org.quartz.jobStore.tablePrefix: QRTZ_        org.quartz.jobStore.acquireTriggersWithinLock: true        org.quartz.scheduler.instanceName: DolphinScheduler        org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool        org.quartz.jobStore.useProperties: false        org.quartz.threadPool.makeThreadsDaemons: true        org.quartz.threadPool.threadCount: 25        org.quartz.jobStore.misfireThreshold: 60000        org.quartz.scheduler.makeSchedulerThreadDaemon: true  #      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate        org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate        org.quartz.jobStore.clusterCheckinInterval: 5000    management:    endpoints:      web:        exposure:          include: '*'    metrics:      tags:        application: ${}    registry:    type: zookeeper    zookeeper:      namespace: dolphinscheduler  #    connect-string: localhost:2181      connect-string:      retry-policy:        base-sleep-time: 60ms        max-sleep: 300ms        max-retries: 5      session-timeout: 30s      connection-timeout: 9s      block-until-connected: 600ms      digest: ~    audit:    enabled: false    metrics:    enabled: true    python-gateway:    # Weather enable python gateway server or not. The default value is true.    enabled: true    # The address of Python gateway server start. Set its value to `` if your Python API run in different    # between Python gateway server. It could be be specific to other address like `` or `localhost`    gateway-server-address:    # The port of Python gateway server start. Define which port you could connect to Python gateway server from    # Python API side.    gateway-server-port: 25333    # The address of Python callback client.    python-address:    # The port of Python callback client.    python-port: 25334    # Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite),    # and socket server would never close even though no requests accept    connect-timeout: 0    # Close each active connection of socket server if python program not active after x milliseconds. Define value is    # (0 = infinite), and socket server would never close even though no requests accept    read-timeout: 0    # Override by profile    ---  spring:    config:      activate:        on-profile: mysql    datasource:      driver-class-name: com.mysql.jdbc.Driver      url: jdbc:mysql://    quartz:      properties:        org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate      

dolphinscheduler-master application.yaml

spring:    banner:      charset: UTF-8    application:      name: master-server    jackson:      time-zone: UTC      date-format: "yyyy-MM-dd HH:mm:ss"    cache:      # default enable cache, you can disable by `type: none`      type: none      cache-names:        - tenant        - user        - processDefinition        - processTaskRelation        - taskDefinition      caffeine:        spec: maximumSize=100,expireAfterWrite=300s,recordStats    datasource:      #driver-class-name: org.postgresql.Driver      #url: jdbc:postgresql://      driver-class-name: com.mysql.jdbc.Driver      url: jdbc:mysql://      username: root      password:      hikari:        connection-test-query: select 1        minimum-idle: 5        auto-commit: true        validation-timeout: 3000        pool-name: DolphinScheduler        maximum-pool-size: 50        connection-timeout: 30000        idle-timeout: 600000        leak-detection-threshold: 0        initialization-fail-timeout: 1    quartz:      job-store-type: jdbc      jdbc:        initialize-schema: never      properties:        org.quartz.threadPool:threadPriority: 5        org.quartz.jobStore.isClustered: true        org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX        org.quartz.scheduler.instanceId: AUTO        org.quartz.jobStore.tablePrefix: QRTZ_        org.quartz.jobStore.acquireTriggersWithinLock: true        org.quartz.scheduler.instanceName: DolphinScheduler        org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool        org.quartz.jobStore.useProperties: false        org.quartz.threadPool.makeThreadsDaemons: true        org.quartz.threadPool.threadCount: 25        org.quartz.jobStore.misfireThreshold: 60000        org.quartz.scheduler.makeSchedulerThreadDaemon: true  #      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate        org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate        org.quartz.jobStore.clusterCheckinInterval: 5000    registry:    type: zookeeper    zookeeper:      namespace: dolphinscheduler  #    connect-string: localhost:2181      connect-string:      retry-policy:        base-sleep-time: 60ms        max-sleep: 300ms        max-retries: 5      session-timeout: 30s      connection-timeout: 9s      block-until-connected: 600ms      digest: ~    master:    listen-port: 5678    # master fetch command num    fetch-command-num: 10    # master prepare execute thread number to limit handle commands in parallel    pre-exec-threads: 10    # master execute thread number to limit process instances in parallel    exec-threads: 100    # master dispatch task number per batch    dispatch-task-number: 3    # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight    host-selector: lower_weight    # master heartbeat interval, the unit is second    heartbeat-interval: 10    # master commit task retry times    task-commit-retry-times: 5    # master commit task interval, the unit is millisecond    task-commit-interval: 1000    state-wheel-interval: 5    # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2    max-cpu-load-avg: -1    # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G    reserved-memory: 0.3    # failover interval, the unit is minute    failover-interval: 10    # kill yarn jon when failover taskInstance, default true    kill-yarn-job-when-task-failover: true    server:    port: 5679    management:    endpoints:      web:        exposure:          include: '*'    metrics:      tags:        application: ${}    metrics:    enabled: true    # Override by profile    ---  spring:    config:      activate:        on-profile: mysql    datasource:      driver-class-name: com.mysql.jdbc.Driver      url: jdbc:mysql://    quartz:      properties:        org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate    

dolphinscheduler-worker application.yaml

spring:    banner:      charset: UTF-8    application:      name: worker-server    jackson:      time-zone: UTC      date-format: "yyyy-MM-dd HH:mm:ss"    datasource:      #driver-class-name: org.postgresql.Driver      #url: jdbc:postgresql://      driver-class-name: com.mysql.jdbc.Driver      url: jdbc:mysql://      username: root      #password: root      password:      hikari:        connection-test-query: select 1        minimum-idle: 5        auto-commit: true        validation-timeout: 3000        pool-name: DolphinScheduler        maximum-pool-size: 50        connection-timeout: 30000        idle-timeout: 600000        leak-detection-threshold: 0        initialization-fail-timeout: 1    registry:    type: zookeeper    zookeeper:      namespace: dolphinscheduler  #    connect-string: localhost:2181      connect-string:      retry-policy:        base-sleep-time: 60ms        max-sleep: 300ms        max-retries: 5      session-timeout: 30s      connection-timeout: 9s      block-until-connected: 600ms      digest: ~    worker:    # worker listener port    listen-port: 1234    # worker execute thread number to limit task instances in parallel    exec-threads: 100    # worker heartbeat interval, the unit is second    heartbeat-interval: 10    # worker host weight to dispatch tasks, default value 100    host-weight: 100    # worker tenant auto create    tenant-auto-create: true    # worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value -1: the number of cpu cores * 2    max-cpu-load-avg: -1    # worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G    reserved-memory: 0.3    # default worker groups separated by comma, like 'worker.groups=default,test'    groups:      - default    # alert server listen host    alert-listen-host: localhost    alert-listen-port: 50052    server:    port: 1235    management:    endpoints:      web:        exposure:          include: '*'    metrics:      tags:        application: ${}    metrics:    enabled: true    

主要关注数据库,quartz, zookeeper, masker, worker配置。

2.2 API主要任务操作接口



public Map<String, Object> setScheduleState(User loginUser,                                                 long projectCode,                                                 Integer id,                                                 ReleaseState scheduleStatus) {         Map<String, Object> result = new HashMap<>();

Project project = projectMapper.queryByCode(projectCode);         // check project auth         boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);         if (!hasProjectAndPerm) {             return result;         }

// check schedule exists         Schedule scheduleObj = scheduleMapper.selectById(id);

if (scheduleObj == null) {             putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);             return result;         }         // check schedule release state         if (scheduleObj.getReleaseState() == scheduleStatus) {   “schedule release is already {},needn’t to change schedule id: {} from {} to {}”,                     scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus);             putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);             return result;         }         ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());         if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode()));             return result;         }         List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());         if (processTaskRelations.isEmpty()) {             putMsg(result, Status.PROCESS_DAG_IS_EMPTY);             return result;         }         if (scheduleStatus == ReleaseState.ONLINE) {             // check process definition release state             if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {       “not release process definition id: {} , name : {}”,                         processDefinition.getId(), processDefinition.getName());                 putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());                 return result;             }             // check sub process definition release state             List<Long> subProcessDefineCodes = new ArrayList<>();             processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes);             if (!subProcessDefineCodes.isEmpty()) {                 List<ProcessDefinition> subProcessDefinitionList =                         processDefinitionMapper.queryByCodes(subProcessDefineCodes);                 if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) {                     for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) {                         /**                          * if there is no online process, exit directly                          */                         if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {                   “not release process definition id: {} , name : {}”,                                     subProcessDefinition.getId(), subProcessDefinition.getName());                             putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(subProcessDefinition.getId()));                             return result;                         }                     }                 }             }         }

// check master server exists         List<Server> masterServers = monitorService.getServerListFromRegistry(true);

if (masterServers.isEmpty()) {             putMsg(result, Status.MASTER_NOT_EXISTS);             return result;         }

// set status         scheduleObj.setReleaseState(scheduleStatus);


try {             switch (scheduleStatus) {                 case ONLINE:           “Call master client set schedule online, project id: {}, flow id: {},host: {}”, project.getId(), processDefinition.getId(), masterServers);                     setSchedule(project.getId(), scheduleObj);                     break;                 case OFFLINE:           “Call master client set schedule offline, project id: {}, flow id: {},host: {}”, project.getId(), processDefinition.getId(), masterServers);                     deleteSchedule(project.getId(), id);                     break;                 default:                     putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());                     return result;             }         } catch (Exception e) {             result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? “set online failure” : “set offline failure”);             throw new ServiceException(result.get(Constants.MSG).toString(), e);         }

putMsg(result, Status.SUCCESS);         return result;     }

public void setSchedule(int projectId, Schedule schedule) {"set schedule, project id: {}, scheduleId: {}", projectId, schedule.getId());            quartzExecutor.addJob(ProcessScheduleJob.class, projectId, schedule);      }    
public void addJob(Class<? extends Job> clazz, int projectId, final Schedule schedule) {          String jobName = this.buildJobName(schedule.getId());          String jobGroupName = this.buildJobGroupName(projectId);            Map<String, Object> jobDataMap = this.buildDataMap(projectId, schedule);          String cronExpression = schedule.getCrontab();          String timezoneId = schedule.getTimezoneId();            /**           * transform from server default timezone to schedule timezone           * e.g. server default timezone is `UTC`           * user set a schedule with startTime `2022-04-28 10:00:00`, timezone is `Asia/Shanghai`,           * api skip to transform it and save into databases directly, startTime `2022-04-28 10:00:00`, timezone is `UTC`, which actually added 8 hours,           * so when add job to quartz, it should recover by transform timezone           */          Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId);          Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId);            lock.writeLock().lock();          try {                JobKey jobKey = new JobKey(jobName, jobGroupName);              JobDetail jobDetail;              //add a task (if this task already exists, return this task directly)              if (scheduler.checkExists(jobKey)) {                    jobDetail = scheduler.getJobDetail(jobKey);                  jobDetail.getJobDataMap().putAll(jobDataMap);              } else {                  jobDetail = newJob(clazz).withIdentity(jobKey).build();                    jobDetail.getJobDataMap().putAll(jobDataMap);                    scheduler.addJob(jobDetail, false, true);          "Add job, job name: {}, group name: {}",                          jobName, jobGroupName);              }                TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName);              /*               * Instructs the Scheduler that upon a mis-fire               * situation, the CronTrigger wants to have it's               * next-fire-time updated to the next time in the schedule after the               * current time (taking into account any associated Calendar),               * but it does not want to be fired now.               */              CronTrigger cronTrigger = newTrigger()                      .withIdentity(triggerKey)                      .startAt(startDate)                      .endAt(endDate)                      .withSchedule(                              cronSchedule(cronExpression)                                      .withMisfireHandlingInstructionDoNothing()                                      .inTimeZone(DateUtils.getTimezone(timezoneId))                      )                      .forJob(jobDetail).build();                if (scheduler.checkExists(triggerKey)) {                  // updateProcessInstance scheduler trigger when scheduler cycle changes                  CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);                  String oldCronExpression = oldCronTrigger.getCronExpression();                    if (!StringUtils.equalsIgnoreCase(cronExpression, oldCronExpression)) {                      // reschedule job trigger                      scheduler.rescheduleJob(triggerKey, cronTrigger);            "reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",                              jobName, jobGroupName, cronExpression, startDate, endDate);                  }              } else {                  scheduler.scheduleJob(cronTrigger);        "schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",                          jobName, jobGroupName, cronExpression, startDate, endDate);              }            } catch (Exception e) {              throw new ServiceException("add job failed", e);          } finally {              lock.writeLock().unlock();          }      }    

2.3 Quaterz架构与运行流程

2.3.1 概念与架构

Quartz 框架主要包括如下几个部分:

  • SchedulerFactory:任务调度工厂,主要负责管理任务调度器;

  • Scheduler :任务调度器,主要负责任务调度,以及操作任务的相关接口;

  • Job :任务接口,实现类包含具体任务业务代码;

  • JobDetail:用于定义作业的实例;

  • Trigger:任务触发器,主要存放 Job 执行的时间策略。例如多久执行一次,什么时候执行,以什么频率执行等等;

  • JobBuilder :用于定义/构建 JobDetail 实例,用于定义作业的实例。

  • TriggerBuilder :用于定义/构建触发器实例;

  • Calendar:Trigger 扩展对象,可以排除或者包含某个指定的时间点(如排除法定节假日);

  • JobStore:存储作业和任务调度期间的状态Scheduler的生命期,从 SchedulerFactory 创建它时开始,到 Scheduler 调用Shutdown() 方法时结束;

Scheduler 被创建后,可以增加、删除和列举 Job 和 Trigger,以及执行其它与调度相关的操作(如暂停 Trigger)。但Scheduler 只有在调用 start() 方法后,才会真正地触发 trigger(即执行 job)

2.3.2 初始化与执行流程


一文读懂,硬核 Apache DolphinScheduler3.0 源码解析


一文读懂,硬核 Apache DolphinScheduler3.0 源码解析



2.3.3 集群运转


  1. 当Quartz采用集群形式部署的时候,存储介质不能使用内存的形式,也就是不能使用JobStoreRAM。

  2. Quartz集群对于对于需要被调度的Triggers实例的扫描是使用数据库锁TRIGGER_ACCESS来完成的,保障此扫描过程只能被一个Quartz实例获取到。代码如下:

public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)         throws JobPersistenceException {                  String lockName;         if(isAcquireTriggersWithinLock() || maxCount > 1) {              lockName = LOCK_TRIGGER_ACCESS;         } else {             lockName = null;         }         return executeInNonManagedTXLock(lockName,                  new TransactionCallback<List<OperableTrigger>>() {                     public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {                         return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);                     }                 },                 new TransactionValidator<List<OperableTrigger>>() {                     public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {                         try {                             List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());                             Set<String> fireInstanceIds = new HashSet<String>();                             for (FiredTriggerRecord ft : acquired) {                                 fireInstanceIds.add(ft.getFireInstanceId());                             }                             for (OperableTrigger tr : result) {                                 if (fireInstanceIds.contains(tr.getFireInstanceId())) {                                     return true;                                 }                             }                             return false;                         } catch (SQLException e) {                             throw new JobPersistenceException(“error validating trigger acquisition”, e);                         }                     }                 });     }


 protected void clusterRecover(Connection conn, List<SchedulerStateRecord> failedInstances)          throws JobPersistenceException {            if (failedInstances.size() > 0) {                long recoverIds = System.currentTimeMillis();                logWarnIfNonZero(failedInstances.size(),                      "ClusterManager: detected " + failedInstances.size()                              + " failed or restarted instances.");              try {                  for (SchedulerStateRecord rec : failedInstances) {                      getLog().info(                              "ClusterManager: Scanning for instance ""                                      + rec.getSchedulerInstanceId()                                      + ""'s failed in-progress jobs.");                        List<FiredTriggerRecord> firedTriggerRecs = getDelegate()                              .selectInstancesFiredTriggerRecords(conn,                                      rec.getSchedulerInstanceId());                        int acquiredCount = 0;                      int recoveredCount = 0;                      int otherCount = 0;                        Set<TriggerKey> triggerKeys = new HashSet<TriggerKey>();                        for (FiredTriggerRecord ftRec : firedTriggerRecs) {                            TriggerKey tKey = ftRec.getTriggerKey();                          JobKey jKey = ftRec.getJobKey();                            triggerKeys.add(tKey);                            // release blocked triggers..                          if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) {                              getDelegate()                                      .updateTriggerStatesForJobFromOtherState(                                              conn, jKey,                                              STATE_WAITING, STATE_BLOCKED);                          } else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) {                              getDelegate()                                      .updateTriggerStatesForJobFromOtherState(                                              conn, jKey,                                              STATE_PAUSED, STATE_PAUSED_BLOCKED);                          }                            // release acquired triggers..                          if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) {                              getDelegate().updateTriggerStateFromOtherState(                                      conn, tKey, STATE_WAITING,                                      STATE_ACQUIRED);                              acquiredCount++;                          } else if (ftRec.isJobRequestsRecovery()) {                              // handle jobs marked for recovery that were not fully                              // executed..                              if (jobExists(conn, jKey)) {                                  @SuppressWarnings("deprecation")                                  SimpleTriggerImpl rcvryTrig = new SimpleTriggerImpl(                                          "recover_"                                                  + rec.getSchedulerInstanceId()                                                  + "_"                                                  + String.valueOf(recoverIds++),                                          Scheduler.DEFAULT_RECOVERY_GROUP,                                          new Date(ftRec.getScheduleTimestamp()));                                  rcvryTrig.setJobName(jKey.getName());                                  rcvryTrig.setJobGroup(jKey.getGroup());                                  rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY);                                  rcvryTrig.setPriority(ftRec.getPriority());                                  JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup());                                  jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName());                                  jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup());                                  jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp()));                                  jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_SCHEDULED_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getScheduleTimestamp()));                                  rcvryTrig.setJobDataMap(jd);                                    rcvryTrig.computeFirstFireTime(null);                                  storeTrigger(conn, rcvryTrig, null, false,                                          STATE_WAITING, false, true);                                  recoveredCount++;                              } else {                                  getLog()                                          .warn(                                                  "ClusterManager: failed job '"                                                          + jKey                                                          + "' no longer exists, cannot schedule recovery.");                                  otherCount++;                              }                          } else {                              otherCount++;                          }                            // free up stateful job's triggers                          if (ftRec.isJobDisallowsConcurrentExecution()) {                              getDelegate()                                      .updateTriggerStatesForJobFromOtherState(                                              conn, jKey,                                              STATE_WAITING, STATE_BLOCKED);                              getDelegate()                                      .updateTriggerStatesForJobFromOtherState(                                              conn, jKey,                                              STATE_PAUSED, STATE_PAUSED_BLOCKED);                          }                      }                        getDelegate().deleteFiredTriggers(conn,                              rec.getSchedulerInstanceId());                        // Check if any of the fired triggers we just deleted were the last fired trigger                      // records of a COMPLETE trigger.                      int completeCount = 0;                      for (TriggerKey triggerKey : triggerKeys) {                            if (getDelegate().selectTriggerState(conn, triggerKey).                                  equals(STATE_COMPLETE)) {                              List<FiredTriggerRecord> firedTriggers =                                      getDelegate().selectFiredTriggerRecords(conn, triggerKey.getName(), triggerKey.getGroup());                              if (firedTriggers.isEmpty()) {                                    if (removeTrigger(conn, triggerKey)) {                                      completeCount++;                                  }                              }                          }                      }                        logWarnIfNonZero(acquiredCount,                              "ClusterManager: ......Freed " + acquiredCount                                      + " acquired trigger(s).");                      logWarnIfNonZero(completeCount,                              "ClusterManager: ......Deleted " + completeCount                                      + " complete triggers(s).");                      logWarnIfNonZero(recoveredCount,                              "ClusterManager: ......Scheduled " + recoveredCount                                      + " recoverable job(s) for recovery.");                      logWarnIfNonZero(otherCount,                              "ClusterManager: ......Cleaned-up " + otherCount                                      + " other failed job(s).");                        if (!rec.getSchedulerInstanceId().equals(getInstanceId())) {                          getDelegate().deleteSchedulerState(conn,                                  rec.getSchedulerInstanceId());                      }                  }              } catch (Throwable e) {                  throw new JobPersistenceException("Failure recovering jobs: "                          + e.getMessage(), e);              }          }      }    

2.4 Master启动与执行流程

一文读懂,硬核 Apache DolphinScheduler3.0 源码解析

2.4.1 概念与执行逻辑



  • Scheduler(任务调度容器,一般都是StdScheduler实例)。

  • ProcessScheduleJob:(实现Quarts调度框架的Job接口的业务类,专门生成DolphinScheduler数据库业务表t_ds_commond数据);


  • NettyRemotingServer(netty服务端,包含netty服务端serverBootstrap对象与netty服务端业务处理对象serverHandler), NettyServerHandler:(netty服务端业务处理类:包含各类处理器以及处理器对应的执行线程池);

  • TaskPluginManager(任务插件管理器,不同类型的任务以插件的形式管理,在应用服务启动的时候,通过@AutoService加载实现了TaskChannelFactory接口的工厂信息到数据库,通过工厂对象来加载各类TaskChannel实现类到缓存);

  • MasterRegistryClient(master操作zk的客户端,封装了master对于zk的所有操作,注册,查询,删除等);

  • MasterSchedulerService(扫描服务,包含业务执行线程和work包含的nettyhe护短,负责任务调度业务,slot来控制集群模式下任务不被重复调度,底层实现是zookeeper分布式锁);

  • WorkflowExecuteThread(真正的业务处理线程,通过插槽获取命令commond,执行之前会校验slot的变化,如果变化不执行,关键功能就是构建任务相关的参数,定义,优先级等,然后发送到队列,供队列处理线程消费);

  • CommonTaskProcessor(普通任务处理器,实现ITaskProcessor接口,根据业务分为普通,依赖,子任务,阻塞,条件任务类型,包含了任务的提交,运行,分发,杀死等业务,通过@AutoService加载的类,根本就是封装了对);

  • TaskPriorityQueueImpl(任务队列,负责任务队列的存储控制);

  • TaskPriorityQueueConsumer(任务队列消费线程,负责任务的根据负载均衡策略在worker之间分发与执行);

  • ServerNodeManager (节点信息控制器,负责节点注册信息更新与槽位(slot)变更,底层实现是zookeeper分布式锁的应用);

  • EventExecuteService(事件处理线程,通过缓存起来的任务处理线程,处理每个任务在处理过程中注册在线程事件队列中的事件);

  • FailoverExecuteThread(故障转移线程,包含Master和worker的);

  • MasterRegistryDataListener(托管在zk管理框架cautor的故障监听器,负责对worker和master注册在zk上的节点的新增和删除)。


 private void failoverMasterWithLock(String masterHost) {          String failoverPath = getFailoverLockPath(NodeType.MASTER, masterHost);          try {              registryClient.getLock(failoverPath);              this.failoverMaster(masterHost);          } catch (Exception e) {              LOGGER.error("{} server failover failed, host:{}", NodeType.MASTER, masterHost, e);          } finally {              registryClient.releaseLock(failoverPath);          }      }   /**       * failover master       * <p>       * failover process instance and associated task instance       *故障转移流程实例和关联的任务实例       * @param masterHost master host       */      private void failoverMaster(String masterHost) {          if (StringUtils.isEmpty(masterHost)) {              return;          }          Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost);          long startTime = System.currentTimeMillis();          List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);"start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size());          List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);          for (ProcessInstance processInstance : needFailoverProcessInstanceList) {              if (Constants.NULL.equals(processInstance.getHost())) {                  continue;              }                List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());              for (TaskInstance taskInstance : validTaskInstanceList) {        "failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());                  failoverTaskInstance(processInstance, taskInstance, workerServers);              }                if (serverStartupTime != null && processInstance.getRestartTime() != null                  && processInstance.getRestartTime().after(serverStartupTime)) {                  continue;              }      "failover process instance id: {}", processInstance.getId());              //updateProcessInstance host is null and insert into command              processInstance.setHost(Constants.NULL);              processService.processNeedFailoverProcessInstances(processInstance);          }  "master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime);      }    

2.4.2 集群与槽(slot)




private void updateMasterNodes() {          MASTER_SLOT = 0;          MASTER_SIZE = 0;          this.masterNodes.clear();          String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;          try {              registryClient.getLock(nodeLock);              Collection<String> currentNodes = registryClient.getMasterNodesDirectly();              List<Server> masterNodes = registryClient.getServerList(NodeType.MASTER);              syncMasterNodes(currentNodes, masterNodes);          } catch (Exception e) {              logger.error("update master nodes error", e);          } finally {              registryClient.releaseLock(nodeLock);          }        }  /**       * sync master nodes       *       * @param nodes master nodes       */      private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {          masterLock.lock();          try {              String addr = NetUtils.getAddr(NetUtils.getHost(), masterConfig.getListenPort());              this.masterNodes.addAll(nodes);              this.masterPriorityQueue.clear();              this.masterPriorityQueue.putList(masterNodes);              int index = masterPriorityQueue.getIndex(addr);              if (index >= 0) {                  MASTER_SIZE = nodes.size();                  MASTER_SLOT = index;              } else {                  logger.warn("current addr:{} is not in active master list", addr);              }    "update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, addr);          } finally {              masterLock.unlock();          }      }    


/**       * 1. get command by slot       * 2. donot handle command if slot is empty       */      /** * 1. 通过插槽获取命令 * 2. 如果插槽为空,则不处理命令 */      private void scheduleProcess() throws Exception {          List<Command> commands = findCommands();          if (CollectionUtils.isEmpty(commands)) {              //indicate that no command ,sleep for 1s              Thread.sleep(Constants.SLEEP_TIME_MILLIS);              return;          }            List<ProcessInstance> processInstances = command2ProcessInstance(commands);          if (CollectionUtils.isEmpty(processInstances)) {              return;          }            for (ProcessInstance processInstance : processInstances) {              if (processInstance == null) {                  continue;              }                WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(                      processInstance                      , processService                      , nettyExecutorManager                      , processAlertManager                      , masterConfig                      , stateWheelExecuteThread);                this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);              if (processInstance.getTimeout() > 0) {                  stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);              }              workflowExecuteThreadPool.startWorkflow(workflowExecuteThread);          }      }  private List<Command> findCommands() {          int pageNumber = 0;          int pageSize = masterConfig.getFetchCommandNum();          List<Command> result = new ArrayList<>();          if (Stopper.isRunning()) {              int thisMasterSlot = ServerNodeManager.getSlot();              int masterCount = ServerNodeManager.getMasterSize();              if (masterCount > 0) {                  result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);              }          }          return result;      }  @Override      public List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) {          if (masterCount <= 0) {              return Lists.newArrayList();          }          return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot);      }         <select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">          select *          from t_ds_command          where id % #{masterCount} = #{thisMasterSlot}          order by process_instance_priority, id asc              limit #{limit} offset #{offset}      </select>    ##槽位检查   private List<ProcessInstance> command2ProcessInstance(List<Command> commands) {          List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));          CountDownLatch latch = new CountDownLatch(commands.size());          for (final Command command : commands) {              masterPrepareExecService.execute(() -> {                  try {                      // slot check again                      SlotCheckState slotCheckState = slotCheck(command);                      if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) {                "handle command {} skip, slot check state: {}", command.getId(), slotCheckState);                          return;                      }                      ProcessInstance processInstance = processService.handleCommand(logger,                              getLocalAddress(),                              command);                      if (processInstance != null) {                          processInstances.add(processInstance);                "handle command {} end, create process instance {}", command.getId(), processInstance.getId());                      }                  } catch (Exception e) {                      logger.error("handle command error ", e);                      processService.moveToErrorCommand(command, e.toString());                  } finally {                      latch.countDown();                  }              });          }            try {              // make sure to finish handling command each time before next scan              latch.await();          } catch (InterruptedException e) {              logger.error("countDownLatch await error ", e);          }            return processInstances;      }    private SlotCheckState slotCheck(Command command) {          int slot = ServerNodeManager.getSlot();          int masterSize = ServerNodeManager.getMasterSize();          SlotCheckState state;          if (masterSize <= 0) {              state = SlotCheckState.CHANGE;          } else if (command.getId() % masterSize == slot) {              state = SlotCheckState.PASS;          } else {              state = SlotCheckState.INJECT;          }          return state;      }    

2.4.3 代码执行流程

一文读懂,硬核 Apache DolphinScheduler3.0 源码解析



2.5.1 概念与执行逻辑

  • NettyRemotingServer(worker包含的netty服务端) WorkerRegistryClient(zk客户端,封装了worker与zk相关的操作,注册,查询,删除等) ;

  • TaskPluginManager(任务插件管理器,封装了插件加载逻辑和任务实际执行业务的抽象) ;

  • WorkerManagerThread(任务工作线程生成器,消费netty处理器推进队列的任务信息,并生成任务执行线程提交线程池管理) ;

  • TaskExecuteProcessor(Netty任务执行处理器,生成master分发到work的任务信息,并推送到队列) ;

  • TaskExecuteThread(任务执行线程) ;

  • TaskCallbackService(任务回调线程,与master包含的netty client通信);

  • AbstractTask(任务实际业务的抽象类,子类包含实际的任务执行业务,SqlTask,DataXTask等) ;

  • RetryReportTaskStatusThread(不关注)

2.5.2 代码执行流程


一文读懂,硬核 Apache DolphinScheduler3.0 源码解析


2.6 RPC交互



一文读懂,硬核 Apache DolphinScheduler3.0 源码解析

2.6.1 Master与Worker交互





 @PostConstruct      public void run() throws SchedulerException {          // init remoting server          NettyServerConfig serverConfig = new NettyServerConfig();          serverConfig.setListenPort(masterConfig.getListenPort());          this.nettyRemotingServer = new NettyRemotingServer(serverConfig);          this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);          this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);          this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);          this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);          this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);          this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);          this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);            // logger server          this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);          this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);          this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);          this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);            this.nettyRemotingServer.start();            // install task plugin          this.taskPluginManager.installPlugin();            // self tolerant          this.masterRegistryClient.init();          this.masterRegistryClient.start();          this.masterRegistryClient.setRegistryStoppable(this);            this.masterSchedulerService.init();          this.masterSchedulerService.start();            this.eventExecuteService.start();          this.failoverExecuteThread.start();            this.scheduler.start();            Runtime.getRuntime().addShutdownHook(new Thread(() -> {              if (Stopper.isRunning()) {                  close("shutdownHook");              }          }));      }   /**       * server start       */      public void start() {          if (isStarted.compareAndSet(false, true)) {              this.serverBootstrap                      .group(this.bossGroup, this.workGroup)                      .channel(NettyUtils.getServerSocketChannelClass())                      .option(ChannelOption.SO_REUSEADDR, true)                      .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())                      .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())                      .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())                      .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())                      .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())                      .childHandler(new ChannelInitializer<SocketChannel>() {                            @Override                          protected void initChannel(SocketChannel ch) {                              initNettyChannel(ch);                          }                      });                ChannelFuture future;              try {                  future = serverBootstrap.bind(serverConfig.getListenPort()).sync();              } catch (Exception e) {                  logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);                  throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));              }              if (future.isSuccess()) {        "NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());              } else if (future.cause() != null) {                  throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()), future.cause());              } else {                  throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));              }          }      }    


 /**       * constructor       */      public NettyExecutorManager() {          final NettyClientConfig clientConfig = new NettyClientConfig();          this.nettyRemotingClient = new NettyRemotingClient(clientConfig);      }  ##注册处理worker回调的处理器      @PostConstruct      public void init() {          this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);          this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);          this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);      }         public NettyRemotingClient(final NettyClientConfig clientConfig) {          this.clientConfig = clientConfig;          if (NettyUtils.useEpoll()) {              this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {                  private final AtomicInteger threadIndex = new AtomicInteger(0);                    @Override                  public Thread newThread(Runnable r) {                      return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));                  }              });          } else {              this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {                  private final AtomicInteger threadIndex = new AtomicInteger(0);                    @Override                  public Thread newThread(Runnable r) {                      return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));                  }              });          }          this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,                  new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),                  new CallerThreadExecutePolicy());          this.clientHandler = new NettyClientHandler(this, callbackExecutor);            this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));            this.start();      }   /**       * start       */   private void start() {            this.bootstrap                  .group(this.workerGroup)                  .channel(NettyUtils.getSocketChannelClass())                  .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())                  .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())                  .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())                  .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())                  .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())                  .handler(new ChannelInitializer<SocketChannel>() {                      @Override                      public void initChannel(SocketChannel ch) {                          ch.pipeline()                                  .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS))                                  .addLast(new NettyDecoder(), clientHandler, encoder);                      }                  });          this.responseFutureExecutor.scheduleAtFixedRate(ResponseFuture::scanFutureTable, 5000, 1000, TimeUnit.MILLISECONDS);          isStarted.compareAndSet(false, true);      }    


/**       * task dispatch       *       * @param context context       * @return result       * @throws ExecuteException if error throws ExecuteException       */      public Boolean dispatch(final ExecutionContext context) throws ExecuteException {          /**           * get executor manager           */          ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());          if (executorManager == null) {              throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());          }            /**           * host select           */            Host host =;          if (StringUtils.isEmpty(host.getAddress())) {              throw new ExecuteException(String.format("fail to execute : %s due to no suitable worker, "                              + "current task needs worker group %s to execute",                      context.getCommand(),context.getWorkerGroup()));          }          context.setHost(host);          executorManager.beforeExecute(context);          try {              /**               * task execute               */              return executorManager.execute(context);          } finally {              executorManager.afterExecute(context);          }      }      /**       * execute logic       *       * @param context context       * @return result       * @throws ExecuteException if error throws ExecuteException       */      @Override      public Boolean execute(ExecutionContext context) throws ExecuteException {            /**           *  all nodes           */          Set<String> allNodes = getAllNodes(context);            /**           * fail nodes           */          Set<String> failNodeSet = new HashSet<>();            /**           *  build command accord executeContext           */          Command command = context.getCommand();            /**           * execute task host           */          Host host = context.getHost();          boolean success = false;          while (!success) {              try {                  doExecute(host, command);                  success = true;                  context.setHost(host);              } catch (ExecuteException ex) {                  logger.error(String.format("execute command : %s error", command), ex);                  try {                      failNodeSet.add(host.getAddress());                      Set<String> tmpAllIps = new HashSet<>(allNodes);                      Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);                      if (remained != null && remained.size() > 0) {                          host = Host.of(remained.iterator().next());                          logger.error("retry execute command : {} host : {}", command, host);                      } else {                          throw new ExecuteException("fail after try all nodes");                      }                  } catch (Throwable t) {                      throw new ExecuteException("fail after try all nodes");                  }              }          }            return success;      }      /**       * execute logic       *       * @param host host       * @param command command       * @throws ExecuteException if error throws ExecuteException       */      public void doExecute(final Host host, final Command command) throws ExecuteException {          /**           * retry count,default retry 3           */          int retryCount = 3;          boolean success = false;          do {              try {                  nettyRemotingClient.send(host, command);                  success = true;              } catch (Exception ex) {                  logger.error(String.format("send command : %s to %s error", command, host), ex);                  retryCount--;                  ThreadUtils.sleep(100);              }          } while (retryCount >= 0 && !success);            if (!success) {              throw new ExecuteException(String.format("send command : %s to %s error", command, host));          }      }      /**       * send task       *       * @param host host       * @param command command       */      public void send(final Host host, final Command command) throws RemotingException {          Channel channel = getChannel(host);          if (channel == null) {              throw new RemotingException(String.format("connect to : %s fail", host));          }          try {              ChannelFuture future = channel.writeAndFlush(command).await();              if (future.isSuccess()) {                  logger.debug("send command : {} , to : {} successfully.", command, host.getAddress());              } else {                  String msg = String.format("send command : %s , to :%s failed", command, host.getAddress());                  logger.error(msg, future.cause());                  throw new RemotingException(msg);              }          } catch (Exception e) {              logger.error("Send command {} to address {} encounter error.", command, host.getAddress());              throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e);          }      }    



/**       * worker server run       */      @PostConstruct      public void run() {          // init remoting server          NettyServerConfig serverConfig = new NettyServerConfig();          serverConfig.setListenPort(workerConfig.getListenPort());          this.nettyRemotingServer = new NettyRemotingServer(serverConfig);          this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);          this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);          this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);          this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);          this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);            // logger server          this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);          this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);          this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);          this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);            this.nettyRemotingServer.start();            // install task plugin          this.taskPluginManager.installPlugin();            // worker registry          try {              this.workerRegistryClient.registry();              this.workerRegistryClient.setRegistryStoppable(this);              Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();                this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);          } catch (Exception e) {              logger.error(e.getMessage(), e);              throw new RuntimeException(e);          }            // task execute manager          this.workerManagerThread.start();            // retry report task status          this.retryReportTaskStatusThread.start();            /*           * registry hooks, which are called before the process exits           */          Runtime.getRuntime().addShutdownHook(new Thread(() -> {              if (Stopper.isRunning()) {                  close("shutdownHook");              }          }));      }    


 public TaskCallbackService() {          final NettyClientConfig clientConfig = new NettyClientConfig();          this.nettyRemotingClient = new NettyRemotingClient(clientConfig);          this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningProcessor);          this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);      }    


/**       * send result       *       * @param taskInstanceId taskInstanceId       * @param command command       */      public void send(int taskInstanceId, Command command) {          NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);          if (nettyRemoteChannel != null) {              nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener() {                    @Override                  public void operationComplete(ChannelFuture future) throws Exception {                      if (future.isSuccess()) {                          // remove(taskInstanceId);                          return;                      }                  }              });          }      }    

2.6.2 其他服务与Master交互


 public Result<String> queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,                                     @RequestParam(value = "taskInstanceId") int taskInstanceId,                                     @RequestParam(value = "skipLineNum") int skipNum,                                     @RequestParam(value = "limit") int limit) {          return loggerService.queryLog(taskInstanceId, skipNum, limit);      }    
 /**       * view log       *       * @param taskInstId task instance id       * @param skipLineNum skip line number       * @param limit limit       * @return log string data       */      @Override      @SuppressWarnings("unchecked")      public Result<String> queryLog(int taskInstId, int skipLineNum, int limit) {            TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);            if (taskInstance == null) {              return Result.error(Status.TASK_INSTANCE_NOT_FOUND);          }          if (StringUtils.isBlank(taskInstance.getHost())) {              return Result.error(Status.TASK_INSTANCE_HOST_IS_NULL);          }          Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());          String log = queryLog(taskInstance,skipLineNum,limit);          result.setData(log);          return result;      }    
/**       * query log       *       * @param taskInstance  task instance       * @param skipLineNum skip line number       * @param limit       limit       * @return log string data       */      private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {          Host host = Host.of(taskInstance.getHost());  "log host : {} , logPath : {} , port : {}", host.getIp(), taskInstance.getLogPath(),                  host.getPort());            StringBuilder log = new StringBuilder();          if (skipLineNum == 0) {              String head = String.format(LOG_HEAD_FORMAT,                      taskInstance.getLogPath(),                      host,                      Constants.SYSTEM_LINE_SEPARATOR);              log.append(head);          }            log.append(logClient                  .rollViewLog(host.getIp(), host.getPort(), taskInstance.getLogPath(), skipLineNum, limit));            return log.toString();      }    
 /**       * roll view log       *       * @param host host       * @param port port       * @param path path       * @param skipLineNum skip line number       * @param limit limit       * @return log content       */      public String rollViewLog(String host, int port, String path, int skipLineNum, int limit) {"roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit);          RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);          String result = "";          final Host address = new Host(host, port);          try {              Command command = request.convert2Command();              Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);              if (response != null) {                  RollViewLogResponseCommand rollReviewLog = JSONUtils.parseObject(                          response.getBody(), RollViewLogResponseCommand.class);                  return rollReviewLog.getMsg();              }          } catch (Exception e) {              logger.error("roll view log error", e);          } finally {              this.client.closeChannel(address);          }          return result;      }    
 /**       * sync send       *       * @param host host       * @param command command       * @param timeoutMillis timeoutMillis       * @return command       */      public Command sendSync(final Host host, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {          final Channel channel = getChannel(host);          if (channel == null) {              throw new RemotingException(String.format("connect to : %s fail", host));          }          final long opaque = command.getOpaque();          final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);          channel.writeAndFlush(command).addListener(future -> {              if (future.isSuccess()) {                  responseFuture.setSendOk(true);                  return;              } else {                  responseFuture.setSendOk(false);              }              responseFuture.setCause(future.cause());              responseFuture.putResponse(null);              logger.error("send command {} to host {} failed", command, host);          });          /*           * sync wait for result           */          Command result = responseFuture.waitResponse();          if (result == null) {              if (responseFuture.isSendOK()) {                  throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());              } else {                  throw new RemotingException(host.toString(), responseFuture.getCause());              }          }          return result;      }    


 /**       * construct client       */      public LogClientService() {          this.clientConfig = new NettyClientConfig();          this.clientConfig.setWorkerThreads(4);          this.client = new NettyRemotingClient(clientConfig);          this.isRunning = true;      }    

2.7 负载均衡算法

Master在选择执行器的时候DolphinScheduler提供了三种负载均衡算法,且所有的算法都用到了节点权重:加权随机(random),平滑轮询(roundrobin),线性负载(lowerweight)。通过配置文件来控制到底使用哪一个负载均衡策略,默认配置是权重策略:host-selector: lower_weight。

@Bean      public HostManager hostManager() {          HostSelector selector = masterConfig.getHostSelector();          HostManager hostManager;          switch (selector) {              case RANDOM:                  hostManager = new RandomHostManager();                  break;              case ROUND_ROBIN:                  hostManager = new RoundRobinHostManager();                  break;              case LOWER_WEIGHT:                  hostManager = new LowerWeightHostManager();                  break;              default:                  throw new IllegalArgumentException("unSupport selector " + selector);          }          beanFactory.autowireBean(hostManager);          return hostManager;      }    

2.7.1 加权随机


  @Override      public HostWorker doSelect(final Collection<HostWorker> source) {            List<HostWorker> hosts = new ArrayList<>(source);          int size = hosts.size();          int[] weights = new int[size];          int totalWeight = 0;          int index = 0;            for (HostWorker host : hosts) {              totalWeight += host.getHostWeight();              weights[index] = host.getHostWeight();              index++;          }            if (totalWeight > 0) {              int offset = ThreadLocalRandom.current().nextInt(totalWeight);                for (int i = 0; i < size; i++) {                  offset -= weights[i];                  if (offset < 0) {                      return hosts.get(i);                  }              }          }          return hosts.get(ThreadLocalRandom.current().nextInt(size));      }    

2.7.2 线性负载


private double calculateWeight(double cpu, double memory, double loadAverage, long startTime) {          double calculatedWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR;          long uptime = System.currentTimeMillis() - startTime;          if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {              // If the warm-up is not over, add the weight              return calculatedWeight * Constants.WARM_UP_TIME / uptime;          }          return calculatedWeight;      }    


/**       * select       *       * @param sources sources       * @return HostWeight       */      @Override      public HostWeight doSelect(Collection<HostWeight> sources) {          double totalWeight = 0;          double lowWeight = 0;          HostWeight lowerNode = null;          for (HostWeight hostWeight : sources) {              totalWeight += hostWeight.getWeight();              hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());              if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) {                  lowerNode = hostWeight;                  lowWeight = hostWeight.getCurrentWeight();              }          }          lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);          return lowerNode;        }    

2.7.3 平滑轮询


 @Override      public HostWorker doSelect(Collection<HostWorker> source) {            List<HostWorker> hosts = new ArrayList<>(source);          String key = hosts.get(0).getWorkerGroup();          ConcurrentMap<String, WeightedRoundRobin> map = workGroupWeightMap.get(key);          if (map == null) {              workGroupWeightMap.putIfAbsent(key, new ConcurrentHashMap<>());              map = workGroupWeightMap.get(key);          }            int totalWeight = 0;          long maxCurrent = Long.MIN_VALUE;          long now = System.currentTimeMillis();          HostWorker selectedHost = null;          WeightedRoundRobin selectWeightRoundRobin = null;            for (HostWorker host : hosts) {              String workGroupHost = host.getWorkerGroup() + host.getAddress();              WeightedRoundRobin weightedRoundRobin = map.get(workGroupHost);              int weight = host.getHostWeight();              if (weight < 0) {                  weight = 0;              }                if (weightedRoundRobin == null) {                  weightedRoundRobin = new WeightedRoundRobin();                  // set weight                  weightedRoundRobin.setWeight(weight);                  map.putIfAbsent(workGroupHost, weightedRoundRobin);                  weightedRoundRobin = map.get(workGroupHost);              }              if (weight != weightedRoundRobin.getWeight()) {                  weightedRoundRobin.setWeight(weight);              }                long cur = weightedRoundRobin.increaseCurrent();              weightedRoundRobin.setLastUpdate(now);              if (cur > maxCurrent) {                  maxCurrent = cur;                  selectedHost = host;                  selectWeightRoundRobin = weightedRoundRobin;              }                totalWeight += weight;          }            if (!updateLock.get() && hosts.size() != map.size() && updateLock.compareAndSet(false, true)) {              try {                  ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);                  newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);                  workGroupWeightMap.put(key, newMap);              } finally {                  updateLock.set(false);              }          }            if (selectedHost != null) {              selectWeightRoundRobin.sel(totalWeight);              return selectedHost;          }            return hosts.get(0);      }    

2.8 日志服务


2.9 报警


3 后记

3.1 Make friends


Apache DolphinScheduler Slack群链接:

3.2 参考文献





一文读懂,硬核 Apache DolphinScheduler3.0 源码解析

非常欢迎大家加入 DolphinScheduler 大家庭,融入开源世界!

我们鼓励任何形式的参与社区,最终成为 Committer 或 PPMC,如:

  • 将遇到的问题通过 GitHub 上 issue 的形式反馈出来。

  • 回答别人遇到的 issue 问题。

  • 帮助完善文档。

  • 帮助项目增加测试用例。

  • 为代码添加注释。

  • 提交修复 Bug 或者 Feature 的 PR。

  • 发表应用案例实践、调度流程分析或者与调度相关的技术文章。

  • 帮助推广 DolphinScheduler,参与技术大会或者 meetup 的分享等。

欢迎加入贡献的队伍,加入开源从提交第一个 PR 开始。

  • 比如添加代码注释或找到带有 ”easy to fix” 标记或一些非常简单的 issue(拼写错误等) 等等,先通过第一个简单的 PR 熟悉提交流程。

注:贡献不仅仅限于 PR 哈,对促进项目发展的都是贡献。

相信参与 DolphinScheduler,一定会让您从开源中受益!


随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。

参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:

贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。





参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手,手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。



作者: 博拉资讯


