This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push: new 9c7fc0d [FLINK-15497][table-planner-blink] Reduce outputs when rank number is not required in Retractable TopN (#10823) 9c7fc0d is described below commit 9c7fc0d4eb91fc486f8497ab07e55aa7e15222de Author: Jing Zhang <beyond1...@126.com> AuthorDate: Tue Jan 21 10:51:52 2020 +0800 [FLINK-15497][table-planner-blink] Reduce outputs when rank number is not required in Retractable TopN (#10823) --- .../planner/runtime/stream/sql/RankITCase.scala | 43 +++--- .../operators/rank/RetractableTopNFunction.java | 151 +++++++++++++++++++-- .../operators/rank/UpdatableTopNFunction.java | 2 +- .../rank/RetractableTopNFunctionTest.java | 142 +++++++++++++++---- 4 files changed, 282 insertions(+), 56 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala index c2f0fb5..d46c640 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala @@ -894,7 +894,6 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode assertEquals(updatedExpected.sorted, sink.getUpsertResults.sorted) } - @Ignore("Enable after UnaryUpdatableTopN is supported") @Test def testTopNWithGroupByAvgWithoutRowNumber(): Unit = { val data = List( @@ -937,25 +936,37 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode tEnv.execute("test") val expected = List( - "(true,book,1,100.0)", - "(true,book,3,110.0)", - "(true,book,4,120.0)", - "(true,book,1,150.0)", - "(true,book,1,166.66666666666666)", - "(true,book,2,300.0)", - "(false,book,3,110.0)", - "(true,book,2,350.0)", - "(true,book,4,310.0)", - "(true,book,1,225.0)", - "(true,fruit,5,100.0)") + "(true,book,1,100)", + "(true,book,3,110)", + "(true,book,4,120)", + "(false,book,1,100)", + "(true,book,1,150)", + "(false,book,1,150)", + "(true,book,1,166)", + "(false,book,3,110)", + "(true,book,2,300)", + "(false,book,2,300)", + "(true,book,3,110)", + "(false,book,3,110)", + "(true,book,2,350)", + "(false,book,4,120)", + "(true,book,3,110)", + "(false,book,3,110)", + "(true,book,4,310)", + "(false,book,1,166)", + "(true,book,3,110)", + "(false,book,3,110)", + "(true,book,1,225)", + "(true,fruit,5,100)" + ) assertEquals(expected, sink.getRawResults) val updatedExpected = List( - "book,1,225.0", - "book,2,350.0", - "book,4,310.0", - "fruit,5,100.0") + "book,1,225", + "book,2,350", + "book,4,310", + "fruit,5,100") assertEquals(updatedExpected.sorted, sink.getUpsertResults.sorted) } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java index 5ed2c7b..6e593e9 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java @@ -135,8 +135,13 @@ public class RetractableTopNFunction extends AbstractTopNFunction { } // emit - emitRecordsWithRowNumber(sortedMap, sortKey, input, out); - + if (outputRankNumber || hasOffset()) { + // the without-number-algorithm can't handle topN with offset, + // so use the with-number-algorithm to handle offset + emitRecordsWithRowNumber(sortedMap, sortKey, input, out); + } else { + emitRecordsWithoutRowNumber(sortedMap, sortKey, input, out); + } // update data state List<BaseRow> inputs = dataState.get(sortKey); if (inputs == null) { @@ -147,7 +152,13 @@ public class RetractableTopNFunction extends AbstractTopNFunction { dataState.put(sortKey, inputs); } else { // emit updates first - retractRecordWithRowNumber(sortedMap, sortKey, input, out); + if (outputRankNumber || hasOffset()) { + // the without-number-algorithm can't handle topN with offset, + // so use the with-number-algorithm to handle offset + retractRecordWithRowNumber(sortedMap, sortKey, input, out); + } else { + retractRecordWithoutRowNumber(sortedMap, sortKey, input, out); + } // and then update sortedMap if (sortedMap.containsKey(sortKey)) { @@ -183,6 +194,95 @@ public class RetractableTopNFunction extends AbstractTopNFunction { // ------------- ROW_NUMBER------------------------------- + private void emitRecordsWithRowNumber( + SortedMap<BaseRow, Long> sortedMap, BaseRow sortKey, BaseRow inputRow, Collector<BaseRow> out) + throws Exception { + Iterator<Map.Entry<BaseRow, Long>> iterator = sortedMap.entrySet().iterator(); + long curRank = 0L; + boolean findsSortKey = false; + while (iterator.hasNext() && isInRankEnd(curRank)) { + Map.Entry<BaseRow, Long> entry = iterator.next(); + BaseRow key = entry.getKey(); + if (!findsSortKey && key.equals(sortKey)) { + curRank += entry.getValue(); + collect(out, inputRow, curRank); + findsSortKey = true; + } else if (findsSortKey) { + List<BaseRow> inputs = dataState.get(key); + if (inputs == null) { + // Skip the data if it's state is cleared because of state ttl. + if (lenient) { + LOG.warn(STATE_CLEARED_WARN_MSG); + } else { + throw new RuntimeException(STATE_CLEARED_WARN_MSG); + } + } else { + int i = 0; + while (i < inputs.size() && isInRankEnd(curRank)) { + curRank += 1; + BaseRow prevRow = inputs.get(i); + retract(out, prevRow, curRank - 1); + collect(out, prevRow, curRank); + i++; + } + } + } else { + curRank += entry.getValue(); + } + } + } + + private void emitRecordsWithoutRowNumber( + SortedMap<BaseRow, Long> sortedMap, BaseRow sortKey, BaseRow inputRow, Collector<BaseRow> out) + throws Exception { + Iterator<Map.Entry<BaseRow, Long>> iterator = sortedMap.entrySet().iterator(); + long curRank = 0L; + boolean findsSortKey = false; + BaseRow toCollect = null; + BaseRow toDelete = null; + while (iterator.hasNext() && isInRankEnd(curRank)) { + Map.Entry<BaseRow, Long> entry = iterator.next(); + BaseRow key = entry.getKey(); + if (!findsSortKey && key.equals(sortKey)) { + curRank += entry.getValue(); + if (isInRankRange(curRank)) { + toCollect = inputRow; + } + findsSortKey = true; + } else if (findsSortKey) { + List<BaseRow> inputs = dataState.get(key); + if (inputs == null) { + // Skip the data if it's state is cleared because of state ttl. + if (lenient) { + LOG.warn(STATE_CLEARED_WARN_MSG); + } else { + throw new RuntimeException(STATE_CLEARED_WARN_MSG); + } + } else { + long count = entry.getValue(); + // gets the rank of last record with same sortKey + long rankOfLastRecord = curRank + count; + // deletes the record if there is a record recently downgrades to Top-(N+1) + if (isInRankEnd(rankOfLastRecord)) { + curRank = rankOfLastRecord; + } else { + int index = Long.valueOf(rankEnd - curRank).intValue(); + toDelete = inputs.get(index); + break; + } + } + } else { + curRank += entry.getValue(); + } + } + if (toDelete != null) { + delete(out, toDelete); + } + if (toCollect != null) { + collect(out, inputRow); + } + } + private void retractRecordWithRowNumber( SortedMap<BaseRow, Long> sortedMap, BaseRow sortKey, BaseRow inputRow, Collector<BaseRow> out) throws Exception { @@ -238,7 +338,7 @@ public class RetractableTopNFunction extends AbstractTopNFunction { } } - private void emitRecordsWithRowNumber( + private void retractRecordWithoutRowNumber( SortedMap<BaseRow, Long> sortedMap, BaseRow sortKey, BaseRow inputRow, Collector<BaseRow> out) throws Exception { Iterator<Map.Entry<BaseRow, Long>> iterator = sortedMap.entrySet().iterator(); @@ -248,10 +348,6 @@ public class RetractableTopNFunction extends AbstractTopNFunction { Map.Entry<BaseRow, Long> entry = iterator.next(); BaseRow key = entry.getKey(); if (!findsSortKey && key.equals(sortKey)) { - curRank += entry.getValue(); - collect(out, inputRow, curRank); - findsSortKey = true; - } else if (findsSortKey) { List<BaseRow> inputs = dataState.get(key); if (inputs == null) { // Skip the data if it's state is cleared because of state ttl. @@ -261,14 +357,41 @@ public class RetractableTopNFunction extends AbstractTopNFunction { throw new RuntimeException(STATE_CLEARED_WARN_MSG); } } else { - int i = 0; - while (i < inputs.size() && isInRankEnd(curRank)) { + Iterator<BaseRow> inputIter = inputs.iterator(); + while (inputIter.hasNext() && isInRankEnd(curRank)) { curRank += 1; - BaseRow prevRow = inputs.get(i); - retract(out, prevRow, curRank - 1); - collect(out, prevRow, curRank); - i++; + BaseRow prevRow = inputIter.next(); + if (!findsSortKey && equaliser.equalsWithoutHeader(prevRow, inputRow)) { + delete(out, prevRow, curRank); + curRank -= 1; + findsSortKey = true; + inputIter.remove(); + } else if (findsSortKey) { + if (curRank == rankEnd) { + collect(out, prevRow, curRank); + break; + } + } } + if (inputs.isEmpty()) { + dataState.remove(key); + } else { + dataState.put(key, inputs); + } + } + } else if (findsSortKey) { + long count = entry.getValue(); + // gets the rank of last record with same sortKey + long rankOfLastRecord = curRank + count; + // sends the record if there is a record recently upgrades to Top-N + if (rankOfLastRecord < rankEnd) { + curRank = rankOfLastRecord; + } else { + int index = Long.valueOf(rankEnd - curRank - 1).intValue(); + List<BaseRow> inputs = dataState.get(key); + BaseRow toAdd = inputs.get(index); + collect(out, toAdd); + break; } } else { curRank += entry.getValue(); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java index 68a3c61..94a1379 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java @@ -55,7 +55,7 @@ import java.util.TreeMap; * However, the function only works in some special scenarios: * 1. sort field collation is ascending and its mono is decreasing, or sort field collation is descending and its mono * is increasing - * 2. input data has unique keys + * 2. input data has unique keys and unique key must contain partition key * 3. input stream could not contain delete record or retract record */ public class UpdatableTopNFunction extends AbstractTopNFunction implements CheckpointedFunction { diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java index f852ca1..26bfa65 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java @@ -111,10 +111,58 @@ public class RetractableTopNFunctionTest extends TopNFunctionTestBase { assertorWithRowNumber.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); } - // TODO RetractRankFunction could be sent less retraction message when does not need to retract row_number - @Override @Test - public void testConstantRankRangeWithoutOffset() throws Exception { + public void testConstantRankRangeWithoutOffsetWithRowNumber() throws Exception { + AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, new ConstantRankRange(1, 2), true, + true); + OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = createTestHarness(func); + testHarness.open(); + testHarness.processElement(record("book", 1L, 12)); + testHarness.processElement(record("book", 2L, 19)); + testHarness.processElement(record("book", 4L, 11)); + testHarness.processElement(record("fruit", 4L, 33)); + testHarness.processElement(record("fruit", 3L, 44)); + testHarness.processElement(record("fruit", 5L, 22)); + + List<Object> expectedOutput = new ArrayList<>(); + expectedOutput.add(record("book", 1L, 12, 1L)); + expectedOutput.add(record("book", 2L, 19, 2L)); + expectedOutput.add(retractRecord("book", 1L, 12, 1L)); + expectedOutput.add(record("book", 1L, 12, 2L)); + expectedOutput.add(retractRecord("book", 2L, 19, 2L)); + expectedOutput.add(record("book", 4L, 11, 1L)); + expectedOutput.add(record("fruit", 4L, 33, 1L)); + expectedOutput.add(record("fruit", 3L, 44, 2L)); + expectedOutput.add(retractRecord("fruit", 4L, 33, 1L)); + expectedOutput.add(retractRecord("fruit", 3L, 44, 2L)); + expectedOutput.add(record("fruit", 4L, 33, 2L)); + expectedOutput.add(record("fruit", 5L, 22, 1L)); + assertorWithRowNumber + .assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); + + // do a snapshot, data could be recovered from state + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0); + testHarness.close(); + expectedOutput.clear(); + + func = createFunction(RankType.ROW_NUMBER, new ConstantRankRange(1, 2), true, true); + testHarness = createTestHarness(func); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + testHarness.processElement(record("book", 1L, 10)); + + expectedOutput.add(retractRecord("book", 1L, 12, 2L)); + expectedOutput.add(retractRecord("book", 4L, 11, 1L)); + expectedOutput.add(record("book", 4L, 11, 2L)); + expectedOutput.add(record("book", 1L, 10, 1L)); + assertorWithRowNumber + .assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + @Test + public void testConstantRankRangeWithoutOffsetWithoutRowNumber() throws Exception { AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, new ConstantRankRange(1, 2), true, false); OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = createTestHarness(func); @@ -129,18 +177,14 @@ public class RetractableTopNFunctionTest extends TopNFunctionTestBase { List<Object> expectedOutput = new ArrayList<>(); expectedOutput.add(record("book", 1L, 12)); expectedOutput.add(record("book", 2L, 19)); - expectedOutput.add(retractRecord("book", 1L, 12)); - expectedOutput.add(record("book", 1L, 12)); - expectedOutput.add(retractRecord("book", 2L, 19)); + expectedOutput.add(deleteRecord("book", 2L, 19)); expectedOutput.add(record("book", 4L, 11)); expectedOutput.add(record("fruit", 4L, 33)); expectedOutput.add(record("fruit", 3L, 44)); - expectedOutput.add(retractRecord("fruit", 4L, 33)); - expectedOutput.add(retractRecord("fruit", 3L, 44)); - expectedOutput.add(record("fruit", 4L, 33)); + expectedOutput.add(deleteRecord("fruit", 3L, 44)); expectedOutput.add(record("fruit", 5L, 22)); assertorWithoutRowNumber - .assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); + .assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); // do a snapshot, data could be recovered from state OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0); @@ -154,18 +198,42 @@ public class RetractableTopNFunctionTest extends TopNFunctionTestBase { testHarness.open(); testHarness.processElement(record("book", 1L, 10)); - expectedOutput.add(retractRecord("book", 1L, 12)); - expectedOutput.add(retractRecord("book", 4L, 11)); - expectedOutput.add(record("book", 4L, 11)); + expectedOutput.add(deleteRecord("book", 1L, 12)); expectedOutput.add(record("book", 1L, 10)); assertorWithoutRowNumber - .assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); + .assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + + @Test + public void testVariableRankRangeWithRowNumber() throws Exception { + AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, new VariableRankRange(1), true, true); + OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = createTestHarness(func); + testHarness.open(); + testHarness.processElement(record("book", 2L, 12)); + testHarness.processElement(record("book", 2L, 19)); + testHarness.processElement(record("book", 2L, 11)); + testHarness.processElement(record("fruit", 1L, 33)); + testHarness.processElement(record("fruit", 1L, 44)); + testHarness.processElement(record("fruit", 1L, 22)); testHarness.close(); + + List<Object> expectedOutput = new ArrayList<>(); + expectedOutput.add(record("book", 2L, 12, 1L)); + expectedOutput.add(record("book", 2L, 19, 2L)); + expectedOutput.add(retractRecord("book", 2L, 19, 2L)); + expectedOutput.add(retractRecord("book", 2L, 12, 1L)); + expectedOutput.add(record("book", 2L, 12, 2L)); + expectedOutput.add(record("book", 2L, 11, 1L)); + expectedOutput.add(record("fruit", 1L, 33, 1L)); + expectedOutput.add(retractRecord("fruit", 1L, 33, 1L)); + expectedOutput.add(record("fruit", 1L, 22, 1L)); + assertorWithRowNumber + .assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); } - // TODO RetractRankFunction could be sent less retraction message when does not need to retract row_number @Test - public void testVariableRankRange() throws Exception { + public void testVariableRankRangeWithoutRowNumber() throws Exception { AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, new VariableRankRange(1), true, false); OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = createTestHarness(func); testHarness.open(); @@ -180,20 +248,44 @@ public class RetractableTopNFunctionTest extends TopNFunctionTestBase { List<Object> expectedOutput = new ArrayList<>(); expectedOutput.add(record("book", 2L, 12)); expectedOutput.add(record("book", 2L, 19)); - expectedOutput.add(retractRecord("book", 2L, 19)); - expectedOutput.add(retractRecord("book", 2L, 12)); - expectedOutput.add(record("book", 2L, 12)); + expectedOutput.add(deleteRecord("book", 2L, 19)); expectedOutput.add(record("book", 2L, 11)); expectedOutput.add(record("fruit", 1L, 33)); - expectedOutput.add(retractRecord("fruit", 1L, 33)); + expectedOutput.add(deleteRecord("fruit", 1L, 33)); expectedOutput.add(record("fruit", 1L, 22)); assertorWithoutRowNumber + .assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + } + + @Test + public void testDisableGenerateRetractionWithRowNumber() throws Exception { + AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, new ConstantRankRange(1, 2), false, + true); + OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = createTestHarness(func); + testHarness.open(); + testHarness.processElement(record("book", 1L, 12)); + testHarness.processElement(record("book", 2L, 19)); + testHarness.processElement(record("book", 4L, 11)); + testHarness.processElement(record("fruit", 4L, 33)); + testHarness.processElement(record("fruit", 3L, 44)); + testHarness.processElement(record("fruit", 5L, 22)); + testHarness.close(); + + List<Object> expectedOutput = new ArrayList<>(); + expectedOutput.add(record("book", 1L, 12, 1L)); + expectedOutput.add(record("book", 2L, 19, 2L)); + expectedOutput.add(record("book", 1L, 12, 2L)); + expectedOutput.add(record("book", 4L, 11, 1L)); + expectedOutput.add(record("fruit", 4L, 33, 1L)); + expectedOutput.add(record("fruit", 3L, 44, 2L)); + expectedOutput.add(record("fruit", 4L, 33, 2L)); + expectedOutput.add(record("fruit", 5L, 22, 1L)); + assertorWithRowNumber .assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); } - // TODO @Test - public void testDisableGenerateRetraction() throws Exception { + public void testDisableGenerateRetractionWithoutRowNumber() throws Exception { AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, new ConstantRankRange(1, 2), false, false); OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness = createTestHarness(func); @@ -209,14 +301,14 @@ public class RetractableTopNFunctionTest extends TopNFunctionTestBase { List<Object> expectedOutput = new ArrayList<>(); expectedOutput.add(record("book", 1L, 12)); expectedOutput.add(record("book", 2L, 19)); - expectedOutput.add(record("book", 1L, 12)); + expectedOutput.add(deleteRecord("book", 2L, 19)); expectedOutput.add(record("book", 4L, 11)); expectedOutput.add(record("fruit", 4L, 33)); expectedOutput.add(record("fruit", 3L, 44)); - expectedOutput.add(record("fruit", 4L, 33)); + expectedOutput.add(deleteRecord("fruit", 3L, 44)); expectedOutput.add(record("fruit", 5L, 22)); assertorWithoutRowNumber - .assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); + .assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); } @Test