spuru9 commented on code in PR #28530:
URL: https://github.com/apache/flink/pull/28530#discussion_r3476380286
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java:
##########
@@ -494,6 +506,77 @@ void testEventTimeCumulativeWindows() throws Exception {
testHarness.close();
}
+ @TestTemplate
+ void
testGlobalEventTimeCumulativeWindowsDoNotRefireExpiredWindowAfterRestore()
+ throws Exception {
+ final SliceAssigner assigner =
+ SliceAssigners.cumulative(
+ 3, shiftTimeZone, Duration.ofSeconds(3),
Duration.ofSeconds(1));
+ final SlicingSumAndCountAggsFunction globalAggsFunction =
+ new SlicingSumAndCountAggsFunction(assigner);
+ final SlicingSumAndCountAggsFunction stateAggsFunction =
+ new SlicingSumAndCountAggsFunction(assigner);
+ OneInputStreamOperator<RowData, RowData> operator =
+ buildGlobalWindowOperator(
+ assigner,
+ LOCAL_ACC_INPUT_ROW_SER,
+ new LocalAccumulatorRowsAggsFunction(assigner),
+ globalAggsFunction,
+ stateAggsFunction,
+ null);
+
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+ createTestHarness(operator);
+
+ testHarness.setup(OUT_SERIALIZER);
+ testHarness.open();
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+
+ testHarness.processElement(insertRecord("key1", 1L, 1L,
fromEpochMillis(20L)));
+ testHarness.processElement(insertRecord("key1", 1L, 1L,
fromEpochMillis(0L)));
+ testHarness.processElement(insertRecord("key1", 1L, 1L,
fromEpochMillis(999L)));
+
+ testHarness.processElement(insertRecord("key2", 1L, 1L,
fromEpochMillis(1998L)));
+ testHarness.processElement(insertRecord("key2", 1L, 1L,
fromEpochMillis(1999L)));
+ testHarness.processElement(insertRecord("key2", 1L, 1L,
fromEpochMillis(1000L)));
+
+ testHarness.processWatermark(new Watermark(999));
+ expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(0L),
localMills(1000L)));
+ expectedOutput.add(new Watermark(999));
+ ASSERTER.assertOutputEqualsSorted(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.processWatermark(new Watermark(1999));
+ expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(0L),
localMills(2000L)));
+ expectedOutput.add(insertRecord("key2", 3L, 3L, localMills(0L),
localMills(2000L)));
+ expectedOutput.add(new Watermark(1999));
+ ASSERTER.assertOutputEqualsSorted(
+ "Output was not correct.", expectedOutput,
testHarness.getOutput());
+
+ testHarness.prepareSnapshotPreBarrier(0L);
+ OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+ testHarness.close();
Review Comment:
nit: (Optional) Unlike all other restore/close tests in this file this one
never asserts globalAggsFunction.closeCalled / stateAggsFunction.closeCalled
after close().
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]