This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 63099905454 [FLINK-34597][table] Migrate `SimplifyFilterConditionRule`
to java
63099905454 is described below
commit 63099905454f95aefb0a017003d5ea798afc80ea
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Sat Jan 10 11:28:57 2026 +0100
[FLINK-34597][table] Migrate `SimplifyFilterConditionRule` to java
---
.../planner/operations/DeletePushDownUtils.java | 6 +-
.../rules/logical/SimplifyFilterConditionRule.java | 139 +++++++++++++++++++++
.../logical/SimplifyFilterConditionRule.scala | 98 ---------------
3 files changed, 142 insertions(+), 101 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
index a7323b3fa69..a7639848122 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
@@ -138,7 +138,7 @@ public class DeletePushDownUtils {
// we try to reduce and simplify the filter
ReduceExpressionsRuleProxy reduceExpressionsRuleProxy =
ReduceExpressionsRuleProxy.INSTANCE;
SimplifyFilterConditionRule simplifyFilterConditionRule =
- SimplifyFilterConditionRule.INSTANCE();
+ SimplifyFilterConditionRule.INSTANCE;
// max iteration num for reducing and simplifying filter,
// we use 5 as the max iteration num which is same with the iteration
num in Flink's plan
// optimizing.
@@ -161,9 +161,9 @@ public class DeletePushDownUtils {
// create a new filter
filter = filter.copy(filter.getTraitSet(), filter.getInput(),
newCondition);
// then apply the rule to simplify filter
- Option<Filter> changedFilter =
+ Optional<Filter> changedFilter =
simplifyFilterConditionRule.simplify(filter, new boolean[]
{false});
- if (changedFilter.isDefined()) {
+ if (changedFilter.isPresent()) {
filter = changedFilter.get();
changed = true;
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.java
new file mode 100644
index 00000000000..4807ec020ca
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexSubQuery;
+import org.apache.calcite.rex.RexUtil;
+import org.immutables.value.Value;
+
+import java.util.Optional;
+
+/**
+ * Planner rule that apply various simplifying transformations on filter
condition.
+ *
+ * <p>If `simplifySubQuery` is true, this rule will also simplify the filter
condition in {@link
+ * RexSubQuery}.
+ */
[email protected]
+public class SimplifyFilterConditionRule
+ extends
RelRule<SimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig> {
+
+ public static final SimplifyFilterConditionRule INSTANCE =
+
SimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.DEFAULT.toRule();
+ public static final SimplifyFilterConditionRule EXTENDED =
+
SimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.DEFAULT
+
.withDescription("SimplifyFilterConditionRule:simplifySubQuery")
+
.as(SimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.class)
+ .withIsSimplifySubQuery(true)
+ .toRule();
+
+ protected SimplifyFilterConditionRule(SimplifyFilterConditionRuleConfig
config) {
+ super(config);
+ }
+
+ public void onMatch(RelOptRuleCall call) {
+ Filter filter = call.rel(0);
+ boolean[] changed = new boolean[] {false};
+ Optional<Filter> newFilter = simplify(filter, changed);
+ if (newFilter.isPresent()) {
+ call.transformTo(newFilter.get());
+ call.getPlanner().prune(filter);
+ }
+ }
+
+ public Optional<Filter> simplify(Filter filter, boolean[] changed) {
+ RexNode condition =
+ config.isSimplifySubQuery()
+ ?
simplifyFilterConditionInSubQuery(filter.getCondition(), changed)
+ : filter.getCondition();
+
+ RexBuilder rexBuilder = filter.getCluster().getRexBuilder();
+ RexNode simplifiedCondition =
+ FlinkRexUtil.simplify(
+ rexBuilder, condition,
filter.getCluster().getPlanner().getExecutor());
+ RexNode newCondition = RexUtil.pullFactors(rexBuilder,
simplifiedCondition);
+
+ if (!changed[0] && !condition.equals(newCondition)) {
+ changed[0] = true;
+ }
+
+ // just replace modified RexNode
+ return changed[0]
+ ? Optional.of(filter.copy(filter.getTraitSet(),
filter.getInput(), newCondition))
+ : Optional.empty();
+ }
+
+ private RexNode simplifyFilterConditionInSubQuery(RexNode condition,
boolean[] changed) {
+ return condition.accept(
+ new RexShuttle() {
+ public RexNode visitSubQuery(RexSubQuery subQuery) {
+ RelNode newRel =
+ subQuery.rel.accept(
+ new RelShuttleImpl() {
+ public RelNode visit(LogicalFilter
filter) {
+ return simplify(filter,
changed).orElse(filter);
+ }
+ });
+ if (changed[0]) {
+ return subQuery.clone(newRel);
+ } else {
+ return subQuery;
+ }
+ }
+ });
+ }
+
+ /** Rule configuration. */
+ @Value.Immutable(singleton = false)
+ @Value.Style(
+ get = {"is*", "get*"},
+ init = "with*",
+ defaults = @Value.Immutable(copy = false))
+ public interface SimplifyFilterConditionRuleConfig extends RelRule.Config {
+ SimplifyFilterConditionRuleConfig DEFAULT =
+
ImmutableSimplifyFilterConditionRule.SimplifyFilterConditionRuleConfig.builder()
+ .description("SimplifyFilterConditionRule")
+ .build()
+ .withOperandSupplier(b0 ->
b0.operand(Filter.class).anyInputs());
+
+ @Value.Default
+ default boolean isSimplifySubQuery() {
+ return false;
+ }
+
+ /** Sets {@link #isSimplifySubQuery()}. */
+ SimplifyFilterConditionRuleConfig withIsSimplifySubQuery(boolean
simplifySubQuery);
+
+ @Override
+ default SimplifyFilterConditionRule toRule() {
+ return new SimplifyFilterConditionRule(this);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala
deleted file mode 100644
index 79df7194245..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.logical
-
-import org.apache.flink.table.planner.plan.utils.FlinkRexUtil
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.rel.{RelNode, RelShuttleImpl}
-import org.apache.calcite.rel.core.Filter
-import org.apache.calcite.rel.logical.LogicalFilter
-import org.apache.calcite.rex._
-
-/**
- * Planner rule that apply various simplifying transformations on filter
condition.
- *
- * if `simplifySubQuery` is true, this rule will also simplify the filter
condition in
- * [[RexSubQuery]].
- */
-class SimplifyFilterConditionRule(simplifySubQuery: Boolean, description:
String)
- extends RelOptRule(operand(classOf[Filter], any()), description) {
-
- override def onMatch(call: RelOptRuleCall): Unit = {
- val filter: Filter = call.rel(0)
- val changed = Array(false)
- val newFilter = simplify(filter, changed)
- newFilter match {
- case Some(f) =>
- call.transformTo(f)
- call.getPlanner.prune(filter)
- case _ => // do nothing
- }
- }
-
- def simplify(filter: Filter, changed: Array[Boolean]): Option[Filter] = {
- val condition = if (simplifySubQuery) {
- simplifyFilterConditionInSubQuery(filter.getCondition, changed)
- } else {
- filter.getCondition
- }
-
- val rexBuilder = filter.getCluster.getRexBuilder
- val simplifiedCondition =
- FlinkRexUtil.simplify(rexBuilder, condition,
filter.getCluster.getPlanner.getExecutor)
- val newCondition = RexUtil.pullFactors(rexBuilder, simplifiedCondition)
-
- if (!changed.head && !condition.equals(newCondition)) {
- changed(0) = true
- }
-
- // just replace modified RexNode
- if (changed.head) {
- Some(filter.copy(filter.getTraitSet, filter.getInput, newCondition))
- } else {
- None
- }
- }
-
- def simplifyFilterConditionInSubQuery(condition: RexNode, changed:
Array[Boolean]): RexNode = {
- condition.accept(new RexShuttle() {
- override def visitSubQuery(subQuery: RexSubQuery): RexNode = {
- val newRel = subQuery.rel.accept(new RelShuttleImpl() {
- override def visit(filter: LogicalFilter): RelNode = {
- simplify(filter, changed).getOrElse(filter)
- }
- })
- if (changed.head) {
- subQuery.clone(newRel)
- } else {
- subQuery
- }
- }
- })
- }
-
-}
-
-object SimplifyFilterConditionRule {
- val INSTANCE = new SimplifyFilterConditionRule(false,
"SimplifyFilterConditionRule")
-
- val EXTENDED =
- new SimplifyFilterConditionRule(true,
"SimplifyFilterConditionRule:simplifySubQuery")
-}