This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5aa02cbb63397be67a02b6e7fd7fbea67f87a964 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Wed May 15 17:31:03 2024 +0200 [hotfix] Allow to use per split output via SourceOperatorTestContext --- .../flink/api/connector/source/mocks/MockSourceReader.java | 10 +--------- .../flink/streaming/api/operators/SourceOperatorTest.java | 1 + .../streaming/api/operators/SourceOperatorTestContext.java | 11 +++++++++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java index ce5660bd30f..2c1d4b40867 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java @@ -61,15 +61,7 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> private CompletableFuture<Void> availableFuture; public MockSourceReader() { - this(false, false); - } - - public MockSourceReader(boolean waitingForMoreSplits, boolean markIdleOnNoSplits) { - this( - waitingForMoreSplits - ? WaitingForSplits.WAIT_UNTIL_ALL_SPLITS_ASSIGNED - : WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, - markIdleOnNoSplits); + this(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, false); } public MockSourceReader( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index edb4a6150a7..5f983b15257 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -210,6 +210,7 @@ public class SourceOperatorTest { List<StreamElement> outputStreamElements = new ArrayList<>(); context = new SourceOperatorTestContext( + false, false, WatermarkStrategy.<Integer>forMonotonousTimestamps() .withTimestampAssigner((element, recordTimestamp) -> element), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java index e24270e2684..13e31be1005 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java @@ -75,15 +75,22 @@ public class SourceOperatorTestContext implements AutoCloseable { public SourceOperatorTestContext(boolean idle, WatermarkStrategy<Integer> watermarkStrategy) throws Exception { - this(idle, watermarkStrategy, new MockOutput<>(new ArrayList<>())); + this(idle, false, watermarkStrategy, new MockOutput<>(new ArrayList<>())); } public SourceOperatorTestContext( boolean idle, + boolean usePerSplitOutputs, WatermarkStrategy<Integer> watermarkStrategy, Output<StreamRecord<Integer>> output) throws Exception { - mockSourceReader = new MockSourceReader(idle, idle); + mockSourceReader = + new MockSourceReader( + idle + ? MockSourceReader.WaitingForSplits.WAIT_UNTIL_ALL_SPLITS_ASSIGNED + : MockSourceReader.WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, + idle, + usePerSplitOutputs); mockGateway = new MockOperatorEventGateway(); timeService = new TestProcessingTimeService(); operator =