This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8591e33217c8b5301850aa89e40c7478b5a83d75 Author: Jark Wu <j...@apache.org> AuthorDate: Wed Dec 11 12:00:29 2019 +0800 [hotfix][table-runtime-blink] Fix the watermark assigner operator should emit max watermark in close --- .../runtime/operators/wmassigners/WatermarkAssignerOperator.java | 6 ++---- .../operators/wmassigners/WatermarkAssignerOperatorFactory.java | 2 +- ...rOperatorTest.java => RowTimeMiniBatchAssginerOperatorTest.java} | 2 +- .../operators/wmassigners/WatermarkAssignerOperatorTest.java | 1 + 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java index d924602..73d729a 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java @@ -164,10 +164,8 @@ public class WatermarkAssignerOperator @Override public void close() throws Exception { - super.close(); - - // emit a final watermark - advanceWatermark(); + // all records have been processed, emit a final watermark + processWatermark(Watermark.MAX_WATERMARK); functionsClosed = true; FunctionUtils.closeFunction(watermarkGenerator); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorFactory.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorFactory.java index 6b50cbe..c896377 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorFactory.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorFactory.java @@ -40,7 +40,7 @@ public class WatermarkAssignerOperatorFactory implements OneInputStreamOperatorF private final GeneratedWatermarkGenerator generatedWatermarkGenerator; - private ChainingStrategy strategy = ChainingStrategy.HEAD; + private ChainingStrategy strategy = ChainingStrategy.ALWAYS; public WatermarkAssignerOperatorFactory( int rowtimeFieldIndex, diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeWatermarkAssignerOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeMiniBatchAssginerOperatorTest.java similarity index 98% rename from flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeWatermarkAssignerOperatorTest.java rename to flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeMiniBatchAssginerOperatorTest.java index a7abaad..6bda308 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeWatermarkAssignerOperatorTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/RowTimeMiniBatchAssginerOperatorTest.java @@ -35,7 +35,7 @@ import static org.junit.Assert.assertEquals; /** * Tests of {@link RowTimeMiniBatchAssginerOperator}. */ -public class RowTimeWatermarkAssignerOperatorTest extends WatermarkAssignerOperatorTestBase { +public class RowTimeMiniBatchAssginerOperatorTest extends WatermarkAssignerOperatorTestBase { @Test public void testRowTimeWatermarkAssigner() throws Exception { diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.java index 28f1d41..f801cd5 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperatorTest.java @@ -215,6 +215,7 @@ public class WatermarkAssignerOperatorTest extends WatermarkAssignerOperatorTest expected.add(new Watermark(10L)); testHarness.close(); + expected.add(Watermark.MAX_WATERMARK); // num_watermark + num_records assertEquals(expected.size() + 11, testHarness.getOutput().size());