This is an automated email from the ASF dual-hosted git repository.

jingzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d46548c  [FLINK-25614][table-runtime] Update LocalWindowAggregate 
chain strategy to ALWAYS
d46548c is described below

commit d46548c5024c4c47d6e52168e1d92c68d3ce58f8
Author: lmagic233 <mirai.sen...@outlook.com>
AuthorDate: Tue Jan 11 23:24:48 2022 +0800

    [FLINK-25614][table-runtime] Update LocalWindowAggregate chain strategy to 
ALWAYS
    
    This closes #18331.
---
 .../window/LocalSlicingWindowAggOperator.java      |   2 +
 .../operators/window/slicing/SliceAssigners.java   |   9 +-
 .../window/SlicingWindowAggOperatorTest.java       | 114 +++++++++++----------
 3 files changed, 66 insertions(+), 59 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
index fbdfe48..1742767 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
@@ -21,6 +21,7 @@ package 
org.apache.flink.table.runtime.operators.aggregate.window;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -80,6 +81,7 @@ public class LocalSlicingWindowAggOperator extends 
AbstractStreamOperator<RowDat
             SliceAssigner sliceAssigner,
             WindowBuffer.LocalFactory windowBufferFactory,
             ZoneId shiftTimezone) {
+        chainingStrategy = ChainingStrategy.ALWAYS;
         this.keySelector = keySelector;
         this.sliceAssigner = sliceAssigner;
         this.windowInterval = sliceAssigner.getSliceEndInterval();
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java
index 5b71b35..b568c48 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.table.runtime.operators.window.slicing;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.runtime.operators.window.TimeWindow;
 import org.apache.flink.util.IterableIterator;
 import org.apache.flink.util.MathUtils;
@@ -401,7 +402,7 @@ public final class SliceAssigners {
 
         @Override
         public long assignSliceEnd(RowData element, ClockService clock) {
-            return element.getLong(windowEndIndex);
+            return element.getTimestamp(windowEndIndex, 3).getMillisecond();
         }
 
         @Override
@@ -507,7 +508,7 @@ public final class SliceAssigners {
 
         @Override
         public long assignSliceEnd(RowData element, ClockService clock) {
-            return element.getLong(sliceEndIndex);
+            return element.getTimestamp(sliceEndIndex, 3).getMillisecond();
         }
 
         @Override
@@ -552,7 +553,9 @@ public final class SliceAssigners {
         public final long assignSliceEnd(RowData element, ClockService clock) {
             final long timestamp;
             if (rowtimeIndex >= 0) {
-                timestamp = toUtcTimestampMills(element.getLong(rowtimeIndex), 
shiftTimeZone);
+                // Precision for row timestamp is always 3
+                TimestampData rowTime = element.getTimestamp(rowtimeIndex, 3);
+                timestamp = toUtcTimestampMills(rowTime.getMillisecond(), 
shiftTimeZone);
             } else {
                 // in processing time mode
                 timestamp = toUtcTimestampMills(clock.currentProcessingTime(), 
shiftTimeZone);
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
index 1989701..58c384a 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.utils.HandwrittenSelectorUtil;
 
@@ -58,6 +59,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
+import static org.apache.flink.table.data.TimestampData.fromEpochMillis;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
 import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
 import static org.junit.Assert.assertEquals;
@@ -82,7 +84,7 @@ public class SlicingWindowAggOperatorTest {
                     Arrays.asList(
                             new RowType.RowField("f0", new 
VarCharType(Integer.MAX_VALUE)),
                             new RowType.RowField("f1", new IntType()),
-                            new RowType.RowField("f2", new BigIntType())));
+                            new RowType.RowField("f2", new TimestampType())));
 
     private static final RowDataSerializer INPUT_ROW_SER = new 
RowDataSerializer(INPUT_ROW_TYPE);
 
@@ -138,16 +140,16 @@ public class SlicingWindowAggOperatorTest {
         ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
         // add elements out-of-order
-        testHarness.processElement(insertRecord("key2", 1, 3999L));
-        testHarness.processElement(insertRecord("key2", 1, 3000L));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(3999L)));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(3000L)));
 
-        testHarness.processElement(insertRecord("key1", 1, 20L));
-        testHarness.processElement(insertRecord("key1", 1, 0L));
-        testHarness.processElement(insertRecord("key1", 1, 999L));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(20L)));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(0L)));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(999L)));
 
-        testHarness.processElement(insertRecord("key2", 1, 1998L));
-        testHarness.processElement(insertRecord("key2", 1, 1999L));
-        testHarness.processElement(insertRecord("key2", 1, 1000L));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(1998L)));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(1999L)));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(1000L)));
 
         testHarness.processWatermark(new Watermark(999));
         expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(-2000L), 
