This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a0ef9eb46ad [FLINK-27733][python] Rework on_timer output behind 
watermark bug fix
a0ef9eb46ad is described below

commit a0ef9eb46ad3896d6d87595dbe364f69d583794c
Author: Juntao Hu <maybach...@gmail.com>
AuthorDate: Sun May 22 23:16:12 2022 +0800

    [FLINK-27733][python] Rework on_timer output behind watermark bug fix
    
    This closes #19788.
---
 .../python/AbstractPythonFunctionOperator.java     | 28 +++++++++++++-----
 .../python/PythonKeyedCoProcessOperator.java       | 33 ++++------------------
 .../python/PythonKeyedProcessOperator.java         | 33 ++++------------------
 .../operators/python/timer/TimerRegistration.java  | 16 -----------
 .../api/operators/python/timer/TimerUtils.java     | 30 --------------------
 ...thonStreamGroupWindowAggregateOperatorTest.java |  2 --
 ...onGroupWindowAggregateFunctionOperatorTest.java | 19 +++++++------
 ...ArrowPythonRowTimeBoundedRangeOperatorTest.java |  6 ++--
 ...mArrowPythonRowTimeBoundedRowsOperatorTest.java |  6 ++--
 9 files changed, 49 insertions(+), 124 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index f229ea7023c..5324df04f8a 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.python.metric.FlinkMetricContainer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
 import 
org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
 import 
org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -180,14 +181,18 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
         // Approach 1) is the easiest and gives better latency, yet 2)
         // gives better throughput due to the bundle not getting cut on
         // every watermark. So we have implemented 2) below.
