This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ae6fa9907 [hotfix][engine][task] Fix coordinator task running error
(#3126)
ae6fa9907 is described below
commit ae6fa990787c9474b907c7a72be19b56b3b3fbf0
Author: Zongwen Li <[email protected]>
AuthorDate: Tue Oct 18 16:53:26 2022 +0800
[hotfix][engine][task] Fix coordinator task running error (#3126)
---
.../engine/server/task/SinkAggregatedCommitterTask.java | 12 ++++++++++--
.../engine/server/task/SourceSplitEnumeratorTask.java | 1 +
2 files changed, 11 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 5d8eef25b..8497c5883 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -45,6 +45,7 @@ import lombok.NonNull;
import java.io.IOException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -213,8 +214,15 @@ public class SinkAggregatedCommitterTask<CommandInfoT,
AggregatedCommitInfoT> ex
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
- aggregatedCommitter.commit(checkpointCommitInfoMap.get(checkpointId));
- checkpointCommitInfoMap.remove(checkpointId);
+ List<AggregatedCommitInfoT> aggregatedCommitInfo = new ArrayList<>();
+ checkpointCommitInfoMap.forEach((key, value) -> {
+ if (key > checkpointId) {
+ return;
+ }
+ aggregatedCommitInfo.addAll(value);
+ checkpointCommitInfoMap.remove(key);
+ });
+ aggregatedCommitter.commit(aggregatedCommitInfo);
if (prepareCloseStatus) {
closeCall();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 83fb06b82..608a0fb1c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -180,6 +180,7 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
taskMemberMapping.put(taskID, memberAdder);
taskIDToTaskLocationMapping.put(taskID.getTaskID(), taskID);
taskIndexToTaskLocationMapping.put(taskID.getTaskIndex(), taskID);
+ unfinishedReaders.add(taskID.getTaskID());
}
public Address getTaskMemberAddress(long taskID) {