This is an automated email from the ASF dual-hosted git repository. jingzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d46548c [FLINK-25614][table-runtime] Update LocalWindowAggregate chain strategy to ALWAYS d46548c is described below commit d46548c5024c4c47d6e52168e1d92c68d3ce58f8 Author: lmagic233 <mirai.sen...@outlook.com> AuthorDate: Tue Jan 11 23:24:48 2022 +0800 [FLINK-25614][table-runtime] Update LocalWindowAggregate chain strategy to ALWAYS This closes #18331. --- .../window/LocalSlicingWindowAggOperator.java | 2 + .../operators/window/slicing/SliceAssigners.java | 9 +- .../window/SlicingWindowAggOperatorTest.java | 114 +++++++++++---------- 3 files changed, 66 insertions(+), 59 deletions(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java index fbdfe48..1742767 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java @@ -21,6 +21,7 @@ package org.apache.flink.table.runtime.operators.aggregate.window; import org.apache.flink.core.memory.ManagedMemoryUseCase; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.watermark.Watermark; @@ -80,6 +81,7 @@ public class LocalSlicingWindowAggOperator extends AbstractStreamOperator<RowDat SliceAssigner sliceAssigner, WindowBuffer.LocalFactory windowBufferFactory, ZoneId shiftTimezone) { + chainingStrategy = ChainingStrategy.ALWAYS; this.keySelector = keySelector; this.sliceAssigner = sliceAssigner; this.windowInterval = sliceAssigner.getSliceEndInterval(); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java index 5b71b35..b568c48 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java @@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.operators.window.slicing; import org.apache.flink.annotation.Internal; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.runtime.operators.window.TimeWindow; import org.apache.flink.util.IterableIterator; import org.apache.flink.util.MathUtils; @@ -401,7 +402,7 @@ public final class SliceAssigners { @Override public long assignSliceEnd(RowData element, ClockService clock) { - return element.getLong(windowEndIndex); + return element.getTimestamp(windowEndIndex, 3).getMillisecond(); } @Override @@ -507,7 +508,7 @@ public final class SliceAssigners { @Override public long assignSliceEnd(RowData element, ClockService clock) { - return element.getLong(sliceEndIndex); + return element.getTimestamp(sliceEndIndex, 3).getMillisecond(); } @Override @@ -552,7 +553,9 @@ public final class SliceAssigners { public final long assignSliceEnd(RowData element, ClockService clock) { final long timestamp; if (rowtimeIndex >= 0) { - timestamp = toUtcTimestampMills(element.getLong(rowtimeIndex), shiftTimeZone); + // Precision for row timestamp is always 3 + TimestampData rowTime = element.getTimestamp(rowtimeIndex, 3); + timestamp = toUtcTimestampMills(rowTime.getMillisecond(), shiftTimeZone); } else { // in processing time mode timestamp = toUtcTimestampMills(clock.currentProcessingTime(), shiftTimeZone); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java index 1989701..58c384a 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java @@ -41,6 +41,7 @@ import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.utils.HandwrittenSelectorUtil; @@ -58,6 +59,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; +import static org.apache.flink.table.data.TimestampData.fromEpochMillis; import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; import static org.junit.Assert.assertEquals; @@ -82,7 +84,7 @@ public class SlicingWindowAggOperatorTest { Arrays.asList( new RowType.RowField("f0", new VarCharType(Integer.MAX_VALUE)), new RowType.RowField("f1", new IntType()), - new RowType.RowField("f2", new BigIntType()))); + new RowType.RowField("f2", new TimestampType()))); private static final RowDataSerializer INPUT_ROW_SER = new RowDataSerializer(INPUT_ROW_TYPE); @@ -138,16 +140,16 @@ public class SlicingWindowAggOperatorTest { ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); // add elements out-of-order - testHarness.processElement(insertRecord("key2", 1, 3999L)); - testHarness.processElement(insertRecord("key2", 1, 3000L)); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(3999L))); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(3000L))); - testHarness.processElement(insertRecord("key1", 1, 20L)); - testHarness.processElement(insertRecord("key1", 1, 0L)); - testHarness.processElement(insertRecord("key1", 1, 999L)); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(20L))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(0L))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(999L))); - testHarness.processElement(insertRecord("key2", 1, 1998L)); - testHarness.processElement(insertRecord("key2", 1, 1999L)); - testHarness.processElement(insertRecord("key2", 1, 1000L)); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(1998L))); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(1999L))); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(1000L))); testHarness.processWatermark(new Watermark(999)); expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(-2000L), localMills(1000L))); @@ -189,7 +191,7 @@ public class SlicingWindowAggOperatorTest { "Output was not correct.", expectedOutput, testHarness.getOutput()); // late element for [1K, 4K), but should be accumulated into [2K, 5K), [3K, 6K) - testHarness.processElement(insertRecord("key2", 1, 3500L)); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(3500L))); testHarness.processWatermark(new Watermark(4999)); expectedOutput.add(insertRecord("key2", 3L, 3L, localMills(2000L), localMills(5000L))); @@ -198,7 +200,7 @@ public class SlicingWindowAggOperatorTest { "Output was not correct.", expectedOutput, testHarness.getOutput()); // late for all assigned windows, should be dropped - testHarness.processElement(insertRecord("key1", 1, 2999L)); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(2999L))); testHarness.processWatermark(new Watermark(5999)); expectedOutput.add(insertRecord("key2", 3L, 3L, localMills(3000L), localMills(6000L))); @@ -246,7 +248,7 @@ public class SlicingWindowAggOperatorTest { // timestamp is ignored in processing time testHarness.setProcessingTime(epochMills(shiftTimeZone, "1970-01-01T00:00:00.003")); - testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE)); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(Long.MAX_VALUE))); testHarness.setProcessingTime(epochMills(shiftTimeZone, "1970-01-01T01:00:00")); @@ -261,8 +263,8 @@ public class SlicingWindowAggOperatorTest { ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); - testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE)); - testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE)); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(Long.MAX_VALUE))); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(Long.MAX_VALUE))); testHarness.setProcessingTime(epochMills(shiftTimeZone, "1970-01-01T02:00:00")); @@ -276,8 +278,8 @@ public class SlicingWindowAggOperatorTest { ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); - testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE)); - testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE)); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(Long.MAX_VALUE))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(Long.MAX_VALUE))); testHarness.setProcessingTime(epochMills(shiftTimeZone, "1970-01-01T03:00:00")); @@ -299,9 +301,9 @@ public class SlicingWindowAggOperatorTest { ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); - testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE)); - testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE)); - testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE)); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(Long.MAX_VALUE))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(Long.MAX_VALUE))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(Long.MAX_VALUE))); testHarness.setProcessingTime(epochMills(shiftTimeZone, "1970-01-01T07:00:00")); @@ -366,16 +368,16 @@ public class SlicingWindowAggOperatorTest { ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); // add elements out-of-order - testHarness.processElement(insertRecord("key2", 1, 2999L)); - testHarness.processElement(insertRecord("key2", 1, 3000L)); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(2999L))); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(3000L))); - testHarness.processElement(insertRecord("key1", 1, 20L)); - testHarness.processElement(insertRecord("key1", 1, 0L)); - testHarness.processElement(insertRecord("key1", 1, 999L)); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(20L))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(0L))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(999L))); - testHarness.processElement(insertRecord("key2", 1, 1998L)); - testHarness.processElement(insertRecord("key2", 1, 1999L)); - testHarness.processElement(insertRecord("key2", 1, 1000L)); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(1998L))); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(1999L))); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(1000L))); testHarness.processWatermark(new Watermark(999)); expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(0L), localMills(1000L))); @@ -404,7 +406,7 @@ public class SlicingWindowAggOperatorTest { testHarness.open(); // the late event would not trigger window [0, 2000L) again even if the job restore from // savepoint - testHarness.processElement(insertRecord("key2", 1, 1000L)); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(1000L))); testHarness.processWatermark(new Watermark(1999)); expectedOutput.add(new Watermark(1999)); @@ -424,7 +426,7 @@ public class SlicingWindowAggOperatorTest { "Output was not correct.", expectedOutput, testHarness.getOutput()); // late element for [3K, 4K), but should be accumulated into [3K, 5K) [3K, 6K) - testHarness.processElement(insertRecord("key1", 2, 3500L)); + testHarness.processElement(insertRecord("key1", 2, fromEpochMillis(3500L))); testHarness.processWatermark(new Watermark(4999)); expectedOutput.add(insertRecord("key2", 1L, 1L, localMills(3000L), localMills(5000L))); @@ -434,7 +436,7 @@ public class SlicingWindowAggOperatorTest { "Output was not correct.", expectedOutput, testHarness.getOutput()); // late for all assigned windows, should be dropped - testHarness.processElement(insertRecord("key1", 1, 2999L)); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(2999L))); testHarness.processWatermark(new Watermark(5999)); expectedOutput.add(insertRecord("key2", 1L, 1L, localMills(3000L), localMills(6000L))); @@ -483,7 +485,7 @@ public class SlicingWindowAggOperatorTest { // timestamp is ignored in processing time testHarness.setProcessingTime(epochMills(shiftTimeZone, "1970-01-01T00:00:00.003")); - testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE)); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(Long.MAX_VALUE))); testHarness.setProcessingTime(epochMills(shiftTimeZone, "1970-01-01T08:00:00")); @@ -498,8 +500,8 @@ public class SlicingWindowAggOperatorTest { ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); - testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE)); - testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE)); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(Long.MAX_VALUE))); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(Long.MAX_VALUE))); testHarness.setProcessingTime(epochMills(shiftTimeZone, "1970-01-01T16:00:00")); @@ -513,8 +515,8 @@ public class SlicingWindowAggOperatorTest { ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); - testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE)); - testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE)); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(Long.MAX_VALUE))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(Long.MAX_VALUE))); testHarness.setProcessingTime(epochMills(shiftTimeZone, "1970-01-02T00:00:00")); @@ -536,9 +538,9 @@ public class SlicingWindowAggOperatorTest { ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); - testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE)); - testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE)); - testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE)); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(Long.MAX_VALUE))); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(Long.MAX_VALUE))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(Long.MAX_VALUE))); testHarness.setProcessingTime(epochMills(shiftTimeZone, "1970-01-03T08:00:00")); @@ -616,16 +618,16 @@ public class SlicingWindowAggOperatorTest { ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); // add elements out-of-order - testHarness.processElement(insertRecord("key2", 1, 3999L)); - testHarness.processElement(insertRecord("key2", 1, 3000L)); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(3999L))); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(3000L))); - testHarness.processElement(insertRecord("key1", 1, 20L)); - testHarness.processElement(insertRecord("key1", 1, 0L)); - testHarness.processElement(insertRecord("key1", 1, 999L)); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(20L))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(0L))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(999L))); - testHarness.processElement(insertRecord("key2", 1, 1998L)); - testHarness.processElement(insertRecord("key2", 1, 1999L)); - testHarness.processElement(insertRecord("key2", 1, 1000L)); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(1998L))); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(1999L))); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(1000L))); testHarness.processWatermark(new Watermark(999)); expectedOutput.add(new Watermark(999)); @@ -663,7 +665,7 @@ public class SlicingWindowAggOperatorTest { "Output was not correct.", expectedOutput, testHarness.getOutput()); // late element, should be dropped - testHarness.processElement(insertRecord("key1", 1, 2500L)); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(2500L))); testHarness.processWatermark(new Watermark(4999)); expectedOutput.add(new Watermark(4999)); @@ -671,7 +673,7 @@ public class SlicingWindowAggOperatorTest { "Output was not correct.", expectedOutput, testHarness.getOutput()); // late element, should be dropped - testHarness.processElement(insertRecord("key2", 1, 2999L)); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(2999L))); testHarness.processWatermark(new Watermark(5999)); expectedOutput.add(insertRecord("key2", 2L, 2L, localMills(3000L), localMills(6000L))); @@ -725,12 +727,12 @@ public class SlicingWindowAggOperatorTest { testHarness.setProcessingTime(epochMills(shiftTimeZone, "1970-01-01T00:00:00.003")); // timestamp is ignored in processing time - testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE)); - testHarness.processElement(insertRecord("key2", 1, 7000L)); - testHarness.processElement(insertRecord("key2", 1, 7000L)); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(Long.MAX_VALUE))); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(7000L))); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(7000L))); - testHarness.processElement(insertRecord("key1", 1, 7000L)); - testHarness.processElement(insertRecord("key1", 1, 7000L)); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(7000L))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(7000L))); testHarness.setProcessingTime(epochMills(shiftTimeZone, "1970-01-01T05:00:00")); @@ -752,9 +754,9 @@ public class SlicingWindowAggOperatorTest { ASSERTER.assertOutputEqualsSorted( "Output was not correct.", expectedOutput, testHarness.getOutput()); - testHarness.processElement(insertRecord("key1", 1, 7000L)); - testHarness.processElement(insertRecord("key1", 1, 7000L)); - testHarness.processElement(insertRecord("key1", 1, 7000L)); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(7000L))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(7000L))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(7000L))); testHarness.setProcessingTime(epochMills(shiftTimeZone, "1970-01-01T10:00:01"));