This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 97a67277c1d [FLINK-34161][table] Migration of RewriteMinusAllRule to java 97a67277c1d is described below commit 97a67277c1d7878f320ab1c67589e05fdd8b153a Author: Sergey Nuyanzin <snuyan...@gmail.com> AuthorDate: Mon Apr 8 12:11:57 2024 +0200 [FLINK-34161][table] Migration of RewriteMinusAllRule to java --- .../plan/rules/logical/RewriteMinusAllRule.java | 148 +++++++++++++++++++++ .../plan/rules/logical/RewriteMinusAllRule.scala | 116 ---------------- 2 files changed, 148 insertions(+), 116 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRule.java new file mode 100644 index 00000000000..2e3f0d93a8e --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRule.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Minus; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.Util; +import org.immutables.value.Value; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.calcite.sql.type.SqlTypeName.BIGINT; +import static org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable.GREATER_THAN; + +/** + * Replaces logical {@link Minus} operator using a combination of union all, aggregate and table + * function. + * + * <p>Original Query : {@code SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 } + * + * <pre>Rewritten Query: + * {@code SELECT c1 FROM ( SELECT c1, sum_val FROM ( SELECT c1, sum(vcol_marker) + * AS sum_val FROM ( SELECT c1, 1L as vcol_marker FROM ut1 UNION ALL SELECT c1, -1L as vcol_marker + * FROM ut2 ) AS union_all GROUP BY union_all.c1 ) WHERE sum_val > 0 ) + * LATERAL TABLE(replicate_row(sum_val, c1)) AS T(c1) } + * </pre> + * + * <p>Only handle the case of input size 2. + */ +@Value.Enclosing +public class RewriteMinusAllRule extends RelRule<RewriteMinusAllRule.RewriteMinusAllRuleConfig> { + public static final RewriteMinusAllRule INSTANCE = RewriteMinusAllRuleConfig.DEFAULT.toRule(); + + protected RewriteMinusAllRule(RewriteMinusAllRuleConfig config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + Minus minus = call.rel(0); + return minus.all && minus.getInputs().size() == 2; + } + + @Override + public void onMatch(RelOptRuleCall call) { + Minus minus = call.rel(0); + RelNode left = minus.getInput(0); + RelNode right = minus.getInput(1); + + List<Integer> fields = Util.range(minus.getRowType().getFieldCount()); + + // 1. add vcol_marker to left rel node + RelBuilder leftBuilder = call.builder(); + RelNode leftWithAddedVirtualCols = + leftBuilder + .push(left) + .project( + Stream.concat( + leftBuilder.fields(fields).stream(), + Stream.of( + leftBuilder.alias( + leftBuilder.cast( + leftBuilder.literal(1L), + BIGINT), + "vcol_marker"))) + .collect(Collectors.toList())) + .build(); + + // 2. add vcol_marker to right rel node + RelBuilder rightBuilder = call.builder(); + RelNode rightWithAddedVirtualCols = + rightBuilder + .push(right) + .project( + Stream.concat( + rightBuilder.fields(fields).stream(), + Stream.of( + rightBuilder.alias( + leftBuilder.cast( + leftBuilder.literal(-1L), + BIGINT), + "vcol_marker"))) + .collect(Collectors.toList())) + .build(); + + // 3. add union all and aggregate + RelBuilder builder = call.builder(); + builder.push(leftWithAddedVirtualCols) + .push(rightWithAddedVirtualCols) + .union(true) + .aggregate( + builder.groupKey(builder.fields(fields)), + builder.sum(false, "sum_vcol_marker", builder.field("vcol_marker"))) + .filter( + builder.call( + GREATER_THAN, builder.field("sum_vcol_marker"), builder.literal(0))) + .project( + Stream.concat( + Stream.of(builder.field("sum_vcol_marker")), + builder.fields(fields).stream()) + .collect(Collectors.toList())); + + // 4. add table function to replicate rows + RelNode output = SetOpRewriteUtil.replicateRows(builder, minus.getRowType(), fields); + + call.transformTo(output); + } + + /** Rule configuration. */ + @Value.Immutable(singleton = false) + public interface RewriteMinusAllRuleConfig extends RelRule.Config { + RewriteMinusAllRule.RewriteMinusAllRuleConfig DEFAULT = + ImmutableRewriteMinusAllRule.RewriteMinusAllRuleConfig.builder() + .operandSupplier(b0 -> b0.operand(Minus.class).anyInputs()) + .relBuilderFactory(RelFactories.LOGICAL_BUILDER) + .description("RewriteMinusAllRule") + .build(); + + @Override + default RewriteMinusAllRule toRule() { + return new RewriteMinusAllRule(this); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRule.scala deleted file mode 100644 index adc93815e40..00000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRule.scala +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.logical - -import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable.GREATER_THAN -import org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.replicateRows - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.calcite.plan.RelOptRule.{any, operand} -import org.apache.calcite.rel.core.{Minus, RelFactories} -import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT -import org.apache.calcite.util.Util - -import scala.collection.JavaConversions._ - -/** - * Replaces logical [[Minus]] operator using a combination of union all, aggregate and table - * function. - * - * Original Query : - * {{{ - * SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 - * }}} - * - * Rewritten Query: - * {{{ - * SELECT c1 - * FROM ( - * SELECT c1, sum_val - * FROM ( - * SELECT c1, sum(vcol_marker) AS sum_val - * FROM ( - * SELECT c1, 1L as vcol_marker FROM ut1 - * UNION ALL - * SELECT c1, -1L as vcol_marker FROM ut2 - * ) AS union_all - * GROUP BY union_all.c1 - * ) - * WHERE sum_val > 0 - * ) - * LATERAL TABLE(replicate_row(sum_val, c1)) AS T(c1) - * }}} - * - * Only handle the case of input size 2. - */ -class RewriteMinusAllRule - extends RelOptRule( - operand(classOf[Minus], any), - RelFactories.LOGICAL_BUILDER, - "RewriteMinusAllRule") { - - override def matches(call: RelOptRuleCall): Boolean = { - val minus: Minus = call.rel(0) - minus.all && minus.getInputs.size() == 2 - } - - override def onMatch(call: RelOptRuleCall): Unit = { - val minus: Minus = call.rel(0) - val left = minus.getInput(0) - val right = minus.getInput(1) - - val fields = Util.range(minus.getRowType.getFieldCount) - - // 1. add vcol_marker to left rel node - val leftBuilder = call.builder - val leftWithAddedVirtualCols = leftBuilder - .push(left) - .project(leftBuilder.fields(fields) ++ - Seq(leftBuilder.alias(leftBuilder.cast(leftBuilder.literal(1L), BIGINT), "vcol_marker"))) - .build() - - // 2. add vcol_marker to right rel node - val rightBuilder = call.builder - val rightWithAddedVirtualCols = rightBuilder - .push(right) - .project(rightBuilder.fields(fields) ++ - Seq(rightBuilder.alias(leftBuilder.cast(leftBuilder.literal(-1L), BIGINT), "vcol_marker"))) - .build() - - // 3. add union all and aggregate - val builder = call.builder - builder - .push(leftWithAddedVirtualCols) - .push(rightWithAddedVirtualCols) - .union(true) - .aggregate( - builder.groupKey(builder.fields(fields)), - builder.sum(false, "sum_vcol_marker", builder.field("vcol_marker"))) - .filter(builder.call(GREATER_THAN, builder.field("sum_vcol_marker"), builder.literal(0))) - .project(Seq(builder.field("sum_vcol_marker")) ++ builder.fields(fields)) - - // 4. add table function to replicate rows - val output = replicateRows(builder, minus.getRowType, fields) - - call.transformTo(output) - } -} - -object RewriteMinusAllRule { - val INSTANCE: RelOptRule = new RewriteMinusAllRule -}