This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4aa59987e7 [Improve][Zeta] Support close idle task for multiple sinks 
(#6864)
4aa59987e7 is described below

commit 4aa59987e7643da9d9dddadcb69abe4e1b02005f
Author: hailin0 <[email protected]>
AuthorDate: Fri Jun 14 11:55:02 2024 +0800

    [Improve][Zeta] Support close idle task for multiple sinks (#6864)
---
 .../seatunnel/engine/server/checkpoint/CheckpointCoordinator.java     | 4 ----
 .../java/org/apache/seatunnel/engine/server/master/JobMasterTest.java | 2 +-
 .../src/test/resources/stream_fakesource_to_file.conf                 | 4 ++++
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 12f5acd597..2b7498f485 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -441,10 +441,6 @@ public class CheckpointCoordinator {
                             subTask.getJobId());
                 }
             }
-            if (subTaskList.size() != 2) {
-                throw new UnsupportedOperationException(
-                        "Unsupported close not reader/writer task group: " + 
subTaskList);
-            }
             readyToCloseIdleTask.addAll(subTaskList);
             tryTriggerPendingCheckpoint(CheckpointType.CHECKPOINT_TYPE);
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index cbfde91b37..bc1e8f06f2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -278,7 +278,7 @@ public class JobMasterTest extends 
AbstractSeaTunnelServerTest {
                         .getCheckpointManager()
                         
.getCheckpointCoordinator(seaTunnelTask.getTaskLocation().getPipelineId());
         await().atMost(60, TimeUnit.SECONDS)
-                .until(() -> checkpointCoordinator.getClosedIdleTask().size() 
== 2);
+                .until(() -> checkpointCoordinator.getClosedIdleTask().size() 
== 3);
         await().atMost(60, TimeUnit.SECONDS)
                 .until(() -> 
slotService.getWorkerProfile().getAssignedSlots().length == 3);
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf
index 2cbcf14bd9..a9515ca00b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf
@@ -41,7 +41,11 @@ transform {
 }
 
 sink {
+  console {
+    source_table_name = "fake"
+  }
   LocalFile {
+    source_table_name = "fake"
     path="/tmp/hive/warehouse/test2"
     field_delimiter="\t"
     row_delimiter="\n"

Reply via email to