This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 5f38a5702fc [FLINK-34494][table] Migrate ReplaceIntersectWithSemiJoinRule to java 5f38a5702fc is described below commit 5f38a5702fc3629e29cae0d531912b1ae9b47e08 Author: Jacky Lau <liuyon...@gmail.com> AuthorDate: Tue Mar 12 04:27:09 2024 +0800 [FLINK-34494][table] Migrate ReplaceIntersectWithSemiJoinRule to java --- .../logical/ReplaceIntersectWithSemiJoinRule.java | 98 ++++++++++++++++++++++ .../logical/ReplaceIntersectWithSemiJoinRule.scala | 65 -------------- 2 files changed, 98 insertions(+), 65 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.java new file mode 100644 index 00000000000..d71f4bd4d5e --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.Util; +import org.immutables.value.Value; + +import java.util.List; + +import static org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition; + +/** + * Planner rule that replaces distinct {@link Intersect} with a distinct {@link Aggregate} on a SEMI + * {@link Join}. + * + * <p>Only handle the case of input size 2. + */ +@Value.Enclosing +public class ReplaceIntersectWithSemiJoinRule + extends RelRule<ReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig> { + + public static final ReplaceIntersectWithSemiJoinRule INSTANCE = + ReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig.DEFAULT + .toRule(); + + private ReplaceIntersectWithSemiJoinRule(ReplaceIntersectWithSemiJoinRuleConfig config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + Intersect intersect = call.rel(0); + return !intersect.all && intersect.getInputs().size() == 2; + } + + @Override + public void onMatch(RelOptRuleCall call) { + Intersect intersect = call.rel(0); + RelNode left = intersect.getInput(0); + RelNode right = intersect.getInput(1); + + RelBuilder relBuilder = call.builder(); + List<Integer> keys = Util.range(left.getRowType().getFieldCount()); + List<RexNode> conditions = generateEqualsCondition(relBuilder, left, right, keys); + + relBuilder.push(left); + relBuilder.push(right); + relBuilder + .join(JoinRelType.SEMI, conditions) + .aggregate( + relBuilder.groupKey(keys.stream().mapToInt(Integer::intValue).toArray())); + RelNode rel = relBuilder.build(); + call.transformTo(rel); + } + + /** Rule configuration. */ + @Value.Immutable(singleton = false) + public interface ReplaceIntersectWithSemiJoinRuleConfig extends RelRule.Config { + ReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig DEFAULT = + ImmutableReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig + .builder() + .build() + .withOperandSupplier(b0 -> b0.operand(Intersect.class).anyInputs()) + .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER) + .withDescription("ReplaceIntersectWithSemiJoinRule"); + + @Override + default ReplaceIntersectWithSemiJoinRule toRule() { + return new ReplaceIntersectWithSemiJoinRule(this); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.scala deleted file mode 100644 index 045e39b220b..00000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.logical - -import org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.calcite.plan.RelOptRule.{any, operand} -import org.apache.calcite.rel.core.{Aggregate, Intersect, Join, JoinRelType, RelFactories} - -import scala.collection.JavaConverters._ - -/** - * Planner rule that replaces distinct [[Intersect]] with a distinct [[Aggregate]] on a SEMI - * [[Join]]. - * - * Only handle the case of input size 2. - */ -class ReplaceIntersectWithSemiJoinRule - extends RelOptRule( - operand(classOf[Intersect], any), - RelFactories.LOGICAL_BUILDER, - "ReplaceIntersectWithSemiJoinRule") { - - override def matches(call: RelOptRuleCall): Boolean = { - val intersect: Intersect = call.rel(0) - !intersect.all && intersect.getInputs.size() == 2 - } - - override def onMatch(call: RelOptRuleCall): Unit = { - val intersect: Intersect = call.rel(0) - val left = intersect.getInput(0) - val right = intersect.getInput(1) - - val relBuilder = call.builder - val keys = 0 until left.getRowType.getFieldCount - val conditions = - generateEqualsCondition(relBuilder, left, right, keys.map(Integer.valueOf).toList.asJava) - - relBuilder.push(left) - relBuilder.push(right) - relBuilder.join(JoinRelType.SEMI, conditions).aggregate(relBuilder.groupKey(keys: _*)) - val rel = relBuilder.build() - call.transformTo(rel) - } -} - -object ReplaceIntersectWithSemiJoinRule { - val INSTANCE: RelOptRule = new ReplaceIntersectWithSemiJoinRule -}