This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 97a67277c1d [FLINK-34161][table] Migration of RewriteMinusAllRule to 
java
97a67277c1d is described below

commit 97a67277c1d7878f320ab1c67589e05fdd8b153a
Author: Sergey Nuyanzin <snuyan...@gmail.com>
AuthorDate: Mon Apr 8 12:11:57 2024 +0200

    [FLINK-34161][table] Migration of RewriteMinusAllRule to java
---
 .../plan/rules/logical/RewriteMinusAllRule.java    | 148 +++++++++++++++++++++
 .../plan/rules/logical/RewriteMinusAllRule.scala   | 116 ----------------
 2 files changed, 148 insertions(+), 116 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRule.java
new file mode 100644
index 00000000000..2e3f0d93a8e
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRule.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.Util;
+import org.immutables.value.Value;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.calcite.sql.type.SqlTypeName.BIGINT;
+import static 
org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable.GREATER_THAN;
+
+/**
+ * Replaces logical {@link Minus} operator using a combination of union all, 
aggregate and table
+ * function.
+ *
+ * <p>Original Query : {@code SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 
}
+ *
+ * <pre>Rewritten Query:
+ * {@code SELECT c1 FROM ( SELECT c1, sum_val FROM ( SELECT c1, 
sum(vcol_marker)
+ * AS sum_val FROM ( SELECT c1, 1L as vcol_marker FROM ut1 UNION ALL SELECT 
c1, -1L as vcol_marker
+ * FROM ut2 ) AS union_all GROUP BY union_all.c1 ) WHERE sum_val > 0 )
+ * LATERAL TABLE(replicate_row(sum_val, c1)) AS T(c1) }
+ * </pre>
+ *
+ * <p>Only handle the case of input size 2.
+ */
+@Value.Enclosing
+public class RewriteMinusAllRule extends 
RelRule<RewriteMinusAllRule.RewriteMinusAllRuleConfig> {
+    public static final RewriteMinusAllRule INSTANCE = 
RewriteMinusAllRuleConfig.DEFAULT.toRule();
+
+    protected RewriteMinusAllRule(RewriteMinusAllRuleConfig config) {
+        super(config);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        Minus minus = call.rel(0);
+        return minus.all && minus.getInputs().size() == 2;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        Minus minus = call.rel(0);
+        RelNode left = minus.getInput(0);
+        RelNode right = minus.getInput(1);
+
+        List<Integer> fields = Util.range(minus.getRowType().getFieldCount());
+
+        // 1. add vcol_marker to left rel node
+        RelBuilder leftBuilder = call.builder();
+        RelNode leftWithAddedVirtualCols =
+                leftBuilder
+                        .push(left)
+                        .project(
+                                Stream.concat(
+                                                
leftBuilder.fields(fields).stream(),
+                                                Stream.of(
+                                                        leftBuilder.alias(
+                                                                
leftBuilder.cast(
+                                                                        
leftBuilder.literal(1L),
+                                                                        
BIGINT),
+                                                                
"vcol_marker")))
+                                        .collect(Collectors.toList()))
+                        .build();
+
+        // 2. add vcol_marker to right rel node
+        RelBuilder rightBuilder = call.builder();
+        RelNode rightWithAddedVirtualCols =
+                rightBuilder
+                        .push(right)
+                        .project(
+                                Stream.concat(
+                                                
rightBuilder.fields(fields).stream(),
+                                                Stream.of(
+                                                        rightBuilder.alias(
+                                                                
leftBuilder.cast(
+                                                                        
leftBuilder.literal(-1L),
+                                                                        
BIGINT),
+                                                                
"vcol_marker")))
+                                        .collect(Collectors.toList()))
+                        .build();
+
+        // 3. add union all and aggregate
+        RelBuilder builder = call.builder();
+        builder.push(leftWithAddedVirtualCols)
+                .push(rightWithAddedVirtualCols)
+                .union(true)
+                .aggregate(
+                        builder.groupKey(builder.fields(fields)),
+                        builder.sum(false, "sum_vcol_marker", 
builder.field("vcol_marker")))
+                .filter(
+                        builder.call(
+                                GREATER_THAN, 
builder.field("sum_vcol_marker"), builder.literal(0)))
+                .project(
+                        Stream.concat(
+                                        
Stream.of(builder.field("sum_vcol_marker")),
+                                        builder.fields(fields).stream())
+                                .collect(Collectors.toList()));
+
+        // 4. add table function to replicate rows
+        RelNode output = SetOpRewriteUtil.replicateRows(builder, 
minus.getRowType(), fields);
+
+        call.transformTo(output);
+    }
+
+    /** Rule configuration. */
+    @Value.Immutable(singleton = false)
+    public interface RewriteMinusAllRuleConfig extends RelRule.Config {
+        RewriteMinusAllRule.RewriteMinusAllRuleConfig DEFAULT =
+                
ImmutableRewriteMinusAllRule.RewriteMinusAllRuleConfig.builder()
+                        .operandSupplier(b0 -> 
b0.operand(Minus.class).anyInputs())
+                        .relBuilderFactory(RelFactories.LOGICAL_BUILDER)
+                        .description("RewriteMinusAllRule")
+                        .build();
+
+        @Override
+        default RewriteMinusAllRule toRule() {
+            return new RewriteMinusAllRule(this);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRule.scala
deleted file mode 100644
index adc93815e40..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRule.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.logical
-
-import 
org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable.GREATER_THAN
-import org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.replicateRows
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.rel.core.{Minus, RelFactories}
-import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT
-import org.apache.calcite.util.Util
-
-import scala.collection.JavaConversions._
-
-/**
- * Replaces logical [[Minus]] operator using a combination of union all, 
aggregate and table
- * function.
- *
- * Original Query :
- * {{{
- *    SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2
- * }}}
- *
- * Rewritten Query:
- * {{{
- *   SELECT c1
- *   FROM (
- *     SELECT c1, sum_val
- *     FROM (
- *       SELECT c1, sum(vcol_marker) AS sum_val
- *       FROM (
- *         SELECT c1, 1L as vcol_marker FROM ut1
- *         UNION ALL
- *         SELECT c1, -1L as vcol_marker FROM ut2
- *       ) AS union_all
- *       GROUP BY union_all.c1
- *     )
- *     WHERE sum_val > 0
- *   )
- *   LATERAL TABLE(replicate_row(sum_val, c1)) AS T(c1)
- * }}}
- *
- * Only handle the case of input size 2.
- */
-class RewriteMinusAllRule
-  extends RelOptRule(
-    operand(classOf[Minus], any),
-    RelFactories.LOGICAL_BUILDER,
-    "RewriteMinusAllRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val minus: Minus = call.rel(0)
-    minus.all && minus.getInputs.size() == 2
-  }
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val minus: Minus = call.rel(0)
-    val left = minus.getInput(0)
-    val right = minus.getInput(1)
-
-    val fields = Util.range(minus.getRowType.getFieldCount)
-
-    // 1. add vcol_marker to left rel node
-    val leftBuilder = call.builder
-    val leftWithAddedVirtualCols = leftBuilder
-      .push(left)
-      .project(leftBuilder.fields(fields) ++
-        Seq(leftBuilder.alias(leftBuilder.cast(leftBuilder.literal(1L), 
BIGINT), "vcol_marker")))
-      .build()
-
-    // 2. add vcol_marker to right rel node
-    val rightBuilder = call.builder
-    val rightWithAddedVirtualCols = rightBuilder
-      .push(right)
-      .project(rightBuilder.fields(fields) ++
-        Seq(rightBuilder.alias(leftBuilder.cast(leftBuilder.literal(-1L), 
BIGINT), "vcol_marker")))
-      .build()
-
-    // 3. add union all and aggregate
-    val builder = call.builder
-    builder
-      .push(leftWithAddedVirtualCols)
-      .push(rightWithAddedVirtualCols)
-      .union(true)
-      .aggregate(
-        builder.groupKey(builder.fields(fields)),
-        builder.sum(false, "sum_vcol_marker", builder.field("vcol_marker")))
-      .filter(builder.call(GREATER_THAN, builder.field("sum_vcol_marker"), 
builder.literal(0)))
-      .project(Seq(builder.field("sum_vcol_marker")) ++ builder.fields(fields))
-
-    // 4. add table function to replicate rows
-    val output = replicateRows(builder, minus.getRowType, fields)
-
-    call.transformTo(output)
-  }
-}
-
-object RewriteMinusAllRule {
-  val INSTANCE: RelOptRule = new RewriteMinusAllRule
-}

Reply via email to