caishunfeng commented on code in PR #15278:
URL:
https://github.com/apache/dolphinscheduler/pull/15278#discussion_r1415486611
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java:
##########
@@ -17,38 +17,38 @@
package org.apache.dolphinscheduler.server.master.runner;
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable;
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
-import java.util.concurrent.DelayQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.springframework.stereotype.Component;
/**
*
*/
@Component
-public class MasterDelayTaskExecuteRunnableDelayQueue {
+public class GlobalMasterTaskExecuteRunnableQueue {
- private final DelayQueue<MasterDelayTaskExecuteRunnable>
masterDelayTaskExecuteRunnableDelayQueue =
- new DelayQueue<>();
+ private final BlockingQueue<MasterTaskExecutor>
masterTaskExecutorBlockingQueue =
+ new LinkedBlockingQueue<>();
- public boolean
submitMasterDelayTaskExecuteRunnable(MasterDelayTaskExecuteRunnable
masterDelayTaskExecuteRunnable) {
- return
masterDelayTaskExecuteRunnableDelayQueue.offer(masterDelayTaskExecuteRunnable);
+ public boolean submitMasterTaskExecuteRunnable(MasterTaskExecutor
masterTaskExecutor) {
+ return masterTaskExecutorBlockingQueue.offer(masterTaskExecutor);
}
- public MasterDelayTaskExecuteRunnable takeMasterDelayTaskExecuteRunnable()
throws InterruptedException {
- return masterDelayTaskExecuteRunnableDelayQueue.take();
+ public MasterTaskExecutor takeMasterTaskExecuteRunnable() throws
InterruptedException {
+ return masterTaskExecutorBlockingQueue.take();
}
// todo: if we move the delay process to master, than we don't need this
method, since dispatchProcess can directly
Review Comment:
I think this todo can be remove in this pr.
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java:
##########
@@ -29,7 +27,7 @@
@Component
public class GlobalTaskDispatchWaitingQueue {
- private final PriorityBlockingQueue<DefaultTaskExecuteRunnable> queue =
new PriorityBlockingQueue<>();
+ private final DelayQueue<DefaultTaskExecuteRunnable> queue = new
DelayQueue<>();
Review Comment:
Should be DelayPriorityQueue?
##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java:
##########
@@ -17,19 +17,41 @@
package org.apache.dolphinscheduler.server.master.runner.operator;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
import
org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue;
-import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public abstract class BaseTaskExecuteRunnableDispatchOperator implements
TaskExecuteRunnableOperator {
private final GlobalTaskDispatchWaitingQueue
globalTaskDispatchWaitingQueue;
- public
BaseTaskExecuteRunnableDispatchOperator(GlobalTaskDispatchWaitingQueue
globalTaskDispatchWaitingQueue) {
+ private final TaskInstanceDao taskInstanceDao;
+
+ public BaseTaskExecuteRunnableDispatchOperator(
+
GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue,
+ TaskInstanceDao
taskInstanceDao) {
this.globalTaskDispatchWaitingQueue = globalTaskDispatchWaitingQueue;
+ this.taskInstanceDao = taskInstanceDao;
}
@Override
public void operate(DefaultTaskExecuteRunnable taskExecuteRunnable) {
+ long remainTime = taskExecuteRunnable.getDelay(TimeUnit.SECONDS);
+ TaskInstance taskInstance = taskExecuteRunnable.getTaskInstance();
+ if (remainTime > 0) {
+ taskInstance.setState(TaskExecutionStatus.DELAY_EXECUTION);
+ taskInstanceDao.updateById(taskInstance);
+ log.info("Current taskInstance: {} is choose delay execution,
delay time: {}/s, remainTime: {}/s",
Review Comment:
```suggestion
log.info("Current taskInstance: {} is choose delay execution,
delay time: {}/ms, remainTime: {}/s",
```
##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorFactory.java:
##########
@@ -28,24 +28,23 @@
import lombok.NonNull;
-public abstract class WorkerDelayTaskExecuteRunnableFactory<T extends
WorkerDelayTaskExecuteRunnable>
+public class DefaultWorkerTaskExecutorFactory
implements
- WorkerTaskExecuteRunnableFactory<T> {
-
- protected final @NonNull TaskExecutionContext taskExecutionContext;
- protected final @NonNull WorkerConfig workerConfig;
- protected final @NonNull WorkerMessageSender workerMessageSender;
- protected final @NonNull TaskPluginManager taskPluginManager;
- protected final @Nullable StorageOperate storageOperate;
- protected final @NonNull WorkerRegistryClient workerRegistryClient;
-
- protected WorkerDelayTaskExecuteRunnableFactory(
- @NonNull
TaskExecutionContext taskExecutionContext,
- @NonNull WorkerConfig
workerConfig,
- @NonNull
WorkerMessageSender workerMessageSender,
- @NonNull TaskPluginManager
taskPluginManager,
- @Nullable StorageOperate
storageOperate,
- @NonNull
WorkerRegistryClient workerRegistryClient) {
+ WorkerTaskExecutorFactory<DefaultWorkerTaskExecutor> {
+
+ private final @NonNull TaskExecutionContext taskExecutionContext;
+ private final @NonNull WorkerConfig workerConfig;
+ private final @NonNull WorkerMessageSender workerMessageSender;
+ private final @NonNull TaskPluginManager taskPluginManager;
+ private final @Nullable StorageOperate storageOperate;
+ private final @NonNull WorkerRegistryClient workerRegistryClient;
+
+ public DefaultWorkerTaskExecutorFactory(@NonNull TaskExecutionContext
taskExecutionContext,
+ @NonNull WorkerConfig workerConfig,
+ @NonNull WorkerMessageSender
workerMessageSender,
+ @NonNull TaskPluginManager
taskPluginManager,
+ @Nullable StorageOperate
storageOperate,
+ @NonNull WorkerRegistryClient
workerRegistryClient) {
Review Comment:
Can we remove the param `workerRegistryClient`? Seems the task executor does
not need it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]