This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 93b9bdf798a [FLINK-35157][runtime] Sources with watermark alignment get stuck once some subtasks finish (#24757) 93b9bdf798a is described below commit 93b9bdf798aec933122bdee6e9b70860eadd0864 Author: elon-X <34712973+elo...@users.noreply.github.com> AuthorDate: Fri Jun 14 09:26:56 2024 +0800 [FLINK-35157][runtime] Sources with watermark alignment get stuck once some subtasks finish (#24757) --- .../source/coordinator/SourceCoordinator.java | 14 +++-- .../streaming/api/operators/SourceOperator.java | 8 ++- .../api/operators/SourceOperatorAlignmentTest.java | 31 ++++++++++ .../api/datastream/WatermarkAlignmentITCase.java | 72 ++++++++++++++++++++++ 4 files changed, 118 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index e64eb0424ca..3133bbe7ce7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -198,12 +198,16 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> subTaskIds, operatorName); - // Subtask maybe during deploying or restarting, so we only send WatermarkAlignmentEvent - // to ready task to avoid period task fail (Java-ThreadPoolExecutor will not schedule - // the period task if it throws an exception). for (Integer subtaskId : subTaskIds) { - context.sendEventToSourceOperatorIfTaskReady( - subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark)); + // when subtask have been finished, do not send event. + if (!context.hasNoMoreSplits(subtaskId)) { + // Subtask maybe during deploying or restarting, so we only send + // WatermarkAlignmentEvent to ready task to avoid period task fail + // (Java-ThreadPoolExecutor will not schedule the period task if it throws an + // exception). + context.sendEventToSourceOperatorIfTaskReady( + subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark)); + } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index d49de8c622c..972415c1ff0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -434,7 +434,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr // introduces a small performance regression (probably because of an extra // virtual call) processingTimeService.scheduleWithFixedDelay( - this::emitLatestWatermark, + time -> emitLatestWatermark(), watermarkAlignmentParams.getUpdateInterval(), watermarkAlignmentParams.getUpdateInterval()); } @@ -449,6 +449,10 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr sourceMetricGroup.idlingStarted(); return DataInputStatus.END_OF_DATA; case DATA_FINISHED: + if (watermarkAlignmentParams.isEnabled()) { + latestWatermark = Watermark.MAX_WATERMARK.getTimestamp(); + emitLatestWatermark(); + } sourceMetricGroup.idlingStarted(); return DataInputStatus.END_OF_INPUT; case WAITING_FOR_ALIGNMENT: @@ -507,7 +511,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr } } - private void emitLatestWatermark(long time) { + private void emitLatestWatermark() { checkState(currentMainOutput != null); if (latestWatermark == Watermark.UNINITIALIZED.getTimestamp()) { return; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java index db3f8f0ed1f..30b8fdd5063 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java @@ -27,6 +27,7 @@ import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; import org.apache.flink.runtime.io.network.api.StopMode; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.source.event.AddSplitEvent; +import org.apache.flink.runtime.source.event.NoMoreSplitsEvent; import org.apache.flink.runtime.source.event.ReportedWatermarkEvent; import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent; import org.apache.flink.streaming.api.operators.source.CollectingDataOutput; @@ -276,6 +277,36 @@ class SourceOperatorAlignmentTest { assertLatestReportedWatermarkEvent(record1); } + @Test + void testWatermarkAlignmentWhileSubtaskFinished() throws Exception { + operator.initializeState(context.createStateContext()); + operator.getReaderState().clear(); + operator.open(); + + MockSourceSplit newSplit = new MockSourceSplit(1, 0, 1); + int record1 = 1; + newSplit.addRecord(record1); + + operator.handleOperatorEvent( + new AddSplitEvent<>( + Collections.singletonList(newSplit), new MockSourceSplitSerializer())); + + CollectingDataOutput<Integer> actualOutput = new CollectingDataOutput<>(); + List<Integer> expectedOutput = new ArrayList<>(); + + assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE); + expectedOutput.add(record1); + assertOutput(actualOutput, expectedOutput); + + // no more split event, verify that the final watermark is emitted + operator.handleOperatorEvent(new NoMoreSplitsEvent()); + assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.END_OF_DATA); + + assertThat(operator.emitNext(actualOutput)).isEqualTo(DataInputStatus.END_OF_INPUT); + context.getTimeService().advance(1); + assertLatestReportedWatermarkEvent(Watermark.MAX_WATERMARK.getTimestamp()); + } + private void assertOutput( CollectingDataOutput<Integer> actualOutput, List<Integer> expectedOutput) { assertThat( diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java new file mode 100644 index 00000000000..e113345235e --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.api.datastream; + +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +/** This ITCase class tests the behavior of task execution with watermark alignment. */ +class WatermarkAlignmentITCase { + + /** + * Test method to verify whether the watermark alignment works well with finished task. + * + * @throws Exception if any error occurs during the execution. + */ + @Test + void testTaskFinishedWithWatermarkAlignmentExecution() throws Exception { + // Set up the execution environment with parallelism of 2 + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + // Create a stream from a custom source with watermark strategy + DataStream<Long> stream = + env.fromSource( + new NumberSequenceSource(0, 100), + WatermarkStrategy.<Long>forMonotonousTimestamps() + .withTimestampAssigner( + (SerializableTimestampAssigner<Long>) + (aLong, l) -> aLong) + .withWatermarkAlignment( + "g1", Duration.ofMillis(10), Duration.ofSeconds(2)), + "Sequence Source") + .filter((FilterFunction<Long>) aLong -> true); + + // Execute the stream and collect the results + final List<Long> result = stream.executeAndCollect(101); + Collections.sort(result); + + // Assert that the collected result contains all numbers from 0 to 100 + Assertions.assertIterableEquals( + result, LongStream.rangeClosed(0, 100).boxed().collect(Collectors.toList())); + } +}