This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f38a5702fc [FLINK-34494][table] Migrate 
ReplaceIntersectWithSemiJoinRule to java
5f38a5702fc is described below

commit 5f38a5702fc3629e29cae0d531912b1ae9b47e08
Author: Jacky Lau <liuyon...@gmail.com>
AuthorDate: Tue Mar 12 04:27:09 2024 +0800

    [FLINK-34494][table] Migrate ReplaceIntersectWithSemiJoinRule to java
---
 .../logical/ReplaceIntersectWithSemiJoinRule.java  | 98 ++++++++++++++++++++++
 .../logical/ReplaceIntersectWithSemiJoinRule.scala | 65 --------------
 2 files changed, 98 insertions(+), 65 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.java
new file mode 100644
index 00000000000..d71f4bd4d5e
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.Util;
+import org.immutables.value.Value;
+
+import java.util.List;
+
+import static 
org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition;
+
+/**
+ * Planner rule that replaces distinct {@link Intersect} with a distinct 
{@link Aggregate} on a SEMI
+ * {@link Join}.
+ *
+ * <p>Only handle the case of input size 2.
+ */
+@Value.Enclosing
+public class ReplaceIntersectWithSemiJoinRule
+        extends 
RelRule<ReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig>
 {
+
+    public static final ReplaceIntersectWithSemiJoinRule INSTANCE =
+            
ReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig.DEFAULT
+                    .toRule();
+
+    private 
ReplaceIntersectWithSemiJoinRule(ReplaceIntersectWithSemiJoinRuleConfig config) 
{
+        super(config);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        Intersect intersect = call.rel(0);
+        return !intersect.all && intersect.getInputs().size() == 2;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        Intersect intersect = call.rel(0);
+        RelNode left = intersect.getInput(0);
+        RelNode right = intersect.getInput(1);
+
+        RelBuilder relBuilder = call.builder();
+        List<Integer> keys = Util.range(left.getRowType().getFieldCount());
+        List<RexNode> conditions = generateEqualsCondition(relBuilder, left, 
right, keys);
+
+        relBuilder.push(left);
+        relBuilder.push(right);
+        relBuilder
+                .join(JoinRelType.SEMI, conditions)
+                .aggregate(
+                        
relBuilder.groupKey(keys.stream().mapToInt(Integer::intValue).toArray()));
+        RelNode rel = relBuilder.build();
+        call.transformTo(rel);
+    }
+
+    /** Rule configuration. */
+    @Value.Immutable(singleton = false)
+    public interface ReplaceIntersectWithSemiJoinRuleConfig extends 
RelRule.Config {
+        
ReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig DEFAULT 
=
+                
ImmutableReplaceIntersectWithSemiJoinRule.ReplaceIntersectWithSemiJoinRuleConfig
+                        .builder()
+                        .build()
+                        .withOperandSupplier(b0 -> 
b0.operand(Intersect.class).anyInputs())
+                        .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER)
+                        .withDescription("ReplaceIntersectWithSemiJoinRule");
+
+        @Override
+        default ReplaceIntersectWithSemiJoinRule toRule() {
+            return new ReplaceIntersectWithSemiJoinRule(this);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.scala
deleted file mode 100644
index 045e39b220b..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.logical
-
-import 
org.apache.flink.table.planner.plan.utils.SetOpRewriteUtil.generateEqualsCondition
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.rel.core.{Aggregate, Intersect, Join, JoinRelType, 
RelFactories}
-
-import scala.collection.JavaConverters._
-
-/**
- * Planner rule that replaces distinct [[Intersect]] with a distinct 
[[Aggregate]] on a SEMI
- * [[Join]].
- *
- * Only handle the case of input size 2.
- */
-class ReplaceIntersectWithSemiJoinRule
-  extends RelOptRule(
-    operand(classOf[Intersect], any),
-    RelFactories.LOGICAL_BUILDER,
-    "ReplaceIntersectWithSemiJoinRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val intersect: Intersect = call.rel(0)
-    !intersect.all && intersect.getInputs.size() == 2
-  }
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val intersect: Intersect = call.rel(0)
-    val left = intersect.getInput(0)
-    val right = intersect.getInput(1)
-
-    val relBuilder = call.builder
-    val keys = 0 until left.getRowType.getFieldCount
-    val conditions =
-      generateEqualsCondition(relBuilder, left, right, 
keys.map(Integer.valueOf).toList.asJava)
-
-    relBuilder.push(left)
-    relBuilder.push(right)
-    relBuilder.join(JoinRelType.SEMI, 
conditions).aggregate(relBuilder.groupKey(keys: _*))
-    val rel = relBuilder.build()
-    call.transformTo(rel)
-  }
-}
-
-object ReplaceIntersectWithSemiJoinRule {
-  val INSTANCE: RelOptRule = new ReplaceIntersectWithSemiJoinRule
-}

Reply via email to