This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5aa02cbb63397be67a02b6e7fd7fbea67f87a964
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Wed May 15 17:31:03 2024 +0200

    [hotfix] Allow to use per split output via SourceOperatorTestContext
---
 .../flink/api/connector/source/mocks/MockSourceReader.java    | 10 +---------
 .../flink/streaming/api/operators/SourceOperatorTest.java     |  1 +
 .../streaming/api/operators/SourceOperatorTestContext.java    | 11 +++++++++--
 3 files changed, 11 insertions(+), 11 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
index ce5660bd30f..2c1d4b40867 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
@@ -61,15 +61,7 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
     private CompletableFuture<Void> availableFuture;
 
     public MockSourceReader() {
-        this(false, false);
-    }
-
-    public MockSourceReader(boolean waitingForMoreSplits, boolean 
markIdleOnNoSplits) {
-        this(
-                waitingForMoreSplits
-                        ? WaitingForSplits.WAIT_UNTIL_ALL_SPLITS_ASSIGNED
-                        : WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS,
-                markIdleOnNoSplits);
+        this(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false);
     }
 
     public MockSourceReader(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index edb4a6150a7..5f983b15257 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -210,6 +210,7 @@ public class SourceOperatorTest {
         List<StreamElement> outputStreamElements = new ArrayList<>();
         context =
                 new SourceOperatorTestContext(
+                        false,
                         false,
                         WatermarkStrategy.<Integer>forMonotonousTimestamps()
                                 .withTimestampAssigner((element, 
recordTimestamp) -> element),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
index e24270e2684..13e31be1005 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
@@ -75,15 +75,22 @@ public class SourceOperatorTestContext implements 
AutoCloseable {
 
     public SourceOperatorTestContext(boolean idle, WatermarkStrategy<Integer> 
watermarkStrategy)
             throws Exception {
-        this(idle, watermarkStrategy, new MockOutput<>(new ArrayList<>()));
+        this(idle, false, watermarkStrategy, new MockOutput<>(new 
ArrayList<>()));
     }
 
     public SourceOperatorTestContext(
             boolean idle,
+            boolean usePerSplitOutputs,
             WatermarkStrategy<Integer> watermarkStrategy,
             Output<StreamRecord<Integer>> output)
             throws Exception {
-        mockSourceReader = new MockSourceReader(idle, idle);
+        mockSourceReader =
+                new MockSourceReader(
+                        idle
+                                ? 
MockSourceReader.WaitingForSplits.WAIT_UNTIL_ALL_SPLITS_ASSIGNED
+                                : 
MockSourceReader.WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS,
+                        idle,
+                        usePerSplitOutputs);
         mockGateway = new MockOperatorEventGateway();
         timeService = new TestProcessingTimeService();
         operator =

Reply via email to