This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 921b6081582 [FLINK-28019][table] fix error when retract a staled 
record if state ttl enabled in RetractableTopNFunction
921b6081582 is described below

commit 921b608158288bc807493e1c425f6d7ec6f47b18
Author: lincoln lee <lincoln.8...@gmail.com>
AuthorDate: Mon Jun 20 10:12:37 2022 +0800

    [FLINK-28019][table] fix error when retract a staled record if state ttl 
enabled in RetractableTopNFunction
    
    This closes #19996
---
 .../operators/rank/RetractableTopNFunction.java    | 32 ++++++++++--------
 .../rank/RetractableTopNFunctionTest.java          | 38 ++++++++++++++++++++++
 2 files changed, 56 insertions(+), 14 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
index da4d94d994b..b9d6b6dfdca 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
@@ -191,16 +191,7 @@ public class RetractableTopNFunction extends 
AbstractTopNFunction {
                     sortedMap.put(sortKey, count);
                 }
             } else {
-                if (sortedMap.isEmpty()) {
-                    if (lenient) {
-                        LOG.warn(STATE_CLEARED_WARN_MSG);
-                    } else {
-                        throw new RuntimeException(STATE_CLEARED_WARN_MSG);
-                    }
-                } else {
-                    throw new RuntimeException(
-                            "Can not retract a non-existent record. This 
should never happen.");
-                }
+                stateStaledErrorHandle();
             }
 
             if (!stateRemoved) {
@@ -231,10 +222,19 @@ public class RetractableTopNFunction extends 
AbstractTopNFunction {
 
     private void processStateStaled(Iterator<Map.Entry<RowData, Long>> 
sortedMapIterator)
             throws RuntimeException {
+        // Sync with dataState first
+        sortedMapIterator.remove();
+
+        stateStaledErrorHandle();
+    }
+
+    /**
+     * Handle state staled error by configured lenient option. If option is 
true, warning log only,
+     * otherwise a {@link RuntimeException} will be thrown.
+     */
+    private void stateStaledErrorHandle() {
         // Skip the data if it's state is cleared because of state ttl.
         if (lenient) {
-            // Sync with dataState
-            sortedMapIterator.remove();
             LOG.warn(STATE_CLEARED_WARN_MSG);
         } else {
             throw new RuntimeException(STATE_CLEARED_WARN_MSG);
@@ -395,8 +395,12 @@ public class RetractableTopNFunction extends 
AbstractTopNFunction {
             }
         }
         if (isInRankEnd(currentRank)) {
-            // there is no enough elements in Top-N, emit DELETE message for 
the retract record.
-            collectDelete(out, prevRow, currentRank);
+            if (!findsSortKey && null == prevRow) {
+                stateStaledErrorHandle();
+            } else {
+                // there is no enough elements in Top-N, emit DELETE message 
for the retract record.
+                collectDelete(out, prevRow, currentRank);
+            }
         }
 
         return findsSortKey;
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
index 20efa889c18..d0c3c7d3d75 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.table.runtime.operators.rank;
 
+import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.StateConfigUtil;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -555,4 +557,40 @@ public class RetractableTopNFunctionTest extends 
TopNFunctionTestBase {
         assertorWithRowNumber.assertOutputEquals(
                 "output wrong.", expectedOutput, testHarness.getOutput());
     }
+
+    @Test
+    public void testRetractAnStaledRecordWithRowNumber() throws Exception {
+        StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1_000);
+        AbstractTopNFunction func =
+                new RetractableTopNFunction(
+                        ttlConfig,
+                        InternalTypeInfo.ofFields(
+                                VarCharType.STRING_TYPE, new BigIntType(), new 
IntType()),
+                        comparableRecordComparator,
+                        sortKeySelector,
+                        RankType.ROW_NUMBER,
+                        new ConstantRankRange(1, 2),
+                        generatedEqualiser,
+                        true,
+                        true);
+
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = 
createTestHarness(func);
+        testHarness.open();
+        testHarness.setStateTtlProcessingTime(0);
+        testHarness.processElement(insertRecord("a", 1L, 10));
+        testHarness.setStateTtlProcessingTime(1001);
+        testHarness.processElement(insertRecord("a", 2L, 11));
+        testHarness.processElement(deleteRecord("a", 1L, 10));
+        testHarness.close();
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord("a", 1L, 10, 1L));
+        expectedOutput.add(insertRecord("a", 2L, 11, 1L));
+        // the following delete record should not be sent because the left row 
is null which is
+        // illegal.
+        // -D{row1=null, row2=+I(1)};
+
+        assertorWithRowNumber.assertOutputEquals(
+                "output wrong.", expectedOutput, testHarness.getOutput());
+    }
 }

Reply via email to