localMills(1000L)));
@@ -189,7 +191,7 @@ public class SlicingWindowAggOperatorTest {
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
         // late element for [1K, 4K), but should be accumulated into [2K, 5K), 
[3K, 6K)
-        testHarness.processElement(insertRecord("key2", 1, 3500L));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(3500L)));
 
         testHarness.processWatermark(new Watermark(4999));
         expectedOutput.add(insertRecord("key2", 3L, 3L, localMills(2000L), 
localMills(5000L)));
@@ -198,7 +200,7 @@ public class SlicingWindowAggOperatorTest {
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
         // late for all assigned windows, should be dropped
-        testHarness.processElement(insertRecord("key1", 1, 2999L));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(2999L)));
 
         testHarness.processWatermark(new Watermark(5999));
         expectedOutput.add(insertRecord("key2", 3L, 3L, localMills(3000L), 
localMills(6000L)));
@@ -246,7 +248,7 @@ public class SlicingWindowAggOperatorTest {
 
         // timestamp is ignored in processing time
         testHarness.setProcessingTime(epochMills(shiftTimeZone, 
"1970-01-01T00:00:00.003"));
-        testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(Long.MAX_VALUE)));
 
         testHarness.setProcessingTime(epochMills(shiftTimeZone, 
"1970-01-01T01:00:00"));
 
@@ -261,8 +263,8 @@ public class SlicingWindowAggOperatorTest {
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
-        testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
-        testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(Long.MAX_VALUE)));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(Long.MAX_VALUE)));
 
         testHarness.setProcessingTime(epochMills(shiftTimeZone, 
"1970-01-01T02:00:00"));
 
@@ -276,8 +278,8 @@ public class SlicingWindowAggOperatorTest {
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
-        testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
-        testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(Long.MAX_VALUE)));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(Long.MAX_VALUE)));
 
         testHarness.setProcessingTime(epochMills(shiftTimeZone, 
"1970-01-01T03:00:00"));
 
@@ -299,9 +301,9 @@ public class SlicingWindowAggOperatorTest {
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
-        testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
-        testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
-        testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(Long.MAX_VALUE)));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(Long.MAX_VALUE)));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(Long.MAX_VALUE)));
 
         testHarness.setProcessingTime(epochMills(shiftTimeZone, 
"1970-01-01T07:00:00"));
 
@@ -366,16 +368,16 @@ public class SlicingWindowAggOperatorTest {
         ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
         // add elements out-of-order
-        testHarness.processElement(insertRecord("key2", 1, 2999L));
-        testHarness.processElement(insertRecord("key2", 1, 3000L));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(2999L)));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(3000L)));
 
-        testHarness.processElement(insertRecord("key1", 1, 20L));
-        testHarness.processElement(insertRecord("key1", 1, 0L));
-        testHarness.processElement(insertRecord("key1", 1, 999L));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(20L)));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(0L)));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(999L)));
 
-        testHarness.processElement(insertRecord("key2", 1, 1998L));
-        testHarness.processElement(insertRecord("key2", 1, 1999L));
-        testHarness.processElement(insertRecord("key2", 1, 1000L));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(1998L)));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(1999L)));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(1000L)));
 
         testHarness.processWatermark(new Watermark(999));
         expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(0L), 
localMills(1000L)));
@@ -404,7 +406,7 @@ public class SlicingWindowAggOperatorTest {
         testHarness.open();
         // the late event would not trigger window [0, 2000L) again even if 
the job restore from
         // savepoint
-        testHarness.processElement(insertRecord("key2", 1, 1000L));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(1000L)));
         testHarness.processWatermark(new Watermark(1999));
 
         expectedOutput.add(new Watermark(1999));
@@ -424,7 +426,7 @@ public class SlicingWindowAggOperatorTest {
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
         // late element for [3K, 4K), but should be accumulated into [3K, 5K) 
[3K, 6K)
-        testHarness.processElement(insertRecord("key1", 2, 3500L));
+        testHarness.processElement(insertRecord("key1", 2, 
fromEpochMillis(3500L)));
 
         testHarness.processWatermark(new Watermark(4999));
         expectedOutput.add(insertRecord("key2", 1L, 1L, localMills(3000L), 
localMills(5000L)));
@@ -434,7 +436,7 @@ public class SlicingWindowAggOperatorTest {
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
         // late for all assigned windows, should be dropped
-        testHarness.processElement(insertRecord("key1", 1, 2999L));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(2999L)));
 
         testHarness.processWatermark(new Watermark(5999));
         expectedOutput.add(insertRecord("key2", 1L, 1L, localMills(3000L), 
localMills(6000L)));
@@ -483,7 +485,7 @@ public class SlicingWindowAggOperatorTest {
 
         // timestamp is ignored in processing time
         testHarness.setProcessingTime(epochMills(shiftTimeZone, 
"1970-01-01T00:00:00.003"));
-        testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(Long.MAX_VALUE)));
 
         testHarness.setProcessingTime(epochMills(shiftTimeZone, 
"1970-01-01T08:00:00"));
 
