This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 9c7fc0d  [FLINK-15497][table-planner-blink] Reduce outputs when rank 
number is not required in Retractable TopN (#10823)
9c7fc0d is described below

commit 9c7fc0d4eb91fc486f8497ab07e55aa7e15222de
Author: Jing Zhang <beyond1...@126.com>
AuthorDate: Tue Jan 21 10:51:52 2020 +0800

    [FLINK-15497][table-planner-blink] Reduce outputs when rank number is not 
required in Retractable TopN (#10823)
---
 .../planner/runtime/stream/sql/RankITCase.scala    |  43 +++---
 .../operators/rank/RetractableTopNFunction.java    | 151 +++++++++++++++++++--
 .../operators/rank/UpdatableTopNFunction.java      |   2 +-
 .../rank/RetractableTopNFunctionTest.java          | 142 +++++++++++++++----
 4 files changed, 282 insertions(+), 56 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala
index c2f0fb5..d46c640 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala
@@ -894,7 +894,6 @@ class RankITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     assertEquals(updatedExpected.sorted, sink.getUpsertResults.sorted)
   }
 
-  @Ignore("Enable after UnaryUpdatableTopN is supported")
   @Test
   def testTopNWithGroupByAvgWithoutRowNumber(): Unit = {
     val data = List(
@@ -937,25 +936,37 @@ class RankITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     tEnv.execute("test")
 
     val expected = List(
-      "(true,book,1,100.0)",
-      "(true,book,3,110.0)",
-      "(true,book,4,120.0)",
-      "(true,book,1,150.0)",
-      "(true,book,1,166.66666666666666)",
-      "(true,book,2,300.0)",
-      "(false,book,3,110.0)",
-      "(true,book,2,350.0)",
-      "(true,book,4,310.0)",
-      "(true,book,1,225.0)",
-      "(true,fruit,5,100.0)")
+      "(true,book,1,100)",
+      "(true,book,3,110)",
+      "(true,book,4,120)",
+      "(false,book,1,100)",
+      "(true,book,1,150)",
+      "(false,book,1,150)",
+      "(true,book,1,166)",
+      "(false,book,3,110)",
+      "(true,book,2,300)",
+      "(false,book,2,300)",
+      "(true,book,3,110)",
+      "(false,book,3,110)",
+      "(true,book,2,350)",
+      "(false,book,4,120)",
+      "(true,book,3,110)",
+      "(false,book,3,110)",
+      "(true,book,4,310)",
+      "(false,book,1,166)",
+      "(true,book,3,110)",
+      "(false,book,3,110)",
+      "(true,book,1,225)",
+      "(true,fruit,5,100)"
+    )
 
     assertEquals(expected, sink.getRawResults)
 
     val updatedExpected = List(
-      "book,1,225.0",
-      "book,2,350.0",
-      "book,4,310.0",
-      "fruit,5,100.0")
+      "book,1,225",
+      "book,2,350",
+      "book,4,310",
+      "fruit,5,100")
 
     assertEquals(updatedExpected.sorted, sink.getUpsertResults.sorted)
   }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
index 5ed2c7b..6e593e9 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
@@ -135,8 +135,13 @@ public class RetractableTopNFunction extends 
AbstractTopNFunction {
                        }
 
                        // emit
