This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 63099905454 [FLINK-34597][table] Migrate `SimplifyFilterConditionRule` 
to java
63099905454 is described below

commit 63099905454f95aefb0a017003d5ea798afc80ea
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Sat Jan 10 11:28:57 2026 +0100

    [FLINK-34597][table] Migrate `SimplifyFilterConditionRule` to java
---
 .../planner/operations/DeletePushDownUtils.java    |   6 +-
 .../rules/logical/SimplifyFilterConditionRule.java | 139 +++++++++++++++++++++
 .../logical/SimplifyFilterConditionRule.scala      |  98 ---------------
 3 files changed, 142 insertions(+), 101 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
index a7323b3fa69..a7639848122 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
@@ -138,7 +138,7 @@ public class DeletePushDownUtils {
         // we try to reduce and simplify the filter
         ReduceExpressionsRuleProxy reduceExpressionsRuleProxy = 
ReduceExpressionsRuleProxy.INSTANCE;
         SimplifyFilterConditionRule simplifyFilterConditionRule =
-                SimplifyFilterConditionRule.INSTANCE();
+                SimplifyFilterConditionRule.INSTANCE;
         // max iteration num for reducing and simplifying filter,
         // we use 5 as the max iteration num which is same with the iteration 
num in Flink's plan
         // optimizing.
@@ -161,9 +161,9 @@ public class DeletePushDownUtils {
             // create a new filter
             filter = filter.copy(filter.getTraitSet(), filter.getInput(), 
newCondition);
             // then apply the rule to simplify filter
-            Option<Filter> changedFilter =
+            Optional<Filter> changedFilter =
                     simplifyFilterConditionRule.simplify(filter, new boolean[] 
{false});
-            if (changedFilter.isDefined()) {
+            if (changedFilter.isPresent()) {
                 filter = changedFilter.get();
                 changed = true;
             }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.java
new file mode 100644
index 00000000000..4807ec020ca
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexSubQuery;
+import org.apache.calcite.rex.RexUtil;
+import org.immutables.value.Value;
+
+import java.util.Optional;
+
+/**
+ * Planner rule that apply various simplifying transformations on filter 
condition.
+ *
+ * <p>If `simplifySubQuery` is true, this rule will also simplify the filter 
condition in {@link
+ * RexSubQuery}.
+ */
[email protected]
+public class SimplifyFilterConditionRule
+        extends 
RelRule<SimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig> {
+
+    public static final SimplifyFilterConditionRule INSTANCE =
+            
SimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.DEFAULT.toRule();
+    public static final SimplifyFilterConditionRule EXTENDED =
+            
SimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.DEFAULT
+                    
.withDescription("SimplifyFilterConditionRule:simplifySubQuery")
+                    
.as(SimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.class)
+                    .withIsSimplifySubQuery(true)
+                    .toRule();
+
+    protected SimplifyFilterConditionRule(SimplifyFilterConditionRuleConfig 
config) {
+        super(config);
+    }
+
+    public void onMatch(RelOptRuleCall call) {
+        Filter filter = call.rel(0);
+        boolean[] changed = new boolean[] {false};
+        Optional<Filter> newFilter = simplify(filter, changed);
+        if (newFilter.isPresent()) {
+            call.transformTo(newFilter.get());
+            call.getPlanner().prune(filter);
+        }
+    }
+
+    public Optional<Filter> simplify(Filter filter, boolean[] changed) {
+        RexNode condition =
+                config.isSimplifySubQuery()
+                        ? 
simplifyFilterConditionInSubQuery(filter.getCondition(), changed)
+                        : filter.getCondition();
+
+        RexBuilder rexBuilder = filter.getCluster().getRexBuilder();
+        RexNode simplifiedCondition =
+                FlinkRexUtil.simplify(
+                        rexBuilder, condition, 
filter.getCluster().getPlanner().getExecutor());
+        RexNode newCondition = RexUtil.pullFactors(rexBuilder, 
simplifiedCondition);
+
+        if (!changed[0] && !condition.equals(newCondition)) {
+            changed[0] = true;
+        }
+
+        // just replace modified RexNode
+        return changed[0]
+                ? Optional.of(filter.copy(filter.getTraitSet(), 
filter.getInput(), newCondition))
+                : Optional.empty();
+    }
+
+    private RexNode simplifyFilterConditionInSubQuery(RexNode condition, 
boolean[] changed) {
+        return condition.accept(
+                new RexShuttle() {
+                    public RexNode visitSubQuery(RexSubQuery subQuery) {
+                        RelNode newRel =
+                                subQuery.rel.accept(
+                                        new RelShuttleImpl() {
+                                            public RelNode visit(LogicalFilter 
filter) {
+                                                return simplify(filter, 
changed).orElse(filter);
+                                            }
+                                        });
+                        if (changed[0]) {
+                            return subQuery.clone(newRel);
+                        } else {
+                            return subQuery;
+                        }
+                    }
+                });
+    }
+
+    /** Rule configuration. */
+    @Value.Immutable(singleton = false)
+    @Value.Style(
+            get = {"is*", "get*"},
+            init = "with*",
+            defaults = @Value.Immutable(copy = false))
+    public interface SimplifyFilterConditionRuleConfig extends RelRule.Config {
+        SimplifyFilterConditionRuleConfig DEFAULT =
+                
ImmutableSimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.builder()
+                        .description("SimplifyFilterConditionRule")
+                        .build()
+                        .withOperandSupplier(b0 -> 
b0.operand(Filter.class).anyInputs());
+
+        @Value.Default
+        default boolean isSimplifySubQuery() {
+            return false;
+        }
+
+        /** Sets {@link #isSimplifySubQuery()}. */
+        SimplifyFilterConditionRuleConfig withIsSimplifySubQuery(boolean 
simplifySubQuery);
+
+        @Override
+        default SimplifyFilterConditionRule toRule() {
+            return new SimplifyFilterConditionRule(this);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala
deleted file mode 100644
index 79df7194245..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.logical
-
-import org.apache.flink.table.planner.plan.utils.FlinkRexUtil
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.rel.{RelNode, RelShuttleImpl}
-import org.apache.calcite.rel.core.Filter
-import org.apache.calcite.rel.logical.LogicalFilter
-import org.apache.calcite.rex._
-
-/**
- * Planner rule that apply various simplifying transformations on filter 
condition.
- *
- * if `simplifySubQuery` is true, this rule will also simplify the filter 
condition in
- * [[RexSubQuery]].
- */
-class SimplifyFilterConditionRule(simplifySubQuery: Boolean, description: 
String)
-  extends RelOptRule(operand(classOf[Filter], any()), description) {
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val filter: Filter = call.rel(0)
-    val changed = Array(false)
-    val newFilter = simplify(filter, changed)
-    newFilter match {
-      case Some(f) =>
-        call.transformTo(f)
-        call.getPlanner.prune(filter)
-      case _ => // do nothing
-    }
-  }
-
-  def simplify(filter: Filter, changed: Array[Boolean]): Option[Filter] = {
-    val condition = if (simplifySubQuery) {
-      simplifyFilterConditionInSubQuery(filter.getCondition, changed)
-    } else {
-      filter.getCondition
-    }
-
-    val rexBuilder = filter.getCluster.getRexBuilder
-    val simplifiedCondition =
-      FlinkRexUtil.simplify(rexBuilder, condition, 
filter.getCluster.getPlanner.getExecutor)
-    val newCondition = RexUtil.pullFactors(rexBuilder, simplifiedCondition)
-
-    if (!changed.head && !condition.equals(newCondition)) {
-      changed(0) = true
-    }
-
-    // just replace modified RexNode
-    if (changed.head) {
-      Some(filter.copy(filter.getTraitSet, filter.getInput, newCondition))
-    } else {
-      None
-    }
-  }
-
-  def simplifyFilterConditionInSubQuery(condition: RexNode, changed: 
Array[Boolean]): RexNode = {
-    condition.accept(new RexShuttle() {
-      override def visitSubQuery(subQuery: RexSubQuery): RexNode = {
-        val newRel = subQuery.rel.accept(new RelShuttleImpl() {
-          override def visit(filter: LogicalFilter): RelNode = {
-            simplify(filter, changed).getOrElse(filter)
-          }
-        })
-        if (changed.head) {
-          subQuery.clone(newRel)
-        } else {
-          subQuery
-        }
-      }
-    })
-  }
-
-}
-
-object SimplifyFilterConditionRule {
-  val INSTANCE = new SimplifyFilterConditionRule(false, 
"SimplifyFilterConditionRule")
-
-  val EXTENDED =
-    new SimplifyFilterConditionRule(true, 
"SimplifyFilterConditionRule:simplifySubQuery")
-}

Reply via email to