Currently the communication between master and worker is through zk as a queue
1.master inserts tasks into zk and db as transactions
@Transactional(rollbackFor = Exception.class)
public TaskInstance submitTask(TaskInstance taskInstance, ProcessInstance
processInstance){
logger.info("start submit task : {}, instance id:{}, state: {}, ",
taskInstance.getName(), processInstance.getId(),
processInstance.getState() );
processInstance =
this.findProcessInstanceDetailById(processInstance.getId());
//submit to mysql
TaskInstance task= submitTaskInstanceToMysql(taskInstance,
processInstance);
if(task.isSubProcess() && !task.getState().typeIsFinished()){
ProcessInstanceMap processInstanceMap =
setProcessInstanceMap(processInstance, task);
TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(),
TaskNode.class);
Map<String, String> subProcessParam =
JSONUtils.toMap(taskNode.getParams());
Integer defineId =
Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
createSubWorkProcessCommand(processInstance, processInstanceMap,
defineId, task);
}else if(!task.getState().typeIsFinished()){
//submit to task queue
task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
submitTaskToQueue(task);
}
logger.info("submit task :{} state:{} complete, instance id:{} state:
{} ",
taskInstance.getName(), task.getState(),
processInstance.getId(), processInstance.getState());
return task;
}
2.worker fetch task from zk and query db.if both exist,submit task to execute
another situation is db task insert is slow . then
/**
* wait for task instance exists, because of db action would be delayed.
*
* @throws Exception exception
*/
private void waitForTaskInstance()throws Exception{
int retryTimes = 30;
while (taskInstance == null && retryTimes > 0) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
taskInstance = processDao.findTaskInstanceById(taskInstId);
retryTimes--;
}
}
The worker waits for the db 30s to insert the task instance. If it exceeds 30s,
the task corresponding to the zk queue is deleted.
This will cause the task to be in the task submited state and the process
instance to be running.
In this case :
master to ensure data consistency or other ?
Welcome to discuss
thx
―――――――――――――
DolphinScheduler(Incubator) PPMC
Zhanwei Qiao 乔占卫
[email protected]