This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a12668bcfe [FLINK-34493][table] Migrate ReplaceMinusWithAntiJoinRule 
to java.
6a12668bcfe is described below

commit 6a12668bcfe651fa938517eb2da4d537ce6ce668
Author: liuyongvs <liuyon...@gmail.com>
AuthorDate: Fri Mar 1 16:08:52 2024 +0800

    [FLINK-34493][table] Migrate ReplaceMinusWithAntiJoinRule to java.
---
 .../logical/ReplaceMinusWithAntiJoinRule.java      | 95 ++++++++++++++++++++++
 .../logical/ReplaceMinusWithAntiJoinRule.scala     | 65 ---------------
 2 files changed, 95 insertions(+), 65 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.java
new file mode 100644
index 00000000000..35c719e3846
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.Util;
+import org.immutables.value.Value;
+
+import java.util.List;
+
+import static 
org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition;
+
+/**
+ * Planner rule that replaces distinct {@link 
org.apache.calcite.rel.core.Minus} (SQL keyword:
+ * EXCEPT) with a distinct {@link org.apache.calcite.rel.core.Aggregate} on an 
ANTI {@link
+ * org.apache.calcite.rel.core.Join}.
+ *
+ * <p>Only handle the case of input size 2.
+ */
+@Value.Enclosing
+public class ReplaceMinusWithAntiJoinRule
+        extends 
RelRule<ReplaceMinusWithAntiJoinRule.ReplaceMinusWithAntiJoinRuleConfig> {
+
+    public static final ReplaceMinusWithAntiJoinRule INSTANCE =
+            
ReplaceMinusWithAntiJoinRule.ReplaceMinusWithAntiJoinRuleConfig.DEFAULT.toRule();
+
+    private ReplaceMinusWithAntiJoinRule(ReplaceMinusWithAntiJoinRuleConfig 
config) {
+        super(config);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        Minus minus = call.rel(0);
+        return !minus.all && minus.getInputs().size() == 2;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        Minus minus = call.rel(0);
+        RelNode left = minus.getInput(0);
+        RelNode right = minus.getInput(1);
+
+        RelBuilder relBuilder = call.builder();
+        List<Integer> keys = Util.range(left.getRowType().getFieldCount());
+        List<RexNode> conditions = generateEqualsCondition(relBuilder, left, 
right, keys);
+
+        relBuilder.push(left);
+        relBuilder.push(right);
+        relBuilder
+                .join(JoinRelType.ANTI, conditions)
+                .aggregate(
+                        
relBuilder.groupKey(keys.stream().mapToInt(Integer::intValue).toArray()));
+        RelNode rel = relBuilder.build();
+        call.transformTo(rel);
+    }
+
+    /** Rule configuration. */
+    @Value.Immutable(singleton = false)
+    public interface ReplaceMinusWithAntiJoinRuleConfig extends RelRule.Config 
{
+        ReplaceMinusWithAntiJoinRule.ReplaceMinusWithAntiJoinRuleConfig 
DEFAULT =
+                
ImmutableReplaceMinusWithAntiJoinRule.ReplaceMinusWithAntiJoinRuleConfig.builder()
+                        .build()
+                        .withOperandSupplier(b0 -> 
b0.operand(Minus.class).anyInputs())
+                        .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER)
+                        .withDescription("ReplaceMinusWithAntiJoinRule");
+
+        @Override
+        default ReplaceMinusWithAntiJoinRule toRule() {
+            return new ReplaceMinusWithAntiJoinRule(this);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.scala
deleted file mode 100644
index 0b080b86391..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceMinusWithAntiJoinRule.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.logical
-
-import 
org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.rel.core._
-
-import scala.collection.JavaConverters._
-
-/**
- * Planner rule that replaces distinct [[Minus]] (SQL keyword: EXCEPT) with a 
distinct [[Aggregate]]
- * on an ANTI [[Join]].
- *
- * Only handle the case of input size 2.
- */
-class ReplaceMinusWithAntiJoinRule
-  extends RelOptRule(
-    operand(classOf[Minus], any),
-    RelFactories.LOGICAL_BUILDER,
-    "ReplaceMinusWithAntiJoinRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val minus: Minus = call.rel(0)
-    !minus.all && minus.getInputs.size() == 2
-  }
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val minus: Minus = call.rel(0)
-    val left = minus.getInput(0)
-    val right = minus.getInput(1)
-
-    val relBuilder = call.builder
-    val keys = 0 until left.getRowType.getFieldCount
-    val conditions =
-      generateEqualsCondition(relBuilder, left, right, 
keys.map(Integer.valueOf).toList.asJava)
-
-    relBuilder.push(left)
-    relBuilder.push(right)
-    relBuilder.join(JoinRelType.ANTI, 
conditions).aggregate(relBuilder.groupKey(keys: _*))
-    val rel = relBuilder.build()
-    call.transformTo(rel)
-  }
-}
-
-object ReplaceMinusWithAntiJoinRule {
-  val INSTANCE: RelOptRule = new ReplaceMinusWithAntiJoinRule
-}

Reply via email to