[ https://issues.apache.org/jira/browse/FLINK-28019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lincoln lee updated FLINK-28019: -------------------------------- Description: A reproduce case: {code} @Test public void testRetractAnStaledRecordWithRowNumber() throws Exception { StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1_000); AbstractTopNFunction func = new RetractableTopNFunction( ttlConfig, InternalTypeInfo.ofFields( VarCharType.STRING_TYPE, new BigIntType(), new IntType()), comparableRecordComparator, sortKeySelector, RankType.ROW_NUMBER, new ConstantRankRange(1, 2), generatedEqualiser, true, true); OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(func); testHarness.open(); testHarness.setStateTtlProcessingTime(0); testHarness.processElement(insertRecord("a", 1L, 10)); testHarness.setStateTtlProcessingTime(1001); testHarness.processElement(insertRecord("a", 2L, 11)); testHarness.processElement(deleteRecord("a", 1L, 10)); testHarness.close(); List<Object> expectedOutput = new ArrayList<>(); expectedOutput.add(insertRecord("a", 1L, 10, 1L)); expectedOutput.add(insertRecord("a", 2L, 11, 1L)); // the following delete record should not be sent because the left row is null which is // illegal. // -D{row1=null, row2=+I(1)}; assertorWithRowNumber.assertOutputEquals( "output wrong.", expectedOutput, testHarness.getOutput()); } {code} > Error occurred when retract a staled record when enable state ttl in > RetractableTopNFunction > -------------------------------------------------------------------------------------------- > > Key: FLINK-28019 > URL: https://issues.apache.org/jira/browse/FLINK-28019 > Project: Flink > Issue Type: Bug > Affects Versions: 1.15.0, 1.14.4 > Reporter: lincoln lee > Priority: Major > Fix For: 1.16.0 > > > A reproduce case: > {code} > @Test > public void testRetractAnStaledRecordWithRowNumber() throws Exception { > StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1_000); > AbstractTopNFunction func = > new RetractableTopNFunction( > ttlConfig, > InternalTypeInfo.ofFields( > VarCharType.STRING_TYPE, new BigIntType(), > new IntType()), > comparableRecordComparator, > sortKeySelector, > RankType.ROW_NUMBER, > new ConstantRankRange(1, 2), > generatedEqualiser, > true, > true); > OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = > createTestHarness(func); > testHarness.open(); > testHarness.setStateTtlProcessingTime(0); > testHarness.processElement(insertRecord("a", 1L, 10)); > testHarness.setStateTtlProcessingTime(1001); > testHarness.processElement(insertRecord("a", 2L, 11)); > testHarness.processElement(deleteRecord("a", 1L, 10)); > testHarness.close(); > List<Object> expectedOutput = new ArrayList<>(); > expectedOutput.add(insertRecord("a", 1L, 10, 1L)); > expectedOutput.add(insertRecord("a", 2L, 11, 1L)); > // the following delete record should not be sent because the left > row is null which is > // illegal. > // -D{row1=null, row2=+I(1)}; > assertorWithRowNumber.assertOutputEquals( > "output wrong.", expectedOutput, testHarness.getOutput()); > } > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)