This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c2eac7ec85b [FLINK-34160][table] Migration of FlinkCalcMergeRule to java c2eac7ec85b is described below commit c2eac7ec85bef93fe2b61c028984e704c5a9d126 Author: Sergey Nuyanzin <snuyan...@gmail.com> AuthorDate: Mon Feb 19 23:30:28 2024 +0100 [FLINK-34160][table] Migration of FlinkCalcMergeRule to java --- .../plan/rules/logical/FlinkCalcMergeRule.java | 117 +++++++++++++++++++++ .../plan/rules/logical/FlinkCalcMergeRule.scala | 83 --------------- .../PushFilterInCalcIntoTableSourceRuleTest.java | 2 +- 3 files changed, 118 insertions(+), 84 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.java new file mode 100644 index 00000000000..f82a1bcf188 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc; +import org.apache.flink.table.planner.plan.utils.FlinkRelUtil; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.core.Calc; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rex.RexOver; +import org.apache.calcite.rex.RexProgram; +import org.immutables.value.Value; + +/** + * This rule is copied from Calcite's [[org.apache.calcite.rel.rules.CalcMergeRule]]. + * + * <p>Modification: - Condition in the merged program will be simplified if it exists. - If the two + * [[Calc]] can merge into one, each non-deterministic [[RexNode]] of bottom [[Calc]] should appear + * at most once in the project list and filter list of top [[Calc]]. + */ + +/** + * Planner rule that merges a [[Calc]] onto a [[Calc]]. + * + * <p>The resulting [[Calc]] has the same project list as the upper [[Calc]], but expressed in terms + * of the lower [[Calc]]'s inputs. + */ +@Value.Enclosing +public class FlinkCalcMergeRule extends RelRule<FlinkCalcMergeRule.FlinkCalcMergeRuleConfig> { + + public static final FlinkCalcMergeRule INSTANCE = FlinkCalcMergeRuleConfig.DEFAULT.toRule(); + public static final FlinkCalcMergeRule STREAM_PHYSICAL_INSTANCE = + FlinkCalcMergeRuleConfig.STREAM_PHYSICAL.toRule(); + + protected FlinkCalcMergeRule(FlinkCalcMergeRuleConfig config) { + super(config); + } + + public boolean matches(RelOptRuleCall call) { + Calc topCalc = call.rel(0); + Calc bottomCalc = call.rel(1); + + // Don't merge a calc which contains windowed aggregates onto a + // calc. That would effectively be pushing a windowed aggregate down + // through a filter. + RexProgram topProgram = topCalc.getProgram(); + if (RexOver.containsOver(topProgram)) { + return false; + } + + return FlinkRelUtil.isMergeable(topCalc, bottomCalc); + } + + public void onMatch(RelOptRuleCall call) { + Calc topCalc = call.rel(0); + Calc bottomCalc = call.rel(1); + + Calc newCalc = FlinkRelUtil.merge(topCalc, bottomCalc); + if (newCalc.getDigest() == bottomCalc.getDigest()) { + // newCalc is equivalent to bottomCalc, + // which means that topCalc + // must be trivial. Take it out of the game. + call.getPlanner().prune(topCalc); + } + call.transformTo(newCalc); + } + + /** Rule configuration. */ + @Value.Immutable(singleton = false) + public interface FlinkCalcMergeRuleConfig extends RelRule.Config { + FlinkCalcMergeRule.FlinkCalcMergeRuleConfig DEFAULT = + ImmutableFlinkCalcMergeRule.FlinkCalcMergeRuleConfig.builder() + .build() + .withOperandSupplier( + b0 -> + b0.operand(Calc.class) + .inputs(b1 -> b1.operand(Calc.class).anyInputs())) + .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER) + .withDescription("FlinkCalcMergeRule"); + + FlinkCalcMergeRule.FlinkCalcMergeRuleConfig STREAM_PHYSICAL = + ImmutableFlinkCalcMergeRule.FlinkCalcMergeRuleConfig.builder() + .build() + .withOperandSupplier( + b0 -> + b0.operand(StreamPhysicalCalc.class) + .inputs( + b1 -> + b1.operand(StreamPhysicalCalc.class) + .anyInputs())) + .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER) + .withDescription("FlinkCalcMergeRule"); + + @Override + default FlinkCalcMergeRule toRule() { + return new FlinkCalcMergeRule(this); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.scala deleted file mode 100644 index da5533b2a6e..00000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.logical - -import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc -import org.apache.flink.table.planner.plan.utils.FlinkRelUtil - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.calcite.plan.RelOptRule.{any, operand} -import org.apache.calcite.rel.core.{Calc, RelFactories} -import org.apache.calcite.rex.{RexNode, RexOver} - -/** - * This rule is copied from Calcite's [[org.apache.calcite.rel.rules.CalcMergeRule]]. - * - * Modification: - * - Condition in the merged program will be simplified if it exists. - * - If the two [[Calc]] can merge into one, each non-deterministic [[RexNode]] of bottom [[Calc]] - * should appear at most once in the project list and filter list of top [[Calc]]. - */ - -/** - * Planner rule that merges a [[Calc]] onto a [[Calc]]. - * - * <p>The resulting [[Calc]] has the same project list as the upper [[Calc]], but expressed in terms - * of the lower [[Calc]]'s inputs. - */ -class FlinkCalcMergeRule[C <: Calc](calcClass: Class[C]) - extends RelOptRule( - operand(calcClass, operand(calcClass, any)), - RelFactories.LOGICAL_BUILDER, - "FlinkCalcMergeRule") { - - override def matches(call: RelOptRuleCall): Boolean = { - val topCalc: Calc = call.rel(0) - val bottomCalc: Calc = call.rel(1) - - // Don't merge a calc which contains windowed aggregates onto a - // calc. That would effectively be pushing a windowed aggregate down - // through a filter. - val topProgram = topCalc.getProgram - if (RexOver.containsOver(topProgram)) { - return false - } - - FlinkRelUtil.isMergeable(topCalc, bottomCalc) - } - - override def onMatch(call: RelOptRuleCall): Unit = { - val topCalc: Calc = call.rel(0) - val bottomCalc: Calc = call.rel(1) - - val newCalc = FlinkRelUtil.merge(topCalc, bottomCalc) - if (newCalc.getDigest == bottomCalc.getDigest) { - // newCalc is equivalent to bottomCalc, - // which means that topCalc - // must be trivial. Take it out of the game. - call.getPlanner.prune(topCalc) - } - call.transformTo(newCalc) - } - -} - -object FlinkCalcMergeRule { - val INSTANCE = new FlinkCalcMergeRule(classOf[Calc]) - val STREAM_PHYSICAL_INSTANCE = new FlinkCalcMergeRule(classOf[StreamPhysicalCalc]) -} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.java index 1d929b751ae..3e3bbb76fa7 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.java @@ -52,7 +52,7 @@ class PushFilterInCalcIntoTableSourceRuleTest extends PushFilterIntoTableSourceS RuleSets.ofList( CoreRules.PROJECT_TO_CALC, CoreRules.FILTER_TO_CALC, - FlinkCalcMergeRule$.MODULE$.INSTANCE(), + FlinkCalcMergeRule.INSTANCE, FlinkLogicalCalc.CONVERTER(), FlinkLogicalTableSourceScan.CONVERTER(), FlinkLogicalWatermarkAssigner.CONVERTER()))