This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ae6fa9907 [hotfix][engine][task] Fix coordinator task running error 
(#3126)
ae6fa9907 is described below

commit ae6fa990787c9474b907c7a72be19b56b3b3fbf0
Author: Zongwen Li <[email protected]>
AuthorDate: Tue Oct 18 16:53:26 2022 +0800

    [hotfix][engine][task] Fix coordinator task running error (#3126)
---
 .../engine/server/task/SinkAggregatedCommitterTask.java      | 12 ++++++++++--
 .../engine/server/task/SourceSplitEnumeratorTask.java        |  1 +
 2 files changed, 11 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 5d8eef25b..8497c5883 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -45,6 +45,7 @@ import lombok.NonNull;
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -213,8 +214,15 @@ public class SinkAggregatedCommitterTask<CommandInfoT, 
AggregatedCommitInfoT> ex
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        aggregatedCommitter.commit(checkpointCommitInfoMap.get(checkpointId));
-        checkpointCommitInfoMap.remove(checkpointId);
+        List<AggregatedCommitInfoT> aggregatedCommitInfo = new ArrayList<>();
+        checkpointCommitInfoMap.forEach((key, value) -> {
+            if (key > checkpointId) {
+                return;
+            }
+            aggregatedCommitInfo.addAll(value);
+            checkpointCommitInfoMap.remove(key);
+        });
+        aggregatedCommitter.commit(aggregatedCommitInfo);
         if (prepareCloseStatus) {
             closeCall();
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 83fb06b82..608a0fb1c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -180,6 +180,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
         taskMemberMapping.put(taskID, memberAdder);
         taskIDToTaskLocationMapping.put(taskID.getTaskID(), taskID);
         taskIndexToTaskLocationMapping.put(taskID.getTaskIndex(), taskID);
+        unfinishedReaders.add(taskID.getTaskID());
     }
 
     public Address getTaskMemberAddress(long taskID) {

Reply via email to