This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ba49e50e14c6d78c11ee87afbb851da471d3db68 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Tue Jan 9 09:52:41 2024 +0100 Revert "[FLINK-34000] Remove IncrementalGroupAgg Json Plan & IT tests" This reverts commit 0df5ab5a3318d21e8be3ab9237900664e3741013. --- .../stream/IncrementalAggregateJsonPlanTest.java | 106 ++++ .../IncrementalAggregateJsonPlanITCase.java | 78 +++ .../testIncrementalAggregate.out | 401 ++++++++++++++ ...lAggregateWithSumCountDistinctAndRetraction.out | 585 +++++++++++++++++++++ 4 files changed, 1170 insertions(+) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java new file mode 100644 index 00000000000..26dcc04f303 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.plan.rules.physical.stream.IncrementalAggregateRule; +import org.apache.flink.table.planner.utils.AggregatePhaseStrategy; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +/** Test json serialization/deserialization for incremental aggregate. */ +class IncrementalAggregateJsonPlanTest extends TableTestBase { + + private StreamTableTestUtil util; + private TableEnvironment tEnv; + + @BeforeEach + void setup() { + util = streamTestUtil(TableConfig.getDefault()); + tEnv = util.getTableEnv(); + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, + AggregatePhaseStrategy.TWO_PHASE.name()) + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true) + .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true) + .set( + ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, + Duration.ofSeconds(10)) + .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L) + .set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(), true); + + String srcTableDdl = + "CREATE TABLE MyTable (\n" + + " a bigint,\n" + + " b int not null,\n" + + " c varchar,\n" + + " d bigint\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'false')"; + tEnv.executeSql(srcTableDdl); + } + + @Test + void testIncrementalAggregate() { + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " a bigint,\n" + + " c bigint\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'sink-insert-only' = 'false',\n" + + " 'table-sink-class' = 'DEFAULT')"; + tEnv.executeSql(sinkTableDdl); + util.verifyJsonPlan( + "insert into MySink select a, " + + "count(distinct c) as c " + + "from MyTable group by a"); + } + + @Test + void testIncrementalAggregateWithSumCountDistinctAndRetraction() { + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " b bigint,\n" + + " sum_b int,\n" + + " cnt_distinct_b bigint,\n" + + " cnt1 bigint\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'sink-insert-only' = 'false',\n" + + " 'table-sink-class' = 'DEFAULT')"; + tEnv.executeSql(sinkTableDdl); + util.verifyJsonPlan( + "insert into MySink " + + "select b, sum(b1), count(distinct b1), count(1) " + + " from " + + " (select a, count(b) as b, max(b) as b1 from MyTable group by a)" + + " group by b"); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java new file mode 100644 index 00000000000..6f72a3930a8 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.jsonplan; + +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.plan.rules.physical.stream.IncrementalAggregateRule; +import org.apache.flink.table.planner.runtime.utils.TestData; +import org.apache.flink.table.planner.utils.AggregatePhaseStrategy; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.planner.utils.JsonPlanTestBase; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +/** Test for incremental aggregate json plan. */ +class IncrementalAggregateJsonPlanITCase extends JsonPlanTestBase { + + @BeforeEach + @Override + protected void setup() throws Exception { + super.setup(); + tableEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, + AggregatePhaseStrategy.TWO_PHASE.name()) + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true) + .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true) + .set( + ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, + Duration.ofSeconds(10)) + .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L) + .set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(), true); + } + + @Test + void testIncrementalAggregate() throws IOException, ExecutionException, InterruptedException { + createTestValuesSourceTable( + "MyTable", + JavaScalaConversionUtil.toJava(TestData.smallData3()), + "a int", + "b bigint", + "c varchar"); + createTestNonInsertOnlyValuesSinkTable( + "MySink", "b bigint", "a bigint", "primary key (b) not enforced"); + compileSqlAndExecutePlan( + "insert into MySink select b, " + + "count(distinct a) as a " + + "from MyTable group by b") + .await(); + + List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink"); + assertResult(Arrays.asList("+I[1, 1]", "+I[2, 2]"), result); + } +} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out new file mode 100644 index 00000000000..46cc85e26f3 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out @@ -0,0 +1,401 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "BIGINT" + }, { + "name" : "b", + "dataType" : "INT NOT NULL" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "d", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ], + "options" : { + "bounded" : "false", + "connector" : "values" + } + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 2 ] ], + "producedType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647)> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647)> NOT NULL" + } ] + }, + "outputType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c], metadata=[]]], fields=[a, c])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-mini-batch-assigner_1", + "miniBatchInterval" : { + "interval" : 10000, + "mode" : "ProcTime" + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])" + }, { + "id" : 3, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "internalName" : "$MOD$1", + "operands" : [ { + "kind" : "CALL", + "internalName" : "$HASH_CODE$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "INT" + }, { + "kind" : "LITERAL", + "value" : 1024, + "type" : "INT NOT NULL" + } ], + "type" : "INT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647), `$f2` INT>", + "description" : "Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])" + }, { + "id" : 4, + "type" : "stream-exec-local-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "5" + }, + "grouping" : [ 0, 2 ], + "aggCalls" : [ { + "name" : null, + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : true, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "aggCallNeedRetractions" : [ false ], + "needRetraction" : false, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "a", + "fieldType" : "BIGINT" + }, { + "name" : "$f2", + "fieldType" : "INT" + }, { + "name" : "count$0", + "fieldType" : "BIGINT" + }, { + "name" : "distinct$0", + "fieldType" : { + "type" : "RAW", + "class" : "org.apache.flink.table.api.dataview.MapView", + "externalDataType" : { + "logicalType" : { + "type" : "STRUCTURED_TYPE", + "implementationClass" : "org.apache.flink.table.api.dataview.MapView", + "attributes" : [ { + "name" : "map", + "attributeType" : "MAP<VARCHAR(2147483647), BIGINT NOT NULL>" + } ] + }, + "fields" : [ { + "name" : "map", + "keyClass" : { + "conversionClass" : "org.apache.flink.table.data.StringData" + } + } ] + } + } + } ] + }, + "description" : "LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])" + }, { + "id" : 5, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0, 1 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "a", + "fieldType" : "BIGINT" + }, { + "name" : "$f2", + "fieldType" : "INT" + }, { + "name" : "count$0", + "fieldType" : "BIGINT" + }, { + "name" : "distinct$0", + "fieldType" : { + "type" : "RAW", + "class" : "org.apache.flink.table.api.dataview.MapView", + "externalDataType" : { + "logicalType" : { + "type" : "STRUCTURED_TYPE", + "implementationClass" : "org.apache.flink.table.api.dataview.MapView", + "attributes" : [ { + "name" : "map", + "attributeType" : "MAP<VARCHAR(2147483647), BIGINT NOT NULL>" + } ] + }, + "fields" : [ { + "name" : "map", + "keyClass" : { + "conversionClass" : "org.apache.flink.table.data.StringData" + } + } ] + } + } + } ] + }, + "description" : "Exchange(distribution=[hash[a, $f2]])" + }, { + "id" : 6, + "type" : "stream-exec-incremental-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "5" + }, + "partialAggGrouping" : [ 0, 1 ], + "finalAggGrouping" : [ 0 ], + "partialOriginalAggCalls" : [ { + "name" : null, + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : true, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "partialAggCallNeedRetractions" : [ false ], + "partialLocalAggInputRowType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647), `$f2` INT>", + "partialAggNeedRetraction" : false, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "incrementalGroupAggregateState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `count$0` BIGINT>", + "description" : "IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0])" + }, { + "id" : 7, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `count$0` BIGINT>", + "description" : "Exchange(distribution=[hash[a]])" + }, { + "id" : 8, + "type" : "stream-exec-global-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "5" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : null, + "internalName" : "$$SUM0$1", + "argList" : [ 2 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "aggCallNeedRetractions" : [ false ], + "localAggInputRowType" : "ROW<`a` BIGINT, `$f2` INT, `$f2_0` BIGINT NOT NULL>", + "generateUpdateBefore" : true, + "needRetraction" : false, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "globalGroupAggregateState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `$f1` BIGINT NOT NULL>", + "description" : "GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(count$0) AS $f1])" + }, { + "id" : 9, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`MySink`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ], + "options" : { + "connector" : "values", + "sink-insert-only" : "false", + "table-sink-class" : "DEFAULT" + } + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputUpsertKey" : [ 0 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `$f1` BIGINT NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, $f1])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 8, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out new file mode 100644 index 00000000000..7a48fa0143f --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out @@ -0,0 +1,585 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "BIGINT" + }, { + "name" : "b", + "dataType" : "INT NOT NULL" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "d", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ], + "options" : { + "bounded" : "false", + "connector" : "values" + } + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ] ], + "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL" + } ] + }, + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-mini-batch-assigner_1", + "miniBatchInterval" : { + "interval" : 10000, + "mode" : "ProcTime" + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", + "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])" + }, { + "id" : 3, + "type" : "stream-exec-local-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "5" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "b", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + }, { + "name" : "b1", + "internalName" : "$MAX$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "INT NOT NULL" + } ], + "aggCallNeedRetractions" : [ false, false ], + "needRetraction" : false, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `count1$0` BIGINT, `max$1` INT>", + "description" : "LocalGroupAggregate(groupBy=[a], select=[a, COUNT(*) AS count1$0, MAX(b) AS max$1])" + }, { + "id" : 4, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `count1$0` BIGINT, `max$1` INT>", + "description" : "Exchange(distribution=[hash[a]])" + }, { + "id" : 5, + "type" : "stream-exec-global-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "5" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "b", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + }, { + "name" : "b1", + "internalName" : "$MAX$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "INT NOT NULL" + } ], + "aggCallNeedRetractions" : [ false, false ], + "localAggInputRowType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", + "generateUpdateBefore" : true, + "needRetraction" : false, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "globalGroupAggregateState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `b` BIGINT NOT NULL, `b1` INT NOT NULL>", + "description" : "GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(count1$0) AS b, MAX(max$1) AS b1])" + }, { + "id" : 6, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT NOT NULL" + }, { + "kind" : "CALL", + "internalName" : "$MOD$1", + "operands" : [ { + "kind" : "CALL", + "internalName" : "$HASH_CODE$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT NOT NULL" + } ], + "type" : "INT NOT NULL" + }, { + "kind" : "LITERAL", + "value" : 1024, + "type" : "INT NOT NULL" + } ], + "type" : "INT NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT NOT NULL, `b1` INT NOT NULL, `$f2` INT NOT NULL>", + "description" : "Calc(select=[b, b1, MOD(HASH_CODE(b1), 1024) AS $f2])" + }, { + "id" : 7, + "type" : "stream-exec-local-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "5" + }, + "grouping" : [ 0, 2 ], + "aggCalls" : [ { + "name" : null, + "internalName" : "$SUM$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "INT NOT NULL" + }, { + "name" : null, + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : true, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + }, { + "name" : null, + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "aggCallNeedRetractions" : [ true, true, true ], + "needRetraction" : true, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "b", + "fieldType" : "BIGINT NOT NULL" + }, { + "name" : "$f2", + "fieldType" : "INT NOT NULL" + }, { + "name" : "sum$0", + "fieldType" : "INT" + }, { + "name" : "count$1", + "fieldType" : "BIGINT" + }, { + "name" : "count$2", + "fieldType" : "BIGINT" + }, { + "name" : "count1$3", + "fieldType" : "BIGINT" + }, { + "name" : "distinct$0", + "fieldType" : { + "type" : "RAW", + "class" : "org.apache.flink.table.api.dataview.MapView", + "externalDataType" : { + "type" : "STRUCTURED_TYPE", + "implementationClass" : "org.apache.flink.table.api.dataview.MapView", + "attributes" : [ { + "name" : "map", + "attributeType" : "MAP<INT NOT NULL, BIGINT NOT NULL>" + } ] + } + } + } ] + }, + "description" : "LocalGroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, $f2, SUM_RETRACT(b1) AS (sum$0, count$1), COUNT_RETRACT(distinct$0 b1) AS count$2, COUNT_RETRACT(*) AS count1$3, DISTINCT(b1) AS distinct$0])" + }, { + "id" : 8, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0, 1 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "b", + "fieldType" : "BIGINT NOT NULL" + }, { + "name" : "$f2", + "fieldType" : "INT NOT NULL" + }, { + "name" : "sum$0", + "fieldType" : "INT" + }, { + "name" : "count$1", + "fieldType" : "BIGINT" + }, { + "name" : "count$2", + "fieldType" : "BIGINT" + }, { + "name" : "count1$3", + "fieldType" : "BIGINT" + }, { + "name" : "distinct$0", + "fieldType" : { + "type" : "RAW", + "class" : "org.apache.flink.table.api.dataview.MapView", + "externalDataType" : { + "type" : "STRUCTURED_TYPE", + "implementationClass" : "org.apache.flink.table.api.dataview.MapView", + "attributes" : [ { + "name" : "map", + "attributeType" : "MAP<INT NOT NULL, BIGINT NOT NULL>" + } ] + } + } + } ] + }, + "description" : "Exchange(distribution=[hash[b, $f2]])" + }, { + "id" : 9, + "type" : "stream-exec-incremental-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "5" + }, + "partialAggGrouping" : [ 0, 1 ], + "finalAggGrouping" : [ 0 ], + "partialOriginalAggCalls" : [ { + "name" : null, + "internalName" : "$SUM$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "INT NOT NULL" + }, { + "name" : null, + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : true, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + }, { + "name" : null, + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "partialAggCallNeedRetractions" : [ true, true, true ], + "partialLocalAggInputRowType" : "ROW<`b` BIGINT NOT NULL, `b1` INT NOT NULL, `$f2` INT NOT NULL>", + "partialAggNeedRetraction" : true, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "incrementalGroupAggregateState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT NOT NULL, `sum$0` INT, `count$1` BIGINT, `count$2` BIGINT, `count1$3` BIGINT>", + "description" : "IncrementalGroupAggregate(partialAggGrouping=[b, $f2], finalAggGrouping=[b], select=[b, SUM_RETRACT((sum$0, count$1)) AS (sum$0, count$1), COUNT_RETRACT(distinct$0 count$2) AS count$2, COUNT_RETRACT(count1$3) AS count1$3])" + }, { + "id" : 10, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT NOT NULL, `sum$0` INT, `count$1` BIGINT, `count$2` BIGINT, `count1$3` BIGINT>", + "description" : "Exchange(distribution=[hash[b]])" + }, { + "id" : 11, + "type" : "stream-exec-global-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "5" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : null, + "internalName" : "$SUM$1", + "argList" : [ 2 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "INT NOT NULL" + }, { + "name" : null, + "internalName" : "$$SUM0$1", + "argList" : [ 3 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + }, { + "name" : null, + "internalName" : "$$SUM0$1", + "argList" : [ 4 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "aggCallNeedRetractions" : [ true, true, true ], + "localAggInputRowType" : "ROW<`b` BIGINT NOT NULL, `$f2` INT NOT NULL, `$f2_0` INT NOT NULL, `$f3` BIGINT NOT NULL, `$f4` BIGINT NOT NULL>", + "generateUpdateBefore" : true, + "needRetraction" : true, + "indexOfCountStar" : 2, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "globalGroupAggregateState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT NOT NULL, `$f1` INT NOT NULL, `$f2` BIGINT NOT NULL, `$f3` BIGINT NOT NULL>", + "description" : "GlobalGroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, SUM_RETRACT((sum$0, count$1)) AS $f1, $SUM0_RETRACT(count$2) AS $f2, $SUM0_RETRACT(count1$3) AS $f3], indexOfCountStar=[2])" + }, { + "id" : 12, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`MySink`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "sum_b", + "dataType" : "INT" + }, { + "name" : "cnt_distinct_b", + "dataType" : "BIGINT" + }, { + "name" : "cnt1", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ], + "options" : { + "connector" : "values", + "sink-insert-only" : "false", + "table-sink-class" : "DEFAULT" + } + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputUpsertKey" : [ 0 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT NOT NULL, `$f1` INT NOT NULL, `$f2` BIGINT NOT NULL, `$f3` BIGINT NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, $f1, $f2, $f3])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 8, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 9, + "target" : 10, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 10, + "target" : 11, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 11, + "target" : 12, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +}