This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4848a50d73563d14d479437d64854e1da7f06b47
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Wed May 15 15:57:34 2024 +0200

    [FLINKCC-1283] Properly initialize initial splits in 
WatermarkOutputMultiplexer
    
    Without this fix, initial splits were registered in the multiplexer only
    when first record from that split has been emitted. This was leading to
    incorrectly emitted watermarks, as resulting watermark was not properly
    combined from the initial splits, but only from the splits that have
    already emitted at least one record.
---
 .../streaming/api/operators/SourceOperator.java    |  1 +
 .../api/operators/SourceOperatorAlignmentTest.java |  2 +-
 .../operators/SourceOperatorWatermarksTest.java    | 95 ++++++++++++++++++++++
 3 files changed, 97 insertions(+), 1 deletion(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index c4f624e443c..ae61cfc670c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -341,6 +341,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         final List<SplitT> splits = 
CollectionUtil.iterableToList(readerState.get());
         if (!splits.isEmpty()) {
             LOG.info("Restoring state for {} split(s) to reader.", 
splits.size());
+            outputPendingSplits.addAll(splits);
             sourceReader.addSplits(splits);
         }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
index dc029c272d8..db3f8f0ed1f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
@@ -311,7 +311,7 @@ class SourceOperatorAlignmentTest {
         assertThat(events).isEmpty();
     }
 
-    private static class PunctuatedGenerator implements 
WatermarkGenerator<Integer> {
+    static class PunctuatedGenerator implements WatermarkGenerator<Integer> {
 
         private enum GenerationMode {
             ALL,
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java
new file mode 100644
index 00000000000..6906c755517
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java
@@ -0,0 +1,95 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.DataInputStatus;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for {@link SourceOperator} watermark alignment. */
+@SuppressWarnings("serial")
+class SourceOperatorWatermarksTest {
+
+    @Nullable private SourceOperatorTestContext context;
+    @Nullable private SourceOperator<Integer, MockSourceSplit> operator;
+
+    @BeforeEach
+    void setup() throws Exception {
+        context =
+                new SourceOperatorTestContext(
+                        false,
+                        true,
+                        WatermarkStrategy.forGenerator(
+                                        ctx ->
+                                                new SourceOperatorAlignmentTest
+                                                        .PunctuatedGenerator())
+                                .withTimestampAssigner((r, t) -> r));
+        operator = context.getOperator();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        context.close();
+        context = null;
+        operator = null;
+    }
+
+    @Test
+    void testWatermark() throws Exception {
+        List<MockSourceSplit> initialSplits = new ArrayList<>();
+        initialSplits.add(new MockSourceSplit(0, 
1000).addRecord(1042).addRecord(1044));
+        initialSplits.add(new MockSourceSplit(1, 
1000).addRecord(42).addRecord(44));
+        operator.initializeState(context.createStateContext(initialSplits));
+        operator.open();
+
+        CollectingDataOutput<Integer> actualOutput = new 
CollectingDataOutput<>();
+
+        // after emitting first element from first split, there can not be 
watermark emitted, as
+        // watermark from the other split is still unknown.
+        
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
+        assertNoWatermarks(actualOutput);
+
+        // after emitting two more elements (in this order: [1042, 1044, 42] 
but order doesn't
+        // matter for this test), three in total, watermark 42 can be finally 
emitted
+        
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
+        
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
+        assertWatermark(actualOutput, new Watermark(42));
+    }
+
+    private static void assertNoWatermarks(CollectingDataOutput<Integer> 
actualOutput) {
+        assertThat(actualOutput.getEvents()).noneMatch(element -> element 
instanceof Watermark);
+    }
+
+    private void assertWatermark(CollectingDataOutput<Integer> actualOutput, 
Watermark watermark) {
+        assertThat(actualOutput.getEvents()).containsOnlyOnce(watermark);
+    }
+}

Reply via email to