This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f81d6e1568b [FLINK-34895][table] Migrate FlinkRewriteSubQueryRule to
java
f81d6e1568b is described below
commit f81d6e1568b7ecc66d2512ebb8b442ebbaf65289
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Sun Mar 17 19:12:04 2024 +0100
[FLINK-34895][table] Migrate FlinkRewriteSubQueryRule to java
This closes #24543.
---
.../rules/logical/FlinkRewriteSubQueryRule.java | 196 +++++++++++++++++++++
.../rules/logical/FlinkRewriteSubQueryRule.scala | 173 ------------------
2 files changed, 196 insertions(+), 173 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkRewriteSubQueryRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkRewriteSubQueryRule.java
new file mode 100644
index 00000000000..aea953b8fb0
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkRewriteSubQueryRule.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.rules.SubQueryRemoveRule;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexSubQuery;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlCountAggFunction;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.immutables.value.Value;
+
+import java.util.Optional;
+
+/**
+ * Planner rule that rewrites scalar query in filter like: {@code select *
from T1 where (select
+ * count(*) from T2) > 0} to {@code select * from T1 where exists (select *
from T2)}, which could
+ * be converted to SEMI join by {@link FlinkSubQueryRemoveRule}.
+ *
+ * <p>Without this rule, the original query will be rewritten to a filter on a
join on an aggregate
+ * by {@link SubQueryRemoveRule}. the full logical plan is
+ *
+ * <pre>{@code
+ * LogicalProject(a=[$0], b=[$1], c=[$2])
+ * +- LogicalJoin(condition=[$3], joinType=[semi])
+ * :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ * +- LogicalProject($f0=[IS NOT NULL($0)])
+ * +- LogicalAggregate(group=[{}], m=[MIN($0)])
+ * +- LogicalProject(i=[true])
+ * +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e,
f)]]])
+ * }</pre>
+ */
[email protected]
+public class FlinkRewriteSubQueryRule
+ extends
RelRule<FlinkRewriteSubQueryRule.FlinkRewriteSubQueryRuleConfig> {
+
+ public static final FlinkRewriteSubQueryRule FILTER =
+ FlinkRewriteSubQueryRuleConfig.DEFAULT.toRule();
+
+ protected FlinkRewriteSubQueryRule(
+ FlinkRewriteSubQueryRule.FlinkRewriteSubQueryRuleConfig config) {
+ super(config);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ Filter filter = call.rel(0);
+ RexNode condition = filter.getCondition();
+ RexNode newCondition = rewriteScalarQuery(condition);
+ if (condition.equals(newCondition)) {
+ return;
+ }
+
+ Filter newFilter = filter.copy(filter.getTraitSet(),
filter.getInput(), newCondition);
+ call.transformTo(newFilter);
+ }
+
+ // scalar query like: `(select count(*) from T) > 0` can be converted to
`exists(select * from
+ // T)`
+ private RexNode rewriteScalarQuery(RexNode condition) {
+ return condition.accept(
+ new RexShuttle() {
+ public RexNode visitCall(RexCall call) {
+ Optional<RexSubQuery> subQuery =
getSupportedScalarQuery(call);
+ if (subQuery.isPresent()) {
+ return
RexSubQuery.exists(subQuery.get().rel.getInput(0));
+ }
+ return super.visitCall(call);
+ }
+ });
+ }
+
+ private boolean isScalarQuery(RexNode n) {
+ return n.isA(SqlKind.SCALAR_QUERY);
+ }
+
+ // check the RexNode is a RexLiteral which's value is between 0 and 1
+ private boolean isBetween0And1(RexNode n, boolean include0, boolean
include1) {
+ if (n instanceof RexLiteral) {
+ RexLiteral l = (RexLiteral) n;
+ if (l.getTypeName().getFamily() == SqlTypeFamily.NUMERIC &&
l.getValue() != null) {
+ double v = Double.parseDouble(l.getValue().toString());
+ return (0.0 < v && v < 1.0) || (include0 && v == 0.0) ||
(include1 && v == 1.0);
+ }
+ }
+ return false;
+ }
+
+ // check the RelNode is a Aggregate which has only count aggregate call
with empty args
+ private boolean isCountStarAggWithoutGroupBy(RelNode n) {
+ if (n instanceof Aggregate) {
+ Aggregate agg = (Aggregate) n;
+ if (agg.getGroupCount() == 0 && agg.getAggCallList().size() == 1) {
+ AggregateCall aggCall = agg.getAggCallList().get(0);
+ return !aggCall.isDistinct()
+ && aggCall.filterArg < 0
+ && aggCall.getArgList().isEmpty()
+ && aggCall.getAggregation() instanceof
SqlCountAggFunction;
+ }
+ }
+ return false;
+ }
+
+ Optional<RexSubQuery> getSupportedScalarQuery(RexCall call) {
+ switch (call.getKind()) {
+ // (select count(*) from T) > X (X is between 0 (inclusive) and 1
(exclusive))
+ case GREATER_THAN:
+ if (isScalarQuery(call.operands.get(0))) {
+ RexSubQuery subQuery = (RexSubQuery) call.operands.get(0);
+ if (isCountStarAggWithoutGroupBy(subQuery.rel)
+ && isBetween0And1(call.operands.get(1), true,
false)) {
+ return Optional.of(subQuery);
+ }
+ }
+ break;
+ // (select count(*) from T) >= X (X is between 0 (exclusive) and 1
(inclusive))
+ case GREATER_THAN_OR_EQUAL:
+ if (isScalarQuery(call.operands.get(0))) {
+ RexSubQuery subQuery = (RexSubQuery) call.operands.get(0);
+ if (isCountStarAggWithoutGroupBy(subQuery.rel)
+ && isBetween0And1(call.operands.get(1), false,
true)) {
+ return Optional.of(subQuery);
+ }
+ }
+ break;
+ // X < (select count(*) from T) (X is between 0 (inclusive) and 1
(exclusive))
+ case LESS_THAN:
+ if (isScalarQuery(call.operands.get(1))) {
+ RexSubQuery subQuery = (RexSubQuery) call.operands.get(1);
+ if (isCountStarAggWithoutGroupBy(subQuery.rel)
+ && isBetween0And1(call.operands.get(0), true,
false)) {
+ return Optional.of(subQuery);
+ }
+ }
+ break;
+ // X <= (select count(*) from T) (X is between 0 (exclusive) and 1
(inclusive))
+ case LESS_THAN_OR_EQUAL:
+ if (isScalarQuery(call.operands.get(1))) {
+ RexSubQuery subQuery = (RexSubQuery) call.operands.get(1);
+ if (isCountStarAggWithoutGroupBy(subQuery.rel)
+ && isBetween0And1(call.operands.get(0), false,
true)) {
+ return Optional.of(subQuery);
+ }
+ }
+ break;
+ }
+ return Optional.empty();
+ }
+
+ /** Rule configuration. */
+ @Value.Immutable(singleton = false)
+ public interface FlinkRewriteSubQueryRuleConfig extends RelRule.Config {
+ FlinkRewriteSubQueryRule.FlinkRewriteSubQueryRuleConfig DEFAULT =
+
ImmutableFlinkRewriteSubQueryRule.FlinkRewriteSubQueryRuleConfig.builder()
+ .operandSupplier(
+ b0 ->
+ b0.operand(Filter.class)
+
.predicate(RexUtil.SubQueryFinder.FILTER_PREDICATE)
+ .anyInputs())
+ .relBuilderFactory(RelFactories.LOGICAL_BUILDER)
+ .description("FlinkRewriteSubQueryRule:Filter")
+ .build();
+
+ @Override
+ default FlinkRewriteSubQueryRule toRule() {
+ return new FlinkRewriteSubQueryRule(this);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkRewriteSubQueryRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkRewriteSubQueryRule.scala
deleted file mode 100644
index 49388c70ee5..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkRewriteSubQueryRule.scala
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.logical
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
-import org.apache.calcite.plan.RelOptRule.{any, operandJ}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.{Aggregate, Filter, RelFactories}
-import org.apache.calcite.rex._
-import org.apache.calcite.sql.`type`.SqlTypeFamily
-import org.apache.calcite.sql.SqlKind
-import org.apache.calcite.sql.fun.SqlCountAggFunction
-import org.apache.calcite.tools.RelBuilderFactory
-
-import scala.collection.JavaConversions._
-
-/**
- * Planner rule that rewrites scalar query in filter like: `select * from T1
where (select count(*)
- * from T2) > 0` to `select * from T1 where exists (select * from T2)`, which
could be converted to
- * SEMI join by [[FlinkSubQueryRemoveRule]].
- *
- * Without this rule, the original query will be rewritten to a filter on a
join on an aggregate by
- * [[org.apache.calcite.rel.rules.SubQueryRemoveRule]]. the full logical plan
is
- * {{{
- * LogicalProject(a=[$0], b=[$1], c=[$2])
- * +- LogicalJoin(condition=[$3], joinType=[semi])
- * :- LogicalTableScan(table=[[x]])
- * +- LogicalProject($f0=[IS NOT NULL($0)])
- * +- LogicalAggregate(group=[{}], m=[MIN($0)])
- * +- LogicalProject(i=[true])
- * +- LogicalTableScan(table=[[y]])
- * }}}
- */
-class FlinkRewriteSubQueryRule(
- operand: RelOptRuleOperand,
- relBuilderFactory: RelBuilderFactory,
- description: String)
- extends RelOptRule(operand, relBuilderFactory, description) {
-
- override def onMatch(call: RelOptRuleCall): Unit = {
- val filter: Filter = call.rel(0)
- val condition = filter.getCondition
- val newCondition = rewriteScalarQuery(condition)
- if (condition.equals(newCondition)) {
- return
- }
-
- val newFilter = filter.copy(filter.getTraitSet, filter.getInput,
newCondition)
- call.transformTo(newFilter)
- }
-
- // scalar query like: `(select count(*) from T) > 0` can be converted to
`exists(select * from T)`
- def rewriteScalarQuery(condition: RexNode): RexNode = {
- condition.accept(new RexShuttle() {
- override def visitCall(call: RexCall): RexNode = {
- val subQuery = getSupportedScalarQuery(call)
- subQuery match {
- case Some(sq) =>
- val aggInput = sq.rel.getInput(0)
- RexSubQuery.exists(aggInput)
- case _ => super.visitCall(call)
- }
- }
- })
- }
-
- private def isScalarQuery(n: RexNode): Boolean = n.isA(SqlKind.SCALAR_QUERY)
-
- private def getSupportedScalarQuery(call: RexCall): Option[RexSubQuery] = {
- // check the RexNode is a RexLiteral which's value is between 0 and 1
- def isBetween0And1(n: RexNode, include0: Boolean, include1: Boolean):
Boolean = {
- n match {
- case l: RexLiteral =>
- l.getTypeName.getFamily match {
- case SqlTypeFamily.NUMERIC if l.getValue != null =>
- val v = l.getValue.toString.toDouble
- (0.0 < v && v < 1.0) || (include0 && v == 0.0) || (include1 && v
== 1.0)
- case _ => false
- }
- case _ => false
- }
- }
-
- // check the RelNode is a Aggregate which has only count aggregate call
with empty args
- def isCountStarAggWithoutGroupBy(n: RelNode): Boolean = {
- n match {
- case agg: Aggregate =>
- if (agg.getGroupCount == 0 && agg.getAggCallList.size() == 1) {
- val aggCall = agg.getAggCallList.head
- !aggCall.isDistinct &&
- aggCall.filterArg < 0 &&
- aggCall.getArgList.isEmpty &&
- aggCall.getAggregation.isInstanceOf[SqlCountAggFunction]
- } else {
- false
- }
- case _ => false
- }
- }
-
- call.getKind match {
- // (select count(*) from T) > X (X is between 0 (inclusive) and 1
(exclusive))
- case SqlKind.GREATER_THAN if isScalarQuery(call.operands.head) =>
- val subQuery = call.operands.head.asInstanceOf[RexSubQuery]
- if (
- isCountStarAggWithoutGroupBy(subQuery.rel) &&
- isBetween0And1(call.operands.last, include0 = true, include1 = false)
- ) {
- Some(subQuery)
- } else {
- None
- }
- // (select count(*) from T) >= X (X is between 0 (exclusive) and 1
(inclusive))
- case SqlKind.GREATER_THAN_OR_EQUAL if isScalarQuery(call.operands.head)
=>
- val subQuery = call.operands.head.asInstanceOf[RexSubQuery]
- if (
- isCountStarAggWithoutGroupBy(subQuery.rel) &&
- isBetween0And1(call.operands.last, include0 = false, include1 = true)
- ) {
- Some(subQuery)
- } else {
- None
- }
- // X < (select count(*) from T) (X is between 0 (inclusive) and 1
(exclusive))
- case SqlKind.LESS_THAN if isScalarQuery(call.operands.last) =>
- val subQuery = call.operands.last.asInstanceOf[RexSubQuery]
- if (
- isCountStarAggWithoutGroupBy(subQuery.rel) &&
- isBetween0And1(call.operands.head, include0 = true, include1 = false)
- ) {
- Some(subQuery)
- } else {
- None
- }
- // X <= (select count(*) from T) (X is between 0 (exclusive) and 1
(inclusive))
- case SqlKind.LESS_THAN_OR_EQUAL if isScalarQuery(call.operands.last) =>
- val subQuery = call.operands.last.asInstanceOf[RexSubQuery]
- if (
- isCountStarAggWithoutGroupBy(subQuery.rel) &&
- isBetween0And1(call.operands.head, include0 = false, include1 = true)
- ) {
- Some(subQuery)
- } else {
- None
- }
- case _ => None
- }
- }
-}
-
-object FlinkRewriteSubQueryRule {
-
- val FILTER = new FlinkRewriteSubQueryRule(
- operandJ(classOf[Filter], null, RexUtil.SubQueryFinder.FILTER_PREDICATE,
any),
- RelFactories.LOGICAL_BUILDER,
- "FlinkRewriteSubQueryRule:Filter")
-
-}