@@ -498,8 +500,8 @@ public class SlicingWindowAggOperatorTest {
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
-        testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
-        testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(Long.MAX_VALUE)));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(Long.MAX_VALUE)));
 
         testHarness.setProcessingTime(epochMills(shiftTimeZone, 
"1970-01-01T16:00:00"));
 
@@ -513,8 +515,8 @@ public class SlicingWindowAggOperatorTest {
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
-        testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
-        testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(Long.MAX_VALUE)));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(Long.MAX_VALUE)));
 
         testHarness.setProcessingTime(epochMills(shiftTimeZone, 
"1970-01-02T00:00:00"));
 
@@ -536,9 +538,9 @@ public class SlicingWindowAggOperatorTest {
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
-        testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
-        testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
-        testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(Long.MAX_VALUE)));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(Long.MAX_VALUE)));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(Long.MAX_VALUE)));
 
         testHarness.setProcessingTime(epochMills(shiftTimeZone, 
"1970-01-03T08:00:00"));
 
@@ -616,16 +618,16 @@ public class SlicingWindowAggOperatorTest {
         ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
         // add elements out-of-order
-        testHarness.processElement(insertRecord("key2", 1, 3999L));
-        testHarness.processElement(insertRecord("key2", 1, 3000L));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(3999L)));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(3000L)));
 
-        testHarness.processElement(insertRecord("key1", 1, 20L));
-        testHarness.processElement(insertRecord("key1", 1, 0L));
-        testHarness.processElement(insertRecord("key1", 1, 999L));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(20L)));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(0L)));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(999L)));
 
-        testHarness.processElement(insertRecord("key2", 1, 1998L));
-        testHarness.processElement(insertRecord("key2", 1, 1999L));
-        testHarness.processElement(insertRecord("key2", 1, 1000L));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(1998L)));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(1999L)));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(1000L)));
 
         testHarness.processWatermark(new Watermark(999));
         expectedOutput.add(new Watermark(999));
@@ -663,7 +665,7 @@ public class SlicingWindowAggOperatorTest {
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
         // late element, should be dropped
-        testHarness.processElement(insertRecord("key1", 1, 2500L));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(2500L)));
 
         testHarness.processWatermark(new Watermark(4999));
         expectedOutput.add(new Watermark(4999));
@@ -671,7 +673,7 @@ public class SlicingWindowAggOperatorTest {
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
         // late element, should be dropped
-        testHarness.processElement(insertRecord("key2", 1, 2999L));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(2999L)));
 
         testHarness.processWatermark(new Watermark(5999));
         expectedOutput.add(insertRecord("key2", 2L, 2L, localMills(3000L), 
localMills(6000L)));
@@ -725,12 +727,12 @@ public class SlicingWindowAggOperatorTest {
         testHarness.setProcessingTime(epochMills(shiftTimeZone, 
"1970-01-01T00:00:00.003"));
 
         // timestamp is ignored in processing time
-        testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
-        testHarness.processElement(insertRecord("key2", 1, 7000L));
-        testHarness.processElement(insertRecord("key2", 1, 7000L));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(Long.MAX_VALUE)));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(7000L)));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(7000L)));
 
-        testHarness.processElement(insertRecord("key1", 1, 7000L));
-        testHarness.processElement(insertRecord("key1", 1, 7000L));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(7000L)));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(7000L)));
 
         testHarness.setProcessingTime(epochMills(shiftTimeZone, 
"1970-01-01T05:00:00"));
 
@@ -752,9 +754,9 @@ public class SlicingWindowAggOperatorTest {
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
-        testHarness.processElement(insertRecord("key1", 1, 7000L));
-        testHarness.processElement(insertRecord("key1", 1, 7000L));
-        testHarness.processElement(insertRecord("key1", 1, 7000L));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(7000L)));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(7000L)));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(7000L)));
 
         testHarness.setProcessingTime(epochMills(shiftTimeZone, 
"1970-01-01T10:00:01"));
 

Reply via email to