This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push: new 5dbb51c09e0 [FLINK-28019][table] fix error when retract a staled record if state ttl enabled in RetractableTopNFunction 5dbb51c09e0 is described below commit 5dbb51c09e0d810eabbdc2f4c0f4045dee5be519 Author: lincoln lee <lincoln.8...@gmail.com> AuthorDate: Mon Jun 20 10:12:11 2022 +0800 [FLINK-28019][table] fix error when retract a staled record if state ttl enabled in RetractableTopNFunction This closes #19997 --- .../operators/rank/RetractableTopNFunction.java | 32 ++++++++++-------- .../rank/RetractableTopNFunctionTest.java | 38 ++++++++++++++++++++++ 2 files changed, 56 insertions(+), 14 deletions(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java index da4d94d994b..b9d6b6dfdca 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java @@ -191,16 +191,7 @@ public class RetractableTopNFunction extends AbstractTopNFunction { sortedMap.put(sortKey, count); } } else { - if (sortedMap.isEmpty()) { - if (lenient) { - LOG.warn(STATE_CLEARED_WARN_MSG); - } else { - throw new RuntimeException(STATE_CLEARED_WARN_MSG); - } - } else { - throw new RuntimeException( - "Can not retract a non-existent record. This should never happen."); - } + stateStaledErrorHandle(); } if (!stateRemoved) { @@ -231,10 +222,19 @@ public class RetractableTopNFunction extends AbstractTopNFunction { private void processStateStaled(Iterator<Map.Entry<RowData, Long>> sortedMapIterator) throws RuntimeException { + // Sync with dataState first + sortedMapIterator.remove(); + + stateStaledErrorHandle(); + } + + /** + * Handle state staled error by configured lenient option. If option is true, warning log only, + * otherwise a {@link RuntimeException} will be thrown. + */ + private void stateStaledErrorHandle() { // Skip the data if it's state is cleared because of state ttl. if (lenient) { - // Sync with dataState - sortedMapIterator.remove(); LOG.warn(STATE_CLEARED_WARN_MSG); } else { throw new RuntimeException(STATE_CLEARED_WARN_MSG); @@ -395,8 +395,12 @@ public class RetractableTopNFunction extends AbstractTopNFunction { } } if (isInRankEnd(currentRank)) { - // there is no enough elements in Top-N, emit DELETE message for the retract record. - collectDelete(out, prevRow, currentRank); + if (!findsSortKey && null == prevRow) { + stateStaledErrorHandle(); + } else { + // there is no enough elements in Top-N, emit DELETE message for the retract record. + collectDelete(out, prevRow, currentRank); + } } return findsSortKey; diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java index 20efa889c18..974213743cc 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java @@ -18,10 +18,12 @@ package org.apache.flink.table.runtime.operators.rank; +import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.util.StateConfigUtil; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.VarCharType; @@ -555,4 +557,40 @@ public class RetractableTopNFunctionTest extends TopNFunctionTestBase { assertorWithRowNumber.assertOutputEquals( "output wrong.", expectedOutput, testHarness.getOutput()); } + + @Test + public void testRetractAnStaledRecordWithRowNumber() throws Exception { + StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1_000); + AbstractTopNFunction func = + new RetractableTopNFunction( + ttlConfig, + InternalTypeInfo.ofFields( + new VarCharType(), new BigIntType(), new IntType()), + comparableRecordComparator, + sortKeySelector, + RankType.ROW_NUMBER, + new ConstantRankRange(1, 2), + generatedEqualiser, + true, + true); + + OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(func); + testHarness.open(); + testHarness.setStateTtlProcessingTime(0); + testHarness.processElement(insertRecord("a", 1L, 10)); + testHarness.setStateTtlProcessingTime(1001); + testHarness.processElement(insertRecord("a", 2L, 11)); + testHarness.processElement(deleteRecord("a", 1L, 10)); + testHarness.close(); + + List<Object> expectedOutput = new ArrayList<>(); + expectedOutput.add(insertRecord("a", 1L, 10, 1L)); + expectedOutput.add(insertRecord("a", 2L, 11, 1L)); + // the following delete record should not be sent because the left row is null which is + // illegal. + // -D{row1=null, row2=+I(1)}; + + assertorWithRowNumber.assertOutputEquals( + "output wrong.", expectedOutput, testHarness.getOutput()); + } }