This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ba49e50e14c6d78c11ee87afbb851da471d3db68
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Tue Jan 9 09:52:41 2024 +0100

    Revert "[FLINK-34000] Remove IncrementalGroupAgg Json Plan & IT tests"
    
    This reverts commit 0df5ab5a3318d21e8be3ab9237900664e3741013.
---
 .../stream/IncrementalAggregateJsonPlanTest.java   | 106 ++++
 .../IncrementalAggregateJsonPlanITCase.java        |  78 +++
 .../testIncrementalAggregate.out                   | 401 ++++++++++++++
 ...lAggregateWithSumCountDistinctAndRetraction.out | 585 +++++++++++++++++++++
 4 files changed, 1170 insertions(+)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
new file mode 100644
index 00000000000..26dcc04f303
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import 
org.apache.flink.table.planner.plan.rules.physical.stream.IncrementalAggregateRule;
+import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+/** Test json serialization/deserialization for incremental aggregate. */
+class IncrementalAggregateJsonPlanTest extends TableTestBase {
+
+    private StreamTableTestUtil util;
+    private TableEnvironment tEnv;
+
+    @BeforeEach
+    void setup() {
+        util = streamTestUtil(TableConfig.getDefault());
+        tEnv = util.getTableEnv();
+        tEnv.getConfig()
+                .set(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
+                        AggregatePhaseStrategy.TWO_PHASE.name())
+                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
+                .set(
+                        
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
+                        Duration.ofSeconds(10))
+                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
+                
.set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(), true);
+
+        String srcTableDdl =
+                "CREATE TABLE MyTable (\n"
+                        + "  a bigint,\n"
+                        + "  b int not null,\n"
+                        + "  c varchar,\n"
+                        + "  d bigint\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'bounded' = 'false')";
+        tEnv.executeSql(srcTableDdl);
+    }
+
+    @Test
+    void testIncrementalAggregate() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + "  a bigint,\n"
+                        + "  c bigint\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'sink-insert-only' = 'false',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
+        tEnv.executeSql(sinkTableDdl);
+        util.verifyJsonPlan(
+                "insert into MySink select a, "
+                        + "count(distinct c) as c "
+                        + "from MyTable group by a");
+    }
+
+    @Test
+    void testIncrementalAggregateWithSumCountDistinctAndRetraction() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + "  b bigint,\n"
+                        + "  sum_b int,\n"
+                        + "  cnt_distinct_b bigint,\n"
+                        + "  cnt1 bigint\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'sink-insert-only' = 'false',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
+        tEnv.executeSql(sinkTableDdl);
+        util.verifyJsonPlan(
+                "insert into MySink "
+                        + "select b, sum(b1), count(distinct b1), count(1) "
+                        + " from "
+                        + "   (select a, count(b) as b, max(b) as b1 from 
MyTable group by a)"
+                        + " group by b");
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
new file mode 100644
index 00000000000..6f72a3930a8
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.jsonplan;
+
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import 
org.apache.flink.table.planner.plan.rules.physical.stream.IncrementalAggregateRule;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.JsonPlanTestBase;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+/** Test for incremental aggregate json plan. */
+class IncrementalAggregateJsonPlanITCase extends JsonPlanTestBase {
+
+    @BeforeEach
+    @Override
+    protected void setup() throws Exception {
+        super.setup();
+        tableEnv.getConfig()
+                .set(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
+                        AggregatePhaseStrategy.TWO_PHASE.name())
+                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
+                .set(
+                        
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
+                        Duration.ofSeconds(10))
+                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
+                
.set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(), true);
+    }
+
+    @Test
+    void testIncrementalAggregate() throws IOException, ExecutionException, 
InterruptedException {
+        createTestValuesSourceTable(
+                "MyTable",
+                JavaScalaConversionUtil.toJava(TestData.smallData3()),
+                "a int",
+                "b bigint",
+                "c varchar");
+        createTestNonInsertOnlyValuesSinkTable(
+                "MySink", "b bigint", "a bigint", "primary key (b) not 
enforced");
+        compileSqlAndExecutePlan(
+                        "insert into MySink select b, "
+                                + "count(distinct a) as a "
+                                + "from MyTable group by b")
+                .await();
+
+        List<String> result = 
TestValuesTableFactory.getResultsAsStrings("MySink");
+        assertResult(Arrays.asList("+I[1, 1]", "+I[2, 2]"), result);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
new file mode 100644
index 00000000000..46cc85e26f3
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
@@ -0,0 +1,401 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "bounded" : "false",
+            "connector" : "values"
+          }
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 2 ] ],
+        "producedType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, c], metadata=[]]], fields=[a, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-mini-batch-assigner_1",
+    "miniBatchInterval" : {
+      "interval" : 10000,
+      "mode" : "ProcTime"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$MOD$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "internalName" : "$HASH_CODE$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 1,
+          "type" : "VARCHAR(2147483647)"
+        } ],
+        "type" : "INT"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 1024,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647), `$f2` INT>",
+    "description" : "Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-local-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "5"
+    },
+    "grouping" : [ 0, 2 ],
+    "aggCalls" : [ {
+      "name" : null,
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : true,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "aggCallNeedRetractions" : [ false ],
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "$f2",
+        "fieldType" : "INT"
+      }, {
+        "name" : "count$0",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "distinct$0",
+        "fieldType" : {
+          "type" : "RAW",
+          "class" : "org.apache.flink.table.api.dataview.MapView",
+          "externalDataType" : {
+            "logicalType" : {
+              "type" : "STRUCTURED_TYPE",
+              "implementationClass" : 
"org.apache.flink.table.api.dataview.MapView",
+              "attributes" : [ {
+                "name" : "map",
+                "attributeType" : "MAP<VARCHAR(2147483647), BIGINT NOT NULL>"
+              } ]
+            },
+            "fields" : [ {
+              "name" : "map",
+              "keyClass" : {
+                "conversionClass" : "org.apache.flink.table.data.StringData"
+              }
+            } ]
+          }
+        }
+      } ]
+    },
+    "description" : "LocalGroupAggregate(groupBy=[a, $f2], 
partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 c) AS count$0, 
DISTINCT(c) AS distinct$0])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0, 1 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "$f2",
+        "fieldType" : "INT"
+      }, {
+        "name" : "count$0",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "distinct$0",
+        "fieldType" : {
+          "type" : "RAW",
+          "class" : "org.apache.flink.table.api.dataview.MapView",
+          "externalDataType" : {
+            "logicalType" : {
+              "type" : "STRUCTURED_TYPE",
+              "implementationClass" : 
"org.apache.flink.table.api.dataview.MapView",
+              "attributes" : [ {
+                "name" : "map",
+                "attributeType" : "MAP<VARCHAR(2147483647), BIGINT NOT NULL>"
+              } ]
+            },
+            "fields" : [ {
+              "name" : "map",
+              "keyClass" : {
+                "conversionClass" : "org.apache.flink.table.data.StringData"
+              }
+            } ]
+          }
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[a, $f2]])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-incremental-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "5"
+    },
+    "partialAggGrouping" : [ 0, 1 ],
+    "finalAggGrouping" : [ 0 ],
+    "partialOriginalAggCalls" : [ {
+      "name" : null,
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : true,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "partialAggCallNeedRetractions" : [ false ],
+    "partialLocalAggInputRowType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647), 
`$f2` INT>",
+    "partialAggNeedRetraction" : false,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "incrementalGroupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `count$0` BIGINT>",
+    "description" : "IncrementalGroupAggregate(partialAggGrouping=[a, $f2], 
finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `count$0` BIGINT>",
+    "description" : "Exchange(distribution=[hash[a]])"
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-global-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "5"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : null,
+      "internalName" : "$$SUM0$1",
+      "argList" : [ 2 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "aggCallNeedRetractions" : [ false ],
+    "localAggInputRowType" : "ROW<`a` BIGINT, `$f2` INT, `$f2_0` BIGINT NOT 
NULL>",
+    "generateUpdateBefore" : true,
+    "needRetraction" : false,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "globalGroupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `$f1` BIGINT NOT NULL>",
+    "description" : "GlobalGroupAggregate(groupBy=[a], 
partialFinalType=[FINAL], select=[a, $SUM0(count$0) AS $f1])"
+  }, {
+    "id" : 9,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "sink-insert-only" : "false",
+            "table-sink-class" : "DEFAULT"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "inputUpsertKey" : [ 0 ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `$f1` BIGINT NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, $f1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
new file mode 100644
index 00000000000..7a48fa0143f
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
@@ -0,0 +1,585 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "bounded" : "false",
+            "connector" : "values"
+          }
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ] ],
+        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-mini-batch-assigner_1",
+    "miniBatchInterval" : {
+      "interval" : 10000,
+      "mode" : "ProcTime"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-local-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "5"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "b",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "name" : "b1",
+      "internalName" : "$MAX$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "INT NOT NULL"
+    } ],
+    "aggCallNeedRetractions" : [ false, false ],
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `count1$0` BIGINT, `max$1` INT>",
+    "description" : "LocalGroupAggregate(groupBy=[a], select=[a, COUNT(*) AS 
count1$0, MAX(b) AS max$1])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `count1$0` BIGINT, `max$1` INT>",
+    "description" : "Exchange(distribution=[hash[a]])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-global-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "5"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "b",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "name" : "b1",
+      "internalName" : "$MAX$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "INT NOT NULL"
+    } ],
+    "aggCallNeedRetractions" : [ false, false ],
+    "localAggInputRowType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "generateUpdateBefore" : true,
+    "needRetraction" : false,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "globalGroupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` BIGINT NOT NULL, `b1` INT NOT NULL>",
+    "description" : "GlobalGroupAggregate(groupBy=[a], select=[a, 
COUNT(count1$0) AS b, MAX(max$1) AS b1])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$MOD$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "internalName" : "$HASH_CODE$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 2,
+          "type" : "INT NOT NULL"
+        } ],
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 1024,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "INT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT NOT NULL, `b1` INT NOT NULL, `$f2` INT NOT 
NULL>",
+    "description" : "Calc(select=[b, b1, MOD(HASH_CODE(b1), 1024) AS $f2])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-local-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "5"
+    },
+    "grouping" : [ 0, 2 ],
+    "aggCalls" : [ {
+      "name" : null,
+      "internalName" : "$SUM$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "INT NOT NULL"
+    }, {
+      "name" : null,
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : true,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "name" : null,
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "aggCallNeedRetractions" : [ true, true, true ],
+    "needRetraction" : true,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT NOT NULL"
+      }, {
+        "name" : "$f2",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "sum$0",
+        "fieldType" : "INT"
+      }, {
+        "name" : "count$1",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "count$2",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "count1$3",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "distinct$0",
+        "fieldType" : {
+          "type" : "RAW",
+          "class" : "org.apache.flink.table.api.dataview.MapView",
+          "externalDataType" : {
+            "type" : "STRUCTURED_TYPE",
+            "implementationClass" : 
"org.apache.flink.table.api.dataview.MapView",
+            "attributes" : [ {
+              "name" : "map",
+              "attributeType" : "MAP<INT NOT NULL, BIGINT NOT NULL>"
+            } ]
+          }
+        }
+      } ]
+    },
+    "description" : "LocalGroupAggregate(groupBy=[b, $f2], 
partialFinalType=[PARTIAL], select=[b, $f2, SUM_RETRACT(b1) AS (sum$0, 
count$1), COUNT_RETRACT(distinct$0 b1) AS count$2, COUNT_RETRACT(*) AS 
count1$3, DISTINCT(b1) AS distinct$0])"
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0, 1 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT NOT NULL"
+      }, {
+        "name" : "$f2",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "sum$0",
+        "fieldType" : "INT"
+      }, {
+        "name" : "count$1",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "count$2",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "count1$3",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "distinct$0",
+        "fieldType" : {
+          "type" : "RAW",
+          "class" : "org.apache.flink.table.api.dataview.MapView",
+          "externalDataType" : {
+            "type" : "STRUCTURED_TYPE",
+            "implementationClass" : 
"org.apache.flink.table.api.dataview.MapView",
+            "attributes" : [ {
+              "name" : "map",
+              "attributeType" : "MAP<INT NOT NULL, BIGINT NOT NULL>"
+            } ]
+          }
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[b, $f2]])"
+  }, {
+    "id" : 9,
+    "type" : "stream-exec-incremental-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "5"
+    },
+    "partialAggGrouping" : [ 0, 1 ],
+    "finalAggGrouping" : [ 0 ],
+    "partialOriginalAggCalls" : [ {
+      "name" : null,
+      "internalName" : "$SUM$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "INT NOT NULL"
+    }, {
+      "name" : null,
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : true,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "name" : null,
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "partialAggCallNeedRetractions" : [ true, true, true ],
+    "partialLocalAggInputRowType" : "ROW<`b` BIGINT NOT NULL, `b1` INT NOT 
NULL, `$f2` INT NOT NULL>",
+    "partialAggNeedRetraction" : true,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "incrementalGroupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT NOT NULL, `sum$0` INT, `count$1` BIGINT, 
`count$2` BIGINT, `count1$3` BIGINT>",
+    "description" : "IncrementalGroupAggregate(partialAggGrouping=[b, $f2], 
finalAggGrouping=[b], select=[b, SUM_RETRACT((sum$0, count$1)) AS (sum$0, 
count$1), COUNT_RETRACT(distinct$0 count$2) AS count$2, COUNT_RETRACT(count1$3) 
AS count1$3])"
+  }, {
+    "id" : 10,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT NOT NULL, `sum$0` INT, `count$1` BIGINT, 
`count$2` BIGINT, `count1$3` BIGINT>",
+    "description" : "Exchange(distribution=[hash[b]])"
+  }, {
+    "id" : 11,
+    "type" : "stream-exec-global-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "5"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : null,
+      "internalName" : "$SUM$1",
+      "argList" : [ 2 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "INT NOT NULL"
+    }, {
+      "name" : null,
+      "internalName" : "$$SUM0$1",
+      "argList" : [ 3 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "name" : null,
+      "internalName" : "$$SUM0$1",
+      "argList" : [ 4 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "aggCallNeedRetractions" : [ true, true, true ],
+    "localAggInputRowType" : "ROW<`b` BIGINT NOT NULL, `$f2` INT NOT NULL, 
`$f2_0` INT NOT NULL, `$f3` BIGINT NOT NULL, `$f4` BIGINT NOT NULL>",
+    "generateUpdateBefore" : true,
+    "needRetraction" : true,
+    "indexOfCountStar" : 2,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "globalGroupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT NOT NULL, `$f1` INT NOT NULL, `$f2` BIGINT 
NOT NULL, `$f3` BIGINT NOT NULL>",
+    "description" : "GlobalGroupAggregate(groupBy=[b], 
partialFinalType=[FINAL], select=[b, SUM_RETRACT((sum$0, count$1)) AS $f1, 
$SUM0_RETRACT(count$2) AS $f2, $SUM0_RETRACT(count1$3) AS $f3], 
indexOfCountStar=[2])"
+  }, {
+    "id" : 12,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "sum_b",
+              "dataType" : "INT"
+            }, {
+              "name" : "cnt_distinct_b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "cnt1",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "sink-insert-only" : "false",
+            "table-sink-class" : "DEFAULT"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "inputUpsertKey" : [ 0 ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT NOT NULL, `$f1` INT NOT NULL, `$f2` BIGINT 
NOT NULL, `$f3` BIGINT NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[b, $f1, $f2, $f3])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}


Reply via email to