[ 
https://issues.apache.org/jira/browse/FLINK-34175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814123#comment-17814123
 ] 

Vallari Rastogi commented on FLINK-34175:
-----------------------------------------

Hi [~xuyangzhong] 

Are you working on this? Can I take this up ?

> When meeting WindowedSliceAssigner, slice window agg registers an wrong 
> timestamp timer 
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-34175
>                 URL: https://issues.apache.org/jira/browse/FLINK-34175
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>            Reporter: xuyang
>            Priority: Major
>
> The following test added to SlicingWindowAggOperatorTest can re-produce this 
> problem.
> {code:java}
> private static final RowType INPUT_ROW_TYPE_FROM_WINDOW_TVF =
>         new RowType(
>                 Arrays.asList(
>                         new RowType.RowField("f0", new 
> VarCharType(Integer.MAX_VALUE)),
>                         new RowType.RowField("f1", new IntType()),
>                         new RowType.RowField("f2", new TimestampType()),
>                         new RowType.RowField("f3", new TimestampType()),
>                         new RowType.RowField(
>                                 "f4", new TimestampType(false, 
> TimestampKind.ROWTIME, 3))));
> protected static final RowDataSerializer INPUT_ROW_SER_FROM_WINDOW_TVF =
>         new RowDataSerializer(INPUT_ROW_TYPE_FROM_WINDOW_TVF); 
> @Test
> public void test() throws Exception {
>     final SliceAssigner innerAssigner =
>             SliceAssigners.tumbling(2, shiftTimeZone, Duration.ofSeconds(3));
>     final SliceAssigner assigner = SliceAssigners.windowed(3, innerAssigner);
>     final SlicingSumAndCountAggsFunction aggsFunction =
>             new SlicingSumAndCountAggsFunction(assigner);
>     SlicingWindowOperator<RowData, ?> operator =
>             (SlicingWindowOperator<RowData, ?>)
>                     WindowAggOperatorBuilder.builder()
>                             .inputSerializer(INPUT_ROW_SER_FROM_WINDOW_TVF)
>                             .shiftTimeZone(shiftTimeZone)
>                             .keySerializer(KEY_SER)
>                             .assigner(assigner)
>                             .aggregate(wrapGenerated(aggsFunction), ACC_SER)
>                             .build();
>     OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
>             createTestHarness(operator);
>     testHarness.setup(OUT_SERIALIZER);
>     testHarness.open();
>     // process elements
>     ConcurrentLinkedQueue<Object> expectedOutput = new 
> ConcurrentLinkedQueue<>();
>     // add elements out-of-order
>     testHarness.processElement(
>             insertRecord(
>                     "key2",
>                     1,
>                     fromEpochMillis(999L),
>                     fromEpochMillis(3999L),
>                     fromEpochMillis(3998L)));
>     testHarness.processWatermark(new Watermark(999));
>     expectedOutput.add(new Watermark(999));
>     ASSERTER.assertOutputEqualsSorted(
>             "Output was not correct.", expectedOutput, 
> testHarness.getOutput());
>     testHarness.close();
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to