Backgroud:

At present, the problems of master:

There are many polling, that result in unnecessary time-cost

The distributed lock is used when the command is taken, that result in the
bottleneck of concurrency

Too many threads(nProcessInstance*nTaskInstances) are used, that result in
the waste of system resources

Polling database result in database query pressure bottleneck


*refer issues: *

backgroud:  https://github.com/apache/incubator-dolphinscheduler/issues/4083

propose:  https://github.com/apache/incubator-dolphinscheduler/issues/4355

*propose:*
1. Reconstruct the communication function

   -

   Synchronization: sending thread sends message (blocking) - > the
   receiver receives the message and processes the message (storing dB,
   writing data, etc.) - > the receiver returns the message to the sender - >
   the sender unblocks
   -

   Asynchronous: send thread send message, send thread cache message,
   receive message and process message, reply to sender command after
   processing message, remove cache message after receiving command

2. Add scheduler function
https://user-images.githubusercontent.com/29528966/103751033-a0286e80-5042-11eb-8ff6-4baf1b76d536.png


   -

   Realize ha function
   -

   To implement the scheduler start process, scan the CMD table first, and
   then start the monitoring function of CMD to cache the CMD data to the
   local queue
   -

   Monitor the receiving process of CMD (synchronization)
   -

   CMD caches the queue processing thread and sends the CMD to the master
   according to the policy
   -

   The implementation of CMD sending policy can support multiple policies
   at the same time, and can be easily extended, such as the priority of CMD,
   the load of master, and so on

3. API

   - API receives the execution workflow command or pause / stop command,
   and sends the command to the specified scheduler / master. If it fails, it
   will try again three times. If it fails three times, it will throw a
   failure.

4. Fault tolerant process modification:

   -

   Master fault tolerance
   -

   Workflow instance responsible for fault tolerant master: find the
   unfinished workflow instance, generate fault tolerant CMD, and send it to
   active scheduler
   -

      Find the unfinished workflow instance and generate the fault-tolerant
      CMD
      -

      Send CMD to active scheduler
      -

   Find the unfinished command and send it to the scheduler for
   reprocessing (remove the host and reassign the master)
   -

      The unfinished command is found according to the host of the dead
      Master
      -

      Send the command back to the scheduler and let the scheduler
      reallocate it
      -

   Worker fault tolerance
   -

      When the worker hangs up, the master listens to the message and finds
      out all the unfinished tasks on the worker from the DB
      -

      Judge whether the DAG of the task belongs to itself, and change the
      state of the task to "fault tolerant"
      -

      Trigger task status modification

5. Master execution process

https://user-images.githubusercontent.com/29528966/103524201-1a78b780-4eb8-11eb-95bc-a7c0dbc5d2af.png

   -

   Modify the process pool of master processing workflow, from obtaining
   CMD to generating workflow instance to starting task submission to the end
   of queue
   -

      When the master submits a task, it is found that the task is in
      progress and needs to be updated
      -

      Check (whether the task instance worker is alive + worker start time
      < task start time). If the worker has fault tolerance, modify the task
      state to "fault tolerance" and trigger the task state change
      -

      Inform the worker where the task is, and change the host reported by
      the worker to the current master
      -

   Add master task status monitoring
   -

      Receive task / workflow status from worker / API / Master
      -

      Save the received task status to the corresponding DAG's unprocessed
      status queue
      -

      The corresponding DAG cannot be found in the current task state, and
      the receive failure is required
      -

   Add the thread pool of master task state processing, all task states of
   the same workflow can only be processed sequentially
   -

      When the master receives a task, it needs to determine whether the
      DAG to which the task belongs is processed by the master. If not, it does
      not belong to its own processing and needs to discard this status
      -

      Determine whether a thread is already processing the DAG state. If it
      already exists, the DAG state will not be processed
      -

      Determine whether the task state exists in the list of unprocessed
      states in DAG. If not, discard the state
      -

      Start the thread to process the DAG task status, and add DAG as key
      and feature as value to the map to facilitate the previous judgment
      -

      After processing the task state, delete the map and release the thread
      -

   The polling thread is added to poll the external workflow / task status
   (dependent / sub workflow).
   -

      Query the status of dependent tasks
      -

      Query whether the dependent task component is successful
      -

      According to the combination of dependency conditions, the dependency
      state is generated
      -

      Query subworkflow status
      -

      Query parent child relationship table
      -

      Query the status of a subworkflow
      -

      Modify the state of a subtask based on the state of the subworkflow
      -

   Add a timeout monitoring queue, add the task / workflow that needs to be
   monitored to the queue, and a time wheel / thread will monitor the timeout.
   If a timeout occurs, the timeout processing will be triggered.
   -

      The newly submitted task adds a timeout monitoring task in the
      timeout queue
      -

      If the task is in the timeout queue and the timeout is triggered, it
      will alarm or kill the task according to the timeout policy
      -

      When the task status is completed, it is removed from the timeout
      queue
      -

   The master monitors the CMD thread, marks the CMD, and caches the CMD to
   the local CMD queue.
   - Listen to the received CMD, mark it as the host of the master, cache
      it to the local CMD queue, and return the success message of the sender
   -

   The CMDS that cannot be processed by the master can be fed back to the
   scheduler for redistribution
   - When the master processing the CMD thread finds that the resources are
      insufficient, it feeds the CMD back to the scheduler for
redistribution. If
      the feedback is successful, it is deleted from the local CMD queue
   -

   Actively report the resource usage, and the master reports the resource
   usage to the scheduler.
   - The master sends resource usage information to ZK and scheduler at the
      same time during heartbeat

