This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 11890261201 [FLINK-38884][table] Migrate IncrementalAggregateRule to
java
11890261201 is described below
commit 118902612015cc60071d0428007ace336b132605
Author: Roman <[email protected]>
AuthorDate: Sat Jan 10 20:21:34 2026 +0100
[FLINK-38884][table] Migrate IncrementalAggregateRule to java
---
.../physical/stream/IncrementalAggregateRule.java | 217 +++++++++++++++++++++
.../physical/stream/IncrementalAggregateRule.scala | 154 ---------------
2 files changed, 217 insertions(+), 154 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.java
new file mode 100644
index 00000000000..fc4f3a1ab3b
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.PartialFinalType;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalGlobalGroupAggregate;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIncrementalGroupAggregate;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLocalGroupAggregate;
+import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.immutables.value.Value;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig;
+import static
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
+
+/**
+ * Rule that matches final {@link StreamPhysicalGlobalGroupAggregate} on {@link
+ * StreamPhysicalExchange} on final {@link StreamPhysicalLocalGroupAggregate}
on partial {@link
+ * StreamPhysicalGlobalGroupAggregate}, and combines the final {@link
+ * StreamPhysicalLocalGroupAggregate} and the partial {@link
StreamPhysicalGlobalGroupAggregate}
+ * into a {@link StreamPhysicalIncrementalGroupAggregate}.
+ */
[email protected]
+public class IncrementalAggregateRule
+ extends
RelRule<IncrementalAggregateRule.IncrementalAggregateRuleConfig> {
+
+ public static final IncrementalAggregateRule INSTANCE =
+
IncrementalAggregateRule.IncrementalAggregateRuleConfig.DEFAULT.toRule();
+
+ protected IncrementalAggregateRule(
+ IncrementalAggregateRule.IncrementalAggregateRuleConfig config) {
+ super(config);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ StreamPhysicalGlobalGroupAggregate finalGlobalAgg = call.rel(0);
+ StreamPhysicalLocalGroupAggregate finalLocalAgg = call.rel(2);
+ StreamPhysicalGlobalGroupAggregate partialGlobalAgg = call.rel(3);
+
+ TableConfig tableConfig = unwrapTableConfig(call);
+
+ // whether incremental aggregate is enabled
+ boolean incrementalAggEnabled =
+
tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED);
+
+ return partialGlobalAgg.partialFinalType() == PartialFinalType.PARTIAL
+ && finalLocalAgg.partialFinalType() == PartialFinalType.FINAL
+ && finalGlobalAgg.partialFinalType() == PartialFinalType.FINAL
+ && incrementalAggEnabled;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall relOptRuleCall) {
+ StreamPhysicalGlobalGroupAggregate finalGlobalAgg =
relOptRuleCall.rel(0);
+ StreamPhysicalExchange exchange = relOptRuleCall.rel(1);
+ StreamPhysicalLocalGroupAggregate finalLocalAgg =
relOptRuleCall.rel(2);
+ StreamPhysicalGlobalGroupAggregate partialGlobalAgg =
relOptRuleCall.rel(3);
+ RelDataType partialLocalAggInputRowType =
partialGlobalAgg.localAggInputRowType();
+
+ AggregateCall[] partialOriginalAggCalls =
+ JavaScalaConversionUtil.toJava(partialGlobalAgg.aggCalls())
+ .toArray(new AggregateCall[0]);
+
+ AggregateCall[] partialRealAggCalls =
+ partialGlobalAgg.localAggInfoList().getActualAggregateCalls();
+
+ AggregateCall[] finalRealAggCalls =
+ finalGlobalAgg.globalAggInfoList().getActualAggregateCalls();
+
+ StreamPhysicalIncrementalGroupAggregate incrAgg =
+ new StreamPhysicalIncrementalGroupAggregate(
+ partialGlobalAgg.getCluster(),
+ finalLocalAgg.getTraitSet(), // extends final local
agg traits (ACC trait)
+ partialGlobalAgg.getInput(),
+ partialGlobalAgg.grouping(),
+ partialRealAggCalls,
+ finalLocalAgg.grouping(),
+ finalRealAggCalls,
+ partialOriginalAggCalls,
+ partialGlobalAgg.aggCallNeedRetractions(),
+ partialGlobalAgg.needRetraction(),
+ partialLocalAggInputRowType,
+ partialGlobalAgg.getRowType(),
+ partialGlobalAgg.hints());
+ RelDataType incrAggOutputRowType = incrAgg.getRowType();
+
+ StreamPhysicalExchange newExchange =
+ exchange.copy(exchange.getTraitSet(), incrAgg,
exchange.distribution);
+
+ boolean partialAggCountStarInserted =
+ partialGlobalAgg.globalAggInfoList().countStarInserted();
+
+ RelNode globalAgg;
+ if (partialAggCountStarInserted) {
+ RelDataType globalAggInputAccType = finalLocalAgg.getRowType();
+ Preconditions.checkState(
+ RelOptUtil.areRowTypesEqual(
+ incrAggOutputRowType, globalAggInputAccType,
false));
+
+ globalAgg =
+ finalGlobalAgg.copy(
+ finalGlobalAgg.getTraitSet(),
Collections.singletonList(newExchange));
+ } else {
+ // adapt the needRetract of final global agg to be same as that of
partial agg
+ AggregateInfoList localAggInfoList =
+ AggregateUtil.transformToStreamAggregateInfoList(
+ unwrapTypeFactory(finalGlobalAgg),
+ // the final agg input is partial agg
+
FlinkTypeFactory.toLogicalRowType(partialGlobalAgg.getRowType()),
+
JavaScalaConversionUtil.toScala(Arrays.asList(finalRealAggCalls)),
+ // use partial global agg's aggCallNeedRetractions
+ partialGlobalAgg.aggCallNeedRetractions(),
+ partialGlobalAgg.needRetraction(),
+
partialGlobalAgg.globalAggInfoList().indexOfCountStar(),
+ // the local agg is not works on state
+ false,
+ true);
+
+ // check whether the global agg required input row type equals the
incr agg output row
+ // type
+ RelDataType globalAggInputAccType =
+ AggregateUtil.inferLocalAggRowType(
+ localAggInfoList,
+ incrAgg.getRowType(),
+ finalGlobalAgg.grouping(),
+ (FlinkTypeFactory)
finalGlobalAgg.getCluster().getTypeFactory());
+
+ Preconditions.checkState(
+ RelOptUtil.areRowTypesEqual(
+ incrAggOutputRowType, globalAggInputAccType,
false));
+
+ globalAgg =
+ new StreamPhysicalGlobalGroupAggregate(
+ finalGlobalAgg.getCluster(),
+ finalGlobalAgg.getTraitSet(),
+ newExchange,
+ finalGlobalAgg.getRowType(),
+ finalGlobalAgg.grouping(),
+
JavaScalaConversionUtil.toScala(Arrays.asList(finalRealAggCalls)),
+ partialGlobalAgg.aggCallNeedRetractions(),
+ finalGlobalAgg.localAggInputRowType(),
+ partialGlobalAgg.needRetraction(),
+ finalGlobalAgg.partialFinalType(),
+
partialGlobalAgg.globalAggInfoList().indexOfCountStar(),
+ finalGlobalAgg.hints());
+ }
+
+ relOptRuleCall.transformTo(globalAgg);
+ }
+
+ /** Configuration for {@link IncrementalAggregateRule}. */
+ @Value.Immutable(singleton = false)
+ public interface IncrementalAggregateRuleConfig extends RelRule.Config {
+ IncrementalAggregateRule.IncrementalAggregateRuleConfig DEFAULT =
+
ImmutableIncrementalAggregateRule.IncrementalAggregateRuleConfig.builder()
+ .build()
+ .withOperandSupplier(
+ b0 ->
+
b0.operand(StreamPhysicalGlobalGroupAggregate.class)
+ .oneInput(
+ b1 ->
+ b1.operand(
+
StreamPhysicalExchange
+
.class)
+
.oneInput(
+
b2 ->
+
b2.operand(
+
StreamPhysicalLocalGroupAggregate
+
.class)
+
.oneInput(
+
b3 ->
+
b3.operand(
+
StreamPhysicalGlobalGroupAggregate
+
.class)
+
.anyInputs()))))
+ .withDescription("IncrementalAggregateRule")
+
.as(IncrementalAggregateRule.IncrementalAggregateRuleConfig.class);
+
+ @Override
+ default IncrementalAggregateRule toRule() {
+ return new IncrementalAggregateRule(this);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
deleted file mode 100644
index 0b788427b65..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.physical.stream
-
-import org.apache.flink.table.api.config.OptimizerConfigOptions
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.plan.PartialFinalType
-import
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalExchange,
StreamPhysicalGlobalGroupAggregate, StreamPhysicalIncrementalGroupAggregate,
StreamPhysicalLocalGroupAggregate}
-import org.apache.flink.table.planner.plan.utils.AggregateUtil
-import org.apache.flink.table.planner.utils.ShortcutUtils.{unwrapTableConfig,
unwrapTypeFactory}
-import org.apache.flink.util.Preconditions
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-
-import java.util.Collections
-
-/**
- * Rule that matches final [[StreamPhysicalGlobalGroupAggregate]] on
[[StreamPhysicalExchange]] on
- * final [[StreamPhysicalLocalGroupAggregate]] on partial
[[StreamPhysicalGlobalGroupAggregate]],
- * and combines the final [[StreamPhysicalLocalGroupAggregate]] and the partial
- * [[StreamPhysicalGlobalGroupAggregate]] into a
[[StreamPhysicalIncrementalGroupAggregate]].
- */
-class IncrementalAggregateRule
- extends RelOptRule(
- operand(
- classOf[StreamPhysicalGlobalGroupAggregate], // final global agg
- operand(
- classOf[StreamPhysicalExchange], // key by
- operand(
- classOf[StreamPhysicalLocalGroupAggregate], // final local agg
- operand(classOf[StreamPhysicalGlobalGroupAggregate], any())
- )
- )
- ), // partial global agg
- "IncrementalAggregateRule") {
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val finalGlobalAgg: StreamPhysicalGlobalGroupAggregate = call.rel(0)
- val finalLocalAgg: StreamPhysicalLocalGroupAggregate = call.rel(2)
- val partialGlobalAgg: StreamPhysicalGlobalGroupAggregate = call.rel(3)
-
- val tableConfig = unwrapTableConfig(call)
-
- // whether incremental aggregate is enabled
- val incrementalAggEnabled =
-
tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED)
-
- partialGlobalAgg.partialFinalType == PartialFinalType.PARTIAL &&
- finalLocalAgg.partialFinalType == PartialFinalType.FINAL &&
- finalGlobalAgg.partialFinalType == PartialFinalType.FINAL &&
- incrementalAggEnabled
- }
-
- override def onMatch(call: RelOptRuleCall): Unit = {
- val finalGlobalAgg: StreamPhysicalGlobalGroupAggregate = call.rel(0)
- val exchange: StreamPhysicalExchange = call.rel(1)
- val finalLocalAgg: StreamPhysicalLocalGroupAggregate = call.rel(2)
- val partialGlobalAgg: StreamPhysicalGlobalGroupAggregate = call.rel(3)
- val partialLocalAggInputRowType = partialGlobalAgg.localAggInputRowType
-
- val partialOriginalAggCalls = partialGlobalAgg.aggCalls.toArray
- val partialRealAggCalls =
partialGlobalAgg.localAggInfoList.getActualAggregateCalls
- val finalRealAggCalls =
finalGlobalAgg.globalAggInfoList.getActualAggregateCalls
-
- val incrAgg = new StreamPhysicalIncrementalGroupAggregate(
- partialGlobalAgg.getCluster,
- finalLocalAgg.getTraitSet, // extends final local agg traits (ACC trait)
- partialGlobalAgg.getInput,
- partialGlobalAgg.grouping,
- partialRealAggCalls,
- finalLocalAgg.grouping,
- finalRealAggCalls,
- partialOriginalAggCalls,
- partialGlobalAgg.aggCallNeedRetractions,
- partialGlobalAgg.needRetraction,
- partialLocalAggInputRowType,
- partialGlobalAgg.getRowType,
- partialGlobalAgg.hints
- )
- val incrAggOutputRowType = incrAgg.getRowType
-
- val newExchange = exchange.copy(exchange.getTraitSet, incrAgg,
exchange.distribution)
-
- val partialAggCountStarInserted =
partialGlobalAgg.globalAggInfoList.countStarInserted
-
- val globalAgg = if (partialAggCountStarInserted) {
- val globalAggInputAccType = finalLocalAgg.getRowType
- Preconditions.checkState(
- RelOptUtil.areRowTypesEqual(incrAggOutputRowType,
globalAggInputAccType, false))
- finalGlobalAgg.copy(finalGlobalAgg.getTraitSet,
Collections.singletonList(newExchange))
- } else {
- // adapt the needRetract of final global agg to be same as that of
partial agg
- val localAggInfoList = AggregateUtil.transformToStreamAggregateInfoList(
- unwrapTypeFactory(finalGlobalAgg),
- // the final agg input is partial agg
- FlinkTypeFactory.toLogicalRowType(partialGlobalAgg.getRowType),
- finalRealAggCalls,
- // use partial global agg's aggCallNeedRetractions
- partialGlobalAgg.aggCallNeedRetractions,
- partialGlobalAgg.needRetraction,
- partialGlobalAgg.globalAggInfoList.indexOfCountStar,
- // the local agg is not works on state
- isStateBackendDataViews = false,
- needDistinctInfo = true
- )
-
- // check whether the global agg required input row type equals the incr
agg output row type
- val globalAggInputAccType = AggregateUtil.inferLocalAggRowType(
- localAggInfoList,
- incrAgg.getRowType,
- finalGlobalAgg.grouping,
-
finalGlobalAgg.getCluster.getTypeFactory.asInstanceOf[FlinkTypeFactory])
-
- Preconditions.checkState(
- RelOptUtil.areRowTypesEqual(incrAggOutputRowType,
globalAggInputAccType, false))
-
- new StreamPhysicalGlobalGroupAggregate(
- finalGlobalAgg.getCluster,
- finalGlobalAgg.getTraitSet,
- newExchange,
- finalGlobalAgg.getRowType,
- finalGlobalAgg.grouping,
- finalRealAggCalls,
- partialGlobalAgg.aggCallNeedRetractions,
- finalGlobalAgg.localAggInputRowType,
- partialGlobalAgg.needRetraction,
- finalGlobalAgg.partialFinalType,
- partialGlobalAgg.globalAggInfoList.indexOfCountStar,
- finalGlobalAgg.hints)
- }
-
- call.transformTo(globalAgg)
- }
-}
-
-object IncrementalAggregateRule {
- val INSTANCE = new IncrementalAggregateRule
-}