[FLINK-7776] [table] Prevent emission of identical update records in group aggregation.
This closes #4785. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4047be49 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4047be49 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4047be49 Branch: refs/heads/master Commit: 4047be49e10cacc5e4ce932a0b8433afffa82a58 Parents: 1ea7f49 Author: Xpray <[email protected]> Authored: Mon Oct 9 18:19:01 2017 +0800 Committer: Fabian Hueske <[email protected]> Committed: Tue Oct 10 23:09:07 2017 +0200 ---------------------------------------------------------------------- .../aggregate/GroupAggProcessFunction.scala | 10 ++++---- .../runtime/stream/table/AggregateITCase.scala | 25 ++++++++++++++++++++ 2 files changed, 31 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4047be49/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala index df59460..91c379f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala @@ -129,17 +129,19 @@ class GroupAggProcessFunction( state.update(accumulators) cntState.update(inputCnt) - // if this was not the first row and we have to emit retractions - if (generateRetraction && !firstRow) { + // if this was not the first row + if (!firstRow) { if (prevRow.row.equals(newRow.row) && !stateCleaningEnabled) { // newRow is the same as before and state cleaning is not enabled. - // We do not emit retraction and acc message. + // We emit nothing // If state cleaning is enabled, we have to emit messages to prevent too early // state eviction of downstream operators. return } else { // retract previous result - out.collect(prevRow) + if (generateRetraction) { + out.collect(prevRow) + } } } // emit the new result http://git-wip-us.apache.org/repos/asf/flink/blob/4047be49/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala index eb3d37f..e67c784 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala @@ -193,4 +193,29 @@ class AggregateITCase extends StreamingWithStateTestBase { // verify agg close is called assert(JavaUserDefinedAggFunctions.isCloseCalled) } + + @Test + def testRemoveDuplicateRecordsWithUpsertSink(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val data = new mutable.MutableList[(Int, Long, String)] + data.+=((1, 1L, "A")) + data.+=((2, 2L, "B")) + data.+=((3, 2L, "B")) + data.+=((4, 3L, "C")) + data.+=((5, 3L, "C")) + + val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c) + .groupBy('c) + .select('c, 'b.max) + + t.writeToSink(new TestUpsertSink(Array("c"), false)) + env.execute() + + val expected = List("(true,A,1)", "(true,B,2)", "(true,C,3)") + assertEquals(expected.sorted, RowCollector.getAndClearValues.map(_.toString).sorted) + } }
