This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 885a37e4f [Feature][ST-Engine] Notice Task State By 
TaskExecutionService (#2567)
885a37e4f is described below

commit 885a37e4f312896c1c20211d82a84299464f6212
Author: Eric <[email protected]>
AuthorDate: Mon Sep 5 18:19:21 2022 +0800

    [Feature][ST-Engine] Notice Task State By TaskExecutionService (#2567)
    
    * add updateTaskExecutionState method to SeaTunnelServer
    
    * complete notice task state by TaskExecutionService
    
    * delete no use variable
    
    * impove code
    
    * fix review problem
---
 .../engine/e2e/engine/JobExecutionIT.java          |  21 ++-
 .../engine/client/SeaTunnelClientTest.java         |  11 +-
 .../exception/JobNoEnoughResourceException.java    |   4 +-
 .../seatunnel/engine/server/SeaTunnelServer.java   |  17 ++-
 .../engine/server/dag/physical/PhysicalPlan.java   |   2 +-
 .../server/dag/physical/PhysicalPlanGenerator.java |  35 +++--
 .../engine/server/dag/physical/PhysicalVertex.java | 169 +++++++++------------
 .../engine/server/dag/physical/SubPlan.java        |  22 ++-
 .../seatunnel/engine/server/master/JobMaster.java  |  31 +++-
 .../operation/NotifyTaskStatusOperation.java       |   4 +-
 .../server/scheduler/PipelineBaseScheduler.java    |   2 +-
 .../server/task/operation/DeployTaskOperation.java |  15 +-
 .../operation/GetTaskGroupAddressOperation.java    |   2 +-
 .../task/operation/sink/SinkRegisterOperation.java |   2 +-
 14 files changed, 184 insertions(+), 153 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
index f67042748..8fb21b51b 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
@@ -84,14 +84,14 @@ public class JobExecutionIT {
         try {
             final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
 
-            CompletableFuture<Object> objectCompletableFuture = 
CompletableFuture.supplyAsync(() -> {
-                JobStatus jobStatus = clientJobProxy.waitForJobComplete();
-                Assert.assertEquals(JobStatus.FINISHED, jobStatus);
-                return null;
+            CompletableFuture<JobStatus> objectCompletableFuture = 
CompletableFuture.supplyAsync(() -> {
+                return clientJobProxy.waitForJobComplete();
             });
 
             await().atMost(20000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> 
Assert.assertTrue(objectCompletableFuture.isDone()));
+                .untilAsserted(() -> Assert.assertTrue(
+                    objectCompletableFuture.isDone() && 
JobStatus.FINISHED.equals(objectCompletableFuture.get())));
+
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -115,16 +115,15 @@ public class JobExecutionIT {
             JobStatus jobStatus1 = clientJobProxy.getJobStatus();
             Assert.assertFalse(jobStatus1.isEndState());
             ClientJobProxy finalClientJobProxy = clientJobProxy;
-            CompletableFuture<Object> objectCompletableFuture = 
CompletableFuture.supplyAsync(() -> {
-                JobStatus jobStatus = finalClientJobProxy.waitForJobComplete();
-                Assert.assertEquals(JobStatus.CANCELED, jobStatus);
-                return null;
+            CompletableFuture<JobStatus> objectCompletableFuture = 
CompletableFuture.supplyAsync(() -> {
+                return finalClientJobProxy.waitForJobComplete();
             });
             Thread.sleep(1000);
             clientJobProxy.cancelJob();
 
-            await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> 
Assert.assertTrue(objectCompletableFuture.isDone()));
+            await().atMost(20000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> Assert.assertTrue(
+                    objectCompletableFuture.isDone() && 
JobStatus.CANCELED.equals(objectCompletableFuture.get())));
 
         } catch (Exception e) {
             throw new RuntimeException(e);
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 540ff2074..f3a8d4445 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -83,14 +83,13 @@ public class SeaTunnelClientTest {
 
         try {
             final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
-            CompletableFuture<Object> objectCompletableFuture = 
CompletableFuture.supplyAsync(() -> {
-                JobStatus jobStatus = clientJobProxy.waitForJobComplete();
-                Assert.assertEquals(JobStatus.FINISHED, jobStatus);
-                return null;
+            CompletableFuture<JobStatus> objectCompletableFuture = 
CompletableFuture.supplyAsync(() -> {
+                return clientJobProxy.waitForJobComplete();
             });
 
-            await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> 
Assert.assertTrue(objectCompletableFuture.isDone()));
+            await().atMost(20000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> Assert.assertTrue(
+                    objectCompletableFuture.isDone() && 
JobStatus.FINISHED.equals(objectCompletableFuture.get())));
 
         } catch (ExecutionException | InterruptedException e) {
             throw new RuntimeException(e);
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNoEnoughResourceException.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNoEnoughResourceException.java
index 9a0b89565..81653542b 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNoEnoughResourceException.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/JobNoEnoughResourceException.java
@@ -18,8 +18,8 @@
 package org.apache.seatunnel.engine.common.exception;
 
 public class JobNoEnoughResourceException extends SeaTunnelEngineException {
-    public JobNoEnoughResourceException(String jobName, long jobId, int 
pipelineIndex, int totalPipelineNum) {
-        super(String.format("Job %s (%s), Pipeline [(%s/%s)] have no enough 
resource.", jobName, jobId, pipelineIndex + 1, totalPipelineNum));
+    public JobNoEnoughResourceException(String jobName, long jobId, int 
pipelineId, int totalPipelineNum) {
+        super(String.format("Job %s (%s), Pipeline [(%s/%s)] have no enough 
resource.", jobName, jobId, pipelineId + 1, totalPipelineNum));
     }
 
     public JobNoEnoughResourceException(String message) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 7c912f304..56c006206 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -19,8 +19,11 @@ package org.apache.seatunnel.engine.server;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.exception.JobException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.master.JobMaster;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import 
org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
@@ -225,7 +228,7 @@ public class SeaTunnelServer implements ManagedService, 
MembershipAwareService,
             return new 
PassiveCompletableFuture<>(CompletableFuture.supplyAsync(() -> {
                 runningJobMaster.cancelJob();
                 return null;
-            }));
+            }, executorService));
         }
     }
 
@@ -237,4 +240,16 @@ public class SeaTunnelServer implements ManagedService, 
MembershipAwareService,
         }
         return runningJobMaster.getJobStatus();
     }
+
+    /**
+     * When TaskGroup ends, it is called by {@link TaskExecutionService} to 
notify JobMaster the TaskGroup's state.
+     */
+    public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+        TaskGroupLocation taskGroupLocation = 
taskExecutionState.getTaskGroupLocation();
+        JobMaster runningJobMaster = 
runningJobMasterMap.get(taskGroupLocation.getJobId());
+        if (runningJobMaster == null) {
+            throw new JobException(String.format("Job %s not running", 
taskGroupLocation.getJobId()));
+        }
+        runningJobMaster.updateTaskExecutionState(taskExecutionState);
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 6fd0fe132..e55b0da65 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -138,7 +138,7 @@ public class PhysicalPlan {
                 CompletableFuture<Void> future = 
CompletableFuture.supplyAsync(() -> {
                     pipeline.cancelPipeline();
                     return null;
-                });
+                }, executorService);
                 return future;
             }
             return null;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index df2b11f53..50fa26b7f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -169,7 +169,8 @@ public class PhysicalPlanGenerator {
                 initializationTimestamp,
                 physicalVertexList,
                 coordinatorVertexList,
-                jobImmutableInformation);
+                jobImmutableInformation,
+                executorService);
         });
 
         PhysicalPlan physicalPlan = new 
PhysicalPlan(subPlanStream.collect(Collectors.toList()),
@@ -204,7 +205,8 @@ public class PhysicalPlanGenerator {
                 if (sinkAggregatedCommitter.isPresent()) {
                     long taskGroupID = idGenerator.getNextId();
                     long taskTypeId = idGenerator.getNextId();
-                    TaskGroupLocation taskGroupLocation = new 
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, 
taskGroupID);
+                    TaskGroupLocation taskGroupLocation =
+                        new 
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, 
taskGroupID);
                     TaskLocation taskLocation = new 
TaskLocation(taskGroupLocation, taskTypeId, 0);
                     SinkAggregatedCommitterTask<?> t =
                         new 
SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(), taskLocation, s,
@@ -214,8 +216,7 @@ public class PhysicalPlanGenerator {
                     // checkpoint
                     pipelineTasks.add(taskLocation);
 
-                    return new PhysicalVertex(idGenerator.getNextId(),
-                        atomicInteger.incrementAndGet(),
+                    return new PhysicalVertex(atomicInteger.incrementAndGet(),
                         executorService,
                         collect.size(),
                         new TaskGroupDefaultImpl(taskGroupLocation, 
s.getName() + "-AggregatedCommitterTask",
@@ -245,13 +246,15 @@ public class PhysicalPlanGenerator {
                 long taskGroupIDPrefix = idGenerator.getNextId();
                 for (int i = 0; i < flow.getAction().getParallelism(); i++) {
                     long taskGroupID = mixIDPrefixAndIndex(taskGroupIDPrefix, 
i);
-                    TaskGroupLocation taskGroupLocation = new 
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, 
taskGroupID);
+                    TaskGroupLocation taskGroupLocation =
+                        new 
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, 
taskGroupID);
                     TaskLocation taskLocation = new 
TaskLocation(taskGroupLocation, taskIDPrefix, i);
                     setFlowConfig(flow, i);
-                    SeaTunnelTask seaTunnelTask = new 
TransformSeaTunnelTask(jobImmutableInformation.getJobId(), taskLocation, i, 
flow);
+                    SeaTunnelTask seaTunnelTask =
+                        new 
TransformSeaTunnelTask(jobImmutableInformation.getJobId(), taskLocation, i, 
flow);
                     // checkpoint
                     pipelineTasks.add(taskLocation);
-                    t.add(new PhysicalVertex(idGenerator.getNextId(),
+                    t.add(new PhysicalVertex(
                         i,
                         executorService,
                         flow.getAction().getParallelism(),
@@ -278,15 +281,17 @@ public class PhysicalPlanGenerator {
         return sources.stream().map(s -> {
             long taskGroupID = idGenerator.getNextId();
             long taskTypeId = idGenerator.getNextId();
-            TaskGroupLocation taskGroupLocation = new 
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, 
taskGroupID);
+            TaskGroupLocation taskGroupLocation =
+                new TaskGroupLocation(jobImmutableInformation.getJobId(), 
pipelineIndex, taskGroupID);
             TaskLocation taskLocation = new TaskLocation(taskGroupLocation, 
taskTypeId, 0);
-            SourceSplitEnumeratorTask<?> t = new 
SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(), taskLocation, 
s);
+            SourceSplitEnumeratorTask<?> t =
+                new 
SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(), taskLocation, 
s);
             // checkpoint
             pipelineTasks.add(taskLocation);
             startingTasks.add(taskLocation);
             enumeratorTaskIDMap.put(s, taskLocation);
 
-            return new PhysicalVertex(idGenerator.getNextId(),
+            return new PhysicalVertex(
                 atomicInteger.incrementAndGet(),
                 executorService,
                 sources.size(),
@@ -319,13 +324,15 @@ public class PhysicalPlanGenerator {
                 for (int i = 0; i < flow.getAction().getParallelism(); i++) {
                     int finalParallelismIndex = i;
                     long taskGroupID = mixIDPrefixAndIndex(taskGroupIDPrefix, 
i);
-                    TaskGroupLocation taskGroupLocation = new 
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, 
taskGroupID);
+                    TaskGroupLocation taskGroupLocation =
+                        new 
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, 
taskGroupID);
                     List<SeaTunnelTask> taskList =
                         flows.stream().map(f -> {
                             setFlowConfig(f, finalParallelismIndex);
                             long taskIDPrefix =
                                 
flowTaskIDPrefixMap.computeIfAbsent(f.getFlowID(), id -> 
idGenerator.getNextId());
-                            final TaskLocation taskLocation = new 
TaskLocation(taskGroupLocation, taskIDPrefix, finalParallelismIndex);
+                            final TaskLocation taskLocation =
+                                new TaskLocation(taskGroupLocation, 
taskIDPrefix, finalParallelismIndex);
                             // checkpoint
                             pipelineTasks.add(taskLocation);
                             if (f instanceof PhysicalExecutionFlow) {
@@ -343,7 +350,7 @@ public class PhysicalPlanGenerator {
 
                     if 
(taskList.stream().anyMatch(TransformSeaTunnelTask.class::isInstance)) {
                         // contains IntermediateExecutionFlow in task group
-                        t.add(new PhysicalVertex(idGenerator.getNextId(),
+                        t.add(new PhysicalVertex(
                             i,
                             executorService,
                             flow.getAction().getParallelism(),
@@ -358,7 +365,7 @@ public class PhysicalPlanGenerator {
                             initializationTimestamp,
                             nodeEngine));
                     } else {
-                        t.add(new PhysicalVertex(idGenerator.getNextId(),
+                        t.add(new PhysicalVertex(
                             i,
                             executorService,
                             flow.getAction().getParallelism(),
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 1e8dc0ecf..399c4850d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -45,7 +45,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
+import java.util.function.Consumer;
 
 /**
  * PhysicalVertex is responsible for the scheduling and execution of a single 
task parallel
@@ -56,8 +56,6 @@ public class PhysicalVertex {
 
     private static final ILogger LOGGER = 
Logger.getLogger(PhysicalVertex.class);
 
-    private final long physicalVertexId;
-
     private final TaskGroupLocation taskGroupLocation;
 
     /**
@@ -75,7 +73,7 @@ public class PhysicalVertex {
 
     private final FlakeIdGenerator flakeIdGenerator;
 
-    private final int pipelineIndex;
+    private final int pipelineId;
 
     private final int totalPipelineNum;
 
@@ -105,25 +103,23 @@ public class PhysicalVertex {
 
     private Address currentExecutionAddress;
 
-    public PhysicalVertex(long physicalVertexId,
-                          int subTaskGroupIndex,
+    public PhysicalVertex(int subTaskGroupIndex,
                           @NonNull ExecutorService executorService,
                           int parallelism,
                           @NonNull TaskGroupDefaultImpl taskGroup,
                           @NonNull FlakeIdGenerator flakeIdGenerator,
-                          int pipelineIndex,
+                          int pipelineId,
                           int totalPipelineNum,
                           Set<URL> pluginJarsUrls,
                           @NonNull JobImmutableInformation 
jobImmutableInformation,
                           long initializationTimestamp,
                           @NonNull NodeEngine nodeEngine) {
-        this.physicalVertexId = physicalVertexId;
         this.subTaskGroupIndex = subTaskGroupIndex;
         this.executorService = executorService;
         this.parallelism = parallelism;
         this.taskGroup = taskGroup;
         this.flakeIdGenerator = flakeIdGenerator;
-        this.pipelineIndex = pipelineIndex;
+        this.pipelineId = pipelineId;
         this.totalPipelineNum = totalPipelineNum;
         this.pluginJarsUrls = pluginJarsUrls;
         this.jobImmutableInformation = jobImmutableInformation;
@@ -138,7 +134,7 @@ public class PhysicalVertex {
                 "Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)]",
                 jobImmutableInformation.getJobConfig().getName(),
                 jobImmutableInformation.getJobId(),
-                pipelineIndex,
+                pipelineId,
                 totalPipelineNum,
                 taskGroup.getTaskGroupName(),
                 subTaskGroupIndex + 1,
@@ -154,65 +150,67 @@ public class PhysicalVertex {
     private void deployOnLocal(@NonNull SlotProfile slotProfile) {
         deployInternal(taskGroupImmutableInformation -> {
             SeaTunnelServer server = 
nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
-            return new 
PassiveCompletableFuture<>(server.getSlotService().getSlotContext(slotProfile)
-                
.getTaskExecutionService().deployTask(taskGroupImmutableInformation));
+            server.getSlotService().getSlotContext(slotProfile)
+                
.getTaskExecutionService().deployTask(taskGroupImmutableInformation);
         });
     }
 
     private void deployOnRemote(@NonNull SlotProfile slotProfile) {
-        deployInternal(taskGroupImmutableInformation -> new 
PassiveCompletableFuture<>(
-            
nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
-                    new DeployTaskOperation(slotProfile,
-                        
nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
-                    slotProfile.getWorker())
-                .invoke()));
+        deployInternal(taskGroupImmutableInformation -> {
+            try {
+                
nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+                        new DeployTaskOperation(slotProfile,
+                            
nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
+                        slotProfile.getWorker())
+                    .invoke().get();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
     // This method must not throw an exception
     public void deploy(@NonNull SlotProfile slotProfile) {
-        currentExecutionAddress = slotProfile.getWorker();
-        if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {
-            deployOnLocal(slotProfile);
-        } else {
-            deployOnRemote(slotProfile);
+        try {
+            currentExecutionAddress = slotProfile.getWorker();
+            if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {
+                deployOnLocal(slotProfile);
+            } else {
+                deployOnRemote(slotProfile);
+            }
+        } catch (Throwable th) {
+            failedByException(th);
         }
     }
 
-    private void deployInternal(Function<TaskGroupImmutableInformation, 
PassiveCompletableFuture<TaskExecutionState>> deployMethod) {
+    private void deployInternal(Consumer<TaskGroupImmutableInformation> 
taskGroupConsumer) {
         TaskGroupImmutableInformation taskGroupImmutableInformation = 
getTaskGroupImmutableInformation();
-        PassiveCompletableFuture<TaskExecutionState> completeFuture;
-        try {
-            if (ExecutionState.DEPLOYING.equals(executionState.get())) {
-                completeFuture = 
deployMethod.apply(taskGroupImmutableInformation);
-                // may be canceling
-                if (!updateTaskState(ExecutionState.DEPLOYING, 
ExecutionState.RUNNING)) {
-                    // If we found the task state turned to CANCELING after 
deployed to TaskExecutionService. We need
-                    // notice the TaskExecutionService to cancel this task.
-                    noticeTaskExecutionServiceCancel();
-                    if 
(ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
-                        turnToEndState(ExecutionState.CANCELED);
-                        taskFuture.complete(
-                            new TaskExecutionState(this.taskGroupLocation, 
ExecutionState.CANCELED, null));
-                    } else {
-                        turnToEndState(ExecutionState.FAILED);
-                        taskFuture.complete(new 
TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED,
-                            new JobException(String.format("%s turn to a 
unexpected state: %s, make it Failed",
-                                this.getTaskFullName(), 
executionState.get()))));
-                    }
+        if (ExecutionState.DEPLOYING.equals(executionState.get())) {
+            taskGroupConsumer.accept(taskGroupImmutableInformation);
+            // may be canceling
+            if (!updateTaskState(ExecutionState.DEPLOYING, 
ExecutionState.RUNNING)) {
+                // If we found the task state turned to CANCELING after 
deployed to TaskExecutionService. We need
+                // notice the TaskExecutionService to cancel this task.
+                noticeTaskExecutionServiceCancel();
+                if 
(ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
+                    turnToEndState(ExecutionState.CANCELED);
+                    taskFuture.complete(
+                        new TaskExecutionState(this.taskGroupLocation, 
ExecutionState.CANCELED, null));
+                } else {
+                    turnToEndState(ExecutionState.FAILED);
+                    taskFuture.complete(new 
TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED,
+                        new JobException(String.format("%s turn to a 
unexpected state: %s, make it Failed",
+                            this.getTaskFullName(), executionState.get()))));
                 }
-                monitorTask(completeFuture);
-            } else if 
(ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
-                turnToEndState(ExecutionState.CANCELED);
-                taskFuture.complete(new 
TaskExecutionState(this.taskGroupLocation, executionState.get(), null));
-            } else {
-                turnToEndState(ExecutionState.FAILED);
-                taskFuture.complete(new 
TaskExecutionState(this.taskGroupLocation, executionState.get(),
-                    new JobException(String.format("%s turn to a unexpected 
state", getTaskFullName()))));
             }
-
-        } catch (Throwable th) {
-            failedByException(th);
+        } else if 
(ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
+            turnToEndState(ExecutionState.CANCELED);
+            taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, 
executionState.get(), null));
+        } else {
+            turnToEndState(ExecutionState.FAILED);
+            taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, 
executionState.get(),
+                new JobException(String.format("%s turn to a unexpected 
state", getTaskFullName()))));
         }
     }
 
@@ -227,48 +225,8 @@ public class PhysicalVertex {
 
     private TaskGroupImmutableInformation getTaskGroupImmutableInformation() {
         return new TaskGroupImmutableInformation(flakeIdGenerator.newId(),
-                nodeEngine.getSerializationService().toData(this.taskGroup),
-                this.pluginJarsUrls);
-    }
-
-    /**
-     * @param completeFuture This future only can completion by the task run in
-     *                       {@link 
com.hazelcast.spi.impl.executionservice.ExecutionService }
-     */
-    private void monitorTask(PassiveCompletableFuture<TaskExecutionState> 
completeFuture) {
-        completeFuture.whenComplete((v, t) -> {
-            try {
-                if (t != null) {
-                    LOGGER.severe("An unexpected error occurred while the task 
was running", t);
-                    taskFuture.complete(
-                        new TaskExecutionState(this.taskGroupLocation, 
ExecutionState.FAILED,
-                            t));
-                } else {
-                    turnToEndState(v.getExecutionState());
-                    if (v.getThrowable() != null) {
-                        LOGGER.severe(String.format("%s end with state %s and 
Exception: %s",
-                            this.taskFullName,
-                            v.getExecutionState(),
-                            ExceptionUtils.getMessage(v.getThrowable())));
-                    } else {
-                        LOGGER.info(String.format("%s end with state %s",
-                            this.taskFullName,
-                            v.getExecutionState()));
-                    }
-                    taskFuture.complete(v);
-                }
-            } catch (Throwable th) {
-                LOGGER.severe(
-                    String.format("%s end with Exception: %s", 
this.taskFullName, ExceptionUtils.getMessage(th)));
-                turnToEndState(ExecutionState.FAILED);
-                v = new TaskExecutionState(this.taskGroupLocation, 
ExecutionState.FAILED, th);
-                taskFuture.complete(v);
-            }
-        });
-    }
-
-    public long getPhysicalVertexId() {
-        return physicalVertexId;
+            nodeEngine.getSerializationService().toData(this.taskGroup),
+            this.pluginJarsUrls);
     }
 
     private void turnToEndState(@NonNull ExecutionState endState) {
@@ -378,4 +336,23 @@ public class PhysicalVertex {
     public String getTaskFullName() {
         return taskFullName;
     }
+
+    public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+        turnToEndState(taskExecutionState.getExecutionState());
+        if (taskExecutionState.getThrowable() != null) {
+            LOGGER.severe(String.format("%s end with state %s and Exception: 
%s",
+                this.taskFullName,
+                taskExecutionState.getExecutionState(),
+                ExceptionUtils.getMessage(taskExecutionState.getThrowable())));
+        } else {
+            LOGGER.info(String.format("%s end with state %s",
+                this.taskFullName,
+                taskExecutionState.getExecutionState()));
+        }
+        taskFuture.complete(taskExecutionState);
+    }
+
+    public TaskGroupLocation getTaskGroupLocation() {
+        return taskGroupLocation;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index ea11a93d0..7df5074b7 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -30,6 +30,7 @@ import lombok.NonNull;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
@@ -42,7 +43,7 @@ public class SubPlan {
 
     private final List<PhysicalVertex> coordinatorVertexList;
 
-    private final int pipelineIndex;
+    private final int pipelineId;
 
     private final int totalPipelineNum;
 
@@ -72,13 +73,16 @@ public class SubPlan {
      */
     private final CompletableFuture<PipelineState> pipelineFuture;
 
-    public SubPlan(int pipelineIndex,
+    private final ExecutorService executorService;
+
+    public SubPlan(int pipelineId,
                    int totalPipelineNum,
                    long initializationTimestamp,
                    @NonNull List<PhysicalVertex> physicalVertexList,
                    @NonNull List<PhysicalVertex> coordinatorVertexList,
-                   @NonNull JobImmutableInformation jobImmutableInformation) {
-        this.pipelineIndex = pipelineIndex;
+                   @NonNull JobImmutableInformation jobImmutableInformation,
+                   @NonNull ExecutorService executorService) {
+        this.pipelineId = pipelineId;
         this.pipelineFuture = new CompletableFuture<>();
         this.totalPipelineNum = totalPipelineNum;
         this.physicalVertexList = physicalVertexList;
@@ -92,8 +96,10 @@ public class SubPlan {
             "Job %s (%s), Pipeline: [(%d/%d)]",
             jobImmutableInformation.getJobConfig().getName(),
             jobImmutableInformation.getJobId(),
-            pipelineIndex,
+            pipelineId,
             totalPipelineNum);
+
+        this.executorService = executorService;
     }
 
     public PassiveCompletableFuture<PipelineState> initStateFuture() {
@@ -247,7 +253,7 @@ public class SubPlan {
             CompletableFuture<Void> future = CompletableFuture.supplyAsync(() 
-> {
                 task.cancel();
                 return null;
-            });
+            }, executorService);
             return future;
         }
         return null;
@@ -258,8 +264,8 @@ public class SubPlan {
         cancelPipeline();
     }
 
-    public int getPipelineIndex() {
-        return pipelineIndex;
+    public int getPipelineId() {
+        return pipelineId;
     }
 
     public List<PhysicalVertex> getPhysicalVertexList() {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 98d865f55..4b21001a6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -31,6 +31,7 @@ import 
org.apache.seatunnel.engine.server.checkpoint.CheckpointStorageConfigurat
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
 import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.scheduler.PipelineBaseScheduler;
@@ -151,12 +152,12 @@ public class JobMaster implements Runnable {
     }
 
     public Address queryTaskGroupAddress(long taskGroupId) {
-        for (Integer pipelineIndex : ownedSlotProfiles.keySet()) {
-            Optional<PhysicalVertex> currentVertex = 
ownedSlotProfiles.get(pipelineIndex).keySet().stream()
+        for (Integer pipelineId : ownedSlotProfiles.keySet()) {
+            Optional<PhysicalVertex> currentVertex = 
ownedSlotProfiles.get(pipelineId).keySet().stream()
                 .filter(physicalVertex -> 
physicalVertex.getTaskGroup().getTaskGroupLocation().getTaskGroupId() == 
taskGroupId)
                 .findFirst();
             if (currentVertex.isPresent()) {
-                return 
ownedSlotProfiles.get(pipelineIndex).get(currentVertex.get()).getWorker();
+                return 
ownedSlotProfiles.get(pipelineId).get(currentVertex.get()).getWorker();
             }
         }
         throw new IllegalArgumentException("can't find task group address from 
task group id: " + taskGroupId);
@@ -189,4 +190,28 @@ public class JobMaster implements Runnable {
     public PhysicalPlan getPhysicalPlan() {
         return physicalPlan;
     }
+
+    public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+        this.physicalPlan.getPipelineList().forEach(pipeline -> {
+            if (pipeline.getPipelineId() != 
taskExecutionState.getTaskGroupLocation().getPipelineId()) {
+                return;
+            }
+
+            pipeline.getCoordinatorVertexList().forEach(task -> {
+                if 
(!task.getTaskGroupLocation().equals(taskExecutionState.getTaskGroupLocation()))
 {
+                    return;
+                }
+
+                task.updateTaskExecutionState(taskExecutionState);
+            });
+
+            pipeline.getPhysicalVertexList().forEach(task -> {
+                if 
(!task.getTaskGroupLocation().equals(taskExecutionState.getTaskGroupLocation()))
 {
+                    return;
+                }
+
+                task.updateTaskExecutionState(taskExecutionState);
+            });
+        });
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java
index 377316c36..7ae0dc1d2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java
@@ -20,7 +20,6 @@ package org.apache.seatunnel.engine.server.operation;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
-import org.apache.seatunnel.engine.server.master.JobMaster;
 
 import com.hazelcast.spi.impl.operationservice.Operation;
 
@@ -38,8 +37,7 @@ public class NotifyTaskStatusOperation extends Operation {
     @Override
     public void run() throws Exception {
         SeaTunnelServer service = getService();
-        JobMaster jobMaster = 
service.getJobMaster(taskGroupLocation.getJobId());
-        //TODO Notify jobMaster of TaskGroup State
+        service.updateTaskExecutionState(taskExecutionState);
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index 22a6a0f16..1727e06ba 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -72,7 +72,7 @@ public class PipelineBaseScheduler implements JobScheduler {
                 Map<PhysicalVertex, SlotProfile> slotProfiles;
                 try {
                     slotProfiles = applyResourceForPipeline(pipeline);
-                    ownedSlotProfiles.put(pipeline.getPipelineIndex(), 
slotProfiles);
+                    ownedSlotProfiles.put(pipeline.getPipelineId(), 
slotProfiles);
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
index a67a43456..336ef59a9 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
@@ -17,9 +17,7 @@
 
 package org.apache.seatunnel.engine.server.task.operation;
 
-import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.operation.AsyncOperation;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
 
@@ -27,11 +25,13 @@ import com.hazelcast.internal.nio.IOUtil;
 import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.Operation;
 import lombok.NonNull;
 
 import java.io.IOException;
 
-public class DeployTaskOperation extends AsyncOperation {
+public class DeployTaskOperation extends Operation implements 
IdentifiedDataSerializable {
     private Data taskImmutableInformation;
     private SlotProfile slotProfile;
 
@@ -44,12 +44,17 @@ public class DeployTaskOperation extends AsyncOperation {
     }
 
     @Override
-    protected PassiveCompletableFuture<?> doRun() throws Exception {
+    public void run() throws Exception {
         SeaTunnelServer server = getService();
-        return server.getSlotService().getSlotContext(slotProfile)
+        server.getSlotService().getSlotContext(slotProfile)
             .getTaskExecutionService().deployTask(taskImmutableInformation);
     }
 
+    @Override
+    public int getFactoryId() {
+        return TaskDataSerializerHook.FACTORY_ID;
+    }
+
     @Override
     public int getClassId() {
         return TaskDataSerializerHook.DEPLOY_TASK_OPERATOR;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
index df6544140..1c4a6605e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
@@ -50,7 +50,7 @@ public class GetTaskGroupAddressOperation extends Operation 
implements Identifie
         response = RetryUtils.retryWithException(() -> 
server.getJobMaster(taskLocation.getJobId())
                 
.queryTaskGroupAddress(taskLocation.getTaskGroupLocation().getTaskGroupId()),
             new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
-                exception -> exception instanceof NullPointerException, 
Constant.OPERATION_RETRY_SLEEP));
+                exception -> exception instanceof IllegalArgumentException, 
Constant.OPERATION_RETRY_SLEEP));
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
index ccc2d0bce..c11ccc858 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
@@ -59,7 +59,7 @@ public class SinkRegisterOperation extends Operation 
implements IdentifiedDataSe
                 task = 
server.getTaskExecutionService().getTask(committerTaskID);
                 break;
             } catch (NullPointerException e) {
-                LOGGER.warning("can't get committer task , waiting task 
started");
+                LOGGER.warning("can't get committer task , waiting task 
started, retry " + i);
                 Thread.sleep(RETRY_INTERVAL);
             }
         }

Reply via email to