Hisoka-X commented on code in PR #2567:
URL: 
https://github.com/apache/incubator-seatunnel/pull/2567#discussion_r962695101


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java:
##########
@@ -154,65 +150,71 @@ public PassiveCompletableFuture<TaskExecutionState> 
initStateFuture() {
     private void deployOnLocal(@NonNull SlotProfile slotProfile) {
         deployInternal(taskGroupImmutableInformation -> {
             SeaTunnelServer server = 
nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
-            return new 
PassiveCompletableFuture<>(server.getSlotService().getSlotContext(slotProfile)
-                
.getTaskExecutionService().deployTask(taskGroupImmutableInformation));
+            server.getSlotService().getSlotContext(slotProfile)
+                
.getTaskExecutionService().deployTask(taskGroupImmutableInformation);
+            return null;
         });
     }
 
     private void deployOnRemote(@NonNull SlotProfile slotProfile) {
-        deployInternal(taskGroupImmutableInformation -> new 
PassiveCompletableFuture<>(
-            
nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
-                    new DeployTaskOperation(slotProfile,
-                        
nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
-                    slotProfile.getWorker())
-                .invoke()));
+        deployInternal(taskGroupImmutableInformation -> {
+            try {
+                
nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+                        new DeployTaskOperation(slotProfile,
+                            
nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
+                        slotProfile.getWorker())
+                    .invoke().get();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            return null;
+        });
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
     // This method must not throw an exception
     public void deploy(@NonNull SlotProfile slotProfile) {
-        currentExecutionAddress = slotProfile.getWorker();
-        if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {
-            deployOnLocal(slotProfile);
-        } else {
-            deployOnRemote(slotProfile);
+        try {
+            currentExecutionAddress = slotProfile.getWorker();
+            if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {
+                deployOnLocal(slotProfile);
+            } else {
+                deployOnRemote(slotProfile);
+            }
+        } catch (Throwable th) {
+            failedByException(th);
         }
     }
 
-    private void deployInternal(Function<TaskGroupImmutableInformation, 
PassiveCompletableFuture<TaskExecutionState>> deployMethod) {
+    private void deployInternal(
+        Function<TaskGroupImmutableInformation, 
PassiveCompletableFuture<TaskExecutionState>> deployMethod) {

Review Comment:
   Change `Function` to `Consumer`, because deploy method doesn't return 
anything at now.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -189,4 +190,28 @@ public JobStatus getJobStatus() {
     public PhysicalPlan getPhysicalPlan() {
         return physicalPlan;
     }
+
+    public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+        this.physicalPlan.getPipelineList().forEach(pipeline -> {
+            if (pipeline.getPipelineIndex() != 
taskExecutionState.getTaskGroupLocation().getPipelineId()) {
+                return;
+            }
+
+            pipeline.getCoordinatorVertexList().forEach(task -> {
+                if 
(task.getTaskGroupLocation().equals(taskExecutionState.getTaskGroupLocation())) 
{
+                    return;

Review Comment:
   Why `equals` but not `not equals`?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -189,4 +190,28 @@ public JobStatus getJobStatus() {
     public PhysicalPlan getPhysicalPlan() {
         return physicalPlan;
     }
+
+    public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+        this.physicalPlan.getPipelineList().forEach(pipeline -> {
+            if (pipeline.getPipelineIndex() != 
taskExecutionState.getTaskGroupLocation().getPipelineId()) {
+                return;
+            }
+
+            pipeline.getCoordinatorVertexList().forEach(task -> {
+                if 
(task.getTaskGroupLocation().equals(taskExecutionState.getTaskGroupLocation())) 
{
+                    return;
+                }
+
+                task.updateTaskExecutionState(taskExecutionState);
+            });
+
+            pipeline.getPhysicalVertexList().forEach(task -> {
+                if 
(task.getTaskGroupLocation().equals(taskExecutionState.getTaskGroupLocation())) 
{
+                    return;

Review Comment:
   same as above



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -189,4 +190,28 @@ public JobStatus getJobStatus() {
     public PhysicalPlan getPhysicalPlan() {
         return physicalPlan;
     }
+
+    public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+        this.physicalPlan.getPipelineList().forEach(pipeline -> {
+            if (pipeline.getPipelineIndex() != 
taskExecutionState.getTaskGroupLocation().getPipelineId()) {

Review Comment:
   unify pipelineIndex and PipelineId maybe better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to