[ 
https://issues.apache.org/jira/browse/FLINK-28019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-28019:
--------------------------------
    Description: 


A reproduce case:
{code}
    @Test
    public void testRetractAnStaledRecordWithRowNumber() throws Exception {
        StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1_000);
        AbstractTopNFunction func =
                new RetractableTopNFunction(
                        ttlConfig,
                        InternalTypeInfo.ofFields(
                                VarCharType.STRING_TYPE, new BigIntType(), new 
IntType()),
                        comparableRecordComparator,
                        sortKeySelector,
                        RankType.ROW_NUMBER,
                        new ConstantRankRange(1, 2),
                        generatedEqualiser,
                        true,
                        true);

        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = 
createTestHarness(func);
        testHarness.open();
        testHarness.setStateTtlProcessingTime(0);
        testHarness.processElement(insertRecord("a", 1L, 10));
        testHarness.setStateTtlProcessingTime(1001);
        testHarness.processElement(insertRecord("a", 2L, 11));
        testHarness.processElement(deleteRecord("a", 1L, 10));
        testHarness.close();

        List<Object> expectedOutput = new ArrayList<>();
        expectedOutput.add(insertRecord("a", 1L, 10, 1L));
        expectedOutput.add(insertRecord("a", 2L, 11, 1L));
        // the following delete record should not be sent because the left row 
is null which is
        // illegal.
        // -D{row1=null, row2=+I(1)};

        assertorWithRowNumber.assertOutputEquals(
                "output wrong.", expectedOutput, testHarness.getOutput());
    }
{code}

> Error occurred when retract a staled record when enable state ttl in 
> RetractableTopNFunction
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-28019
>                 URL: https://issues.apache.org/jira/browse/FLINK-28019
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.15.0, 1.14.4
>            Reporter: lincoln lee
>            Priority: Major
>             Fix For: 1.16.0
>
>
> A reproduce case:
> {code}
>     @Test
>     public void testRetractAnStaledRecordWithRowNumber() throws Exception {
>         StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1_000);
>         AbstractTopNFunction func =
>                 new RetractableTopNFunction(
>                         ttlConfig,
>                         InternalTypeInfo.ofFields(
>                                 VarCharType.STRING_TYPE, new BigIntType(), 
> new IntType()),
>                         comparableRecordComparator,
>                         sortKeySelector,
>                         RankType.ROW_NUMBER,
>                         new ConstantRankRange(1, 2),
>                         generatedEqualiser,
>                         true,
>                         true);
>         OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = 
> createTestHarness(func);
>         testHarness.open();
>         testHarness.setStateTtlProcessingTime(0);
>         testHarness.processElement(insertRecord("a", 1L, 10));
>         testHarness.setStateTtlProcessingTime(1001);
>         testHarness.processElement(insertRecord("a", 2L, 11));
>         testHarness.processElement(deleteRecord("a", 1L, 10));
>         testHarness.close();
>         List<Object> expectedOutput = new ArrayList<>();
>         expectedOutput.add(insertRecord("a", 1L, 10, 1L));
>         expectedOutput.add(insertRecord("a", 2L, 11, 1L));
>         // the following delete record should not be sent because the left 
> row is null which is
>         // illegal.
>         // -D{row1=null, row2=+I(1)};
>         assertorWithRowNumber.assertOutputEquals(
>                 "output wrong.", expectedOutput, testHarness.getOutput());
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to