This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5844092408d21023a738077d0922cc75f1e634d7 Author: bvarghese1 <bvargh...@confluent.io> AuthorDate: Fri Jan 19 09:11:28 2024 -0800 [FLINK-34000] Remove IncrementalGroupAgg Json Plan & IT tests - These are covered by restore tests --- .../stream/IncrementalAggregateJsonPlanTest.java | 106 ---- .../IncrementalAggregateJsonPlanITCase.java | 78 --- .../testIncrementalAggregate.out | 401 -------------- ...lAggregateWithSumCountDistinctAndRetraction.out | 585 --------------------- 4 files changed, 1170 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java deleted file mode 100644 index 26dcc04f303..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.apache.flink.table.api.config.OptimizerConfigOptions; -import org.apache.flink.table.planner.plan.rules.physical.stream.IncrementalAggregateRule; -import org.apache.flink.table.planner.utils.AggregatePhaseStrategy; -import org.apache.flink.table.planner.utils.StreamTableTestUtil; -import org.apache.flink.table.planner.utils.TableTestBase; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.time.Duration; - -/** Test json serialization/deserialization for incremental aggregate. */ -class IncrementalAggregateJsonPlanTest extends TableTestBase { - - private StreamTableTestUtil util; - private TableEnvironment tEnv; - - @BeforeEach - void setup() { - util = streamTestUtil(TableConfig.getDefault()); - tEnv = util.getTableEnv(); - tEnv.getConfig() - .set( - OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, - AggregatePhaseStrategy.TWO_PHASE.name()) - .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true) - .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true) - .set( - ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, - Duration.ofSeconds(10)) - .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L) - .set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(), true); - - String srcTableDdl = - "CREATE TABLE MyTable (\n" - + " a bigint,\n" - + " b int not null,\n" - + " c varchar,\n" - + " d bigint\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'bounded' = 'false')"; - tEnv.executeSql(srcTableDdl); - } - - @Test - void testIncrementalAggregate() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a bigint,\n" - + " c bigint\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select a, " - + "count(distinct c) as c " - + "from MyTable group by a"); - } - - @Test - void testIncrementalAggregateWithSumCountDistinctAndRetraction() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b bigint,\n" - + " sum_b int,\n" - + " cnt_distinct_b bigint,\n" - + " cnt1 bigint\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink " - + "select b, sum(b1), count(distinct b1), count(1) " - + " from " - + " (select a, count(b) as b, max(b) as b1 from MyTable group by a)" - + " group by b"); - } -} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java deleted file mode 100644 index 6f72a3930a8..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.runtime.stream.jsonplan; - -import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.apache.flink.table.api.config.OptimizerConfigOptions; -import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.apache.flink.table.planner.plan.rules.physical.stream.IncrementalAggregateRule; -import org.apache.flink.table.planner.runtime.utils.TestData; -import org.apache.flink.table.planner.utils.AggregatePhaseStrategy; -import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; -import org.apache.flink.table.planner.utils.JsonPlanTestBase; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.time.Duration; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.ExecutionException; - -/** Test for incremental aggregate json plan. */ -class IncrementalAggregateJsonPlanITCase extends JsonPlanTestBase { - - @BeforeEach - @Override - protected void setup() throws Exception { - super.setup(); - tableEnv.getConfig() - .set( - OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, - AggregatePhaseStrategy.TWO_PHASE.name()) - .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true) - .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true) - .set( - ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, - Duration.ofSeconds(10)) - .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L) - .set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(), true); - } - - @Test - void testIncrementalAggregate() throws IOException, ExecutionException, InterruptedException { - createTestValuesSourceTable( - "MyTable", - JavaScalaConversionUtil.toJava(TestData.smallData3()), - "a int", - "b bigint", - "c varchar"); - createTestNonInsertOnlyValuesSinkTable( - "MySink", "b bigint", "a bigint", "primary key (b) not enforced"); - compileSqlAndExecutePlan( - "insert into MySink select b, " - + "count(distinct a) as a " - + "from MyTable group by b") - .await(); - - List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink"); - assertResult(Arrays.asList("+I[1, 1]", "+I[2, 2]"), result); - } -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out deleted file mode 100644 index 46cc85e26f3..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out +++ /dev/null @@ -1,401 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT NOT NULL" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 2 ] ], - "producedType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647)> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647)> NOT NULL" - } ] - }, - "outputType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c], metadata=[]]], fields=[a, c])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-mini-batch-assigner_1", - "miniBatchInterval" : { - "interval" : 10000, - "mode" : "ProcTime" - }, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647)>", - "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])" - }, { - "id" : 3, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "VARCHAR(2147483647)" - }, { - "kind" : "CALL", - "internalName" : "$MOD$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$HASH_CODE$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "VARCHAR(2147483647)" - } ], - "type" : "INT" - }, { - "kind" : "LITERAL", - "value" : 1024, - "type" : "INT NOT NULL" - } ], - "type" : "INT" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647), `$f2` INT>", - "description" : "Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])" - }, { - "id" : 4, - "type" : "stream-exec-local-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ 0, 2 ], - "aggCalls" : [ { - "name" : null, - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - } ], - "aggCallNeedRetractions" : [ false ], - "needRetraction" : false, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "BIGINT" - }, { - "name" : "$f2", - "fieldType" : "INT" - }, { - "name" : "count$0", - "fieldType" : "BIGINT" - }, { - "name" : "distinct$0", - "fieldType" : { - "type" : "RAW", - "class" : "org.apache.flink.table.api.dataview.MapView", - "externalDataType" : { - "logicalType" : { - "type" : "STRUCTURED_TYPE", - "implementationClass" : "org.apache.flink.table.api.dataview.MapView", - "attributes" : [ { - "name" : "map", - "attributeType" : "MAP<VARCHAR(2147483647), BIGINT NOT NULL>" - } ] - }, - "fields" : [ { - "name" : "map", - "keyClass" : { - "conversionClass" : "org.apache.flink.table.data.StringData" - } - } ] - } - } - } ] - }, - "description" : "LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])" - }, { - "id" : 5, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0, 1 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "BIGINT" - }, { - "name" : "$f2", - "fieldType" : "INT" - }, { - "name" : "count$0", - "fieldType" : "BIGINT" - }, { - "name" : "distinct$0", - "fieldType" : { - "type" : "RAW", - "class" : "org.apache.flink.table.api.dataview.MapView", - "externalDataType" : { - "logicalType" : { - "type" : "STRUCTURED_TYPE", - "implementationClass" : "org.apache.flink.table.api.dataview.MapView", - "attributes" : [ { - "name" : "map", - "attributeType" : "MAP<VARCHAR(2147483647), BIGINT NOT NULL>" - } ] - }, - "fields" : [ { - "name" : "map", - "keyClass" : { - "conversionClass" : "org.apache.flink.table.data.StringData" - } - } ] - } - } - } ] - }, - "description" : "Exchange(distribution=[hash[a, $f2]])" - }, { - "id" : 6, - "type" : "stream-exec-incremental-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "partialAggGrouping" : [ 0, 1 ], - "finalAggGrouping" : [ 0 ], - "partialOriginalAggCalls" : [ { - "name" : null, - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - } ], - "partialAggCallNeedRetractions" : [ false ], - "partialLocalAggInputRowType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647), `$f2` INT>", - "partialAggNeedRetraction" : false, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "incrementalGroupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `count$0` BIGINT>", - "description" : "IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0])" - }, { - "id" : 7, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `count$0` BIGINT>", - "description" : "Exchange(distribution=[hash[a]])" - }, { - "id" : 8, - "type" : "stream-exec-global-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ 0 ], - "aggCalls" : [ { - "name" : null, - "internalName" : "$$SUM0$1", - "argList" : [ 2 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - } ], - "aggCallNeedRetractions" : [ false ], - "localAggInputRowType" : "ROW<`a` BIGINT, `$f2` INT, `$f2_0` BIGINT NOT NULL>", - "generateUpdateBefore" : true, - "needRetraction" : false, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "globalGroupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `$f1` BIGINT NOT NULL>", - "description" : "GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(count$0) AS $f1])" - }, { - "id" : 9, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "c", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], - "inputUpsertKey" : [ 0 ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `$f1` BIGINT NOT NULL>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, $f1])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 6, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 7, - "target" : 8, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 8, - "target" : 9, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out deleted file mode 100644 index 7a48fa0143f..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out +++ /dev/null @@ -1,585 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT NOT NULL" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 1 ] ], - "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL" - } ] - }, - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-mini-batch-assigner_1", - "miniBatchInterval" : { - "interval" : 10000, - "mode" : "ProcTime" - }, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", - "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])" - }, { - "id" : 3, - "type" : "stream-exec-local-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ 0 ], - "aggCalls" : [ { - "name" : "b", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : "b1", - "internalName" : "$MAX$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "INT NOT NULL" - } ], - "aggCallNeedRetractions" : [ false, false ], - "needRetraction" : false, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `count1$0` BIGINT, `max$1` INT>", - "description" : "LocalGroupAggregate(groupBy=[a], select=[a, COUNT(*) AS count1$0, MAX(b) AS max$1])" - }, { - "id" : 4, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `count1$0` BIGINT, `max$1` INT>", - "description" : "Exchange(distribution=[hash[a]])" - }, { - "id" : 5, - "type" : "stream-exec-global-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ 0 ], - "aggCalls" : [ { - "name" : "b", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : "b1", - "internalName" : "$MAX$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "INT NOT NULL" - } ], - "aggCallNeedRetractions" : [ false, false ], - "localAggInputRowType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", - "generateUpdateBefore" : true, - "needRetraction" : false, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "globalGroupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` BIGINT NOT NULL, `b1` INT NOT NULL>", - "description" : "GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(count1$0) AS b, MAX(max$1) AS b1])" - }, { - "id" : 6, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT NOT NULL" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "INT NOT NULL" - }, { - "kind" : "CALL", - "internalName" : "$MOD$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$HASH_CODE$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "INT NOT NULL" - } ], - "type" : "INT NOT NULL" - }, { - "kind" : "LITERAL", - "value" : 1024, - "type" : "INT NOT NULL" - } ], - "type" : "INT NOT NULL" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT NOT NULL, `b1` INT NOT NULL, `$f2` INT NOT NULL>", - "description" : "Calc(select=[b, b1, MOD(HASH_CODE(b1), 1024) AS $f2])" - }, { - "id" : 7, - "type" : "stream-exec-local-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ 0, 2 ], - "aggCalls" : [ { - "name" : null, - "internalName" : "$SUM$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "INT NOT NULL" - }, { - "name" : null, - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : null, - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - } ], - "aggCallNeedRetractions" : [ true, true, true ], - "needRetraction" : true, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "b", - "fieldType" : "BIGINT NOT NULL" - }, { - "name" : "$f2", - "fieldType" : "INT NOT NULL" - }, { - "name" : "sum$0", - "fieldType" : "INT" - }, { - "name" : "count$1", - "fieldType" : "BIGINT" - }, { - "name" : "count$2", - "fieldType" : "BIGINT" - }, { - "name" : "count1$3", - "fieldType" : "BIGINT" - }, { - "name" : "distinct$0", - "fieldType" : { - "type" : "RAW", - "class" : "org.apache.flink.table.api.dataview.MapView", - "externalDataType" : { - "type" : "STRUCTURED_TYPE", - "implementationClass" : "org.apache.flink.table.api.dataview.MapView", - "attributes" : [ { - "name" : "map", - "attributeType" : "MAP<INT NOT NULL, BIGINT NOT NULL>" - } ] - } - } - } ] - }, - "description" : "LocalGroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, $f2, SUM_RETRACT(b1) AS (sum$0, count$1), COUNT_RETRACT(distinct$0 b1) AS count$2, COUNT_RETRACT(*) AS count1$3, DISTINCT(b1) AS distinct$0])" - }, { - "id" : 8, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0, 1 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "b", - "fieldType" : "BIGINT NOT NULL" - }, { - "name" : "$f2", - "fieldType" : "INT NOT NULL" - }, { - "name" : "sum$0", - "fieldType" : "INT" - }, { - "name" : "count$1", - "fieldType" : "BIGINT" - }, { - "name" : "count$2", - "fieldType" : "BIGINT" - }, { - "name" : "count1$3", - "fieldType" : "BIGINT" - }, { - "name" : "distinct$0", - "fieldType" : { - "type" : "RAW", - "class" : "org.apache.flink.table.api.dataview.MapView", - "externalDataType" : { - "type" : "STRUCTURED_TYPE", - "implementationClass" : "org.apache.flink.table.api.dataview.MapView", - "attributes" : [ { - "name" : "map", - "attributeType" : "MAP<INT NOT NULL, BIGINT NOT NULL>" - } ] - } - } - } ] - }, - "description" : "Exchange(distribution=[hash[b, $f2]])" - }, { - "id" : 9, - "type" : "stream-exec-incremental-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "partialAggGrouping" : [ 0, 1 ], - "finalAggGrouping" : [ 0 ], - "partialOriginalAggCalls" : [ { - "name" : null, - "internalName" : "$SUM$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "INT NOT NULL" - }, { - "name" : null, - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : null, - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - } ], - "partialAggCallNeedRetractions" : [ true, true, true ], - "partialLocalAggInputRowType" : "ROW<`b` BIGINT NOT NULL, `b1` INT NOT NULL, `$f2` INT NOT NULL>", - "partialAggNeedRetraction" : true, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "incrementalGroupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT NOT NULL, `sum$0` INT, `count$1` BIGINT, `count$2` BIGINT, `count1$3` BIGINT>", - "description" : "IncrementalGroupAggregate(partialAggGrouping=[b, $f2], finalAggGrouping=[b], select=[b, SUM_RETRACT((sum$0, count$1)) AS (sum$0, count$1), COUNT_RETRACT(distinct$0 count$2) AS count$2, COUNT_RETRACT(count1$3) AS count1$3])" - }, { - "id" : 10, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT NOT NULL, `sum$0` INT, `count$1` BIGINT, `count$2` BIGINT, `count1$3` BIGINT>", - "description" : "Exchange(distribution=[hash[b]])" - }, { - "id" : 11, - "type" : "stream-exec-global-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ 0 ], - "aggCalls" : [ { - "name" : null, - "internalName" : "$SUM$1", - "argList" : [ 2 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "INT NOT NULL" - }, { - "name" : null, - "internalName" : "$$SUM0$1", - "argList" : [ 3 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : null, - "internalName" : "$$SUM0$1", - "argList" : [ 4 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - } ], - "aggCallNeedRetractions" : [ true, true, true ], - "localAggInputRowType" : "ROW<`b` BIGINT NOT NULL, `$f2` INT NOT NULL, `$f2_0` INT NOT NULL, `$f3` BIGINT NOT NULL, `$f4` BIGINT NOT NULL>", - "generateUpdateBefore" : true, - "needRetraction" : true, - "indexOfCountStar" : 2, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "globalGroupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT NOT NULL, `$f1` INT NOT NULL, `$f2` BIGINT NOT NULL, `$f3` BIGINT NOT NULL>", - "description" : "GlobalGroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, SUM_RETRACT((sum$0, count$1)) AS $f1, $SUM0_RETRACT(count$2) AS $f2, $SUM0_RETRACT(count1$3) AS $f3], indexOfCountStar=[2])" - }, { - "id" : 12, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "b", - "dataType" : "BIGINT" - }, { - "name" : "sum_b", - "dataType" : "INT" - }, { - "name" : "cnt_distinct_b", - "dataType" : "BIGINT" - }, { - "name" : "cnt1", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], - "inputUpsertKey" : [ 0 ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT NOT NULL, `$f1` INT NOT NULL, `$f2` BIGINT NOT NULL, `$f3` BIGINT NOT NULL>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, $f1, $f2, $f3])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 6, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 7, - "target" : 8, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 8, - "target" : 9, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 9, - "target" : 10, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 10, - "target" : 11, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 11, - "target" : 12, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -}