This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8591e33217c8b5301850aa89e40c7478b5a83d75
Author: Jark Wu <j...@apache.org>
AuthorDate: Wed Dec 11 12:00:29 2019 +0800

    [hotfix][table-runtime-blink] Fix the watermark assigner operator should 
emit max watermark in close
---
 .../runtime/operators/wmassigners/WatermarkAssignerOperator.java    | 6 ++----
 .../operators/wmassigners/WatermarkAssignerOperatorFactory.java     | 2 +-
 ...rOperatorTest.java => RowTimeMiniBatchAssginerOperatorTest.java} | 2 +-
 .../operators/wmassigners/WatermarkAssignerOperatorTest.java        | 1 +
 4 files changed, 5 insertions(+), 6 deletions(-)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
index d924602..73d729a 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
@@ -164,10 +164,8 @@ public class WatermarkAssignerOperator
 
        @Override
        public void close() throws Exception {
-               super.close();
-
-               // emit a final watermark
-               advanceWatermark();
+               // all records have been processed, emit a final watermark
+               processWatermark(Watermark.MAX_WATERMARK);
 
                functionsClosed = true;
                FunctionUtils.closeFunction(watermarkGenerator);
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorFactory.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorFactory.java
index 6b50cbe..c896377 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorFactory.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorFactory.java
@@ -40,7 +40,7 @@ public class WatermarkAssignerOperatorFactory implements 
OneInputStreamOperatorF
 
        private final GeneratedWatermarkGenerator generatedWatermarkGenerator;
 
-       private ChainingStrategy strategy = ChainingStrategy.HEAD;
+       private ChainingStrategy strategy = ChainingStrategy.ALWAYS;
 
        public WatermarkAssignerOperatorFactory(
                        int rowtimeFieldIndex,
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeWatermarkAssignerOperatorTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeMiniBatchAssginerOperatorTest.java
similarity index 98%
rename from 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeWatermarkAssignerOperatorTest.java
rename to 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeMiniBatchAssginerOperatorTest.java
index a7abaad..6bda308 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeWatermarkAssignerOperatorTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeMiniBatchAssginerOperatorTest.java
@@ -35,7 +35,7 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests of {@link RowTimeMiniBatchAssginerOperator}.
  */
-public class RowTimeWatermarkAssignerOperatorTest extends 
WatermarkAssignerOperatorTestBase {
+public class RowTimeMiniBatchAssginerOperatorTest extends 
WatermarkAssignerOperatorTestBase {
 
        @Test
        public void testRowTimeWatermarkAssigner() throws Exception {
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.java
index 28f1d41..f801cd5 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.java
@@ -215,6 +215,7 @@ public class WatermarkAssignerOperatorTest extends 
WatermarkAssignerOperatorTest
                expected.add(new Watermark(10L));
 
                testHarness.close();
+               expected.add(Watermark.MAX_WATERMARK);
 
                // num_watermark + num_records
                assertEquals(expected.size() + 11, 
testHarness.getOutput().size());

Reply via email to