+
+        // advance the watermark and do not emit watermark to downstream 
operators
+        if (getTimeServiceManager().isPresent()) {
+            getTimeServiceManager().get().advanceWatermark(mark);
+        }
+
         if (mark.getTimestamp() == Long.MAX_VALUE) {
             invokeFinishBundle();
             processElementsOfCurrentKeyIfNeeded(null);
-            preEmitWatermark(mark);
+            advanceWatermark(mark);
             output.emitWatermark(mark);
         } else if (isBundleFinished()) {
-            // forward the watermark immediately if the bundle is already 
finished.
-            preEmitWatermark(mark);
             output.emitWatermark(mark);
         } else {
             // It is not safe to advance the output watermark yet, so add a 
hold on the current
@@ -195,8 +200,8 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
             bundleFinishedCallback =
                     () -> {
                         try {
+                            advanceWatermark(mark);
                             // at this point the bundle is finished, allow the 
watermark to pass
-                            preEmitWatermark(mark);
                             output.emitWatermark(mark);
                         } catch (Exception e) {
                             throw new RuntimeException(
@@ -263,10 +268,19 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
 
     protected abstract PythonEnvironmentManager 
createPythonEnvironmentManager();
 
-    /** Called before emitting watermark to downstream. */
-    protected void preEmitWatermark(Watermark mark) throws Exception {
+    /**
+     * Advances the watermark of all managed timer services, potentially 
firing event time timers.
+     * It also ensures that the fired timers are processed in the Python 
user-defined functions.
+     */
+    private void advanceWatermark(Watermark watermark) throws Exception {
         if (getTimeServiceManager().isPresent()) {
-            getTimeServiceManager().get().advanceWatermark(mark);
+            InternalTimeServiceManager<?> timeServiceManager = 
getTimeServiceManager().get();
+            timeServiceManager.advanceWatermark(watermark);
+
+            while (!isBundleFinished()) {
+                invokeFinishBundle();
+                timeServiceManager.advanceWatermark(watermark);
+            }
         }
     }
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
index f3f65de3759..f4c317feed7 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.TimeDomain;
 import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
-import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.Triggerable;
@@ -38,7 +37,6 @@ import 
org.apache.flink.streaming.api.operators.python.timer.TimerRegistration;
 import 
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
 import org.apache.flink.streaming.api.utils.ProtoUtils;
 import org.apache.flink.streaming.api.utils.PythonTypeUtils;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.types.Row;
 
@@ -63,9 +61,6 @@ public class PythonKeyedCoProcessOperator<OUT>
     /** TimerService for current operator to register or fire timer. */
     private transient InternalTimerService<VoidNamespace> internalTimerService;
 
-    /** TimerRegistration for handling timer registering. */
-    private transient TimerRegistration timerRegistration;
-
     /** The TypeInformation of the key. */
     private transient TypeInformation<Row> keyTypeInfo;
 
@@ -107,13 +102,6 @@ public class PythonKeyedCoProcessOperator<OUT>
                         timerDataTypeInfo);
 
         timerHandler = new TimerHandler();
-        timerRegistration =
-                new TimerRegistration(
-                        getKeyedStateBackend(),
-                        internalTimerService,
-                        this,
-                        VoidNamespaceSerializer.INSTANCE,
-                        timerDataSerializer);
 
         super.open();
     }
@@ -141,7 +129,12 @@ public class PythonKeyedCoProcessOperator<OUT>
                 getOperatorStateBackend(),
                 keyTypeSerializer,
                 null,
-                timerRegistration,
+                new TimerRegistration(
+                        getKeyedStateBackend(),
+                        internalTimerService,
+                        this,
+                        VoidNamespaceSerializer.INSTANCE,
+                        timerDataSerializer),
                 getContainingTask().getEnvironment().getMemoryManager(),
                 getOperatorConfig()
                         .getManagedMemoryFractionOperatorUseCaseOfSlot(
@@ -214,20 +207,6 @@ public class PythonKeyedCoProcessOperator<OUT>
         emitResults();
     }
 
-    @SuppressWarnings("rawtypes")
-    @Override
-    protected void preEmitWatermark(Watermark mark) throws Exception {
-        if (!getTimeServiceManager().isPresent()) {
-            return;
-        }
-        InternalTimeServiceManager timeServiceManager = 
getTimeServiceManager().get();
-        long timestamp = mark.getTimestamp();
-        do {
-            timeServiceManager.advanceWatermark(mark);
-            invokeFinishBundle();
-        } while 
(!timerRegistration.hasEventTimeTimerBeforeTimestamp(timestamp));
-    }
-
     /**
      * As the beam state gRPC service will access the KeyedStateBackend in 
parallel with this
      * operator, we must override this method to prevent changing the current 
key of the
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
index fdc870d75c7..3bcf1c382e4 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.TimeDomain;
 import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
-import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.Triggerable;
@@ -38,7 +37,6 @@ import 
org.apache.flink.streaming.api.operators.python.timer.TimerRegistration;
 import 
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
 import org.apache.flink.streaming.api.utils.ProtoUtils;
 import org.apache.flink.streaming.api.utils.PythonTypeUtils;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.types.Row;
 
@@ -70,9 +68,6 @@ public class PythonKeyedProcessOperator<OUT>
     /** TimerService for current operator to register or fire timer. */
     private transient InternalTimerService internalTimerService;
 
-    /** TimerRegistration for handling timer registering. */
-    private transient TimerRegistration timerRegistration;
-
     /** The TypeInformation of the key. */
     private transient TypeInformation<Row> keyTypeInfo;
 
@@ -127,13 +122,6 @@ public class PythonKeyedProcessOperator<OUT>
                         timerDataTypeInfo);
 
         timerHandler = new TimerHandler();
-        timerRegistration =
-                new TimerRegistration(
-                        getKeyedStateBackend(),
-                        internalTimerService,
-                        this,
-                        namespaceSerializer,
-                        timerDataSerializer);
 
         super.open();
     }
@@ -171,7 +159,12 @@ public class PythonKeyedProcessOperator<OUT>
                 getOperatorStateBackend(),
                 keyTypeSerializer,
                 namespaceSerializer,
-                timerRegistration,
+                new TimerRegistration(
+                        getKeyedStateBackend(),
+                        internalTimerService,
+                        this,
+                        namespaceSerializer,
+                        timerDataSerializer),
                 getContainingTask().getEnvironment().getMemoryManager(),
                 getOperatorConfig()
                         .getManagedMemoryFractionOperatorUseCaseOfSlot(
@@ -233,20 +226,6 @@ public class PythonKeyedProcessOperator<OUT>
         emitResults();
     }
 
-    @SuppressWarnings("rawtypes")
-    @Override
-    protected void preEmitWatermark(Watermark mark) throws Exception {
-        if (!getTimeServiceManager().isPresent()) {
-            return;
-        }
-        InternalTimeServiceManager timeServiceManager = 
getTimeServiceManager().get();
-        long timestamp = mark.getTimestamp();
-        do {
-            timeServiceManager.advanceWatermark(mark);
-            invokeFinishBundle();
-        } while 
(!timerRegistration.hasEventTimeTimerBeforeTimestamp(timestamp));
-    }
-
     /**
      * As the beam state gRPC service will access the KeyedStateBackend in 
parallel with this
      * operator, we must override this method to prevent changing the current 
key of the
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java
index 686025eadb1..1a7371cc723 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java
@@ -22,13 +22,11 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.InternalPriorityQueue;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.KeyContext;
-import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
 import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
 import org.apache.flink.types.Row;
 
@@ -41,7 +39,6 @@ public final class TimerRegistration {
 
     private final KeyedStateBackend<Row> keyedStateBackend;
     private final InternalTimerService internalTimerService;
-    private final InternalPriorityQueue<TimerHeapInternalTimer<?, ?>> 
internalEventTimeTimersQueue;
     private final KeyContext keyContext;
     private final TypeSerializer namespaceSerializer;
     private final TypeSerializer<Row> timerDataSerializer;
@@ -57,8 +54,6 @@ public final class TimerRegistration {
             throws Exception {
         this.keyedStateBackend = keyedStateBackend;
         this.internalTimerService = internalTimerService;
-        this.internalEventTimeTimersQueue =
-                
TimerUtils.getInternalEventTimeTimersQueue(internalTimerService);
         this.keyContext = keyContext;
         this.namespaceSerializer = namespaceSerializer;
         this.timerDataSerializer = timerDataSerializer;
@@ -111,17 +106,6 @@ public final class TimerRegistration {
         }
     }
 
-    /**
-     * Returns if there's any event-time timer in the queue, that should be 
triggered because
-     * watermark advance.
-     */
-    public boolean hasEventTimeTimerBeforeTimestamp(long timestamp) throws 
Exception {
-        return TimerUtils.hasEventTimeTimerBeforeTimestamp(
-                internalEventTimeTimersQueue,
-                timestamp,
-                PythonOperatorUtils.inBatchExecutionMode(keyedStateBackend));
-    }
-
     /** The flag for indicating the timer operation type. */
     private enum TimerOperandType {
         REGISTER_EVENT_TIMER((byte) 0),
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java
index 4c9abf97ca1..aefd1834a24 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java
@@ -22,14 +22,8 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.runtime.state.InternalPriorityQueue;
-import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
 import org.apache.flink.streaming.api.utils.ProtoUtils;
 import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import java.lang.reflect.Field;
 
 /** Utilities for timer. */
 @Internal
@@ -48,28 +42,4 @@ public final class TimerUtils {
         return ProtoUtils.createRawTypeCoderInfoDescriptorProto(
                 timerDataType, FlinkFnApi.CoderInfoDescriptor.Mode.SINGLE, 
false);
     }
-
-    @SuppressWarnings("unchecked")
-    public static InternalPriorityQueue<TimerHeapInternalTimer<?, ?>>
-            getInternalEventTimeTimersQueue(InternalTimerService<?> 
internalTimerService)
-                    throws Exception {
-        Field queueField = 
internalTimerService.getClass().getDeclaredField("eventTimeTimersQueue");
-        queueField.setAccessible(true);
-        return (InternalPriorityQueue<TimerHeapInternalTimer<?, ?>>)
-                queueField.get(internalTimerService);
-    }
-
-    public static boolean hasEventTimeTimerBeforeTimestamp(
-            InternalPriorityQueue<TimerHeapInternalTimer<?, ?>> timerQueue,
-            long timestamp,
-            boolean isBatchMode)
-            throws Exception {
-        if (isBatchMode) {
-            Preconditions.checkArgument(timestamp == Long.MAX_VALUE);
-            return timerQueue.size() == 0;
-        }
-
-        TimerHeapInternalTimer<?, ?> minTimer = timerQueue.peek();
-        return minTimer == null || minTimer.getTimestamp() > timestamp;
-    }
 }
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
index 7aec55138bd..8b36e353adf 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
@@ -207,8 +207,6 @@ class PythonStreamGroupWindowAggregateOperatorTest
         testHarness.processElement(newRecord(true, initialTime + 3, "c1", 
"c6", 2L, 10000L));
         testHarness.processElement(newRecord(true, initialTime + 4, "c2", 
"c8", 3L, 0L));
         testHarness.processWatermark(new Watermark(20000L));
-        assertOutputEquals(
-                "FinishBundle should not be triggered.", expectedOutput, 
testHarness.getOutput());
 
         testHarness.setProcessingTime(1000L);
         expectedOutput.add(newWindowRecord(-5000L, 5000L, "c1", 0L));
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
index 8d44696d3c8..d1fcad02ee4 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
@@ -94,8 +94,6 @@ class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
         testHarness.processWatermark(Long.MAX_VALUE);
         testHarness.close();
 
-        expectedOutput.add(new Watermark(Long.MAX_VALUE));
-
         expectedOutput.add(
                 new StreamRecord<>(
                         newRow(
@@ -149,6 +147,8 @@ class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
                                 TimestampData.fromEpochMillis(10000L),
                                 TimestampData.fromEpochMillis(20000L))));
 
+        expectedOutput.add(new Watermark(Long.MAX_VALUE));
+
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
     }
 
