[ 
https://issues.apache.org/jira/browse/FLINK-28019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17554931#comment-17554931
 ] 

Jingsong Lee edited comment on FLINK-28019 at 6/20/22 2:13 AM:
---------------------------------------------------------------

master: c4d4bb5c28d5319fe567b31464683e3f5f22ba67

release-1.14: 5dbb51c09e0d810eabbdc2f4c0f4045dee5be519

release-1.15: 921b608158288bc807493e1c425f6d7ec6f47b18


was (Author: lzljs3620320):
master: c4d4bb5c28d5319fe567b31464683e3f5f22ba67

> Error occurred when retract a staled record when enable state ttl in 
> RetractableTopNFunction
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-28019
>                 URL: https://issues.apache.org/jira/browse/FLINK-28019
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.15.0, 1.14.4
>            Reporter: lincoln lee
>            Assignee: lincoln lee
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.16.0
>
>
> We found an error occurred when retract a staled record when enable state ttl 
> in RetractableTopNFunction, a reproduce case:
> {code}
>     @Test
>     public void testRetractAnStaledRecordWithRowNumber() throws Exception {
>         StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1_000);
>         AbstractTopNFunction func =
>                 new RetractableTopNFunction(
>                         ttlConfig,
>                         InternalTypeInfo.ofFields(
>                                 VarCharType.STRING_TYPE, new BigIntType(), 
> new IntType()),
>                         comparableRecordComparator,
>                         sortKeySelector,
>                         RankType.ROW_NUMBER,
>                         new ConstantRankRange(1, 2),
>                         generatedEqualiser,
>                         true,
>                         true);
>         OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = 
> createTestHarness(func);
>         testHarness.open();
>         testHarness.setStateTtlProcessingTime(0);
>         testHarness.processElement(insertRecord("a", 1L, 10));
>         testHarness.setStateTtlProcessingTime(1001);
>         testHarness.processElement(insertRecord("a", 2L, 11));
>         testHarness.processElement(deleteRecord("a", 1L, 10));
>         testHarness.close();
>         List<Object> expectedOutput = new ArrayList<>();
>         expectedOutput.add(insertRecord("a", 1L, 10, 1L));
>         expectedOutput.add(insertRecord("a", 2L, 11, 1L));
>         // the following delete record should not be sent because the left 
> row is null which is
>         // illegal.
>         // -D{row1=null, row2=+I(1)};
>         assertorWithRowNumber.assertOutputEquals(
>                 "output wrong.", expectedOutput, testHarness.getOutput());
>     }
> {code}
> the reason is the uncomplete path when deal with staled records.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to