This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a0ef9eb46ad [FLINK-27733][python] Rework on_timer output behind watermark bug fix a0ef9eb46ad is described below commit a0ef9eb46ad3896d6d87595dbe364f69d583794c Author: Juntao Hu <maybach...@gmail.com> AuthorDate: Sun May 22 23:16:12 2022 +0800 [FLINK-27733][python] Rework on_timer output behind watermark bug fix This closes #19788. --- .../python/AbstractPythonFunctionOperator.java | 28 +++++++++++++----- .../python/PythonKeyedCoProcessOperator.java | 33 ++++------------------ .../python/PythonKeyedProcessOperator.java | 33 ++++------------------ .../operators/python/timer/TimerRegistration.java | 16 ----------- .../api/operators/python/timer/TimerUtils.java | 30 -------------------- ...thonStreamGroupWindowAggregateOperatorTest.java | 2 -- ...onGroupWindowAggregateFunctionOperatorTest.java | 19 +++++++------ ...ArrowPythonRowTimeBoundedRangeOperatorTest.java | 6 ++-- ...mArrowPythonRowTimeBoundedRowsOperatorTest.java | 6 ++-- 9 files changed, 49 insertions(+), 124 deletions(-) diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java index f229ea7023c..5324df04f8a 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java @@ -25,6 +25,7 @@ import org.apache.flink.python.metric.FlinkMetricContainer; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager; import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend; import org.apache.flink.streaming.api.watermark.Watermark; @@ -180,14 +181,18 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream // Approach 1) is the easiest and gives better latency, yet 2) // gives better throughput due to the bundle not getting cut on // every watermark. So we have implemented 2) below. + + // advance the watermark and do not emit watermark to downstream operators + if (getTimeServiceManager().isPresent()) { + getTimeServiceManager().get().advanceWatermark(mark); + } + if (mark.getTimestamp() == Long.MAX_VALUE) { invokeFinishBundle(); processElementsOfCurrentKeyIfNeeded(null); - preEmitWatermark(mark); + advanceWatermark(mark); output.emitWatermark(mark); } else if (isBundleFinished()) { - // forward the watermark immediately if the bundle is already finished. - preEmitWatermark(mark); output.emitWatermark(mark); } else { // It is not safe to advance the output watermark yet, so add a hold on the current @@ -195,8 +200,8 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream bundleFinishedCallback = () -> { try { + advanceWatermark(mark); // at this point the bundle is finished, allow the watermark to pass - preEmitWatermark(mark); output.emitWatermark(mark); } catch (Exception e) { throw new RuntimeException( @@ -263,10 +268,19 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream protected abstract PythonEnvironmentManager createPythonEnvironmentManager(); - /** Called before emitting watermark to downstream. */ - protected void preEmitWatermark(Watermark mark) throws Exception { + /** + * Advances the watermark of all managed timer services, potentially firing event time timers. + * It also ensures that the fired timers are processed in the Python user-defined functions. + */ + private void advanceWatermark(Watermark watermark) throws Exception { if (getTimeServiceManager().isPresent()) { - getTimeServiceManager().get().advanceWatermark(mark); + InternalTimeServiceManager<?> timeServiceManager = getTimeServiceManager().get(); + timeServiceManager.advanceWatermark(watermark); + + while (!isBundleFinished()) { + invokeFinishBundle(); + timeServiceManager.advanceWatermark(watermark); + } } } diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java index f3f65de3759..f4c317feed7 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo; -import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.Triggerable; @@ -38,7 +37,6 @@ import org.apache.flink.streaming.api.operators.python.timer.TimerRegistration; import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner; import org.apache.flink.streaming.api.utils.ProtoUtils; import org.apache.flink.streaming.api.utils.PythonTypeUtils; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.types.Row; @@ -63,9 +61,6 @@ public class PythonKeyedCoProcessOperator<OUT> /** TimerService for current operator to register or fire timer. */ private transient InternalTimerService<VoidNamespace> internalTimerService; - /** TimerRegistration for handling timer registering. */ - private transient TimerRegistration timerRegistration; - /** The TypeInformation of the key. */ private transient TypeInformation<Row> keyTypeInfo; @@ -107,13 +102,6 @@ public class PythonKeyedCoProcessOperator<OUT> timerDataTypeInfo); timerHandler = new TimerHandler(); - timerRegistration = - new TimerRegistration( - getKeyedStateBackend(), - internalTimerService, - this, - VoidNamespaceSerializer.INSTANCE, - timerDataSerializer); super.open(); } @@ -141,7 +129,12 @@ public class PythonKeyedCoProcessOperator<OUT> getOperatorStateBackend(), keyTypeSerializer, null, - timerRegistration, + new TimerRegistration( + getKeyedStateBackend(), + internalTimerService, + this, + VoidNamespaceSerializer.INSTANCE, + timerDataSerializer), getContainingTask().getEnvironment().getMemoryManager(), getOperatorConfig() .getManagedMemoryFractionOperatorUseCaseOfSlot( @@ -214,20 +207,6 @@ public class PythonKeyedCoProcessOperator<OUT> emitResults(); } - @SuppressWarnings("rawtypes") - @Override - protected void preEmitWatermark(Watermark mark) throws Exception { - if (!getTimeServiceManager().isPresent()) { - return; - } - InternalTimeServiceManager timeServiceManager = getTimeServiceManager().get(); - long timestamp = mark.getTimestamp(); - do { - timeServiceManager.advanceWatermark(mark); - invokeFinishBundle(); - } while (!timerRegistration.hasEventTimeTimerBeforeTimestamp(timestamp)); - } - /** * As the beam state gRPC service will access the KeyedStateBackend in parallel with this * operator, we must override this method to prevent changing the current key of the diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java index fdc870d75c7..3bcf1c382e4 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo; -import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.Triggerable; @@ -38,7 +37,6 @@ import org.apache.flink.streaming.api.operators.python.timer.TimerRegistration; import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner; import org.apache.flink.streaming.api.utils.ProtoUtils; import org.apache.flink.streaming.api.utils.PythonTypeUtils; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.types.Row; @@ -70,9 +68,6 @@ public class PythonKeyedProcessOperator<OUT> /** TimerService for current operator to register or fire timer. */ private transient InternalTimerService internalTimerService; - /** TimerRegistration for handling timer registering. */ - private transient TimerRegistration timerRegistration; - /** The TypeInformation of the key. */ private transient TypeInformation<Row> keyTypeInfo; @@ -127,13 +122,6 @@ public class PythonKeyedProcessOperator<OUT> timerDataTypeInfo); timerHandler = new TimerHandler(); - timerRegistration = - new TimerRegistration( - getKeyedStateBackend(), - internalTimerService, - this, - namespaceSerializer, - timerDataSerializer); super.open(); } @@ -171,7 +159,12 @@ public class PythonKeyedProcessOperator<OUT> getOperatorStateBackend(), keyTypeSerializer, namespaceSerializer, - timerRegistration, + new TimerRegistration( + getKeyedStateBackend(), + internalTimerService, + this, + namespaceSerializer, + timerDataSerializer), getContainingTask().getEnvironment().getMemoryManager(), getOperatorConfig() .getManagedMemoryFractionOperatorUseCaseOfSlot( @@ -233,20 +226,6 @@ public class PythonKeyedProcessOperator<OUT> emitResults(); } - @SuppressWarnings("rawtypes") - @Override - protected void preEmitWatermark(Watermark mark) throws Exception { - if (!getTimeServiceManager().isPresent()) { - return; - } - InternalTimeServiceManager timeServiceManager = getTimeServiceManager().get(); - long timestamp = mark.getTimestamp(); - do { - timeServiceManager.advanceWatermark(mark); - invokeFinishBundle(); - } while (!timerRegistration.hasEventTimeTimerBeforeTimestamp(timestamp)); - } - /** * As the beam state gRPC service will access the KeyedStateBackend in parallel with this * operator, we must override this method to prevent changing the current key of the diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java index 686025eadb1..1a7371cc723 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java @@ -22,13 +22,11 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.runtime.state.InternalPriorityQueue; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.KeyContext; -import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer; import org.apache.flink.streaming.api.utils.PythonOperatorUtils; import org.apache.flink.types.Row; @@ -41,7 +39,6 @@ public final class TimerRegistration { private final KeyedStateBackend<Row> keyedStateBackend; private final InternalTimerService internalTimerService; - private final InternalPriorityQueue<TimerHeapInternalTimer<?, ?>> internalEventTimeTimersQueue; private final KeyContext keyContext; private final TypeSerializer namespaceSerializer; private final TypeSerializer<Row> timerDataSerializer; @@ -57,8 +54,6 @@ public final class TimerRegistration { throws Exception { this.keyedStateBackend = keyedStateBackend; this.internalTimerService = internalTimerService; - this.internalEventTimeTimersQueue = - TimerUtils.getInternalEventTimeTimersQueue(internalTimerService); this.keyContext = keyContext; this.namespaceSerializer = namespaceSerializer; this.timerDataSerializer = timerDataSerializer; @@ -111,17 +106,6 @@ public final class TimerRegistration { } } - /** - * Returns if there's any event-time timer in the queue, that should be triggered because - * watermark advance. - */ - public boolean hasEventTimeTimerBeforeTimestamp(long timestamp) throws Exception { - return TimerUtils.hasEventTimeTimerBeforeTimestamp( - internalEventTimeTimersQueue, - timestamp, - PythonOperatorUtils.inBatchExecutionMode(keyedStateBackend)); - } - /** The flag for indicating the timer operation type. */ private enum TimerOperandType { REGISTER_EVENT_TIMER((byte) 0), diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java index 4c9abf97ca1..aefd1834a24 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java @@ -22,14 +22,8 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.fnexecution.v1.FlinkFnApi; -import org.apache.flink.runtime.state.InternalPriorityQueue; -import org.apache.flink.streaming.api.operators.InternalTimerService; -import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer; import org.apache.flink.streaming.api.utils.ProtoUtils; import org.apache.flink.types.Row; -import org.apache.flink.util.Preconditions; - -import java.lang.reflect.Field; /** Utilities for timer. */ @Internal @@ -48,28 +42,4 @@ public final class TimerUtils { return ProtoUtils.createRawTypeCoderInfoDescriptorProto( timerDataType, FlinkFnApi.CoderInfoDescriptor.Mode.SINGLE, false); } - - @SuppressWarnings("unchecked") - public static InternalPriorityQueue<TimerHeapInternalTimer<?, ?>> - getInternalEventTimeTimersQueue(InternalTimerService<?> internalTimerService) - throws Exception { - Field queueField = internalTimerService.getClass().getDeclaredField("eventTimeTimersQueue"); - queueField.setAccessible(true); - return (InternalPriorityQueue<TimerHeapInternalTimer<?, ?>>) - queueField.get(internalTimerService); - } - - public static boolean hasEventTimeTimerBeforeTimestamp( - InternalPriorityQueue<TimerHeapInternalTimer<?, ?>> timerQueue, - long timestamp, - boolean isBatchMode) - throws Exception { - if (isBatchMode) { - Preconditions.checkArgument(timestamp == Long.MAX_VALUE); - return timerQueue.size() == 0; - } - - TimerHeapInternalTimer<?, ?> minTimer = timerQueue.peek(); - return minTimer == null || minTimer.getTimestamp() > timestamp; - } } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java index 7aec55138bd..8b36e353adf 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java @@ -207,8 +207,6 @@ class PythonStreamGroupWindowAggregateOperatorTest testHarness.processElement(newRecord(true, initialTime + 3, "c1", "c6", 2L, 10000L)); testHarness.processElement(newRecord(true, initialTime + 4, "c2", "c8", 3L, 0L)); testHarness.processWatermark(new Watermark(20000L)); - assertOutputEquals( - "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput()); testHarness.setProcessingTime(1000L); expectedOutput.add(newWindowRecord(-5000L, 5000L, "c1", 0L)); diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java index 8d44696d3c8..d1fcad02ee4 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java @@ -94,8 +94,6 @@ class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest testHarness.processWatermark(Long.MAX_VALUE); testHarness.close(); - expectedOutput.add(new Watermark(Long.MAX_VALUE)); - expectedOutput.add( new StreamRecord<>( newRow( @@ -149,6 +147,8 @@ class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest TimestampData.fromEpochMillis(10000L), TimestampData.fromEpochMillis(20000L)))); + expectedOutput.add(new Watermark(Long.MAX_VALUE)); + assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); } @@ -175,8 +175,6 @@ class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest // checkpoint trigger finishBundle testHarness.prepareSnapshotPreBarrier(0L); - expectedOutput.add(new Watermark(10000L)); - expectedOutput.add( new StreamRecord<>( newRow( @@ -213,14 +211,14 @@ class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest TimestampData.fromEpochMillis(0L), TimestampData.fromEpochMillis(10000L)))); + expectedOutput.add(new Watermark(10000L)); + assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); testHarness.processWatermark(20000L); testHarness.close(); - expectedOutput.add(new Watermark(20000L)); - expectedOutput.add( new StreamRecord<>( newRow( @@ -238,6 +236,8 @@ class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest TimestampData.fromEpochMillis(10000L), TimestampData.fromEpochMillis(20000L)))); + expectedOutput.add(new Watermark(20000L)); + assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); } @@ -304,8 +304,6 @@ class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest testHarness.processWatermark(20000L); testHarness.close(); - expectedOutput.add(new Watermark(20000L)); - expectedOutput.add( new StreamRecord<>( newRow( @@ -323,6 +321,8 @@ class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest TimestampData.fromEpochMillis(10000L), TimestampData.fromEpochMillis(20000L)))); + expectedOutput.add(new Watermark(20000L)); + assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); } @@ -347,7 +347,6 @@ class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest testHarness.processElement( new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 0L), initialTime + 4)); testHarness.processWatermark(new Watermark(20000L)); - expectedOutput.add(new Watermark(20000L)); assertOutputEquals( "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput()); @@ -404,6 +403,8 @@ class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest TimestampData.fromEpochMillis(10000L), TimestampData.fromEpochMillis(20000L)))); + expectedOutput.add(new Watermark(20000L)); + assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); testHarness.close(); diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java index 2452222586a..0f489dccffb 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java @@ -83,11 +83,11 @@ class StreamArrowPythonRowTimeBoundedRangeOperatorTest testHarness.close(); - expectedOutput.add(new Watermark(Long.MAX_VALUE)); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L, 0L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10L, 2L))); + expectedOutput.add(new Watermark(Long.MAX_VALUE)); assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); } @@ -113,7 +113,6 @@ class StreamArrowPythonRowTimeBoundedRangeOperatorTest new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), initialTime + 3)); testHarness.processWatermark(new Watermark(10000L)); - expectedOutput.add(new Watermark(10000L)); assertOutputEquals( "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput()); // checkpoint trigger finishBundle @@ -123,6 +122,7 @@ class StreamArrowPythonRowTimeBoundedRangeOperatorTest expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10L, 2L))); + expectedOutput.add(new Watermark(10000L)); assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -185,7 +185,6 @@ class StreamArrowPythonRowTimeBoundedRangeOperatorTest testHarness.processElement( new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), initialTime + 3)); testHarness.processWatermark(new Watermark(10000L)); - expectedOutput.add(new Watermark(10000L)); assertOutputEquals( "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput()); @@ -194,6 +193,7 @@ class StreamArrowPythonRowTimeBoundedRangeOperatorTest expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10L, 2L))); + expectedOutput.add(new Watermark(10000L)); assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java index b27e9206103..0c200842910 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java @@ -79,11 +79,11 @@ class StreamArrowPythonRowTimeBoundedRowsOperatorTest testHarness.close(); - expectedOutput.add(new Watermark(Long.MAX_VALUE)); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L, 0L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10L, 1L))); + expectedOutput.add(new Watermark(Long.MAX_VALUE)); assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); } @@ -109,7 +109,6 @@ class StreamArrowPythonRowTimeBoundedRowsOperatorTest new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), initialTime + 3)); testHarness.processWatermark(new Watermark(10000L)); - expectedOutput.add(new Watermark(10000L)); assertOutputEquals( "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput()); // checkpoint trigger finishBundle @@ -119,6 +118,7 @@ class StreamArrowPythonRowTimeBoundedRowsOperatorTest expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10L, 1L))); + expectedOutput.add(new Watermark(10000L)); assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -181,7 +181,6 @@ class StreamArrowPythonRowTimeBoundedRowsOperatorTest testHarness.processElement( new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), initialTime + 3)); testHarness.processWatermark(new Watermark(10000L)); - expectedOutput.add(new Watermark(10000L)); assertOutputEquals( "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput()); @@ -190,6 +189,7 @@ class StreamArrowPythonRowTimeBoundedRowsOperatorTest expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L))); expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10L, 1L))); + expectedOutput.add(new Watermark(10000L)); assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());