@@ -175,8 +175,6 @@ class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
         // checkpoint trigger finishBundle
         testHarness.prepareSnapshotPreBarrier(0L);
 
-        expectedOutput.add(new Watermark(10000L));
-
         expectedOutput.add(
                 new StreamRecord<>(
                         newRow(
@@ -213,14 +211,14 @@ class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
                                 TimestampData.fromEpochMillis(0L),
                                 TimestampData.fromEpochMillis(10000L))));
 
+        expectedOutput.add(new Watermark(10000L));
+
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
         testHarness.processWatermark(20000L);
 
         testHarness.close();
 
-        expectedOutput.add(new Watermark(20000L));
-
         expectedOutput.add(
                 new StreamRecord<>(
                         newRow(
@@ -238,6 +236,8 @@ class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
                                 TimestampData.fromEpochMillis(10000L),
                                 TimestampData.fromEpochMillis(20000L))));
 
+        expectedOutput.add(new Watermark(20000L));
+
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
     }
 
@@ -304,8 +304,6 @@ class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
         testHarness.processWatermark(20000L);
         testHarness.close();
 
-        expectedOutput.add(new Watermark(20000L));
-
         expectedOutput.add(
                 new StreamRecord<>(
                         newRow(
@@ -323,6 +321,8 @@ class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
                                 TimestampData.fromEpochMillis(10000L),
                                 TimestampData.fromEpochMillis(20000L))));
 
