This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4aa59987e7 [Improve][Zeta] Support close idle task for multiple sinks
(#6864)
4aa59987e7 is described below
commit 4aa59987e7643da9d9dddadcb69abe4e1b02005f
Author: hailin0 <[email protected]>
AuthorDate: Fri Jun 14 11:55:02 2024 +0800
[Improve][Zeta] Support close idle task for multiple sinks (#6864)
---
.../seatunnel/engine/server/checkpoint/CheckpointCoordinator.java | 4 ----
.../java/org/apache/seatunnel/engine/server/master/JobMasterTest.java | 2 +-
.../src/test/resources/stream_fakesource_to_file.conf | 4 ++++
3 files changed, 5 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 12f5acd597..2b7498f485 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -441,10 +441,6 @@ public class CheckpointCoordinator {
subTask.getJobId());
}
}
- if (subTaskList.size() != 2) {
- throw new UnsupportedOperationException(
- "Unsupported close not reader/writer task group: " +
subTaskList);
- }
readyToCloseIdleTask.addAll(subTaskList);
tryTriggerPendingCheckpoint(CheckpointType.CHECKPOINT_TYPE);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index cbfde91b37..bc1e8f06f2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -278,7 +278,7 @@ public class JobMasterTest extends
AbstractSeaTunnelServerTest {
.getCheckpointManager()
.getCheckpointCoordinator(seaTunnelTask.getTaskLocation().getPipelineId());
await().atMost(60, TimeUnit.SECONDS)
- .until(() -> checkpointCoordinator.getClosedIdleTask().size()
== 2);
+ .until(() -> checkpointCoordinator.getClosedIdleTask().size()
== 3);
await().atMost(60, TimeUnit.SECONDS)
.until(() ->
slotService.getWorkerProfile().getAssignedSlots().length == 3);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf
index 2cbcf14bd9..a9515ca00b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf
@@ -41,7 +41,11 @@ transform {
}
sink {
+ console {
+ source_table_name = "fake"
+ }
LocalFile {
+ source_table_name = "fake"
path="/tmp/hive/warehouse/test2"
field_delimiter="\t"
row_delimiter="\n"