[ https://issues.apache.org/jira/browse/FLINK-34175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814123#comment-17814123 ]
Vallari Rastogi commented on FLINK-34175: ----------------------------------------- Hi [~xuyangzhong] Are you working on this? Can I take this up ? > When meeting WindowedSliceAssigner, slice window agg registers an wrong > timestamp timer > ---------------------------------------------------------------------------------------- > > Key: FLINK-34175 > URL: https://issues.apache.org/jira/browse/FLINK-34175 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Reporter: xuyang > Priority: Major > > The following test added to SlicingWindowAggOperatorTest can re-produce this > problem. > {code:java} > private static final RowType INPUT_ROW_TYPE_FROM_WINDOW_TVF = > new RowType( > Arrays.asList( > new RowType.RowField("f0", new > VarCharType(Integer.MAX_VALUE)), > new RowType.RowField("f1", new IntType()), > new RowType.RowField("f2", new TimestampType()), > new RowType.RowField("f3", new TimestampType()), > new RowType.RowField( > "f4", new TimestampType(false, > TimestampKind.ROWTIME, 3)))); > protected static final RowDataSerializer INPUT_ROW_SER_FROM_WINDOW_TVF = > new RowDataSerializer(INPUT_ROW_TYPE_FROM_WINDOW_TVF); > @Test > public void test() throws Exception { > final SliceAssigner innerAssigner = > SliceAssigners.tumbling(2, shiftTimeZone, Duration.ofSeconds(3)); > final SliceAssigner assigner = SliceAssigners.windowed(3, innerAssigner); > final SlicingSumAndCountAggsFunction aggsFunction = > new SlicingSumAndCountAggsFunction(assigner); > SlicingWindowOperator<RowData, ?> operator = > (SlicingWindowOperator<RowData, ?>) > WindowAggOperatorBuilder.builder() > .inputSerializer(INPUT_ROW_SER_FROM_WINDOW_TVF) > .shiftTimeZone(shiftTimeZone) > .keySerializer(KEY_SER) > .assigner(assigner) > .aggregate(wrapGenerated(aggsFunction), ACC_SER) > .build(); > OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = > createTestHarness(operator); > testHarness.setup(OUT_SERIALIZER); > testHarness.open(); > // process elements > ConcurrentLinkedQueue<Object> expectedOutput = new > ConcurrentLinkedQueue<>(); > // add elements out-of-order > testHarness.processElement( > insertRecord( > "key2", > 1, > fromEpochMillis(999L), > fromEpochMillis(3999L), > fromEpochMillis(3998L))); > testHarness.processWatermark(new Watermark(999)); > expectedOutput.add(new Watermark(999)); > ASSERTER.assertOutputEqualsSorted( > "Output was not correct.", expectedOutput, > testHarness.getOutput()); > testHarness.close(); > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)