This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push: new f413c40c8ab [FLINK-27733][python] Rework on_timer output behind watermark bug fix f413c40c8ab is described below commit f413c40c8ab8145d3bdea8dbc6372961a598be37 Author: Juntao Hu <maybach...@gmail.com> AuthorDate: Sun May 22 23:16:12 2022 +0800 [FLINK-27733][python] Rework on_timer output behind watermark bug fix This closes #19788. --- .../python/AbstractPythonFunctionOperator.java | 28 +++++++++++++----- .../python/PythonKeyedCoProcessOperator.java | 33 ++++------------------ .../python/PythonKeyedProcessOperator.java | 33 ++++------------------ .../operators/python/timer/TimerRegistration.java | 16 ----------- .../api/operators/python/timer/TimerUtils.java | 30 -------------------- ...thonStreamGroupWindowAggregateOperatorTest.java | 2 -- ...onGroupWindowAggregateFunctionOperatorTest.java | 19 +++++++------ ...ArrowPythonRowTimeBoundedRangeOperatorTest.java | 6 ++-- ...mArrowPythonRowTimeBoundedRowsOperatorTest.java | 6 ++-- 9 files changed, 49 insertions(+), 124 deletions(-) diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java index f229ea7023c..5324df04f8a 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java @@ -25,6 +25,7 @@ import org.apache.flink.python.metric.FlinkMetricContainer; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager; import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend; import org.apache.flink.streaming.api.watermark.Watermark; @@ -180,14 +181,18 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream // Approach 1) is the easiest and gives better latency, yet 2) // gives better throughput due to the bundle not getting cut on // every watermark. So we have implemented 2) below. + + // advance the watermark and do not emit watermark to downstream operators + if (getTimeServiceManager().isPresent()) { + getTimeServiceManager().get().advanceWatermark(mark); + } + if (mark.getTimestamp() == Long.MAX_VALUE) { invokeFinishBundle(); processElementsOfCurrentKeyIfNeeded(null); - preEmitWatermark(mark); + advanceWatermark(mark); output.emitWatermark(mark); } else if (isBundleFinished()) { - // forward the watermark immediately if the bundle is already finished. - preEmitWatermark(mark); output.emitWatermark(mark); } else { // It is not safe to advance the output watermark yet, so add a hold on the current @@ -195,8 +200,8 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream bundleFinishedCallback = () -> { try { + advanceWatermark(mark); // at this point the bundle is finished, allow the watermark to pass - preEmitWatermark(mark); output.emitWatermark(mark); } catch (Exception e) { throw new RuntimeException( @@ -263,10 +268,19 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream protected abstract PythonEnvironmentManager createPythonEnvironmentManager(); - /** Called before emitting watermark to downstream. */ - protected void preEmitWatermark(Watermark mark) throws Exception { + /** + * Advances the watermark of all managed timer services, potentially firing event time timers. + * It also ensures that the fired timers are processed in the Python user-defined functions. + */ + private void advanceWatermark(Watermark watermark) throws Exception { if (getTimeServiceManager().isPresent()) { - getTimeServiceManager().get().advanceWatermark(mark); + InternalTimeServiceManager<?> timeServiceManager = getTimeServiceManager().get(); + timeServiceManager.advanceWatermark(watermark); + + while (!isBundleFinished()) { + invokeFinishBundle(); + timeServiceManager.advanceWatermark(watermark); + } } } diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java index 0d5a2eb6b6a..e7e1fdac987 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo; -import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.Triggerable; @@ -38,7 +37,6 @@ import org.apache.flink.streaming.api.operators.python.timer.TimerRegistration; import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner; import org.apache.flink.streaming.api.utils.ProtoUtils; import org.apache.flink.streaming.api.utils.PythonTypeUtils; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.types.Row; @@ -63,9 +61,6 @@ public class PythonKeyedCoProcessOperator<OUT> /** TimerService for current operator to register or fire timer. */ private transient InternalTimerService<VoidNamespace> internalTimerService; - /** TimerRegistration for handling timer registering. */ - private transient TimerRegistration timerRegistration; - /** The TypeInformation of the key. */ private transient TypeInformation<Row> keyTypeInfo; @@ -107,13 +102,6 @@ public class PythonKeyedCoProcessOperator<OUT> timerDataTypeInfo); timerHandler = new TimerHandler(); - timerRegistration = - new TimerRegistration( - getKeyedStateBackend(), - internalTimerService, - this, - VoidNamespaceSerializer.INSTANCE, - timerDataSerializer); super.open(); } @@ -139,7 +127,12 @@ public class PythonKeyedCoProcessOperator<OUT> getKeyedStateBackend(), keyTypeSerializer, null, - timerRegistration, + new TimerRegistration( + getKeyedStateBackend(), + internalTimerService, + this, + VoidNamespaceSerializer.INSTANCE, + timerDataSerializer), getContainingTask().getEnvironment().getMemoryManager(), getOperatorConfig() .getManagedMemoryFractionOperatorUseCaseOfSlot( @@ -211,20 +204,6 @@ public class PythonKeyedCoProcessOperator<OUT> emitResults(); } - @SuppressWarnings("rawtypes") - @Override - protected void preEmitWatermark(Watermark mark) throws Exception { - if (!getTimeServiceManager().isPresent()) { - return; - } - InternalTimeServiceManager timeServiceManager = getTimeServiceManager().get(); - long timestamp = mark.getTimestamp(); - do { - timeServiceManager.advanceWatermark(mark); - invokeFinishBundle(); - } while (!timerRegistration.hasEventTimeTimerBeforeTimestamp(timestamp)); - } - /** * As the beam state gRPC service will access the KeyedStateBackend in parallel with this * operator, we must override this method to prevent changing the current key of the diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java index 5205469258c..2896ed3be47 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo; -import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.Triggerable; @@ -38,7 +37,6 @@ import org.apache.flink.streaming.api.operators.python.timer.TimerRegistration; import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner; import org.apache.flink.streaming.api.utils.ProtoUtils; import org.apache.flink.streaming.api.utils.PythonTypeUtils; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.types.Row; @@ -70,9 +68,6 @@ public class PythonKeyedProcessOperator<OUT> /** TimerService for current operator to register or fire timer. */ private transient InternalTimerService internalTimerService; - /** TimerRegistration for handling timer registering. */ - private transient TimerRegistration timerRegistration; - /** The TypeInformation of the key. */ private transient TypeInformation<Row> keyTypeInfo; @@ -127,13 +122,6 @@ public class PythonKeyedProcessOperator<OUT> timerDataTypeInfo); timerHandler = new TimerHandler(); - timerRegistration = - new TimerRegistration( - getKeyedStateBackend(), - internalTimerService, - this, - namespaceSerializer, - timerDataSerializer); super.open(); } @@ -169,7 +157,12 @@ public class PythonKeyedProcessOperator<OUT> getKeyedStateBackend(), keyTypeSerializer, namespaceSerializer, - timerRegistration, + new TimerRegistration( + getKeyedStateBackend(), + internalTimerService, + this, + namespaceSerializer, + timerDataSerializer), getContainingTask().getEnvironment().getMemoryManager(), getOperatorConfig() .getManagedMemoryFractionOperatorUseCaseOfSlot( @@ -230,20 +223,6 @@ public class PythonKeyedProcessOperator<OUT> emitResults(); } - @SuppressWarnings("rawtypes") - @Override - protected void preEmitWatermark(Watermark mark) throws Exception { - if (!getTimeServiceManager().isPresent()) { - return; - } - InternalTimeServiceManager timeServiceManager = getTimeServiceManager().get(); - long timestamp = mark.getTimestamp(); - do { - timeServiceManager.advanceWatermark(mark); - invokeFinishBundle(); - } while (!timerRegistration.hasEventTimeTimerBeforeTimestamp(timestamp)); - } - /** * As the beam state gRPC service will access the KeyedStateBackend in parallel with this * operator, we must override this method to prevent changing the current key of the diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java index 686025eadb1..1a7371cc723 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java @@ -22,13 +22,11 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.runtime.state.InternalPriorityQueue; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.KeyContext; -import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer; import org.apache.flink.streaming.api.utils.PythonOperatorUtils; import org.apache.flink.types.Row; @@ -41,7 +39,6 @@ public final class TimerRegistration { private final KeyedStateBackend<Row> keyedStateBackend; private final InternalTimerService internalTimerService; - private final InternalPriorityQueue<TimerHeapInternalTimer<?, ?>> internalEventTimeTimersQueue; private final KeyContext keyContext; private final TypeSerializer namespaceSerializer; private final TypeSerializer<Row> timerDataSerializer; @@ -57,8 +54,6 @@ public final class TimerRegistration { throws Exception { this.keyedStateBackend = keyedStateBackend; this.internalTimerService = internalTimerService; - this.internalEventTimeTimersQueue = - TimerUtils.getInternalEventTimeTimersQueue(internalTimerService); this.keyContext = keyContext; this.namespaceSerializer = namespaceSerializer; this.timerDataSerializer = timerDataSerializer; @@ -111,17 +106,6 @@ public final class TimerRegistration { } } - /** - * Returns if there's any event-time timer in the queue, that should be triggered because - * watermark advance. - */ - public boolean hasEventTimeTimerBeforeTimestamp(long timestamp) throws Exception { - return TimerUtils.hasEventTimeTimerBeforeTimestamp( - internalEventTimeTimersQueue, - timestamp, - PythonOperatorUtils.inBatchExecutionMode(keyedStateBackend)); - } - /** The flag for indicating the timer operation type. */ private enum TimerOperandType { REGISTER_EVENT_TIMER((byte) 0), diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java index 4c9abf97ca1..aefd1834a24 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java @@ -22,14 +22,8 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.fnexecution.v1.FlinkFnApi; -import org.apache.flink.runtime.state.InternalPriorityQueue; -import org.apache.flink.streaming.api.operators.InternalTimerService; -import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer; import org.apache.flink.streaming.api.utils.ProtoUtils; import org.apache.flink.types.Row; -import org.apache.flink.util.Preconditions; - -import java.lang.reflect.Field; /** Utilities for timer. */ @Internal @@ -48,28 +42,4 @@ public final class TimerUtils { return ProtoUtils.createRawTypeCoderInfoDescriptorProto( timerDataType, FlinkFnApi.CoderInfoDescriptor.Mode.SINGLE, false); } - - @SuppressWarnings("unchecked") - public static InternalPriorityQueue<TimerHeapInternalTimer<?, ?>> - getInternalEventTimeTimersQueue(InternalTimerService<?> internalTimerService) - throws Exception { - Field queueField = internalTimerService.getClass().getDeclaredField("eventTimeTimersQueue"); - queueField.setAccessible(true); - return (InternalPriorityQueue<TimerHeapInternalTimer<?, ?>>) - queueField.get(internalTimerService); - } - - public static boolean hasEventTimeTimerBeforeTimestamp( - InternalPriorityQueue<TimerHeapInternalTimer<?, ?>> timerQueue, - long timestamp, - boolean isBatchMode) - throws Exception { - if (isBatchMode) { - Preconditions.checkArgument(timestamp == Long.MAX_VALUE); - return timerQueue.size() == 0; - } - - TimerHeapInternalTimer<?, ?> minTimer = timerQueue.peek(); - return minTimer == null || minTimer.getTimestamp() > timestamp; - } } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java index d58837b7eb0..cdc561cf216 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java @@ -207,8 +207,6 @@ public class PythonStreamGroupWindowAggregateOperatorTest testHarness.processElement(newRecord(true, initialTime + 3, "c1", "c6", 2L, 10000L)); testHarness.processElement(newRecord(true, initialTime + 4, "c2", "c8", 3L, 0L)); testHarness.processWatermark(new Watermark(20000L)); - assertOutputEquals( - "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput()); testHarness.setProcessingTime(1000L); expectedOutput.add(newWindowRecord(-5000L, 5000L, "c1", 0L)); diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java index be4bdabdb48..ddf106357dc 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java @@ -95,8 +95,6 @@ public class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest testHarness.processWatermark(Long.MAX_VALUE); testHarness.close(); - expectedOutput.add(new Watermark(Long.MAX_VALUE)); - expectedOutput.add( new StreamRecord<>( newRow( @@ -150,6 +148,8 @@ public class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest TimestampData.fromEpochMillis(10000L), TimestampData.fromEpochMillis(20000L)))); + expectedOutput.add(new Watermark(Long.MAX_VALUE)); + assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); } @@ -176,8 +176,6 @@ public class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest // checkpoint trigger finishBundle testHarness.prepareSnapshotPreBarrier(0L); - expectedOutput.add(new Watermark(10000L)); - expectedOutput.add( new StreamRecord<>( newRow( @@ -214,14 +212,14 @@ public class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest TimestampData.fromEpochMillis(0L), TimestampData.fromEpochMillis(10000L)))); + expectedOutput.add(new Watermark(10000L)); + assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); testHarness.processWatermark(20000L); testHarness.close(); - expectedOutput.add(new Watermark(20000L)); - expectedOutput.add( new StreamRecord<>( newRow( @@ -239,6 +237,8 @@ public class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest TimestampData.fromEpochMillis(10000L), TimestampData.fromEpochMillis(20000L)))); + expectedOutput.add(new Watermark(20000L)); + assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); } @@ -305,8 +305,6 @@ public class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest testHarness.processWatermark(20000L); testHarness.close(); - expectedOutput.add(new Watermark(20000L)); - expectedOutput.add( new StreamRecord<>( newRow( @@ -324,6 +322,8 @@ public class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest TimestampData.fromEpochMillis(10000L), TimestampData.fromEpochMillis(20000L)))); + expectedOutput.add(new Watermark(20000L)); + assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); } @@ -348,7 +348,6 @@ public class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest testHarness.processElement( new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 0L), initialTime + 4)); testHarness.processWatermark(new Watermark(20000L)); - expectedOutput.add(new Watermark(20000L)); assertOutputEquals( "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput()); @@ -405,6 +404,8 @@ public class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest TimestampData.fromEpochMillis(10000L), TimestampData.fromEpochMillis(20000L)))); + expectedOutput.add(new Watermark(20000L)); + assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); testHarness.close(); diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java index 620de108844..737478fd2cd 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java @@ -84,11 +84,11 @@ public class StreamArrowPythonRowTimeBoundedRangeOperatorTest testHarness.close(); - expectedOutput.add(new Watermark(Long.MAX_VALUE)); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L, 0L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10L, 2L))); + expectedOutput.add(new Watermark(Long.MAX_VALUE)); assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); } @@ -114,7 +114,6 @@ public class StreamArrowPythonRowTimeBoundedRangeOperatorTest new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), initialTime + 3)); testHarness.processWatermark(new Watermark(10000L)); - expectedOutput.add(new Watermark(10000L)); assertOutputEquals( "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput()); // checkpoint trigger finishBundle @@ -124,6 +123,7 @@ public class StreamArrowPythonRowTimeBoundedRangeOperatorTest expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10L, 2L))); + expectedOutput.add(new Watermark(10000L)); assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -186,7 +186,6 @@ public class StreamArrowPythonRowTimeBoundedRangeOperatorTest testHarness.processElement( new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), initialTime + 3)); testHarness.processWatermark(new Watermark(10000L)); - expectedOutput.add(new Watermark(10000L)); assertOutputEquals( "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput()); @@ -195,6 +194,7 @@ public class StreamArrowPythonRowTimeBoundedRangeOperatorTest expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10L, 2L))); + expectedOutput.add(new Watermark(10000L)); assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java index dafdf3b5434..4123d492975 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java @@ -80,11 +80,11 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest testHarness.close(); - expectedOutput.add(new Watermark(Long.MAX_VALUE)); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L, 0L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10L, 1L))); + expectedOutput.add(new Watermark(Long.MAX_VALUE)); assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); } @@ -110,7 +110,6 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), initialTime + 3)); testHarness.processWatermark(new Watermark(10000L)); - expectedOutput.add(new Watermark(10000L)); assertOutputEquals( "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput()); // checkpoint trigger finishBundle @@ -120,6 +119,7 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10L, 1L))); + expectedOutput.add(new Watermark(10000L)); assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -182,7 +182,6 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest testHarness.processElement( new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), initialTime + 3)); testHarness.processWatermark(new Watermark(10000L)); - expectedOutput.add(new Watermark(10000L)); assertOutputEquals( "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput()); @@ -191,6 +190,7 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10L, 1L))); + expectedOutput.add(new Watermark(10000L)); assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());