+        expectedOutput.add(new Watermark(20000L));
+
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
     }
 
@@ -347,7 +347,6 @@ class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
         testHarness.processElement(
                 new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 0L), 
initialTime + 4));
         testHarness.processWatermark(new Watermark(20000L));
-        expectedOutput.add(new Watermark(20000L));
         assertOutputEquals(
                 "FinishBundle should not be triggered.", expectedOutput, 
testHarness.getOutput());
 
@@ -404,6 +403,8 @@ class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
                                 TimestampData.fromEpochMillis(10000L),
                                 TimestampData.fromEpochMillis(20000L))));
 
+        expectedOutput.add(new Watermark(20000L));
+
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
         testHarness.close();
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
index 2452222586a..0f489dccffb 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
@@ -83,11 +83,11 @@ class StreamArrowPythonRowTimeBoundedRangeOperatorTest
 
         testHarness.close();
 
-        expectedOutput.add(new Watermark(Long.MAX_VALUE));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L, 
0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 
0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 
3L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 
10L, 2L)));
+        expectedOutput.add(new Watermark(Long.MAX_VALUE));
 
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
     }
@@ -113,7 +113,6 @@ class StreamArrowPythonRowTimeBoundedRangeOperatorTest
                 new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), 
initialTime + 3));
         testHarness.processWatermark(new Watermark(10000L));
 