-                       emitRecordsWithRowNumber(sortedMap, sortKey, input, 
out);
-
+                       if (outputRankNumber || hasOffset()) {
+                               // the without-number-algorithm can't handle 
topN with offset,
+                               // so use the with-number-algorithm to handle 
offset
+                               emitRecordsWithRowNumber(sortedMap, sortKey, 
input, out);
+                       } else {
+                               emitRecordsWithoutRowNumber(sortedMap, sortKey, 
input, out);
+                       }
                        // update data state
                        List<BaseRow> inputs = dataState.get(sortKey);
                        if (inputs == null) {
@@ -147,7 +152,13 @@ public class RetractableTopNFunction extends 
AbstractTopNFunction {
                        dataState.put(sortKey, inputs);
                } else {
                        // emit updates first
-                       retractRecordWithRowNumber(sortedMap, sortKey, input, 
out);
+                       if (outputRankNumber || hasOffset()) {
+                               // the without-number-algorithm can't handle 
topN with offset,
+                               // so use the with-number-algorithm to handle 
offset
+                               retractRecordWithRowNumber(sortedMap, sortKey, 
input, out);
+                       } else {
+                               retractRecordWithoutRowNumber(sortedMap, 
sortKey, input, out);
+                       }
 
                        // and then update sortedMap
                        if (sortedMap.containsKey(sortKey)) {
@@ -183,6 +194,95 @@ public class RetractableTopNFunction extends 
AbstractTopNFunction {
 
        // ------------- ROW_NUMBER-------------------------------
 
+       private void emitRecordsWithRowNumber(
+                       SortedMap<BaseRow, Long> sortedMap, BaseRow sortKey, 
BaseRow inputRow, Collector<BaseRow> out)
+                       throws Exception {
+               Iterator<Map.Entry<BaseRow, Long>> iterator = 
sortedMap.entrySet().iterator();
+               long curRank = 0L;
+               boolean findsSortKey = false;
+               while (iterator.hasNext() && isInRankEnd(curRank)) {
+                       Map.Entry<BaseRow, Long> entry = iterator.next();
+                       BaseRow key = entry.getKey();
+                       if (!findsSortKey && key.equals(sortKey)) {
+                               curRank += entry.getValue();
+                               collect(out, inputRow, curRank);
+                               findsSortKey = true;
+                       } else if (findsSortKey) {
+                               List<BaseRow> inputs = dataState.get(key);
+                               if (inputs == null) {
+                                       // Skip the data if it's state is 
cleared because of state ttl.
+                                       if (lenient) {
+                                               
LOG.warn(STATE_CLEARED_WARN_MSG);
+                                       } else {
+                                               throw new 
RuntimeException(STATE_CLEARED_WARN_MSG);
+                                       }
+                               } else {
+                                       int i = 0;
+                                       while (i < inputs.size() && 
isInRankEnd(curRank)) {
+                                               curRank += 1;
+                                               BaseRow prevRow = inputs.get(i);
+                                               retract(out, prevRow, curRank - 
1);
+                                               collect(out, prevRow, curRank);
+                                               i++;
+                                       }
+                               }
+                       } else {
+                               curRank += entry.getValue();
+                       }
+               }
+       }
+
+       private void emitRecordsWithoutRowNumber(
+                       SortedMap<BaseRow, Long> sortedMap, BaseRow sortKey, 
BaseRow inputRow, Collector<BaseRow> out)
+                       throws Exception {
+               Iterator<Map.Entry<BaseRow, Long>> iterator = 
sortedMap.entrySet().iterator();
+               long curRank = 0L;
+               boolean findsSortKey = false;
+               BaseRow toCollect = null;
+               BaseRow toDelete = null;
+               while (iterator.hasNext() && isInRankEnd(curRank)) {
+                       Map.Entry<BaseRow, Long> entry = iterator.next();
+                       BaseRow key = entry.getKey();
+                       if (!findsSortKey && key.equals(sortKey)) {
+                               curRank += entry.getValue();
+                               if (isInRankRange(curRank)) {
+                                       toCollect = inputRow;
+                               }
+                               findsSortKey = true;
+                       } else if (findsSortKey) {
+                               List<BaseRow> inputs = dataState.get(key);
+                               if (inputs == null) {
+                                       // Skip the data if it's state is 
cleared because of state ttl.
+                                       if (lenient) {
+                                               
LOG.warn(STATE_CLEARED_WARN_MSG);
+                                       } else {
+                                               throw new 
RuntimeException(STATE_CLEARED_WARN_MSG);
+                                       }
+                               } else {
+                                       long count = entry.getValue();
+                                       // gets the rank of last record with 
same sortKey
+                                       long rankOfLastRecord = curRank + count;
+                                       // deletes the record if there is a 
record recently downgrades to Top-(N+1)
+                                       if (isInRankEnd(rankOfLastRecord)) {
+                                               curRank = rankOfLastRecord;
+                                       } else {
+                                               int index = 
Long.valueOf(rankEnd - curRank).intValue();
+                                               toDelete = inputs.get(index);
+                                               break;
+                                       }
+                               }
+                       } else {
+                               curRank += entry.getValue();
+                       }
+               }
+               if (toDelete != null) {
+                       delete(out, toDelete);
+               }
+               if (toCollect != null) {
+                       collect(out, inputRow);
+               }
+       }
+
        private void retractRecordWithRowNumber(
                        SortedMap<BaseRow, Long> sortedMap, BaseRow sortKey, 
BaseRow inputRow, Collector<BaseRow> out)
                        throws Exception {
@@ -238,7 +338,7 @@ public class RetractableTopNFunction extends 
AbstractTopNFunction {
                }
        }
 
-       private void emitRecordsWithRowNumber(
+       private void retractRecordWithoutRowNumber(
                        SortedMap<BaseRow, Long> sortedMap, BaseRow sortKey, 
BaseRow inputRow, Collector<BaseRow> out)
                        throws Exception {
                Iterator<Map.Entry<BaseRow, Long>> iterator = 
sortedMap.entrySet().iterator();
@@ -248,10 +348,6 @@ public class RetractableTopNFunction extends 
AbstractTopNFunction {
                        Map.Entry<BaseRow, Long> entry = iterator.next();
                        BaseRow key = entry.getKey();
                        if (!findsSortKey && key.equals(sortKey)) {
-                               curRank += entry.getValue();
-                               collect(out, inputRow, curRank);
-                               findsSortKey = true;
-                       } else if (findsSortKey) {
                                List<BaseRow> inputs = dataState.get(key);
                                if (inputs == null) {
                                        // Skip the data if it's state is 
cleared because of state ttl.
@@ -261,14 +357,41 @@ public class RetractableTopNFunction extends 
AbstractTopNFunction {
                                                throw new 
RuntimeException(STATE_CLEARED_WARN_MSG);
                                        }
                                } else {
-                                       int i = 0;
-                                       while (i < inputs.size() && 
isInRankEnd(curRank)) {
+                                       Iterator<BaseRow> inputIter = 
inputs.iterator();
+                                       while (inputIter.hasNext() && 
isInRankEnd(curRank)) {
                                                curRank += 1;
-                                               BaseRow prevRow = inputs.get(i);
-                                               retract(out, prevRow, curRank - 
1);
-                                               collect(out, prevRow, curRank);
-                                               i++;
+                                               BaseRow prevRow = 
inputIter.next();
+                                               if (!findsSortKey && 
equaliser.equalsWithoutHeader(prevRow, inputRow)) {
+                                                       delete(out, prevRow, 
curRank);
+                                                       curRank -= 1;
+                                                       findsSortKey = true;
+                                                       inputIter.remove();
+                                               } else if (findsSortKey) {
+                                                       if (curRank == rankEnd) 
{
+                                                               collect(out, 
prevRow, curRank);
+                                                               break;
+                                                       }
+                                               }
                                        }
+                                       if (inputs.isEmpty()) {
+                                               dataState.remove(key);
+                                       } else {
+                                               dataState.put(key, inputs);
+                                       }
+                               }
+                       } else if (findsSortKey) {
+                               long count = entry.getValue();
+                               // gets the rank of last record with same 
sortKey
+                               long rankOfLastRecord = curRank + count;
+                               // sends the record if there is a record 
recently upgrades to Top-N
+                               if (rankOfLastRecord < rankEnd) {
+                                       curRank = rankOfLastRecord;
+                               } else {
+                                       int index = Long.valueOf(rankEnd - 
curRank - 1).intValue();
+                                       List<BaseRow> inputs = 
dataState.get(key);
+                                       BaseRow toAdd = inputs.get(index);
+                                       collect(out, toAdd);
+                                       break;
                                }
                        } else {
                                curRank += entry.getValue();
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
index 68a3c61..94a1379 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
@@ -55,7 +55,7 @@ import java.util.TreeMap;
  * However, the function only works in some special scenarios:
  * 1. sort field collation is ascending and its mono is decreasing, or sort 
field collation is descending and its mono
  * is increasing
- * 2. input data has unique keys
+ * 2. input data has unique keys and unique key must contain partition key
  * 3. input stream could not contain delete record or retract record
  */
 public class UpdatableTopNFunction extends AbstractTopNFunction implements 
CheckpointedFunction {
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
index f852ca1..26bfa65 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
@@ -111,10 +111,58 @@ public class RetractableTopNFunctionTest extends 
TopNFunctionTestBase {
                assertorWithRowNumber.assertOutputEqualsSorted("output wrong.", 
expectedOutput, testHarness.getOutput());
        }
 
-       // TODO RetractRankFunction could be sent less retraction message when 
does not need to retract row_number
-       @Override
        @Test
-       public void testConstantRankRangeWithoutOffset() throws Exception {
+       public void testConstantRankRangeWithoutOffsetWithRowNumber() throws 
Exception {
+               AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, 
new ConstantRankRange(1, 2), true,
+                               true);
+               OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness 
= createTestHarness(func);
+               testHarness.open();
+               testHarness.processElement(record("book", 1L, 12));
+               testHarness.processElement(record("book", 2L, 19));
+               testHarness.processElement(record("book", 4L, 11));
+               testHarness.processElement(record("fruit", 4L, 33));
+               testHarness.processElement(record("fruit", 3L, 44));
+               testHarness.processElement(record("fruit", 5L, 22));
+
+               List<Object> expectedOutput = new ArrayList<>();
+               expectedOutput.add(record("book", 1L, 12, 1L));
+               expectedOutput.add(record("book", 2L, 19, 2L));
+               expectedOutput.add(retractRecord("book", 1L, 12, 1L));
+               expectedOutput.add(record("book", 1L, 12, 2L));
+               expectedOutput.add(retractRecord("book", 2L, 19, 2L));
+               expectedOutput.add(record("book", 4L, 11, 1L));
+               expectedOutput.add(record("fruit", 4L, 33, 1L));
+               expectedOutput.add(record("fruit", 3L, 44, 2L));
+               expectedOutput.add(retractRecord("fruit", 4L, 33, 1L));
+               expectedOutput.add(retractRecord("fruit", 3L, 44, 2L));
+               expectedOutput.add(record("fruit", 4L, 33, 2L));
+               expectedOutput.add(record("fruit", 5L, 22, 1L));
+               assertorWithRowNumber
+                               .assertOutputEqualsSorted("output wrong.", 
expectedOutput, testHarness.getOutput());
+
+               // do a snapshot, data could be recovered from state
+               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+               testHarness.close();
+               expectedOutput.clear();
+
+               func = createFunction(RankType.ROW_NUMBER, new 
ConstantRankRange(1, 2), true, true);
+               testHarness = createTestHarness(func);
+               testHarness.setup();
+               testHarness.initializeState(snapshot);
+               testHarness.open();
+               testHarness.processElement(record("book", 1L, 10));
+
+               expectedOutput.add(retractRecord("book", 1L, 12, 2L));
+               expectedOutput.add(retractRecord("book", 4L, 11, 1L));
+               expectedOutput.add(record("book", 4L, 11, 2L));
+               expectedOutput.add(record("book", 1L, 10, 1L));
+               assertorWithRowNumber
+                               .assertOutputEqualsSorted("output wrong.", 
expectedOutput, testHarness.getOutput());
+               testHarness.close();
+       }
+
+       @Test
+       public void testConstantRankRangeWithoutOffsetWithoutRowNumber() throws 
Exception {
                AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, 
new ConstantRankRange(1, 2), true,
                                false);
                OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness 
= createTestHarness(func);
@@ -129,18 +177,14 @@ public class RetractableTopNFunctionTest extends 
TopNFunctionTestBase {
                List<Object> expectedOutput = new ArrayList<>();
                expectedOutput.add(record("book", 1L, 12));
                expectedOutput.add(record("book", 2L, 19));
-               expectedOutput.add(retractRecord("book", 1L, 12));
-               expectedOutput.add(record("book", 1L, 12));
-               expectedOutput.add(retractRecord("book", 2L, 19));
+               expectedOutput.add(deleteRecord("book", 2L, 19));
                expectedOutput.add(record("book", 4L, 11));
                expectedOutput.add(record("fruit", 4L, 33));
                expectedOutput.add(record("fruit", 3L, 44));
-               expectedOutput.add(retractRecord("fruit", 4L, 33));
-               expectedOutput.add(retractRecord("fruit", 3L, 44));
-               expectedOutput.add(record("fruit", 4L, 33));
+               expectedOutput.add(deleteRecord("fruit", 3L, 44));
                expectedOutput.add(record("fruit", 5L, 22));
                assertorWithoutRowNumber
-                               .assertOutputEqualsSorted("output wrong.", 
expectedOutput, testHarness.getOutput());
+                               .assertOutputEquals("output wrong.", 
expectedOutput, testHarness.getOutput());
 
                // do a snapshot, data could be recovered from state
                OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
@@ -154,18 +198,42 @@ public class RetractableTopNFunctionTest extends 
TopNFunctionTestBase {
                testHarness.open();
                testHarness.processElement(record("book", 1L, 10));
 
-               expectedOutput.add(retractRecord("book", 1L, 12));
-               expectedOutput.add(retractRecord("book", 4L, 11));
-               expectedOutput.add(record("book", 4L, 11));
+               expectedOutput.add(deleteRecord("book", 1L, 12));
                expectedOutput.add(record("book", 1L, 10));
                assertorWithoutRowNumber
-                               .assertOutputEqualsSorted("output wrong.", 
expectedOutput, testHarness.getOutput());
+                               .assertOutputEquals("output wrong.", 
expectedOutput, testHarness.getOutput());
+               testHarness.close();
+       }
+
+       @Test
+       public void testVariableRankRangeWithRowNumber() throws Exception {
+               AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, 
new VariableRankRange(1), true, true);
+               OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness 
= createTestHarness(func);
+               testHarness.open();
+               testHarness.processElement(record("book", 2L, 12));
+               testHarness.processElement(record("book", 2L, 19));
+               testHarness.processElement(record("book", 2L, 11));
+               testHarness.processElement(record("fruit", 1L, 33));
+               testHarness.processElement(record("fruit", 1L, 44));
+               testHarness.processElement(record("fruit", 1L, 22));
                testHarness.close();
+
+               List<Object> expectedOutput = new ArrayList<>();
+               expectedOutput.add(record("book", 2L, 12, 1L));
+               expectedOutput.add(record("book", 2L, 19, 2L));
+               expectedOutput.add(retractRecord("book", 2L, 19, 2L));
+               expectedOutput.add(retractRecord("book", 2L, 12, 1L));
+               expectedOutput.add(record("book", 2L, 12, 2L));
+               expectedOutput.add(record("book", 2L, 11, 1L));
+               expectedOutput.add(record("fruit", 1L, 33, 1L));
+               expectedOutput.add(retractRecord("fruit", 1L, 33, 1L));
+               expectedOutput.add(record("fruit", 1L, 22, 1L));
+               assertorWithRowNumber
+                               .assertOutputEqualsSorted("output wrong.", 
expectedOutput, testHarness.getOutput());
        }
 
-       // TODO RetractRankFunction could be sent less retraction message when 
does not need to retract row_number
        @Test
-       public void testVariableRankRange() throws Exception {
+       public void testVariableRankRangeWithoutRowNumber() throws Exception {
                AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, 
new VariableRankRange(1), true, false);
                OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness 
= createTestHarness(func);
                testHarness.open();
@@ -180,20 +248,44 @@ public class RetractableTopNFunctionTest extends 
TopNFunctionTestBase {
                List<Object> expectedOutput = new ArrayList<>();
                expectedOutput.add(record("book", 2L, 12));
                expectedOutput.add(record("book", 2L, 19));
-               expectedOutput.add(retractRecord("book", 2L, 19));
-               expectedOutput.add(retractRecord("book", 2L, 12));
-               expectedOutput.add(record("book", 2L, 12));
+               expectedOutput.add(deleteRecord("book", 2L, 19));
                expectedOutput.add(record("book", 2L, 11));
                expectedOutput.add(record("fruit", 1L, 33));
-               expectedOutput.add(retractRecord("fruit", 1L, 33));
+               expectedOutput.add(deleteRecord("fruit", 1L, 33));
                expectedOutput.add(record("fruit", 1L, 22));
                assertorWithoutRowNumber
+                               .assertOutputEquals("output wrong.", 
expectedOutput, testHarness.getOutput());
+       }
+
+       @Test
+       public void testDisableGenerateRetractionWithRowNumber() throws 
Exception {
+               AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, 
new ConstantRankRange(1, 2), false,
+                               true);
+               OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness 
= createTestHarness(func);
+               testHarness.open();
+               testHarness.processElement(record("book", 1L, 12));
+               testHarness.processElement(record("book", 2L, 19));
+               testHarness.processElement(record("book", 4L, 11));
+               testHarness.processElement(record("fruit", 4L, 33));
+               testHarness.processElement(record("fruit", 3L, 44));
+               testHarness.processElement(record("fruit", 5L, 22));
+               testHarness.close();
+
+               List<Object> expectedOutput = new ArrayList<>();
+               expectedOutput.add(record("book", 1L, 12, 1L));
+               expectedOutput.add(record("book", 2L, 19, 2L));
+               expectedOutput.add(record("book", 1L, 12, 2L));
+               expectedOutput.add(record("book", 4L, 11, 1L));
+               expectedOutput.add(record("fruit", 4L, 33, 1L));
+               expectedOutput.add(record("fruit", 3L, 44, 2L));
+               expectedOutput.add(record("fruit", 4L, 33, 2L));
+               expectedOutput.add(record("fruit", 5L, 22, 1L));
+               assertorWithRowNumber
                                .assertOutputEqualsSorted("output wrong.", 
expectedOutput, testHarness.getOutput());
        }
 
-       // TODO
        @Test
-       public void testDisableGenerateRetraction() throws Exception {
+       public void testDisableGenerateRetractionWithoutRowNumber() throws 
Exception {
                AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, 
new ConstantRankRange(1, 2), false,
                                false);
                OneInputStreamOperatorTestHarness<BaseRow, BaseRow> testHarness 
= createTestHarness(func);
@@ -209,14 +301,14 @@ public class RetractableTopNFunctionTest extends 
TopNFunctionTestBase {
                List<Object> expectedOutput = new ArrayList<>();
                expectedOutput.add(record("book", 1L, 12));
                expectedOutput.add(record("book", 2L, 19));
-               expectedOutput.add(record("book", 1L, 12));
+               expectedOutput.add(deleteRecord("book", 2L, 19));
                expectedOutput.add(record("book", 4L, 11));
                expectedOutput.add(record("fruit", 4L, 33));
                expectedOutput.add(record("fruit", 3L, 44));
-               expectedOutput.add(record("fruit", 4L, 33));
+               expectedOutput.add(deleteRecord("fruit", 3L, 44));
                expectedOutput.add(record("fruit", 5L, 22));
                assertorWithoutRowNumber
-                               .assertOutputEqualsSorted("output wrong.", 
expectedOutput, testHarness.getOutput());
+                               .assertOutputEquals("output wrong.", 
expectedOutput, testHarness.getOutput());
        }
 
        @Test

Reply via email to