reswqa commented on code in PR #25731: URL: https://github.com/apache/flink/pull/25731#discussion_r1898186510
########## flink-core-api/src/main/java/org/apache/flink/api/common/watermark/WatermarkCombinationFunction.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.watermark; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.functions.Function; + +/** + * The {@link WatermarkCombinationFunction} defines the comparison/combination semantics among + * {@link Watermark}s. + */ Review Comment: ```suggestion /** * The {@link WatermarkCombinationFunction} defines the watermark's comparison/combination semantics among * multiple input channels. */ ``` ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/ProcessFunction.java: ########## @@ -38,6 +40,16 @@ default Set<StateDeclaration> usesStates() { return Collections.emptySet(); } + /** + * Explicitly declare watermarks upfront. Each specific watermark must be declared in this + * method before it can be used. + * + * @return all watermark declarations used by this application. + */ + default Collection<? extends WatermarkDeclaration> watermarkDeclarations() { Review Comment: 1. How about renaming to `declareWatermarks` to align with `usesStates`. 2. the return type can be `Set<? extends WatermarkDeclaration>`. ########## flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermark/AlignedWatermarkCombiner.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.watermark; + +import org.apache.flink.api.common.watermark.Watermark; + +import java.util.BitSet; +import java.util.function.Consumer; + +/** + * A {@link WatermarkCombiner} is design to align {@link Watermark}s. It will combine watermarks + * after receiving watermarks from all upstream input channels. + * + * <p>The combine process will perform the following three steps: (1) send the combined watermark + * (actually the last received watermark) to downstream, (2) clear the received watermarks, (3) + * resume input gate. + * + * <p>Note that the aligned Watermarks should have the same value within a batch. + */ +public class AlignedWatermarkCombiner implements WatermarkCombiner { + /** The number of upstream input channels. */ + private int numberOfInputChannels; + + /** A bitset to record whether the watermark has been received from each channel. */ + private final BitSet hasReceiveWatermarks; + + /** The input gate resume callback. */ + private Runnable gateResumer; + + public AlignedWatermarkCombiner(int numberOfInputChannels, Runnable gateResumer) { + this.numberOfInputChannels = numberOfInputChannels; + this.hasReceiveWatermarks = new BitSet(numberOfInputChannels); + this.gateResumer = gateResumer; + } + + @Override + public void combineWatermark( + Watermark watermark, int channelIndex, Consumer<Watermark> watermarkEmitter) + throws Exception { + + // mark the channel has received the watermark + hasReceiveWatermarks.set(channelIndex); + + // once receive all watermarks, perform combine process + if (hasReceiveWatermarks.cardinality() == numberOfInputChannels) { Review Comment: We should check the aligned watermark policy is waitingForAllChannels. ########## flink-core-api/src/main/java/org/apache/flink/api/common/watermark/BoolWatermark.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.watermark; + +import org.apache.flink.annotation.Experimental; + +import java.util.Objects; + +/** + * The {@link BoolWatermark} represents a watermark with a boolean value and an associated + * identifier. + */ +@Experimental +public class BoolWatermark implements Watermark { Review Comment: We should include module name in commit message, for example: [FLINK-36960][API]xxxxx ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java: ########## @@ -68,6 +78,15 @@ public void open() throws Exception { taskInfo.getIndexOfThisSubtask(), taskInfo.getAttemptNumber(), operatorContext.getMetricGroup()); + + outputCollector = getOutputCollector(); + watermarkDeclarationMap = Review Comment: We should avoid per-operator level deserialization. This can be optimized to per-task at least if we can't find a better way for job level. ########## flink-core-api/src/main/java/org/apache/flink/api/common/watermark/WatermarkCombinationPolicy.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.watermark; + +import org.apache.flink.annotation.Experimental; + +import java.io.Serializable; +import java.util.Objects; + +/** + * The {@link WatermarkCombinationPolicy} defines when and how to the combine {@link Watermark}s. Review Comment: Need to explain the role of `WatermarkCombinationFunction` and `combineWaitForAllChannels` in this policy. ########## flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkITCase.java: ########## @@ -0,0 +1,1064 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.api.datastream; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.watermark.BoolWatermark; +import org.apache.flink.api.common.watermark.BoolWatermarkDeclaration; +import org.apache.flink.api.common.watermark.LongWatermark; +import org.apache.flink.api.common.watermark.LongWatermarkDeclaration; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkCombinationFunction; +import org.apache.flink.api.common.watermark.WatermarkCombinationPolicy; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.api.common.watermark.WatermarkDeclarations; +import org.apache.flink.api.common.watermark.WatermarkHandlingResult; +import org.apache.flink.api.common.watermark.WatermarkHandlingStrategy; +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream; +import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.runtime.watermark.AlignableBoolWatermarkDeclaration; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * IT test case for {@link Watermark}. It will test the following behaviors of the generalized + * watermark: (1) The aligned watermark can block upstream. (2) The long watermark combines max/min + * correctly. (3) The bool watermark combines and/or correctly. (4) The watermark combiner will wait + * for all channels and then combine if the {@link + * WatermarkCombinationPolicy#isCombineWaitForAllChannels()} returns true. (5) The operator does not + * send watermarks when the {@link OneInputStreamProcessFunction#onWatermark} returns {@link + * WatermarkHandlingResult#POLL}. (6) The operator does not send watermarks when the {@link + * WatermarkHandlingStrategy} is set to IGNORE and the {@link + * OneInputStreamProcessFunction#onWatermark} returns {@link WatermarkHandlingResult#PEEK}. (7) The + * source operator can declare and emit watermarks. + * + * <p>We design a test job for these test cases, the test job has four operators, each with a + * parallelism of 2, and the shuffle edges are all to all. The job details are as follows: + * SourceOperator (Operator1) -> ProcessOperator(Operator2) -> ProcessOperator(Operator3) -> + * ProcessOperator(Operator4). Operator2 will declare watermarks and send watermark to downstream. + * Operator3 will receive watermarks from the upstream Operator2 and process them according to + * different test cases. Operator4 will receive or not receive watermarks from upstream Operator3. + */ +public class WatermarkITCase { + + private static final String DEFAULT_WATERMARK_IDENTIFIER = "default"; + + /** Parallelism of all operators. */ + private static final int DEFAULT_PARALLELISM = 2; + + private MiniCluster flinkCluster; + + /** + * The source operator will emit integer data within a range of 0 to {@code + * NUMBER_KEYS}(exclusive). + */ + private static final int NUMBER_KEYS = 100; + + public void startMiniCluster() throws Exception { + TestingMiniClusterConfiguration miniClusterConfiguration = + TestingMiniClusterConfiguration.newBuilder() + .setNumTaskManagers(3) + .setNumSlotsPerTaskManager(4) + .build(); + flinkCluster = TestingMiniCluster.newBuilder(miniClusterConfiguration).build(); + flinkCluster.start(); + } + + @BeforeEach + void before() throws Exception { + startMiniCluster(); + } + + @AfterEach + void after() throws Exception { + Operator2ProcessFunction.clear(); + Operator3ProcessFunction.clear(); + Operator4ProcessFunction.clear(); + + if (flinkCluster != null) { + flinkCluster.close(); + flinkCluster = null; + } + } + + /** + * Test aligned watermark can block upstream. In this test case, Operator2 will declare the + * aligned watermark. We will block Operator2's subtasks first and unblock them step by step, + * and each subtask will emit a watermark with value {@code true} after being unblocked. The + * Operator3 should receive only one combined and aligned watermark with value {@code true} + * after both the Operator2's subtask are unblocked. + */ + @Test + public void testAlignedWatermarkBlockUpstream() throws Exception { + StreamGraph streamGraph = getStreamGraphForAlignedWatermark(Arrays.asList(true, true)); + + // block operator2's subtask 0 and subtask 1 + Operator2ProcessFunction.blockSubTasks(0, 1); + + // submit job + JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID(); + + // wait all operator3 tasks have started + tryWaitUntilCondition( + () -> Operator3ProcessFunction.attemptIds.size() == DEFAULT_PARALLELISM); + // since all Operator2 tasks are blocked, so the Operator3 should not receive watermarks and + // records + assertOperatorReceivedWatermarkValues(Operator3ProcessFunction.receivedWatermarks, false); + assertThat(Operator3ProcessFunction.receivedRecords).isEmpty(); + + // unblock operator2's subtask 0; it will send a watermark with the value {@code true} to + // operator3. + // However, operator3 should not receive the watermark since it is not aligned, and it + // should block the input until the watermark is aligned. + Operator2ProcessFunction.unblockSubTasks(0); + Thread.sleep(1000); + assertOperatorReceivedWatermarkValues(Operator3ProcessFunction.receivedWatermarks, false); + assertThat(Operator3ProcessFunction.receivedRecords).isEmpty(); + + // unblock operator2's subtask 1, it will send a watermark with the value true to operator3. + // Operator3 should receive the combined watermark with the value {@code true} and not block + // the input as the watermark has already been aligned. + Operator2ProcessFunction.unblockSubTasks(1); + + // wait job complete + JobResult jobResult = flinkCluster.requestJobResult(jobId).get(); + assertThat(jobResult.getSerializedThrowable()).isEmpty(); + + // check Operator3 receives only one watermark per task: true + assertOperatorReceivedWatermarkValues( + Operator3ProcessFunction.receivedWatermarks, false, true); + checkSinkResults(); + } + + /** + * Test long watermark combines correctly using max function. In this test case, Operator2 will + * declare the long watermark using the combine function max. We will block Operator2's subtasks + * first and unblock them step by step, the subtask 0 will emit a watermark with value {@code + * 1L} after being unblocked, the subtask 1 will emit a watermark with value {@code 2L} after + * being unblocked. The Operator3 should receive two watermarks with the value of {@code 1L} and + * {@code 2L} after both the Operator2's subtask are unblocked. + */ + @Test + public void testLongWatermarkCombineMax() + throws ReflectiveOperationException, ExecutionException, InterruptedException { + StreamGraph streamGraph = + getStreamGraphForLongWatermarkCombineFunction(true, Arrays.asList(1L, 2L)); + + // block operator2's subtask 0 and subtask 1 + Operator2ProcessFunction.blockSubTasks(0, 1); + + // submit job + JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID(); + + // wait all operator3 tasks have started + tryWaitUntilCondition( + () -> Operator3ProcessFunction.attemptIds.size() == DEFAULT_PARALLELISM); + // since all Operator2 tasks are blocked, so the Operator3 should not receive watermarks and + // records + assertOperatorReceivedWatermarkValues(Operator3ProcessFunction.receivedWatermarks, true); + assertThat(Operator3ProcessFunction.receivedRecords).isEmpty(); + + // unblock Operator2's subtask 0, which will send a watermark with value {@code 1L} to + // Operator3, + // Operator3 will receive the watermark with value {@code 1L} from channel 0. + Operator2ProcessFunction.unblockSubTasks(0); + tryWaitUntilCondition( + () -> + checkOperatorReceivedWatermarksAllNotEmpty( + Operator3ProcessFunction.receivedWatermarks)); + assertOperatorReceivedWatermarkValues( + Operator3ProcessFunction.receivedWatermarks, true, 1L); + + // unblock operator2's subtask 1, which will send watermark with value {@code 2L} to + // Operator3. + // for Operator3, since the watermark combine function is max, and the channel 1 received + // watermark value 2L is larger than chanel 0 received watermark value 1L, + // so the Operator3 will receive combined watermark with value {@code 2L}. + Operator2ProcessFunction.unblockSubTasks(1); + + // wait job complete + JobResult jobResult = flinkCluster.requestJobResult(jobId).get(); + assertThat(jobResult.getSerializedThrowable()).isEmpty(); + + // check Operator3 receives two watermark per task: 1L and 2L + assertOperatorReceivedWatermarkValues( + Operator3ProcessFunction.receivedWatermarks, true, 1L, 2L); + + checkSinkResults(); + } + + /** + * Test long watermark combines correctly using min function. In this test case, Operator2 will + * declare the long watermark using the combine function min. We will block Operator2's subtasks + * first and unblock them step by step, the subtask 0 will emit a watermark with value {@code + * 1L} after being unblocked, the subtask 1 will emit a watermark with value {@code 2L} after + * being unblocked. The Operator3 should receive only one watermarks with the value of {@code + * 1L} after both the Operator2's subtask are unblocked. + */ + @Test + public void testLongWatermarkCombineMin() + throws ReflectiveOperationException, ExecutionException, InterruptedException { + StreamGraph streamGraph = + getStreamGraphForLongWatermarkCombineFunction(false, Arrays.asList(1L, 2L)); + + // block operator2's subtask 0 and subtask 1 + Operator2ProcessFunction.blockSubTasks(0, 1); + + // submit job + JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID(); + + // wait all operator3 tasks have started + tryWaitUntilCondition( + () -> Operator3ProcessFunction.attemptIds.size() == DEFAULT_PARALLELISM); + // since all Operator2 tasks are blocked, so the Operator3 should not receive watermarks and + // records + assertOperatorReceivedWatermarkValues(Operator3ProcessFunction.receivedWatermarks, true); + assertThat(Operator3ProcessFunction.receivedRecords).isEmpty(); + + // unblock Operator2's subtask 0, which will send a watermark with value {@code 1L} to + // Operator3, + // Operator3 will receive the watermark with value {@code 1L} from channel 0 + Operator2ProcessFunction.unblockSubTasks(0); + tryWaitUntilCondition( + () -> + checkOperatorReceivedWatermarksAllNotEmpty( + Operator3ProcessFunction.receivedWatermarks)); + assertOperatorReceivedWatermarkValues( + Operator3ProcessFunction.receivedWatermarks, true, 1L); + + // unblock operator2's subtask 1, which will send watermark with value {@code 2L} to + // Operator3. + // for Operator3, since the watermark combine function is min, and the channel 1 received + // watermark value 2L is larger than channel 0 received watermark value 1L, + // so the combined watermark will have value {@code 1L}. + // since the value is same as the previous one, the Operator3 will not receive the watermark + // again. + Operator2ProcessFunction.unblockSubTasks(1); + + // wait job complete + JobResult jobResult = flinkCluster.requestJobResult(jobId).get(); + assertThat(jobResult.getSerializedThrowable()).isEmpty(); + + // check Operator3 receives one watermark per task: 1L + assertOperatorReceivedWatermarkValues( + Operator3ProcessFunction.receivedWatermarks, true, 1L); + + checkSinkResults(); + } + + /** + * Test bool watermark combines correctly using and function. In this test case, Operator2 will + * declare the long watermark using the combine function {@code AND}. We will block Operator2's + * subtasks first and unblock them step by step, the subtask 0 will emit a watermark with value + * {@code false} after being unblocked, the subtask 1 will emit a watermark with value {@code + * true} after being unblocked. The Operator3 should receive only one watermark with the value + * of {@code false} after both the Operator2's subtask are unblocked. + */ + @Test + public void testBoolWatermarkCombineAnd() + throws ReflectiveOperationException, ExecutionException, InterruptedException { + StreamGraph streamGraph = + getStreamGraphForBoolWatermarkCombineFunction(true, Arrays.asList(false, true)); + + // block operator2's subtask 0 and subtask 1 + Operator2ProcessFunction.blockSubTasks(0, 1); + + // submit job + JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID(); + + // wait all operator3 tasks have started + tryWaitUntilCondition( + () -> Operator3ProcessFunction.attemptIds.size() == DEFAULT_PARALLELISM); + // since all Operator2 tasks are blocked, so the Operator3 should not receive watermarks and + // records + assertOperatorReceivedWatermarkValues(Operator3ProcessFunction.receivedWatermarks, false); + assertThat(Operator3ProcessFunction.receivedRecords).isEmpty(); + + // unblock Operator2's subtask 0, which will send a watermark with value {@code false} to + // Operator3, + // Operator3 will receive the watermark with value {@code false} from channel 0. + Operator2ProcessFunction.unblockSubTasks(0); + tryWaitUntilCondition( + () -> + checkOperatorReceivedWatermarksAllNotEmpty( + Operator3ProcessFunction.receivedWatermarks)); + assertOperatorReceivedWatermarkValues( + Operator3ProcessFunction.receivedWatermarks, false, false); + + // unblock operator2's subtask 1, which will send watermark with value {@code true} to + // Operator3. + // for Operator3, since the watermark combine function is and, the combined result of + // channel 1 received watermark value true and with channel 0 received watermark value false + // is {@code false}, + // since the value is same as the previous one, the Operator3 will not receive the watermark + // again. + Operator2ProcessFunction.unblockSubTasks(1); + + // wait job complete + JobResult jobResult = flinkCluster.requestJobResult(jobId).get(); + assertThat(jobResult.getSerializedThrowable()).isEmpty(); + + // check Operator3 receives one watermark per task: 1L + assertOperatorReceivedWatermarkValues( + Operator3ProcessFunction.receivedWatermarks, false, false); + + checkSinkResults(); + } + + /** + * Test bool watermark combines correctly using and function. In this test case, Operator2 will + * declare the long watermark using the combine function {@code OR}. We will block Operator2's + * subtasks first and unblock them step by step, the subtask 0 will emit a watermark with value + * {@code false} after being unblocked, the subtask 1 will emit a watermark with value {@code + * true} after being unblocked. The Operator3 should receive two watermarks with the value of + * {@code false} and {@code true} after both the Operator2's subtask are unblocked. + */ + @Test + public void testBoolWatermarkCombineOr() + throws ReflectiveOperationException, ExecutionException, InterruptedException { + StreamGraph streamGraph = + getStreamGraphForBoolWatermarkCombineFunction(false, Arrays.asList(false, true)); + + // block operator2's subtask 0 and subtask 1 + Operator2ProcessFunction.blockSubTasks(0, 1); + + // submit job + JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID(); + + // wait all operator3 tasks have started + tryWaitUntilCondition( + () -> Operator3ProcessFunction.attemptIds.size() == DEFAULT_PARALLELISM); + // since all Operator2 tasks are blocked, so the Operator3 should not receive watermarks and + // records + assertOperatorReceivedWatermarkValues(Operator3ProcessFunction.receivedWatermarks, false); + assertThat(Operator3ProcessFunction.receivedRecords).isEmpty(); + + // unblock Operator2's subtask 0, which will send a watermark with value {@code false} to + // Operator3, + // Operator3 will receive the watermark with value {@code false} from channel 0. + Operator2ProcessFunction.unblockSubTasks(0); + tryWaitUntilCondition( + () -> + checkOperatorReceivedWatermarksAllNotEmpty( + Operator3ProcessFunction.receivedWatermarks)); + assertOperatorReceivedWatermarkValues( + Operator3ProcessFunction.receivedWatermarks, false, false); + + // unblock operator2's subtask 1, which will send watermark with value {@code true} to + // Operator3. + // for Operator3, since the watermark combine function is and, the combined result of + // channel 1 received watermark value true and with channel 0 received watermark value false + // is {@code true}, + // since the Operator3 will receive the watermark with value {@code true} again. + Operator2ProcessFunction.unblockSubTasks(1); + + // wait job complete + JobResult jobResult = flinkCluster.requestJobResult(jobId).get(); + assertThat(jobResult.getSerializedThrowable()).isEmpty(); + + // check Operator3 receives one watermark per task: 1L + assertOperatorReceivedWatermarkValues( + Operator3ProcessFunction.receivedWatermarks, false, false, true); + + checkSinkResults(); + } + + /** + * Test watermark combiner will wait for all channels and then combine if the {@link + * WatermarkCombinationPolicy#isCombineWaitForAllChannels()} returns true. In this test case, + * Operator2 will declare the long watermark and make the {@link + * WatermarkCombinationPolicy#isCombineWaitForAllChannels()} return true, the combine function + * will be {@code MAX}. We will block Operator2's subtasks first and unblock them step by step, + * the subtask 0 will emit a watermark with value {@code 1L} after being unblocked, the subtask + * 1 will emit a watermark with value {@code 2L} after being unblocked. The Operator3 should + * receive only one watermark with the value of {@code 2L} after both the Operator2's subtask + * are unblocked. + */ + @Test + public void testCombineWaitForAllChannels() + throws ReflectiveOperationException, ExecutionException, InterruptedException { + StreamGraph streamGraph = getStreamGraphForCombineWaitForAllChannels(Arrays.asList(1L, 2L)); + + // block operator2's subtask 0 and subtask 1 + Operator2ProcessFunction.blockSubTasks(0, 1); + + // submit job + JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID(); + + // wait all operator3 tasks have started + tryWaitUntilCondition( + () -> Operator3ProcessFunction.attemptIds.size() == DEFAULT_PARALLELISM); + // since all Operator2 tasks are blocked, so the Operator3 should not receive watermarks and + // records + assertOperatorReceivedWatermarkValues(Operator3ProcessFunction.receivedWatermarks, true); + assertThat(Operator3ProcessFunction.receivedRecords).isEmpty(); + + // unblock Operator2's subtask 0, which will send a watermark with value {@code 1L} to + // Operator3, + // for Operator3, only the channel 0 received one watermark with value {@code 1L}, while the + // channel 1 does not receive any watermark. + // as a result, the Operator3 will not receive the watermark. + Operator2ProcessFunction.unblockSubTasks(0); + Thread.sleep(1000); + assertOperatorReceivedWatermarkValues(Operator3ProcessFunction.receivedWatermarks, true); + + // unblock operator2's subtask 1, which will send watermark with value {@code 2L} to + // Operator3. + // for Operator3, since the watermark combine function is max, and only the channel 0 + // received one watermark with value {@code 1L}, the channel 1 receive one watermark with + // value {@code 2L}, + // so the Operator3 will receive one combined watermark with value {@code 2L}. + Operator2ProcessFunction.unblockSubTasks(1); + + // wait job complete + JobResult jobResult = flinkCluster.requestJobResult(jobId).get(); + assertThat(jobResult.getSerializedThrowable()).isEmpty(); + + // check Operator3 receives two watermark per task: 1L and 2L + assertOperatorReceivedWatermarkValues( + Operator3ProcessFunction.receivedWatermarks, true, 2L); + + checkSinkResults(); + } + + /** + * Test operator does not send watermarks when the {@link + * OneInputStreamProcessFunction#onWatermark} returns {@link WatermarkHandlingResult#POLL}. In + * this test case, we will not block any operators. The Operator2 will declare the long + * watermark. The Operator3 will receive watermark and process them in {@link + * Operator3ProcessFunction#onWatermark}, this method will return {@link + * WatermarkHandlingResult#POLL}. The Operator4 should not receive any watermarks, since the + * watermarks has been process and poll by user in Operator3. + */ + @Test + public void testWatermarkHandlingResultIsPoll() throws Exception { + StreamGraph streamGraph = + getStreamGraphForWatermarkHandlingResultIsPoll(Arrays.asList(1L, 2L)); + + // submit job + JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID(); + // wait job complete + JobResult jobResult = flinkCluster.requestJobResult(jobId).get(); + assertThat(jobResult.getSerializedThrowable()).isEmpty(); + + // check operator 4 receive no matermark + assertOperatorReceivedWatermarkValues(Operator4ProcessFunction.receivedWatermarks, true); + + checkSinkResults(); + } + + /** + * Test the operator does not send watermarks when the {@link WatermarkHandlingStrategy} is set + * to IGNORE and the {@link OneInputStreamProcessFunction#onWatermark} returns {@link + * WatermarkHandlingResult#PEEK}. In this test case, we will not block any operators. The + * Operator2 will declare the long watermark with {@link WatermarkHandlingStrategy#IGNORE}. The + * Operator3 will receive watermark and process them in {@link + * Operator3ProcessFunction#onWatermark}, this method will return {@link + * WatermarkHandlingResult#PEEK}. The Operator4 should not receive any watermarks, since the + * watermarks has been ignored in Operator3. + */ + @Test + public void testDefaultHandlingStrategyIgnore() throws Exception { + StreamGraph streamGraph = + getStreamGraphForDefaultHandlingStrategyIgnore(Arrays.asList(1L, 2L)); + + // submit job + JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID(); + // wait job complete + JobResult jobResult = flinkCluster.requestJobResult(jobId).get(); + assertThat(jobResult.getSerializedThrowable()).isEmpty(); + + // check Operator4 should not receive any watermarks + assertOperatorReceivedWatermarkValues(Operator4ProcessFunction.receivedWatermarks, true); + + checkSinkResults(); + } + + /** + * Test the source operator can declare and emit watermarks. In this test case, the test job + * will contain two operator: SourceOperator1 and Operator2. The SourceOperator1 will declare + * and emit long watermarks. Operator2 should receive the emitted watermarks. + */ + @Test + public void testSourceDeclareAndEmitWatermark() throws Exception { + StreamGraph streamGraph = + getStreamGraphForSourceDeclareAndEmitWatermarks(Arrays.asList(1L, 1L)); + + // submit job + JobID jobId = flinkCluster.submitJob(streamGraph).get().getJobID(); + // wait job complete + JobResult jobResult = flinkCluster.requestJobResult(jobId).get(); + assertThat(jobResult.getSerializedThrowable()).isEmpty(); + + // check Operator2 should receive watermarks + assertOperatorReceivedWatermarkValues( + Operator2ProcessFunction.receivedWatermarks, true, 1L); + } + + public static class Operator1SourceReader extends IteratorSourceReader { + + public SourceReaderContext sourceReaderContext; + public Map<Integer, Watermark> emitWatermark; + private boolean isFirstSendWatermark = true; + + public Operator1SourceReader( + SourceReaderContext context, Map<Integer, Watermark> emitWatermark) { + super(context); + this.sourceReaderContext = context; + this.emitWatermark = emitWatermark; + } + + @Override + public InputStatus pollNext(ReaderOutput output) { + InputStatus inputStatus = super.pollNext(output); + + int indexOfSubtask = sourceReaderContext.getIndexOfSubtask(); + if (isFirstSendWatermark + && emitWatermark != null + && emitWatermark.containsKey(indexOfSubtask)) { + sourceReaderContext.emitWatermark(emitWatermark.get(indexOfSubtask)); + isFirstSendWatermark = false; + } + + return inputStatus; + } + } + + public static class Operator1Source extends NumberSequenceSource { + + public WatermarkDeclaration watermarkDeclaration; + public Map<Integer, Watermark> emitWatermark; + + public Operator1Source( + long from, + long to, + WatermarkDeclaration watermarkDeclaration, + Map<Integer, Watermark> emitWatermark) { + super(from, to); + this.watermarkDeclaration = watermarkDeclaration; + this.emitWatermark = emitWatermark; + } + + public Collection<? extends WatermarkDeclaration> watermarkDeclarations() { + return Collections.singletonList(watermarkDeclaration); + } + + @Override + public SourceReader<Long, NumberSequenceSplit> createReader( + SourceReaderContext readerContext) { + return new Operator1SourceReader(readerContext, emitWatermark); + } + } + + public static class Operator2ProcessFunction + implements OneInputStreamProcessFunction<Long, Long> { + public static Map<Integer, Boolean> subtaskBlocked = new ConcurrentHashMap<>(); + public static Map<Integer, Integer> attemptIds = new ConcurrentHashMap<>(); + public static ConcurrentLinkedQueue<Long> receivedRecords = new ConcurrentLinkedQueue<>(); + // subtask id -> received watermarks + public static Map<Integer, List<Watermark>> receivedWatermarks = new ConcurrentHashMap<>(); + protected WatermarkDeclaration watermarkDeclaration; + // subtask id -> emit watermark + protected Map<Integer, Watermark> subIdx2emitWatermark; + + public Operator2ProcessFunction( + @Nullable WatermarkDeclaration watermarkDeclaration, + @Nullable Map<Integer, Watermark> subIdx2emitWatermark) { + this.watermarkDeclaration = watermarkDeclaration; + this.subIdx2emitWatermark = subIdx2emitWatermark; + } + + @Override + public void open(NonPartitionedContext<Long> ctx) throws Exception { + int subIdx = ctx.getTaskInfo().getIndexOfThisSubtask(); + + // attempt id ++ + attemptIds.compute( + subIdx, + (ignored, value) -> { + if (value == null) { + value = 0; + } else { + value += 1; + } + return value; + }); + + receivedWatermarks.computeIfAbsent(subIdx, taskIndex -> new ArrayList<>()); + + // wait until unblocked. + if (subtaskBlocked.containsKey(subIdx) && subtaskBlocked.get(subIdx)) { + tryWaitUntilCondition(() -> !subtaskBlocked.get(subIdx)); + } + + if (subIdx2emitWatermark != null && subIdx2emitWatermark.containsKey(subIdx)) { + ctx.getWatermarkManager().emitWatermark(subIdx2emitWatermark.get(subIdx)); + } + } + + @Override + public void processRecord(Long record, Collector<Long> output, PartitionedContext ctx) + throws Exception { + receivedRecords.add(record); + output.collect(record * 2); + } + + @Override + public WatermarkHandlingResult onWatermark( + Watermark watermark, Collector<Long> output, NonPartitionedContext<Long> ctx) { + int subIdx = ctx.getTaskInfo().getIndexOfThisSubtask(); + receivedWatermarks.get(subIdx).add(watermark); + return WatermarkHandlingResult.PEEK; + } + + @Override + public Collection<? extends WatermarkDeclaration> watermarkDeclarations() { + if (watermarkDeclaration != null) { + return Collections.singletonList(watermarkDeclaration); + } + return Collections.emptyList(); + } + + public static void blockSubTasks(Integer... subIndices) { + setSubtaskBlocked(Arrays.asList(subIndices), true, subtaskBlocked); + } + + public static void unblockSubTasks(Integer... subIndices) { + setSubtaskBlocked(Arrays.asList(subIndices), false, subtaskBlocked); + } + + public static void clear() { + subtaskBlocked.clear(); + attemptIds.clear(); + receivedRecords.clear(); + receivedWatermarks.clear(); + } + } + + public static class Operator3ProcessFunction + implements OneInputStreamProcessFunction<Long, Long> { + public static Map<Integer, Integer> attemptIds = new ConcurrentHashMap<>(); + public static ConcurrentLinkedQueue<Long> receivedRecords = new ConcurrentLinkedQueue<>(); + // subtask id -> received watermarks + public static Map<Integer, List<Watermark>> receivedWatermarks = new ConcurrentHashMap<>(); + public WatermarkHandlingResult watermarkHandlingResult; + + public Operator3ProcessFunction(WatermarkHandlingResult watermarkHandlingResult) { + this.watermarkHandlingResult = watermarkHandlingResult; + } + + @Override + public void open(NonPartitionedContext<Long> ctx) throws Exception { + int subIdx = ctx.getTaskInfo().getIndexOfThisSubtask(); + + // attempt id ++ + attemptIds.compute( + subIdx, + (ignored, value) -> { + if (value == null) { + value = 0; + } else { + value += 1; + } + return value; + }); + receivedWatermarks.computeIfAbsent(subIdx, taskIndex -> new ArrayList<>()); + } + + @Override + public void processRecord(Long record, Collector<Long> output, PartitionedContext ctx) + throws Exception { + receivedRecords.add(record); + output.collect(record + 1); + } + + @Override + public WatermarkHandlingResult onWatermark( + Watermark watermark, Collector<Long> output, NonPartitionedContext<Long> ctx) { + int subIdx = ctx.getTaskInfo().getIndexOfThisSubtask(); + receivedWatermarks.get(subIdx).add(watermark); + return watermarkHandlingResult; + } + + public static void clear() { + attemptIds.clear(); + receivedRecords.clear(); + receivedWatermarks.clear(); + } + } + + public static class Operator4ProcessFunction + implements OneInputStreamProcessFunction<Long, Long> { + public static Map<Integer, Integer> attemptIds = new ConcurrentHashMap<>(); + public static ConcurrentLinkedQueue<Long> receivedRecords = new ConcurrentLinkedQueue<>(); + // subtask id -> received watermarks + public static Map<Integer, List<Watermark>> receivedWatermarks = new ConcurrentHashMap<>(); + + @Override + public void open(NonPartitionedContext<Long> ctx) throws Exception { + int subIdx = ctx.getTaskInfo().getIndexOfThisSubtask(); + + // attempt id ++ + attemptIds.compute( + subIdx, + (ignored, value) -> { + if (value == null) { + value = 0; + } else { + value += 1; + } + return value; + }); + receivedWatermarks.computeIfAbsent(subIdx, taskIndex -> new ArrayList<>()); + } + + @Override + public void processRecord(Long record, Collector<Long> output, PartitionedContext ctx) + throws Exception { + receivedRecords.add(record); + } + + @Override + public WatermarkHandlingResult onWatermark( + Watermark watermark, Collector<Long> output, NonPartitionedContext<Long> ctx) { + int subIdx = ctx.getTaskInfo().getIndexOfThisSubtask(); + receivedWatermarks.get(subIdx).add(watermark); + return WatermarkHandlingResult.PEEK; + } + + public static void clear() { + attemptIds.clear(); + receivedRecords.clear(); + receivedWatermarks.clear(); + } + } + + public StreamGraph getStreamGraphForTestcase( + WatermarkDeclaration watermarkDeclaration, + Map<Integer, Watermark> emitWatermarksInOperator2, + WatermarkHandlingResult operator3HandlingResult) + throws ReflectiveOperationException { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + env.getConfiguration().set(PipelineOptions.OPERATOR_CHAINING, false); + ProcessConfigurableAndNonKeyedPartitionStream<Long> source = + env.fromSource( + DataStreamV2SourceUtils.fromData( + LongStream.range(0, NUMBER_KEYS) + .boxed() + .collect(Collectors.toList())), + "Operator1") + .withParallelism(DEFAULT_PARALLELISM); + + source.shuffle() + .process( + new Operator2ProcessFunction( + watermarkDeclaration, emitWatermarksInOperator2)) + .withName("Operator2") + .withParallelism(DEFAULT_PARALLELISM) + .shuffle() + .process(new Operator3ProcessFunction(operator3HandlingResult)) + .withName("Operator3") + .withParallelism(DEFAULT_PARALLELISM) + .shuffle() + .process(new Operator4ProcessFunction()) + .withName("Operator4") + .withParallelism(DEFAULT_PARALLELISM); + return env.getStreamGraph(); + } + + public StreamGraph getStreamGraphForTestSource( + WatermarkDeclaration watermarkDeclaration, + Map<Integer, Watermark> emitWatermarksInSourceOperator1) + throws ReflectiveOperationException { + ExecutionEnvironmentImpl env = + (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance(); + env.getConfiguration().set(PipelineOptions.OPERATOR_CHAINING, false); + + ProcessConfigurableAndNonKeyedPartitionStream<Long> source = + env.fromSource( + DataStreamV2SourceUtils.wrapSource( + new Operator1Source( + 0, + NUMBER_KEYS - 1, + watermarkDeclaration, + emitWatermarksInSourceOperator1)), + "Operator1") + .withParallelism(DEFAULT_PARALLELISM); + + source.shuffle() + .process(new Operator2ProcessFunction(null, null)) + .withName("Operator2") + .withParallelism(DEFAULT_PARALLELISM); + return env.getStreamGraph(); + } + + public StreamGraph getStreamGraphForAlignedWatermark(List<Boolean> emitWatermarkValues) Review Comment: `emitWatermarkValues` is too difficult to understand. It's better to rename and replace it with map directly. ########## flink-core-api/src/test/java/org/apache/flink/api/common/watermark/WatermarkDeclarationsTest.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.watermark; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link WatermarkDeclarations}. */ +public class WatermarkDeclarationsTest { + + private static final String DEFAULT_WATERMARK_IDENTIFIER = "default"; + + @Test + public void testCreatedLongWatermarkDeclarationDefaultValue() { Review Comment: ```suggestion void testCreatedLongWatermarkDeclarationDefaultValue() { ``` ########## flink-core-api/src/test/java/org/apache/flink/api/common/watermark/WatermarkDeclarationsTest.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.watermark; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link WatermarkDeclarations}. */ +public class WatermarkDeclarationsTest { Review Comment: ```suggestion class WatermarkDeclarationsTest { ``` ########## flink-runtime/src/test/java/org/apache/flink/streaming/api/watermark/generalized/WatermarkCombinerTest.java: ########## @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.watermark.generalized; + +import org.apache.flink.api.common.watermark.BoolWatermark; +import org.apache.flink.api.common.watermark.BoolWatermarkDeclaration; +import org.apache.flink.api.common.watermark.LongWatermark; +import org.apache.flink.api.common.watermark.LongWatermarkDeclaration; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkCombinationFunction; +import org.apache.flink.api.common.watermark.WatermarkCombinationPolicy; +import org.apache.flink.api.common.watermark.WatermarkDeclarations; +import org.apache.flink.api.common.watermark.WatermarkHandlingStrategy; +import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration; +import org.apache.flink.streaming.runtime.watermark.AlignedWatermarkCombiner; +import org.apache.flink.streaming.runtime.watermark.InternalBoolWatermarkDeclaration; +import org.apache.flink.streaming.runtime.watermark.InternalLongWatermarkDeclaration; +import org.apache.flink.streaming.runtime.watermark.WatermarkCombiner; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link WatermarkCombiner}. */ +public class WatermarkCombinerTest { + + private static final String DEFAULT_WATERMARK_IDENTIFIER = "default"; + private List<Watermark> receivedWatermarks = new ArrayList<>(); + private WatermarkCombiner combiner; + + @BeforeEach + public void setup() { + receivedWatermarks.clear(); + combiner = null; + } + + @Test + public void testAlignedWatermarkCombiner() throws Exception { + // The test scenario is as follows: + // ----------------------------------------------------------------------------- + // test scenario | expected result + // ----------------------------------------------------------------------------- + // Step | Channel 0 | Channel 1 | gateNotified | receivedWatermarkValues + // ----------------------------------------------------------------------------- + // 1 | 1 | | false | [] + // 2 | | 1 | true | [1] + // 3 | | 2 | false | [1] + // 3 | 2 | 2 | true | [1, 2] Review Comment: 4 ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/TwoOutputPartitionedContext.java: ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.context; + +import org.apache.flink.annotation.Experimental; + +/** A special {@link BasePartitionedContext} used in two output process functions. */ +@Experimental +public interface TwoOutputPartitionedContext extends BasePartitionedContext { + TwoOutputNonPartitionedContext<?, ?> getNonPartitionedContext(); Review Comment: Could we introduce generic parameter to this class/interface itself/ ########## flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermark/InternalLongWatermarkDeclaration.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.watermark; + +import org.apache.flink.api.common.watermark.LongWatermark; +import org.apache.flink.api.common.watermark.LongWatermarkDeclaration; +import org.apache.flink.api.common.watermark.WatermarkCombinationPolicy; +import org.apache.flink.api.common.watermark.WatermarkHandlingStrategy; + +/** + * The {@link InternalLongWatermarkDeclaration} class implements the {@link + * AbstractInternalWatermarkDeclaration} interface and provides additional functionality specific to + * long-type watermarks. + */ +public class InternalLongWatermarkDeclaration extends AbstractInternalWatermarkDeclaration<Long> { + + public InternalLongWatermarkDeclaration( Review Comment: Unused. ########## flink-runtime/src/main/java/org/apache/flink/streaming/util/watermark/WatermarkUtils.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.watermark; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.datastream.api.function.ProcessFunction; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.SourceOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; + +/** Utils class for {@link Watermark}. */ +public class WatermarkUtils { + + public static Set<AbstractInternalWatermarkDeclaration<?>> + getInternalWatermarkDeclarationsFromStreamGraph(StreamGraph streamGraph) { + Collection<StreamNode> streamNodes = streamGraph.getStreamNodes(); + + Set<WatermarkDeclaration> decralations = Review Comment: typo ########## flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermark/BoolWatermarkCombiner.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.watermark; + +import org.apache.flink.api.common.watermark.BoolWatermark; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkCombinationFunction; +import org.apache.flink.api.common.watermark.WatermarkCombinationPolicy; + +import java.util.BitSet; +import java.util.function.Consumer; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A {@link WatermarkCombiner} for unaligned {@link BoolWatermark}s. + * + * <p>The combination process will perform the following steps: (1) determine if it satisfies the + * condition of {@link WatermarkCombinationPolicy#isCombineWaitForAllChannels()}, (2) calculate the + * combined watermark value, (3) send the combined watermark to downstream only when the value + * differs from the previous sent one or if it's the first time sending. + */ +public class BoolWatermarkCombiner implements WatermarkCombiner { + + private final WatermarkCombinationPolicy combinationPolicy; + + /** The number of upstream input channels. */ + private int numberOfInputChannels; Review Comment: Can be final. ########## flink-runtime/src/main/java/org/apache/flink/streaming/util/watermark/WatermarkUtils.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.watermark; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.datastream.api.function.ProcessFunction; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.SourceOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; + +/** Utils class for {@link Watermark}. */ +public class WatermarkUtils { Review Comment: ```suggestion public final class WatermarkUtils { ``` ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java: ########## @@ -216,7 +220,8 @@ public SourceOperator( Configuration configuration, String localHostname, boolean emitProgressiveWatermarks, - CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) { + CanEmitBatchOfRecordsChecker canEmitBatchOfRecords, + Map<String, AbstractInternalWatermarkDeclaration<?>> watermarkDeclarationMap) { Review Comment: This map can be <String, Boolean>. ########## flink-datastream/src/test/resources/log4j2-test.properties: ########## @@ -0,0 +1,31 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = INFO Review Comment: ```suggestion rootLogger.level = OFF ``` ########## flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java: ########## @@ -83,4 +87,14 @@ SplitEnumerator<SplitT, EnumChkT> restoreEnumerator( * @return The serializer for the SplitEnumerator checkpoint. */ SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer(); + + /** + * Explicitly declare watermarks upfront. Each specific watermark must be declared in this + * method before it can be used. + * + * @return all watermark declarations used by this application. + */ + default Collection<? extends WatermarkDeclaration> watermarkDeclarations() { Review Comment: ```suggestion default Set<? extends WatermarkDeclaration> watermarkDeclarations() { ``` ########## flink-runtime/src/main/java/org/apache/flink/streaming/util/watermark/WatermarkUtils.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.watermark; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.datastream.api.function.ProcessFunction; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.SourceOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; + +/** Utils class for {@link Watermark}. */ +public class WatermarkUtils { + + public static Set<AbstractInternalWatermarkDeclaration<?>> Review Comment: java doc. ########## flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermark/AbstractInternalWatermarkDeclaration.java: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.watermark; + +import org.apache.flink.api.common.watermark.BoolWatermarkDeclaration; +import org.apache.flink.api.common.watermark.LongWatermarkDeclaration; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkCombinationPolicy; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.api.common.watermark.WatermarkHandlingStrategy; + +/** + * The {@link AbstractInternalWatermarkDeclaration} class implements the {@code + * WatermarkDeclaration} interface and provides some internal-oriented methods, such as + * getting/setting the {@code align} flag and creating the {@link WatermarkCombiner}. + */ +public abstract class AbstractInternalWatermarkDeclaration<T> implements WatermarkDeclaration { + + protected final String identifier; + + protected final WatermarkCombinationPolicy combinationPolicy; + + protected final WatermarkHandlingStrategy defaultHandlingStrategy; + + protected final boolean isAligned; + + public AbstractInternalWatermarkDeclaration( + String identifier, + WatermarkCombinationPolicy combinationPolicy, + WatermarkHandlingStrategy defaultHandlingStrategy, + boolean isAligned) { + this.identifier = identifier; + this.combinationPolicy = combinationPolicy; + this.defaultHandlingStrategy = defaultHandlingStrategy; + this.isAligned = isAligned; + } + + @Override + public String getIdentifier() { + return identifier; + } + + /** Creates a new {@link Watermark} with the specified value. */ + public abstract Watermark newWatermark(T val); + + public WatermarkCombinationPolicy getCombinationPolicy() { + return combinationPolicy; + } + + public WatermarkHandlingStrategy getDefaultHandlingStrategy() { + return defaultHandlingStrategy; + } + + public boolean isAligned() { + return isAligned; + } + + /** Creates a new {@link WatermarkCombiner} instance. */ + public abstract WatermarkCombiner watermarkCombiner( Review Comment: ```suggestion public abstract WatermarkCombiner createWatermarkCombiner( ``` ########## flink-core-api/src/test/java/org/apache/flink/api/common/watermark/WatermarkDeclarationsTest.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.watermark; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link WatermarkDeclarations}. */ +public class WatermarkDeclarationsTest { Review Comment: Class and test method should be package-private in junit5. ########## flink-runtime/src/main/java/org/apache/flink/streaming/util/watermark/WatermarkUtils.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.watermark; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.datastream.api.function.ProcessFunction; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.SourceOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; + +/** Utils class for {@link Watermark}. */ +public class WatermarkUtils { + + public static Set<AbstractInternalWatermarkDeclaration<?>> + getInternalWatermarkDeclarationsFromStreamGraph(StreamGraph streamGraph) { + Collection<StreamNode> streamNodes = streamGraph.getStreamNodes(); + + Set<WatermarkDeclaration> decralations = + streamNodes.stream() + .filter(n -> (n.getOperatorFactory() instanceof SimpleOperatorFactory)) + .map(n -> WatermarkUtils.getWatermarkDeclarations(n.getOperator())) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + Set<WatermarkDeclaration> sourceDeclarations = + streamNodes.stream() + .map(StreamNode::getOperatorFactory) + .filter(n -> (n instanceof SourceOperatorFactory)) + .map(n -> ((SourceOperatorFactory<?>) n).getSourceWatermarkDeclarations()) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + decralations.addAll(sourceDeclarations); Review Comment: Could be merged to ``` streamNodes.stream() .map(StreamNode::getOperatorFactory) .filter( factory -> factory instanceof SimpleOperatorFactory || factory instanceof SourceOperatorFactory) .map( factory -> { if (factory instanceof SimpleOperatorFactory) { return WatermarkUtils.getWatermarkDeclarations( ((SimpleOperatorFactory<?>) factory).getOperator()); } else { return ((SourceOperatorFactory<?>) factory) .getSourceWatermarkDeclarations(); } }) .flatMap(Collection::stream) .collect(Collectors.toSet()); ``` ########## flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkITCase.java: ########## @@ -0,0 +1,1064 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.api.datastream; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.watermark.BoolWatermark; +import org.apache.flink.api.common.watermark.BoolWatermarkDeclaration; +import org.apache.flink.api.common.watermark.LongWatermark; +import org.apache.flink.api.common.watermark.LongWatermarkDeclaration; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkCombinationFunction; +import org.apache.flink.api.common.watermark.WatermarkCombinationPolicy; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.api.common.watermark.WatermarkDeclarations; +import org.apache.flink.api.common.watermark.WatermarkHandlingResult; +import org.apache.flink.api.common.watermark.WatermarkHandlingStrategy; +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream; +import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.runtime.watermark.AlignableBoolWatermarkDeclaration; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * IT test case for {@link Watermark}. It will test the following behaviors of the generalized + * watermark: (1) The aligned watermark can block upstream. (2) The long watermark combines max/min + * correctly. (3) The bool watermark combines and/or correctly. (4) The watermark combiner will wait + * for all channels and then combine if the {@link + * WatermarkCombinationPolicy#isCombineWaitForAllChannels()} returns true. (5) The operator does not + * send watermarks when the {@link OneInputStreamProcessFunction#onWatermark} returns {@link + * WatermarkHandlingResult#POLL}. (6) The operator does not send watermarks when the {@link + * WatermarkHandlingStrategy} is set to IGNORE and the {@link + * OneInputStreamProcessFunction#onWatermark} returns {@link WatermarkHandlingResult#PEEK}. (7) The + * source operator can declare and emit watermarks. + * + * <p>We design a test job for these test cases, the test job has four operators, each with a + * parallelism of 2, and the shuffle edges are all to all. The job details are as follows: + * SourceOperator (Operator1) -> ProcessOperator(Operator2) -> ProcessOperator(Operator3) -> + * ProcessOperator(Operator4). Operator2 will declare watermarks and send watermark to downstream. + * Operator3 will receive watermarks from the upstream Operator2 and process them according to + * different test cases. Operator4 will receive or not receive watermarks from upstream Operator3. + */ +public class WatermarkITCase { + + private static final String DEFAULT_WATERMARK_IDENTIFIER = "default"; + + /** Parallelism of all operators. */ + private static final int DEFAULT_PARALLELISM = 2; + + private MiniCluster flinkCluster; + + /** + * The source operator will emit integer data within a range of 0 to {@code + * NUMBER_KEYS}(exclusive). + */ + private static final int NUMBER_KEYS = 100; + + public void startMiniCluster() throws Exception { + TestingMiniClusterConfiguration miniClusterConfiguration = + TestingMiniClusterConfiguration.newBuilder() + .setNumTaskManagers(3) + .setNumSlotsPerTaskManager(4) + .build(); + flinkCluster = TestingMiniCluster.newBuilder(miniClusterConfiguration).build(); + flinkCluster.start(); + } + + @BeforeEach + void before() throws Exception { + startMiniCluster(); + } + + @AfterEach + void after() throws Exception { + Operator2ProcessFunction.clear(); + Operator3ProcessFunction.clear(); + Operator4ProcessFunction.clear(); + + if (flinkCluster != null) { + flinkCluster.close(); + flinkCluster = null; + } + } + + /** + * Test aligned watermark can block upstream. In this test case, Operator2 will declare the + * aligned watermark. We will block Operator2's subtasks first and unblock them step by step, + * and each subtask will emit a watermark with value {@code true} after being unblocked. The + * Operator3 should receive only one combined and aligned watermark with value {@code true} + * after both the Operator2's subtask are unblocked. + */ + @Test + public void testAlignedWatermarkBlockUpstream() throws Exception { Review Comment: We should emit some record to see the upstream is indeed blocked. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
