Currently the communication between master and worker is through zk as a queue

1.master inserts tasks into zk and db as transactions

@Transactional(rollbackFor = Exception.class)
    public TaskInstance submitTask(TaskInstance taskInstance, ProcessInstance 
processInstance){
        logger.info("start submit task : {}, instance id:{}, state: {}, ",
                taskInstance.getName(), processInstance.getId(), 
processInstance.getState() );
        processInstance = 
this.findProcessInstanceDetailById(processInstance.getId());
        //submit to mysql
        TaskInstance task= submitTaskInstanceToMysql(taskInstance, 
processInstance);
        if(task.isSubProcess() && !task.getState().typeIsFinished()){
            ProcessInstanceMap processInstanceMap = 
setProcessInstanceMap(processInstance, task);

            TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), 
TaskNode.class);
            Map<String, String> subProcessParam = 
JSONUtils.toMap(taskNode.getParams());
            Integer defineId = 
Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
            createSubWorkProcessCommand(processInstance, processInstanceMap, 
defineId, task);
        }else if(!task.getState().typeIsFinished()){
            //submit to task queue
            
task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
            submitTaskToQueue(task);
        }
        logger.info("submit task :{} state:{} complete, instance id:{} state: 
{}  ",
                taskInstance.getName(), task.getState(), 
processInstance.getId(), processInstance.getState());
        return task;
    }

2.worker fetch task from zk and query db.if both exist,submit task to execute
another situation is db task insert is slow . then

    /**
     * wait for task instance exists, because of db action would be delayed.
     *
     * @throws Exception exception
     */
    private void waitForTaskInstance()throws Exception{
        int retryTimes = 30;
        while (taskInstance == null && retryTimes > 0) {
            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
            taskInstance = processDao.findTaskInstanceById(taskInstId);
            retryTimes--;
        }
    }

The worker waits for the db 30s to insert the task instance. If it exceeds 30s, 
the task corresponding to the zk queue is deleted.
This will cause the task to be in the task submited state and the process 
instance to be running.



In this case :
master to ensure data consistency or other ?

Welcome to discuss

thx

―――――――――――――
DolphinScheduler(Incubator)  PPMC
Zhanwei Qiao 乔占卫

[email protected]

Reply via email to