-        expectedOutput.add(new Watermark(10000L));
         assertOutputEquals(
                 "FinishBundle should not be triggered.", expectedOutput, 
testHarness.getOutput());
         // checkpoint trigger finishBundle
@@ -123,6 +122,7 @@ class StreamArrowPythonRowTimeBoundedRangeOperatorTest
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 
0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 
3L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 
10L, 2L)));
+        expectedOutput.add(new Watermark(10000L));
 
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
@@ -185,7 +185,6 @@ class StreamArrowPythonRowTimeBoundedRangeOperatorTest
         testHarness.processElement(
                 new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), 
initialTime + 3));
         testHarness.processWatermark(new Watermark(10000L));
-        expectedOutput.add(new Watermark(10000L));
         assertOutputEquals(
                 "FinishBundle should not be triggered.", expectedOutput, 
testHarness.getOutput());
 
@@ -194,6 +193,7 @@ class StreamArrowPythonRowTimeBoundedRangeOperatorTest
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 
0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 
3L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 
10L, 2L)));
+        expectedOutput.add(new Watermark(10000L));
 
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
index b27e9206103..0c200842910 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
@@ -79,11 +79,11 @@ class StreamArrowPythonRowTimeBoundedRowsOperatorTest
 
         testHarness.close();
 
-        expectedOutput.add(new Watermark(Long.MAX_VALUE));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L, 
0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 
0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 
3L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 
10L, 1L)));
+        expectedOutput.add(new Watermark(Long.MAX_VALUE));
 
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
     }
@@ -109,7 +109,6 @@ class StreamArrowPythonRowTimeBoundedRowsOperatorTest
                 new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), 
initialTime + 3));
         testHarness.processWatermark(new Watermark(10000L));
 
-        expectedOutput.add(new Watermark(10000L));
         assertOutputEquals(
                 "FinishBundle should not be triggered.", expectedOutput, 
testHarness.getOutput());
         // checkpoint trigger finishBundle
@@ -119,6 +118,7 @@ class StreamArrowPythonRowTimeBoundedRowsOperatorTest
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 
0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 
3L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 
10L, 1L)));
+        expectedOutput.add(new Watermark(10000L));
 
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
@@ -181,7 +181,6 @@ class StreamArrowPythonRowTimeBoundedRowsOperatorTest
         testHarness.processElement(
                 new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), 
initialTime + 3));
         testHarness.processWatermark(new Watermark(10000L));
-        expectedOutput.add(new Watermark(10000L));
         assertOutputEquals(
                 "FinishBundle should not be triggered.", expectedOutput, 
testHarness.getOutput());
 
@@ -190,6 +189,7 @@ class StreamArrowPythonRowTimeBoundedRowsOperatorTest
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 
0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 
3L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 
10L, 1L)));
+        expectedOutput.add(new Watermark(10000L));
 
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
 

Reply via email to