6. Timing

   -

   Master timing: to prevent a timing from triggering multiple times: there
   may be multiple times of timing when the master sends the timing to the
   scheduler.
   -

      Using quartz distributed timing, each trigger timing
      -

      Add a unique index (definitionid + schedulertime + datetime) to the
      CMD table and the workflow instance table to prevent duplication.

==================================
1. 重构通信功能

   -

   同步: 发送线程send消息(阻塞) -> 接收方收到消息,并且处理消息(存db,写数据等)->接收方返回消息给发送方 -> 发送方解除阻塞
   -

   异步: 发送线程send消息->发送线程缓存消息-> 接收方收到消息,并处理->处理完消息回复发送方command ->
   发送方收到command,移除缓存消息

2. 新增scheduler功能

https://user-images.githubusercontent.com/29528966/103750880-622b4a80-5042-11eb-8523-5cddbccd9e09.png

   -

   实现ha功能
   -

   实现scheduler启动流程,先扫描cmd表,再启动监听cmd功能,将cmd数据缓存到本地队列
   -

   监听cmd接收处理流程(同步)
   -

   cmd缓存队列处理线程,将cmd根据策略发送给master
   -

   实现cmd发送策略,可以同时支持多个策略,且可以易扩展策略,例如:cmd的优先级,master的负载等等

3.API部分

   - api收到执行工作流命令,或者暂停/停止命令,将命令发送给指定的scheduler/master,失败则重试三次,三次失败以后抛出失败。

4. 容错流程修改:

   -

   master容错
   -

      容错master负责的工作流实例: 找到未完成的工作流实例,生成容错cmd,将cmd发给active的scheduler.
      - 找到未完成的工作流实例,生成容错的cmd
         - 将cmd发送给active的scheduler
      -

      找到还未处理完的command,发送到scheduler进行重新处理(去掉host,重新分配master)
      - 根据挂掉的master的host发现未处理完的command
         - 将command发回给scheduler,让scheduler重新分配
      -

   worker容错
   - worker挂掉,master监听到消息,从DB查出所有此worker上未完成的任务
      - 判断任务所属DAG是否属于自身,将属于自身的任务状态改为“容错”
      - 触发任务状态修改

5. master执行流程

https://user-images.githubusercontent.com/29528966/103628867-8c193a00-4f7a-11eb-9762-5d4cb20aabaf.png

   -

   修改master处理工作流线程池,从获取cmd到生成工作流实例,到开始任务全部提交到队列结束
   - master提交任务的时候,发现此任务正在执行,需要
         - 检查(任务实例worker是否活着+worker启动时间<任务开始时间
         ),如果worker发生了容错,则修改任务状态为“容错”,并触发任务状态变化
         - 通知任务所在的worker,将worker汇报的host更换到当前master
      -

   增加master任务状态监听
   - 从worker/api/master收到任务/工作流状态
      - 将接收到的任务状态存到对应DAG的未处理状态队列
      - 当前任务状态找不到对应的DAG,需要返回接收失败
   -

   增加master任务状态处理线程池,同一个工作流的所有任务状态只能顺序处理
   - master接收到任务,需要判断此任务所属的DAG是否本master处理,如不是,则不属于自己处理,需要丢弃这个状态
      - 判断是否已经有线程在处理这个DAG的状态,如果已经存在,则不处理这个状态
      - 判断此任务状态是否存在于DAG的未处理状态列表中,如果不存在,则丢弃这个状态
      - 启动线程处理DAG任务状态,将DAG为key,feature为value,加入map,方便前面判断使用
      - 处理完任务状态以后,将map删除,线程释放
   -

   增加轮询线程,针对需要获取外部工作流/任务状态的需求(依赖/子工作流),进行轮询。
   - 查询依赖任务的状态
         - 查询依赖的任务组件是否成功
         - 根据依赖条件组合生成依赖状态
      - 查询子工作流状态
         - 查询父子关系表
         - 查询子工作流的状态
         - 根据子工作流状态修改子任务的状态
      -

   增加超时监控队列,将需要监控的任务/工作流加入队列,由一个时间轮/线程进行超时监控,发生超时,则触发超时处理。
   - 新提交的任务在超时队列中新增超时监控任务
      - 任务在超时队列中,触发超时,则根据超时策略,报警或者kill任务
      - 任务状态完成则从超时队列中删除
   -

   master监听cmd线程,对cmd标记,cmd缓存到本地cmd队列。
   - 监听收到的cmd, 将cmd标记master所在的host,同时缓存到本地cmd队列,返回消息发送方成功消息
   -

   master处理不完的cmd可以反馈回scheduler进行重新分发
   - master处理cmd线程发现资源不够,将cmd反馈给scheduler进行重新分发,反馈成功则从本地cmd队列删除
   -

   主动上报资源使用情况,master向scheduler汇报资源使用情况。
   - master在心跳时向zk和scheduler同时发送资源使用情况

6. 定时

   1. master定时:防止一个定时触发多次:master将定时发给scheduler过程中可能会有多次定时出现。
      - 使用quartz分布式定时,每次触发定时
      - 对cmd表和工作流实例表增加唯一索引(definitionId+schedulerTime+dateTime)防止产生重复问题。


Please feel free to discuss and give you suggestion,thx

-- 
DolphinScheduler(Incubator)  PPMC
BaoLiang 鲍亮
[email protected]

Reply via email to