This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 93b9bdf798a [FLINK-35157][runtime] Sources with watermark alignment 
get stuck once some subtasks finish (#24757)
93b9bdf798a is described below

commit 93b9bdf798aec933122bdee6e9b70860eadd0864
Author: elon-X <34712973+elo...@users.noreply.github.com>
AuthorDate: Fri Jun 14 09:26:56 2024 +0800

    [FLINK-35157][runtime] Sources with watermark alignment get stuck once some 
subtasks finish (#24757)
---
 .../source/coordinator/SourceCoordinator.java      | 14 +++--
 .../streaming/api/operators/SourceOperator.java    |  8 ++-
 .../api/operators/SourceOperatorAlignmentTest.java | 31 ++++++++++
 .../api/datastream/WatermarkAlignmentITCase.java   | 72 ++++++++++++++++++++++
 4 files changed, 118 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index e64eb0424ca..3133bbe7ce7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -198,12 +198,16 @@ public class SourceCoordinator<SplitT extends 
SourceSplit, EnumChkT>
                 subTaskIds,
                 operatorName);
 
-        // Subtask maybe during deploying or restarting, so we only send 
WatermarkAlignmentEvent
-        // to ready task to avoid period task fail (Java-ThreadPoolExecutor 
will not schedule
-        // the period task if it throws an exception).
         for (Integer subtaskId : subTaskIds) {
-            context.sendEventToSourceOperatorIfTaskReady(
-                    subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
+            // when subtask have been finished, do not send event.
+            if (!context.hasNoMoreSplits(subtaskId)) {
+                // Subtask maybe during deploying or restarting, so we only 
send
+                // WatermarkAlignmentEvent to ready task to avoid period task 
fail
+                // (Java-ThreadPoolExecutor will not schedule the period task 
if it throws an
+                // exception).
+                context.sendEventToSourceOperatorIfTaskReady(
+                        subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
+            }
         }
     }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index d49de8c622c..972415c1ff0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -434,7 +434,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
                     // introduces a small performance regression (probably 
because of an extra
                     // virtual call)
                     processingTimeService.scheduleWithFixedDelay(
-                            this::emitLatestWatermark,
+                            time -> emitLatestWatermark(),
                             watermarkAlignmentParams.getUpdateInterval(),
                             watermarkAlignmentParams.getUpdateInterval());
                 }
@@ -449,6 +449,10 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
                 sourceMetricGroup.idlingStarted();
                 return DataInputStatus.END_OF_DATA;
             case DATA_FINISHED:
+                if (watermarkAlignmentParams.isEnabled()) {
+                    latestWatermark = Watermark.MAX_WATERMARK.getTimestamp();
+                    emitLatestWatermark();
+                }
                 sourceMetricGroup.idlingStarted();
                 return DataInputStatus.END_OF_INPUT;
             case WAITING_FOR_ALIGNMENT:
@@ -507,7 +511,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         }
     }
 
-    private void emitLatestWatermark(long time) {
+    private void emitLatestWatermark() {
         checkState(currentMainOutput != null);
         if (latestWatermark == Watermark.UNINITIALIZED.getTimestamp()) {
             return;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
index db3f8f0ed1f..30b8fdd5063 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.runtime.io.network.api.StopMode;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
 import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
 import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
 import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
@@ -276,6 +277,36 @@ class SourceOperatorAlignmentTest {
         assertLatestReportedWatermarkEvent(record1);
     }
 
+    @Test
+    void testWatermarkAlignmentWhileSubtaskFinished() throws Exception {
+        operator.initializeState(context.createStateContext());
+        operator.getReaderState().clear();
+        operator.open();
+
+        MockSourceSplit newSplit = new MockSourceSplit(1, 0, 1);
+        int record1 = 1;
+        newSplit.addRecord(record1);
+
+        operator.handleOperatorEvent(
+                new AddSplitEvent<>(
+                        Collections.singletonList(newSplit), new 
MockSourceSplitSerializer()));
+
+        CollectingDataOutput<Integer> actualOutput = new 
CollectingDataOutput<>();
+        List<Integer> expectedOutput = new ArrayList<>();
+
+        
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
+        expectedOutput.add(record1);
+        assertOutput(actualOutput, expectedOutput);
+
+        // no more split event, verify that the final watermark is emitted
+        operator.handleOperatorEvent(new NoMoreSplitsEvent());
+        
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.END_OF_DATA);
+
+        
assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.END_OF_INPUT);
+        context.getTimeService().advance(1);
+        
assertLatestReportedWatermarkEvent(Watermark.MAX_WATERMARK.getTimestamp());
+    }
+
     private void assertOutput(
             CollectingDataOutput<Integer> actualOutput, List<Integer> 
expectedOutput) {
         assertThat(
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java
new file mode 100644
index 00000000000..e113345235e
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.api.datastream;
+
+import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+/** This ITCase class tests the behavior of task execution with watermark 
alignment. */
+class WatermarkAlignmentITCase {
+
+    /**
+     * Test method to verify whether the watermark alignment works well with 
finished task.
+     *
+     * @throws Exception if any error occurs during the execution.
+     */
+    @Test
+    void testTaskFinishedWithWatermarkAlignmentExecution() throws Exception {
+        // Set up the execution environment with parallelism of 2
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+
+        // Create a stream from a custom source with watermark strategy
+        DataStream<Long> stream =
+                env.fromSource(
+                                new NumberSequenceSource(0, 100),
+                                
WatermarkStrategy.<Long>forMonotonousTimestamps()
+                                        .withTimestampAssigner(
+                                                
(SerializableTimestampAssigner<Long>)
+                                                        (aLong, l) -> aLong)
+                                        .withWatermarkAlignment(
+                                                "g1", Duration.ofMillis(10), 
Duration.ofSeconds(2)),
+                                "Sequence Source")
+                        .filter((FilterFunction<Long>) aLong -> true);
+
+        // Execute the stream and collect the results
+        final List<Long> result = stream.executeAndCollect(101);
+        Collections.sort(result);
+
+        // Assert that the collected result contains all numbers from 0 to 100
+        Assertions.assertIterableEquals(
+                result, LongStream.rangeClosed(0, 
100).boxed().collect(Collectors.toList()));
+    }
+}

Reply via email to