[ https://issues.apache.org/jira/browse/FLINK-28019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17554931#comment-17554931 ]
Jingsong Lee edited comment on FLINK-28019 at 6/20/22 2:13 AM: --------------------------------------------------------------- master: c4d4bb5c28d5319fe567b31464683e3f5f22ba67 release-1.14: 5dbb51c09e0d810eabbdc2f4c0f4045dee5be519 release-1.15: 921b608158288bc807493e1c425f6d7ec6f47b18 was (Author: lzljs3620320): master: c4d4bb5c28d5319fe567b31464683e3f5f22ba67 > Error occurred when retract a staled record when enable state ttl in > RetractableTopNFunction > -------------------------------------------------------------------------------------------- > > Key: FLINK-28019 > URL: https://issues.apache.org/jira/browse/FLINK-28019 > Project: Flink > Issue Type: Bug > Affects Versions: 1.15.0, 1.14.4 > Reporter: lincoln lee > Assignee: lincoln lee > Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > We found an error occurred when retract a staled record when enable state ttl > in RetractableTopNFunction, a reproduce case: > {code} > @Test > public void testRetractAnStaledRecordWithRowNumber() throws Exception { > StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1_000); > AbstractTopNFunction func = > new RetractableTopNFunction( > ttlConfig, > InternalTypeInfo.ofFields( > VarCharType.STRING_TYPE, new BigIntType(), > new IntType()), > comparableRecordComparator, > sortKeySelector, > RankType.ROW_NUMBER, > new ConstantRankRange(1, 2), > generatedEqualiser, > true, > true); > OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = > createTestHarness(func); > testHarness.open(); > testHarness.setStateTtlProcessingTime(0); > testHarness.processElement(insertRecord("a", 1L, 10)); > testHarness.setStateTtlProcessingTime(1001); > testHarness.processElement(insertRecord("a", 2L, 11)); > testHarness.processElement(deleteRecord("a", 1L, 10)); > testHarness.close(); > List<Object> expectedOutput = new ArrayList<>(); > expectedOutput.add(insertRecord("a", 1L, 10, 1L)); > expectedOutput.add(insertRecord("a", 2L, 11, 1L)); > // the following delete record should not be sent because the left > row is null which is > // illegal. > // -D{row1=null, row2=+I(1)}; > assertorWithRowNumber.assertOutputEquals( > "output wrong.", expectedOutput, testHarness.getOutput()); > } > {code} > the reason is the uncomplete path when deal with staled records. -- This message was sent by Atlassian Jira (v8.20.7#820007)