This is an automated email from the ASF dual-hosted git repository. jchan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 1d08b8bf6a6 [FLINK-33395][table-planner] Fix the join hint doesn't work when appears in subquery 1d08b8bf6a6 is described below commit 1d08b8bf6a67c6a1874e8003868d5735248a7e45 Author: Xuyang <xyzhong...@163.com> AuthorDate: Wed Nov 15 10:24:40 2023 +0800 [FLINK-33395][table-planner] Fix the join hint doesn't work when appears in subquery This closes #23620 --- .../apache/calcite/sql2rel/RelDecorrelator.java | 23 +-- .../apache/calcite/sql2rel/SqlToRelConverter.java | 27 ++- .../planner/hint/CapitalizeJoinHintsShuttle.java | 67 +++++++ .../ClearJoinHintsOnUnmatchedNodesShuttle.java | 82 ++++++++ ...earJoinHintsWithInvalidPropagationShuttle.java} | 18 +- .../flink/table/planner/hint/FlinkHints.java | 102 +++++----- .../table/planner/hint/JoinHintsRelShuttle.java | 65 ++++++ .../flink/table/planner/hint/JoinStrategy.java | 25 ++- ...oinHintResolver.java => JoinHintsResolver.java} | 17 +- .../optimize/CommonSubGraphBasedOptimizer.scala | 2 +- .../rules/logical/FlinkSubQueryRemoveRule.scala | 5 +- .../table/planner/plan/utils/FlinkRelOptUtil.scala | 223 ++++++++++++++++++++- ...inHintsWithCapitalizeJoinHintsShuttleTest.java} | 9 +- ...oinHintsWithInvalidPropagationShuttleTest.java} | 8 +- ...intsWithInvalidPropagationShuttleTestBase.java} | 6 +- ...oinHintsWithInvalidPropagationShuttleTest.java} | 9 +- .../planner/plan/hints/batch/JoinHintTestBase.java | 11 + .../optimize/ClearQueryBlockAliasResolverTest.java | 2 +- .../plan/optimize/JoinHintResolverTest.java | 4 +- ...oinHintsWithCapitalizeJoinHintsShuttleTest.xml} | 0 ...JoinHintsWithInvalidPropagationShuttleTest.xml} | 0 ...JoinHintsWithInvalidPropagationShuttleTest.xml} | 0 .../plan/hints/batch/BroadcastJoinHintTest.xml | 91 +++++++-- .../plan/hints/batch/NestLoopJoinHintTest.xml | 107 +++++++--- .../plan/hints/batch/ShuffleHashJoinHintTest.xml | 61 +++++- .../plan/hints/batch/ShuffleMergeJoinHintTest.xml | 51 +++++ .../optimize/ClearQueryBlockAliasResolverTest.xml | 20 ++ .../planner/plan/optimize/JoinHintResolverTest.xml | 20 ++ 28 files changed, 871 insertions(+), 184 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java index d750a561236..10c2764a807 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java @@ -16,9 +16,6 @@ */ package org.apache.calcite.sql2rel; -import org.apache.flink.table.planner.alias.ClearJoinHintWithInvalidPropagationShuttle; -import org.apache.flink.table.planner.hint.FlinkHints; - import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -128,9 +125,9 @@ import static org.apache.calcite.linq4j.Nullness.castNonNull; * Copied to fix calcite issues. FLINK modifications are at lines * * <ol> - * <li>Was changed within FLINK-29280, FLINK-28682: Line 222 ~ 232 - * <li>Should be removed after fix of FLINK-29540: Line 298 ~ 304 - * <li>Should be removed after fix of FLINK-29540: Line 316 ~ 322 + * <li>Was changed within FLINK-29280, FLINK-28682: Line 216 ~ 223 + * <li>Should be removed after fix of FLINK-29540: Line 289 ~ 295 + * <li>Should be removed after fix of FLINK-29540: Line 307 ~ 313 * </ol> */ public class RelDecorrelator implements ReflectiveVisitor { @@ -216,18 +213,12 @@ public class RelDecorrelator implements ReflectiveVisitor { newRootRel = decorrelator.decorrelate(newRootRel); } - // Re-propagate the hints. - newRootRel = RelOptUtil.propagateRelHints(newRootRel, true); - // ----- FLINK MODIFICATION BEGIN ----- + // REASON: hints are already parsed and validated before optimizing, so should not + // re-propagate again here - // replace all join hints with upper case - newRootRel = FlinkHints.capitalizeJoinHints(newRootRel); - - // clear join hints which are propagated into wrong query block - // The hint QueryBlockAlias will be added when building a RelNode tree before. It is used to - // distinguish the query block in the SQL. - newRootRel = newRootRel.accept(new ClearJoinHintWithInvalidPropagationShuttle()); + // Re-propagate the hints. + // newRootRel = RelOptUtil.propagateRelHints(newRootRel, true); // ----- FLINK MODIFICATION END ----- diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java index 71cafa91308..ad267335c7f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java @@ -18,10 +18,11 @@ package org.apache.calcite.sql2rel; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.data.TimestampData; -import org.apache.flink.table.planner.alias.ClearJoinHintWithInvalidPropagationShuttle; import org.apache.flink.table.planner.calcite.TimestampSchemaVersion; +import org.apache.flink.table.planner.hint.ClearJoinHintsWithInvalidPropagationShuttle; import org.apache.flink.table.planner.hint.FlinkHints; import org.apache.flink.table.planner.plan.FlinkCalciteCatalogSnapshotReader; +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; import org.apache.flink.table.planner.utils.ShortcutUtils; import com.google.common.base.Preconditions; @@ -235,12 +236,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * <p>FLINK modifications are at lines * * <ol> - * <li>Added in FLINK-29081, FLINK-28682: Lines 644 ~ 654 - * <li>Added in FLINK-28682: Lines 2277 ~ 2294 - * <li>Added in FLINK-28682: Lines 2331 ~ 2359 - * <li>Added in FLINK-20873: Lines 5484 ~ 5493 - * <li>Added in FLINK-32474: Lines 2841 ~ 2853 - * <li>Added in FLINK-32474: Lines 2953 ~ 2987 + * <li>Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 654 ~ 671 + * <li>Added in FLINK-28682: Lines 2294 ~ 2311 + * <li>Added in FLINK-28682: Lines 2348 ~ 2376 + * <li>Added in FLINK-20873: Lines 5489 ~ 5498 + * <li>Added in FLINK-32474: Lines 2846 ~ 2858 + * <li>Added in FLINK-32474: Lines 2958 ~ 2992 * </ol> */ @SuppressWarnings("UnstableApiUsage") @@ -650,10 +651,11 @@ public class SqlToRelConverter { result = result.accept(new NestedJsonFunctionRelRewriter()); } - // propagate the hints. - result = RelOptUtil.propagateRelHints(result, false); - // ----- FLINK MODIFICATION BEGIN ----- + // propagate the hints. + // The method FlinkRelOptUtil#propagateRelHints not only finds and propagates hints + // throughout the entire rel tree but also within subqueries. + result = FlinkRelOptUtil.propagateRelHints(result, false); // replace all join hints with upper case result = FlinkHints.capitalizeJoinHints(result); @@ -661,7 +663,10 @@ public class SqlToRelConverter { // clear join hints which are propagated into wrong query block // The hint QueryBlockAlias will be added when building a RelNode tree before. It is used to // distinguish the query block in the SQL. - result = result.accept(new ClearJoinHintWithInvalidPropagationShuttle()); + result = result.accept(new ClearJoinHintsWithInvalidPropagationShuttle()); + + // clear the hints on some nodes where these hints should not be attached + result = FlinkHints.clearJoinHintsOnUnmatchedNodes(result); // ----- FLINK MODIFICATION END ----- diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/CapitalizeJoinHintsShuttle.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/CapitalizeJoinHintsShuttle.java new file mode 100644 index 00000000000..b243fdb746a --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/CapitalizeJoinHintsShuttle.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.hint; + +import org.apache.calcite.rel.BiRel; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.hint.Hintable; +import org.apache.calcite.rel.hint.RelHint; + +import java.util.List; +import java.util.Locale; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +/** A shuttle to capitalize all join hints on corresponding nodes. */ +public class CapitalizeJoinHintsShuttle extends JoinHintsRelShuttle { + + @Override + protected RelNode visitBiRel(BiRel biRel) { + Hintable hBiRel = (Hintable) biRel; + AtomicBoolean changed = new AtomicBoolean(false); + List<RelHint> hintsWithCapitalJoinHints = + hBiRel.getHints().stream() + .map( + hint -> { + String capitalHintName = hint.hintName.toUpperCase(Locale.ROOT); + if (JoinStrategy.isJoinStrategy(capitalHintName)) { + changed.set(true); + if (JoinStrategy.isLookupHint(hint.hintName)) { + return RelHint.builder(capitalHintName) + .hintOptions(hint.kvOptions) + .inheritPath(hint.inheritPath) + .build(); + } + return RelHint.builder(capitalHintName) + .hintOptions(hint.listOptions) + .inheritPath(hint.inheritPath) + .build(); + } else { + return hint; + } + }) + .collect(Collectors.toList()); + + if (changed.get()) { + return super.visit(hBiRel.withHints(hintsWithCapitalJoinHints)); + } else { + return super.visit(biRel); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearJoinHintsOnUnmatchedNodesShuttle.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearJoinHintsOnUnmatchedNodesShuttle.java new file mode 100644 index 00000000000..925fa257ac8 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearJoinHintsOnUnmatchedNodesShuttle.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.hint; + +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; + +import org.apache.calcite.rel.RelHomogeneousShuttle; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.hint.HintStrategyTable; +import org.apache.calcite.rel.hint.Hintable; +import org.apache.calcite.rel.hint.RelHint; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.table.planner.hint.FlinkHints.resolveSubQuery; + +/** + * Clear the invalid join hints in the unmatched nodes. For example, a join hint may be attached in + * the Project node at first. After accepting this shuttle, the join hint in the Project node will + * be cleared. + * + * <p>See more at {@link FlinkHintStrategies}. + * + * <p>Tips, hints about view and alias will not be cleared. + */ +public class ClearJoinHintsOnUnmatchedNodesShuttle extends RelHomogeneousShuttle { + private final HintStrategyTable hintStrategyTable; + + public ClearJoinHintsOnUnmatchedNodesShuttle(HintStrategyTable hintStrategyTable) { + this.hintStrategyTable = hintStrategyTable; + } + + @Override + public RelNode visit(RelNode other) { + if (FlinkRelOptUtil.containsSubQuery(other)) { + other = resolveSubQuery(other, relNode -> relNode.accept(this)); + } + + if (other instanceof Hintable) { + List<RelHint> originHints = ((Hintable) other).getHints(); + // 1. classify the hints and separate out the join hints + List<RelHint> joinHints = + originHints.stream() + .filter(h -> JoinStrategy.isJoinStrategy(h.hintName)) + .collect(Collectors.toList()); + + List<RelHint> remainHints = new ArrayList<>(originHints); + remainHints.removeAll(joinHints); + + // 2. use hintStrategyTable#apply to determine whether the join hint can be attached + // to the current node + // If it cannot be attached, it means that the join hint on the current node needs to + // be removed. + List<RelHint> hintsCanApply = hintStrategyTable.apply(joinHints, other); + if (hintsCanApply.size() != joinHints.size()) { + hintsCanApply.addAll(remainHints); + // As a result, the remaining hints will be attached. + other = ((Hintable) other).withHints(hintsCanApply); + } + } + + return super.visit(other); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttle.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttle.java similarity index 93% rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttle.java rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttle.java index a37ad91496b..2be55cae8d0 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttle.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttle.java @@ -16,10 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.planner.alias; - -import org.apache.flink.table.planner.hint.FlinkHints; -import org.apache.flink.table.planner.hint.JoinStrategy; +package org.apache.flink.table.planner.hint; import org.apache.calcite.rel.BiRel; import org.apache.calcite.rel.RelNode; @@ -48,19 +45,10 @@ import java.util.stream.Collectors; * <p>TODO some node will be attached join hints when parse SqlNode to RelNode such as Project and * etc. The join hints on these node can also be cleared. */ -public class ClearJoinHintWithInvalidPropagationShuttle extends RelShuttleImpl { - - @Override - public RelNode visit(LogicalJoin join) { - return visitBiRel(join); - } +public class ClearJoinHintsWithInvalidPropagationShuttle extends JoinHintsRelShuttle { @Override - public RelNode visit(LogicalCorrelate correlate) { - return visitBiRel(correlate); - } - - private RelNode visitBiRel(BiRel biRel) { + protected RelNode visitBiRel(BiRel biRel) { List<RelHint> hints = ((Hintable) biRel).getHints(); Set<String> allHintNames = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java index d49cdad635e..9154baa8295 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java @@ -23,25 +23,24 @@ import org.apache.flink.table.planner.plan.rules.logical.WrapJsonAggFunctionArgu import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase; import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.rel.BiRel; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelShuttleImpl; import org.apache.calcite.rel.hint.Hintable; import org.apache.calcite.rel.hint.RelHint; -import org.apache.calcite.rel.logical.LogicalCorrelate; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalSnapshot; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.rex.RexSubQuery; import org.apache.commons.lang3.StringUtils; import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.stream.Collectors; /** Utility class for Flink hints. */ @@ -175,7 +174,7 @@ public abstract class FlinkHints { * Get all query block alias hints. * * <p>Because query block alias hints will be propagated from root to leaves, so maybe one node - * will contain multi alias hints. But only the first one is the really query block name where + * will contain multi alias hints. But only the first one is the real query block name where * this node is. */ public static List<RelHint> getQueryBlockAliasHints(List<RelHint> allHints) { @@ -185,53 +184,60 @@ public abstract class FlinkHints { } public static RelNode capitalizeJoinHints(RelNode root) { - return root.accept(new CapitalizeJoinHintShuttle()); + return root.accept(new CapitalizeJoinHintsShuttle()); } - private static class CapitalizeJoinHintShuttle extends RelShuttleImpl { - - @Override - public RelNode visit(LogicalCorrelate correlate) { - return visitBiRel(correlate); - } + /** Resolve the RelNode of the sub query in the node and return a new node. */ + public static RelNode resolveSubQuery(RelNode node, Function<RelNode, RelNode> resolver) { + if (node instanceof LogicalProject) { + LogicalProject project = (LogicalProject) node; + List<RexNode> newProjects = + project.getProjects().stream() + .map(p -> resolveSubQuery(p, resolver)) + .collect(Collectors.toList()); + return project.copy( + project.getTraitSet(), project.getInput(), newProjects, project.getRowType()); + + } else if (node instanceof LogicalFilter) { + LogicalFilter filter = (LogicalFilter) node; + RexNode newCondition = resolveSubQuery(filter.getCondition(), resolver); + return filter.copy(filter.getTraitSet(), filter.getInput(), newCondition); + + } else if (node instanceof LogicalJoin) { + LogicalJoin join = (LogicalJoin) node; + RexNode newCondition = resolveSubQuery(join.getCondition(), resolver); + return join.copy( + join.getTraitSet(), + newCondition, + join.getLeft(), + join.getRight(), + join.getJoinType(), + join.isSemiJoinDone()); - @Override - public RelNode visit(LogicalJoin join) { - return visitBiRel(join); + } else { + return node; } + } - private RelNode visitBiRel(BiRel biRel) { - Hintable hBiRel = (Hintable) biRel; - AtomicBoolean changed = new AtomicBoolean(false); - List<RelHint> hintsWithCapitalJoinHints = - hBiRel.getHints().stream() - .map( - hint -> { - String capitalHintName = - hint.hintName.toUpperCase(Locale.ROOT); - if (JoinStrategy.isJoinStrategy(capitalHintName)) { - changed.set(true); - if (JoinStrategy.isLookupHint(hint.hintName)) { - return RelHint.builder(capitalHintName) - .hintOptions(hint.kvOptions) - .inheritPath(hint.inheritPath) - .build(); - } - return RelHint.builder(capitalHintName) - .hintOptions(hint.listOptions) - .inheritPath(hint.inheritPath) - .build(); - } else { - return hint; - } - }) - .collect(Collectors.toList()); + /** Resolve the RelNode of the sub query in conditions. */ + private static RexNode resolveSubQuery(RexNode rexNode, Function<RelNode, RelNode> resolver) { + return rexNode.accept( + new RexShuttle() { + @Override + public RexNode visitSubQuery(RexSubQuery subQuery) { + RelNode oldRel = subQuery.rel; + RelNode newRel = resolver.apply(oldRel); + if (oldRel != newRel) { + return super.visitSubQuery(subQuery.clone(newRel)); + } + return subQuery; + } + }); + } - if (changed.get()) { - return super.visit(hBiRel.withHints(hintsWithCapitalJoinHints)); - } else { - return super.visit(biRel); - } - } + /** Clear the join hints on some nodes where these hints should not be attached. */ + public static RelNode clearJoinHintsOnUnmatchedNodes(RelNode root) { + return root.accept( + new ClearJoinHintsOnUnmatchedNodesShuttle(root.getCluster().getHintStrategies())); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinHintsRelShuttle.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinHintsRelShuttle.java new file mode 100644 index 00000000000..73adc7040c0 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinHintsRelShuttle.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.hint; + +import org.apache.calcite.rel.BiRel; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelShuttleImpl; +import org.apache.calcite.rel.logical.LogicalCorrelate; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.logical.LogicalProject; + +import static org.apache.flink.table.planner.hint.FlinkHints.resolveSubQuery; +import static org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil.containsSubQuery; + +/** An abstract shuttle for each shuttle used for join hint. */ +public abstract class JoinHintsRelShuttle extends RelShuttleImpl { + + @Override + public RelNode visit(LogicalJoin join) { + if (containsSubQuery(join)) { + join = (LogicalJoin) resolveSubQuery(join, relNode -> relNode.accept(this)); + } + return visitBiRel(join); + } + + @Override + public RelNode visit(LogicalCorrelate correlate) { + return visitBiRel(correlate); + } + + protected abstract RelNode visitBiRel(BiRel biRel); + + @Override + public RelNode visit(LogicalFilter filter) { + if (containsSubQuery(filter)) { + filter = (LogicalFilter) resolveSubQuery(filter, relNode -> relNode.accept(this)); + } + return super.visit(filter); + } + + @Override + public RelNode visit(LogicalProject project) { + if (containsSubQuery(project)) { + project = (LogicalProject) resolveSubQuery(project, relNode -> relNode.accept(this)); + } + return super.visit(project); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java index f48c281168c..6dcdb8b755b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java @@ -21,7 +21,30 @@ package org.apache.flink.table.planner.hint; import java.util.List; import java.util.Locale; -/** Currently available join strategies and corresponding join hint names. */ +/** + * Currently available join strategies and corresponding join hint names. + * + * <p>The process for handling join hints is as follows: + * + * <ol> + * <li>Resolve join hint propagation: + * <ol> + * <li>The join hints are resolved using Calcite's functionality to propagate them from the + * sink to the source and within sub-queries + * <li>Capitalize join hints: All join hints are capitalized to ensure consistency, as they + * are expected to be in uppercase. + * <li>Clear incorrectly propagated join hints: Any join hints that have been mistakenly + * propagated into the query block are cleared. + * <li>Clear join hints from unmatched nodes: Join hints attached to unmatched nodes, such + * as {@link org.apache.calcite.rel.core.Project}, are also cleared. + * </ol> + * <li>Validate and modify join hints: The join hints are validated, and the table names in the + * hints are replaced with LEFT or RIGHT to indicate the join input ordinal. + * <li>Clear query block aliases: The query block aliases are cleared from the sink to the source. + * <li>Consume join hints in applicable rules: Finally, the join hints are consumed in specific + * rules where they are relevant. + * </ol> + */ public enum JoinStrategy { /** * Instructs the optimizer to use broadcast hash join strategy. If both sides are specified in diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintsResolver.java similarity index 96% rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintsResolver.java index 36af173b1ae..ae12b8979ff 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintsResolver.java @@ -21,16 +21,14 @@ package org.apache.flink.table.planner.plan.optimize; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.planner.hint.FlinkHints; +import org.apache.flink.table.planner.hint.JoinHintsRelShuttle; import org.apache.flink.table.planner.hint.JoinStrategy; import org.apache.calcite.rel.BiRel; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelShuttleImpl; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.hint.Hintable; import org.apache.calcite.rel.hint.RelHint; -import org.apache.calcite.rel.logical.LogicalCorrelate; -import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; @@ -52,7 +50,7 @@ import static org.apache.flink.table.planner.hint.LookupJoinHintOptions.LOOKUP_T * * <p>Note: duplicate join hints are not checked here. */ -public class JoinHintResolver extends RelShuttleImpl { +public class JoinHintsResolver extends JoinHintsRelShuttle { private final Set<RelHint> allHints = new HashSet<>(); private final Set<RelHint> validHints = new HashSet<>(); @@ -79,16 +77,7 @@ public class JoinHintResolver extends RelShuttleImpl { } @Override - public RelNode visit(LogicalJoin join) { - return visitBiRel(join); - } - - @Override - public RelNode visit(LogicalCorrelate correlate) { - return visitBiRel(correlate); - } - - private RelNode visitBiRel(BiRel biRel) { + protected RelNode visitBiRel(BiRel biRel) { Optional<String> leftName = extractAliasOrTableName(biRel.getLeft()); Optional<String> rightName = extractAliasOrTableName(biRel.getRight()); diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/CommonSubGraphBasedOptimizer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/CommonSubGraphBasedOptimizer.scala index f36a057de62..db04423e485 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/CommonSubGraphBasedOptimizer.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/CommonSubGraphBasedOptimizer.scala @@ -77,7 +77,7 @@ abstract class CommonSubGraphBasedOptimizer extends Optimizer { */ override def optimize(roots: Seq[RelNode]): Seq[RelNode] = { // resolve hints before optimizing - val joinHintResolver = new JoinHintResolver() + val joinHintResolver = new JoinHintsResolver() val resolvedHintRoots = joinHintResolver.resolve(toJava(roots)) // clear query block alias bef optimizing diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala index 518afbf7df7..d054b8f5996 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala @@ -17,9 +17,8 @@ */ package org.apache.flink.table.planner.plan.rules.logical -import org.apache.flink.table.planner.alias.ClearJoinHintWithInvalidPropagationShuttle import org.apache.flink.table.planner.calcite.{FlinkRelBuilder, FlinkRelFactories} -import org.apache.flink.table.planner.hint.FlinkHints +import org.apache.flink.table.planner.hint.{ClearJoinHintsWithInvalidPropagationShuttle, FlinkHints} import com.google.common.collect.ImmutableList import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelOptUtil} @@ -101,7 +100,7 @@ class FlinkSubQueryRemoveRule( val nodeWithHint = RelOptUtil.propagateRelHints(newNode, false) val nodeWithCapitalizedJoinHints = FlinkHints.capitalizeJoinHints(nodeWithHint) val finalNode = - nodeWithCapitalizedJoinHints.accept(new ClearJoinHintWithInvalidPropagationShuttle) + nodeWithCapitalizedJoinHints.accept(new ClearJoinHintsWithInvalidPropagationShuttle) call.transformTo(finalNode) case _ => // do nothing } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala index e12cf05246c..5900da92f6a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala @@ -20,22 +20,28 @@ package org.apache.flink.table.planner.plan.utils import org.apache.flink.table.planner.JBoolean import org.apache.flink.table.planner.analyze.PlanAdvice import org.apache.flink.table.planner.calcite.{FlinkPlannerImpl, FlinkTypeFactory} +import org.apache.flink.table.planner.hint.FlinkHints import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval, MiniBatchMode} import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig import org.apache.calcite.config.NullCollation import org.apache.calcite.plan.RelOptUtil -import org.apache.calcite.rel.{RelFieldCollation, RelNode} +import org.apache.calcite.rel.{RelFieldCollation, RelHomogeneousShuttle, RelNode, RelShuttle} import org.apache.calcite.rel.RelFieldCollation.{Direction, NullDirection} +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.hint.{Hintable, HintStrategyTable, RelHint} +import org.apache.calcite.rel.logical.{LogicalFilter, LogicalJoin, LogicalProject} import org.apache.calcite.rex.{RexBuilder, RexCall, RexInputRef, RexLiteral, RexNode, RexUtil, RexVisitorImpl} import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.SqlExplainLevel import org.apache.calcite.sql.SqlKind._ +import org.apache.calcite.util.Pair import org.apache.commons.math3.util.ArithmeticUtils import java.io.{PrintWriter, StringWriter} import java.math.BigDecimal import java.sql.{Date, Time, Timestamp} +import java.util import java.util.Calendar import scala.collection.JavaConversions._ @@ -524,4 +530,219 @@ object FlinkRelOptUtil { } } } + + // ----- The following is mainly copied from RelOptUtil ----- + // ----- Copied Line: 537 ~ 743 ----- + // ----- Modified Line: 642 ~ 662 ----- + /** + * Propagates the relational expression hints from root node to leaf node. + * + * @param rel + * The relational expression + * @param reset + * Flag saying if to reset the existing hints before the propagation + * @return + * New relational expression with hints propagated + */ + def propagateRelHints(rel: RelNode, reset: Boolean): RelNode = { + val node = if (reset) { + rel.accept(new ResetHintsShuttle) + } else { + rel + } + val shuttle = new RelHintPropagateShuttle(node.getCluster.getHintStrategies) + node.accept(shuttle) + } + + /** + * A [[RelShuttle]] which resets all the hints of a relational expression to what they are + * originally like. + * + * <p>This would trigger a reverse transformation of what [[RelHintPropagateShuttle]] does. + * + * <p>Transformation rules: + * + * <ul> <li>Project: remove the hints that have non-empty inherit path (which means the hint was + * not originally declared from it); <li>Aggregate: remove the hints that have non-empty inherit + * path; <li>Join: remove all the hints; <li>TableScan: remove the hints that have non-empty + * inherit path. </ul> + */ + private class ResetHintsShuttle extends RelHomogeneousShuttle { + override def visit(node: RelNode): RelNode = { + var finalNode = visitChildren(node) + if (node.isInstanceOf[Hintable]) { + finalNode = ResetHintsShuttle.resetHints(finalNode.asInstanceOf[Hintable]) + } + finalNode + } + } + + private object ResetHintsShuttle { + private def resetHints(hintable: Hintable): RelNode = if (hintable.getHints.size > 0) { + val resetHints: util.List[RelHint] = hintable.getHints + .filter((hint: RelHint) => hint.inheritPath.size == 0) + .toList + hintable.withHints(resetHints) + } else { + hintable.asInstanceOf[RelNode] + } + } + + /** + * A [[RelShuttle]] which propagates all the hints of relational expression to their children + * nodes. + * + * <p>Given a plan: + * + * {{{ + * Filter (Hint1) + * | + * Join + * / \ + * Scan Project (Hint2) + * | + * Scan2 + * }}} + * + * <p>Every hint has a [[inheritPath]] (integers list) which records its propagate path, number + * `0` represents the hint is propagated from the first(left) child, number `1` represents the + * hint is propagated from the second(right) child, so the plan would have hints path as follows + * (assumes each hint can be propagated to all child nodes): + * + * <ul> <li>Filter would have hints {Hint1[]}</li> <li>Join would have hints {Hint1[0]}</li> + * <li>Scan would have hints {Hint1[0, 0]}</li> <li>Project would have hints {Hint1[0,1], + * Hint2[]}</li> <li>Scan2 would have hints {[Hint1[0, 1, 0], Hint2[0]}</li> </ul> + */ + private class RelHintPropagateShuttle private[plan] ( + /** The hint strategies to decide if a hint should be attached to a relational expression. */ + val hintStrategies: HintStrategyTable) + extends RelHomogeneousShuttle { + + /** Stack recording the hints and its current inheritPath. */ + final private val inheritPaths = + new util.ArrayDeque[Pair[util.List[RelHint], util.Deque[Integer]]] + + /** Visits a particular child of a parent. */ + override protected def visitChild(parent: RelNode, i: Int, child: RelNode): RelNode = { + inheritPaths.forEach( + (inheritPath: Pair[util.List[RelHint], util.Deque[Integer]]) => inheritPath.right.push(i)) + try { + val child2 = child.accept(this) + if (child2 ne child) { + val newInputs = new util.ArrayList[RelNode](parent.getInputs) + newInputs.set(i, child2) + return parent.copy(parent.getTraitSet, newInputs) + } + parent + } finally + inheritPaths.forEach( + (inheritPath: Pair[util.List[RelHint], util.Deque[Integer]]) => inheritPath.right.pop) + } + + // FLINK MODIFICATION BEGIN + // let hints propagating in sub query + + override def visit(other: RelNode): RelNode = { + val node = tryToPropagateHintsInSubQuery(other) + if (node.isInstanceOf[Hintable]) { + visitHintable(node) + } else { + visitChildren(node) + } + } + + private def tryToPropagateHintsInSubQuery(node: RelNode): RelNode = { + if (containsSubQuery(node)) { + FlinkHints.resolveSubQuery(node, relNode => relNode.accept(this)) + } else { + node + } + } + + // FLINK MODIFICATION END + + /** + * Handle the [[Hintable]]s. + * + * <p>There are two cases to handle hints: + * + * <ul> <li>For TableScan: table scan is always a leaf node, attach the hints of the propagation + * path directly;</li> <li>For other [[Hintable]]s: if the node has hints itself, that means, + * these hints are query hints that need to propagate to its children, so we do these things: + * <ol> <li>push the hints with empty inheritPath to the stack</li> <li>visit the children nodes + * and propagate the hints</li> <li>pop the hints pushed in step1</li> <li>attach the hints of + * the propagation path</li> </ol> if the node does not have hints, attach the hints of the + * propagation path directly. </li> </ul> + * + * @param node + * [[Hintable]] to handle + * @return + * New copy of the [[Hintable]] with propagated hints attached + */ + private def visitHintable(node: RelNode) = { + val topHints = node.asInstanceOf[Hintable].getHints + val hasHints = topHints != null && topHints.size > 0 + val hasQueryHints = hasHints && !node.isInstanceOf[TableScan] + if (hasQueryHints) inheritPaths.push(Pair.of(topHints, new util.ArrayDeque[Integer])) + val node1 = visitChildren(node) + if (hasQueryHints) inheritPaths.pop + attachHints(node1) + } + + private def attachHints(original: RelNode): RelNode = { + assert(original.isInstanceOf[Hintable]) + if (inheritPaths.size > 0) { + val hints = inheritPaths.toList + .sorted( + ( + o1: Pair[util.List[RelHint], util.Deque[Integer]], + o2: Pair[util.List[RelHint], util.Deque[Integer]]) => { + Integer.compare(o1.right.size, o2.right.size) + }) + .map( + (path: Pair[util.List[RelHint], util.Deque[Integer]]) => + RelHintPropagateShuttle + .copyWithInheritPath(path.left, path.right)) + .foldLeft(new util.ArrayList[RelHint]())( + (acc, hints1) => { + acc.addAll(hints1) + acc + }) + val filteredHints = hintStrategies.apply(hints, original) + if (filteredHints.size > 0) { + return original.asInstanceOf[Hintable].attachHints(filteredHints) + } + } + original + } + } + + private object RelHintPropagateShuttle { + private def copyWithInheritPath( + hints: util.List[RelHint], + inheritPath: util.Deque[Integer]): util.List[RelHint] = { + // Copy the Dequeue in reverse order. + val path = new util.ArrayList[Integer] + val iterator = inheritPath.descendingIterator + while (iterator.hasNext) { + path.add(iterator.next) + } + hints.map((hint: RelHint) => hint.copy(path)).toList + } + } + + // ----- Copied from RelOptUtil end ----- + + /** Check if the node contains sub query. */ + def containsSubQuery(node: RelNode): Boolean = node match { + // the all types of nodes that contain sub query can be found in + // RexUtil.SubQueryFinder#containsSubQuery + case project: LogicalProject => + RexUtil.SubQueryFinder.containsSubQuery(project) + case filter: LogicalFilter => + RexUtil.SubQueryFinder.containsSubQuery(filter) + case join: LogicalJoin => + RexUtil.SubQueryFinder.containsSubQuery(join) + case _ => false + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithCapitalizeJoinHintShuttleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithCapitalizeJoinHintsShuttleTest.java similarity index 96% rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithCapitalizeJoinHintShuttleTest.java rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithCapitalizeJoinHintsShuttleTest.java index 86b0157502a..d53d40c1511 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithCapitalizeJoinHintShuttleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithCapitalizeJoinHintsShuttleTest.java @@ -16,11 +16,10 @@ * limitations under the License. */ -package org.apache.flink.table.planner.alias; +package org.apache.flink.table.planner.hint; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; -import org.apache.flink.table.planner.hint.FlinkHints; import org.apache.flink.table.planner.plan.nodes.exec.spec.LookupJoinHintTestUtil; import org.apache.flink.table.planner.utils.TableTestUtil; @@ -37,8 +36,8 @@ import org.junit.jupiter.api.Test; import java.util.Collections; /** Tests clearing lookup join hint case-insensitive in stream. */ -class ClearJoinHintWithCapitalizeJoinHintShuttleTest - extends ClearJoinHintWithInvalidPropagationShuttleTestBase { +class ClearJoinHintsWithCapitalizeJoinHintsShuttleTest + extends ClearJoinHintsWithInvalidPropagationShuttleTestBase { @Override TableTestUtil getTableTestUtil() { @@ -143,7 +142,7 @@ class ClearJoinHintWithCapitalizeJoinHintShuttleTest util.assertEqualsOrExpand("afterCapitalizeJoinHints", plan, true); RelNode rootAfterClearingJoinHintWithInvalidPropagation = - rootAfterHintCapitalize.accept(new ClearJoinHintWithInvalidPropagationShuttle()); + rootAfterHintCapitalize.accept(new ClearJoinHintsWithInvalidPropagationShuttle()); plan = buildRelPlanWithQueryBlockAlias(rootAfterClearingJoinHintWithInvalidPropagation); util.assertEqualsOrExpand("afterClearingJoinHints", plan, false); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttleTest.java similarity index 95% rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTest.java rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttleTest.java index d4ebef36706..ececf542daa 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttleTest.java @@ -16,11 +16,9 @@ * limitations under the License. */ -package org.apache.flink.table.planner.alias; +package org.apache.flink.table.planner.hint; import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.planner.hint.FlinkHints; -import org.apache.flink.table.planner.hint.JoinStrategy; import org.apache.flink.table.planner.utils.TableTestUtil; import org.apache.calcite.rel.RelNode; @@ -29,8 +27,8 @@ import org.apache.calcite.rel.hint.RelHint; import org.junit.jupiter.api.Test; /** Tests clearing join hint with invalid propagation in batch. */ -class ClearJoinHintWithInvalidPropagationShuttleTest - extends ClearJoinHintWithInvalidPropagationShuttleTestBase { +class ClearJoinHintsWithInvalidPropagationShuttleTest + extends ClearJoinHintsWithInvalidPropagationShuttleTestBase { @Override TableTestUtil getTableTestUtil() { return batchTestUtil(TableConfig.getDefault()); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttleTestBase.java similarity index 96% rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTestBase.java rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttleTestBase.java index 745becc26ff..bb3afaf4698 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttleTestBase.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.planner.alias; +package org.apache.flink.table.planner.hint; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.Configuration; @@ -39,7 +39,7 @@ import org.junit.jupiter.api.BeforeEach; import java.util.Collections; /** A base class for testing clearing join hint with invalid propagation. */ -abstract class ClearJoinHintWithInvalidPropagationShuttleTestBase extends TableTestBase { +abstract class ClearJoinHintsWithInvalidPropagationShuttleTestBase extends TableTestBase { protected final TableTestUtil util = getTableTestUtil(); @@ -121,7 +121,7 @@ abstract class ClearJoinHintWithInvalidPropagationShuttleTestBase extends TableT util.assertEqualsOrExpand("afterPropagatingHints", plan, true); RelNode rootAfterClearingJoinHintWithInvalidPropagation = - rootAfterHintPropagation.accept(new ClearJoinHintWithInvalidPropagationShuttle()); + rootAfterHintPropagation.accept(new ClearJoinHintsWithInvalidPropagationShuttle()); plan = buildRelPlanWithQueryBlockAlias(rootAfterClearingJoinHintWithInvalidPropagation); util.assertEqualsOrExpand("afterClearingJoinHints", plan, false); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearLookupJoinHintsWithInvalidPropagationShuttleTest.java similarity index 97% rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.java rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearLookupJoinHintsWithInvalidPropagationShuttleTest.java index 0e89a4aa2b2..cc4887a82e6 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearLookupJoinHintsWithInvalidPropagationShuttleTest.java @@ -16,14 +16,13 @@ * limitations under the License. */ -package org.apache.flink.table.planner.alias; +package org.apache.flink.table.planner.hint; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; -import org.apache.flink.table.planner.hint.FlinkHints; import org.apache.flink.table.planner.plan.nodes.exec.spec.LookupJoinHintTestUtil; import org.apache.flink.table.planner.utils.TableTestUtil; @@ -44,8 +43,8 @@ import org.junit.jupiter.api.Test; import java.util.Collections; /** Tests clearing lookup join hint with invalid propagation in stream. */ -class ClearLookupJoinHintWithInvalidPropagationShuttleTest - extends ClearJoinHintWithInvalidPropagationShuttleTestBase { +class ClearLookupJoinHintsWithInvalidPropagationShuttleTest + extends ClearJoinHintsWithInvalidPropagationShuttleTestBase { @Override TableTestUtil getTableTestUtil() { return streamTestUtil(TableConfig.getDefault()); @@ -79,7 +78,7 @@ class ClearLookupJoinHintWithInvalidPropagationShuttleTest util.tableEnv() .createTemporarySystemFunction( "MockOffset", - new ClearLookupJoinHintWithInvalidPropagationShuttleTest + new ClearLookupJoinHintsWithInvalidPropagationShuttleTest .MockOffsetTableFunction()); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java index eada3bb3881..1aed3612df1 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java @@ -51,6 +51,9 @@ import static scala.runtime.BoxedUnit.UNIT; * A test base for join hint. * * <p>TODO add test to cover legacy table source. + * + * <p>Notice: Join hints in sub-query will not be printed in AST, because {@code RexSubQuery} use + * 'RelOptUtil.toString(rel)' to print node and doesn't print hints about {@code LogicalJoin}. */ public abstract class JoinHintTestBase extends TableTestBase { @@ -871,6 +874,14 @@ public abstract class JoinHintTestBase extends TableTestBase { verifyRelPlanByCustom(String.format(sql, buildCaseSensitiveStr(getTestSingleJoinHint()))); } + @Test + public void testJoinHintWithJoinHintInNestedCorrelatedSubQuery() { + String sql = + "select * from T1 WHERE a1 IN (select /*+ %s(T2) */ a2 + T1.a1 from T2 join (select T3.* from T2 join T3 on T2.a2 = T3.a3) T3 on T2.a2 = T3.a3)"; + + verifyRelPlanByCustom(String.format(sql, buildCaseSensitiveStr(getTestSingleJoinHint()))); + } + protected String buildAstPlanWithQueryBlockAlias(List<RelNode> relNodes) { StringBuilder astBuilder = new StringBuilder(); relNodes.forEach( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.java index 873604a3b0b..bfbcf84dcc9 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.java @@ -95,7 +95,7 @@ class ClearQueryBlockAliasResolverTest extends JoinHintTestBase { } private List<RelNode> clearQueryBlockAlias(List<RelNode> relNodes) { - JoinHintResolver joinHintResolver = new JoinHintResolver(); + JoinHintsResolver joinHintResolver = new JoinHintsResolver(); relNodes = joinHintResolver.resolve(relNodes); ClearQueryBlockAliasResolver clearQueryBlockAliasResolver = new ClearQueryBlockAliasResolver(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.java index 2789b57d4ff..6ea20736c22 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.java @@ -32,7 +32,7 @@ import java.util.Collections; import java.util.List; import java.util.stream.Collectors; -/** A test class for {@link JoinHintResolver}. */ +/** A test class for {@link JoinHintsResolver}. */ public class JoinHintResolverTest extends JoinHintTestBase { // use any join hint for test @@ -95,7 +95,7 @@ public class JoinHintResolverTest extends JoinHintTestBase { } private List<RelNode> resolveJoinHint(List<RelNode> relNodes) { - JoinHintResolver joinHintResolver = new JoinHintResolver(); + JoinHintsResolver joinHintResolver = new JoinHintsResolver(); return joinHintResolver.resolve(relNodes); } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/alias/ClearJoinHintWithCapitalizeJoinHintShuttleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearJoinHintsWithCapitalizeJoinHintsShuttleTest.xml similarity index 100% rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/alias/ClearJoinHintWithCapitalizeJoinHintShuttleTest.xml rename to flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearJoinHintsWithCapitalizeJoinHintsShuttleTest.xml diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttleTest.xml similarity index 100% rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTest.xml rename to flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttleTest.xml diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearLookupJoinHintsWithInvalidPropagationShuttleTest.xml similarity index 100% rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.xml rename to flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearLookupJoinHintsWithInvalidPropagationShuttleTest.xml diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml index ae99a9546b2..2db1b869bc8 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml @@ -647,10 +647,10 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, EXPR$0)], select=[a1, b1], build= +- HashAggregate(isMerge=[true], groupBy=[a1], select=[a1, Final_COUNT(count$0) AS EXPR$0]) +- Exchange(distribution=[hash[a1]]) +- LocalHashAggregate(groupBy=[a1], select=[a1, Partial_COUNT(a2) AS count$0]) - +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1], isBroadcast=[true], build=[right]) - :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) - +- Exchange(distribution=[broadcast]) - +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS options:[T1]]]]) + +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1], isBroadcast=[true], build=[left]) + :- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) + +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS options:[T1]]]]) ]]> </Resource> </TestCase> @@ -678,10 +678,10 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[rig : +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1]) +- Exchange(distribution=[hash[a2]]) +- Calc(select=[a2]) - +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], isBroadcast=[true], build=[right]) - :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) - +- Exchange(distribution=[broadcast]) - +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]]) + +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], isBroadcast=[true], build=[left]) + :- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) + +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]]) ]]> </Resource> </TestCase> @@ -715,10 +715,10 @@ Calc(select=[a1, b1]) +- Calc(select=[+(a2, a1) AS EXPR$0, a1]) +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a1, +(a2, a1))], select=[a2, a1], build=[right]) :- Calc(select=[a2]) - : +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], isBroadcast=[true], build=[right]) - : :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) - : +- Exchange(distribution=[broadcast]) - : +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]]) + : +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], isBroadcast=[true], build=[left]) + : :- Exchange(distribution=[broadcast]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]]) +- Exchange(distribution=[broadcast]) +- HashAggregate(isMerge=[true], groupBy=[a1], select=[a1]) +- Exchange(distribution=[hash[a1]]) @@ -776,10 +776,61 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], isBroadcas +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], global=[true]) +- Exchange(distribution=[single]) +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], global=[false]) - +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1], isBroadcast=[true], build=[right]) - :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) - +- Exchange(distribution=[broadcast]) - +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS options:[T1]]]]) + +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1], isBroadcast=[true], build=[left]) + :- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) + +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS options:[T1]]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testJoinHintWithJoinHintInNestedCorrelatedSubQuery"> + <Resource name="sql"> + <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ a2 + T1.a1 from T2 join (select T3.* from T2 join T3 on T2.a2 = T3.a3) T3 on T2.a2 = T3.a3)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1]) ++- LogicalFilter(condition=[IN($0, { +LogicalProject(EXPR$0=[+($0, $cor0.a1)]) + LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]) + LogicalProject(a3=[$2], b3=[$3]) + LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]) + LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]]) +})], variablesSet=[[$cor0]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a1, b1]) ++- HashJoin(joinType=[InnerJoin], where=[=(a1, a10)], select=[a1, b1, a10], build=[left]) + :- Exchange(distribution=[hash[a1]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1, b1], metadata=[]]], fields=[a1, b1]) + +- Exchange(distribution=[hash[a1]]) + +- Calc(select=[a1]) + +- HashAggregate(isMerge=[true], groupBy=[EXPR$0, a1], select=[EXPR$0, a1]) + +- Exchange(distribution=[hash[EXPR$0, a1]]) + +- LocalHashAggregate(groupBy=[EXPR$0, a1], select=[EXPR$0, a1]) + +- Calc(select=[+(a2, a1) AS EXPR$0, a1]) + +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a1, +(a2, a1))], select=[a2, a1], build=[right]) + :- Calc(select=[a2]) + : +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], isBroadcast=[true], build=[left]) + : :- Exchange(distribution=[broadcast]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) + : +- Calc(select=[a3]) + : +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[right]) + : :- Exchange(distribution=[hash[a2]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) + : +- Exchange(distribution=[hash[a3]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]]) + +- Exchange(distribution=[broadcast]) + +- HashAggregate(isMerge=[true], groupBy=[a1], select=[a1]) + +- Exchange(distribution=[hash[a1]]) + +- LocalHashAggregate(groupBy=[a1], select=[a1]) + +- Calc(select=[a1]) + +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1, b1], metadata=[]]], fields=[a1, b1]) ]]> </Resource> </TestCase> @@ -806,10 +857,10 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[rig : +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1]) +- Exchange(distribution=[hash[a2]]) +- Calc(select=[a2]) - +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], isBroadcast=[true], build=[right]) - :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) - +- Exchange(distribution=[broadcast]) - +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]]) + +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], isBroadcast=[true], build=[left]) + :- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) + +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml index ed23d52758d..0d451400b57 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml @@ -646,10 +646,10 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, EXPR$0)], select=[a1, b1], build= +- HashAggregate(isMerge=[true], groupBy=[a1], select=[a1, Final_COUNT(count$0) AS EXPR$0]) +- Exchange(distribution=[hash[a1]]) +- LocalHashAggregate(groupBy=[a1], select=[a1, Partial_COUNT(a2) AS count$0]) - +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1], build=[right]) - :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) - +- Exchange(distribution=[broadcast]) - +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS options:[T1]]]]) + +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1], build=[left]) + :- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) + +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS options:[T1]]]]) ]]> </Resource> </TestCase> @@ -677,10 +677,10 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[rig : +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1]) +- Exchange(distribution=[hash[a2]]) +- Calc(select=[a2]) - +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[right]) - :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) - +- Exchange(distribution=[broadcast]) - +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]]) + +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[left]) + :- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) + +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]]) ]]> </Resource> </TestCase> @@ -712,18 +712,18 @@ Calc(select=[a1, b1]) +- Exchange(distribution=[hash[EXPR$0, a1]]) +- LocalHashAggregate(groupBy=[EXPR$0, a1], select=[EXPR$0, a1]) +- Calc(select=[+(a2, a1) AS EXPR$0, a1]) - +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a1, +(a2, a1))], select=[a2, a1], build=[right]) - :- Calc(select=[a2]) - : +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[right]) - : :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) - : +- Exchange(distribution=[broadcast]) + +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a1, +(a2, a1))], select=[a2, a1], build=[left]) + :- Exchange(distribution=[broadcast]) + : +- Calc(select=[a2]) + : +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[left]) + : :- Exchange(distribution=[broadcast]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) : +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]]) - +- Exchange(distribution=[broadcast]) - +- HashAggregate(isMerge=[true], groupBy=[a1], select=[a1]) - +- Exchange(distribution=[hash[a1]]) - +- LocalHashAggregate(groupBy=[a1], select=[a1]) - +- Calc(select=[a1]) - +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1, b1], metadata=[]]], fields=[a1, b1]) + +- HashAggregate(isMerge=[true], groupBy=[a1], select=[a1]) + +- Exchange(distribution=[hash[a1]]) + +- LocalHashAggregate(groupBy=[a1], select=[a1]) + +- Calc(select=[a1]) + +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1, b1], metadata=[]]], fields=[a1, b1]) ]]> </Resource> </TestCase> @@ -775,10 +775,61 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], isBroadcas +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], global=[true]) +- Exchange(distribution=[single]) +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], global=[false]) - +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1], build=[right]) - :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) - +- Exchange(distribution=[broadcast]) - +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS options:[T1]]]]) + +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1], build=[left]) + :- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) + +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS options:[T1]]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testJoinHintWithJoinHintInNestedCorrelatedSubQuery"> + <Resource name="sql"> + <![CDATA[select * from T1 WHERE a1 IN (select /*+ NeSt_lOoP(T2) */ a2 + T1.a1 from T2 join (select T3.* from T2 join T3 on T2.a2 = T3.a3) T3 on T2.a2 = T3.a3)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1]) ++- LogicalFilter(condition=[IN($0, { +LogicalProject(EXPR$0=[+($0, $cor0.a1)]) + LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]) + LogicalProject(a3=[$2], b3=[$3]) + LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]) + LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]]) +})], variablesSet=[[$cor0]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a1, b1]) ++- HashJoin(joinType=[InnerJoin], where=[=(a1, a10)], select=[a1, b1, a10], build=[left]) + :- Exchange(distribution=[hash[a1]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1, b1], metadata=[]]], fields=[a1, b1]) + +- Exchange(distribution=[hash[a1]]) + +- Calc(select=[a1]) + +- HashAggregate(isMerge=[true], groupBy=[EXPR$0, a1], select=[EXPR$0, a1]) + +- Exchange(distribution=[hash[EXPR$0, a1]]) + +- LocalHashAggregate(groupBy=[EXPR$0, a1], select=[EXPR$0, a1]) + +- Calc(select=[+(a2, a1) AS EXPR$0, a1]) + +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a1, +(a2, a1))], select=[a2, a1], build=[left]) + :- Exchange(distribution=[broadcast]) + : +- Calc(select=[a2]) + : +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[left]) + : :- Exchange(distribution=[broadcast]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) + : +- Calc(select=[a3]) + : +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[right]) + : :- Exchange(distribution=[hash[a2]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) + : +- Exchange(distribution=[hash[a3]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]]) + +- HashAggregate(isMerge=[true], groupBy=[a1], select=[a1]) + +- Exchange(distribution=[hash[a1]]) + +- LocalHashAggregate(groupBy=[a1], select=[a1]) + +- Calc(select=[a1]) + +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1, b1], metadata=[]]], fields=[a1, b1]) ]]> </Resource> </TestCase> @@ -805,10 +856,10 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[rig : +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1]) +- Exchange(distribution=[hash[a2]]) +- Calc(select=[a2]) - +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[right]) - :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) - +- Exchange(distribution=[broadcast]) - +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]]) + +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[left]) + :- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) + +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]]) ]]> </Resource> </TestCase> @@ -1086,7 +1137,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]) </Resource> <Resource name="optimized rel plan"> <![CDATA[ -NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], select=[a1, EXPR$0], build=[right]) +NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], select=[a1, EXPR$0], build=[right], singleRowJoin=[true]) :- Calc(select=[a1]) : +- HashJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, a2], build=[left]) : :- Exchange(distribution=[hash[a1]]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml index b9f5992c6d6..390f9e0d5af 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml @@ -665,7 +665,7 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, EXPR$0)], select=[a1, b1], build= +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0]) +- Calc(select=[EXPR$0]) +- HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, COUNT(a2) AS EXPR$0]) - +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1], build=[right]) + +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1], build=[left]) :- Exchange(distribution=[hash[a2]]) : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) +- Exchange(distribution=[hash[a1]]) @@ -696,7 +696,7 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[rig :- Exchange(distribution=[hash[a1]]) : +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1]) +- Calc(select=[a2]) - +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[right]) + +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[left]) :- Exchange(distribution=[hash[a2]]) : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) +- Exchange(distribution=[hash[a3]]) @@ -734,7 +734,7 @@ Calc(select=[a1, b1]) +- Calc(select=[+(a2, a1) AS EXPR$0, a1]) +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a1, +(a2, a1))], select=[a2, a1], build=[right]) :- Calc(select=[a2]) - : +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[right]) + : +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[left]) : :- Exchange(distribution=[hash[a2]]) : : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) : +- Exchange(distribution=[hash[a3]]) @@ -796,11 +796,62 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], isBroadcas +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], global=[true]) +- Exchange(distribution=[single]) +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], global=[false]) - +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1], build=[right]) + +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1], build=[left]) :- Exchange(distribution=[hash[a2]]) : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) +- Exchange(distribution=[hash[a1]]) +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS options:[T1]]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testJoinHintWithJoinHintInNestedCorrelatedSubQuery"> + <Resource name="sql"> + <![CDATA[select * from T1 WHERE a1 IN (select /*+ ShUfFlE_HaSh(T2) */ a2 + T1.a1 from T2 join (select T3.* from T2 join T3 on T2.a2 = T3.a3) T3 on T2.a2 = T3.a3)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1]) ++- LogicalFilter(condition=[IN($0, { +LogicalProject(EXPR$0=[+($0, $cor0.a1)]) + LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]) + LogicalProject(a3=[$2], b3=[$3]) + LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]) + LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]]) +})], variablesSet=[[$cor0]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a1, b1]) ++- HashJoin(joinType=[InnerJoin], where=[=(a1, a10)], select=[a1, b1, a10], build=[left]) + :- Exchange(distribution=[hash[a1]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1, b1], metadata=[]]], fields=[a1, b1]) + +- Exchange(distribution=[hash[a1]]) + +- Calc(select=[a1]) + +- HashAggregate(isMerge=[true], groupBy=[EXPR$0, a1], select=[EXPR$0, a1]) + +- Exchange(distribution=[hash[EXPR$0, a1]]) + +- LocalHashAggregate(groupBy=[EXPR$0, a1], select=[EXPR$0, a1]) + +- Calc(select=[+(a2, a1) AS EXPR$0, a1]) + +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a1, +(a2, a1))], select=[a2, a1], build=[right]) + :- Calc(select=[a2]) + : +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[left]) + : :- Exchange(distribution=[hash[a2]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) + : +- Calc(select=[a3]) + : +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[right]) + : :- Exchange(distribution=[hash[a2]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) + : +- Exchange(distribution=[hash[a3]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]]) + +- Exchange(distribution=[broadcast]) + +- HashAggregate(isMerge=[true], groupBy=[a1], select=[a1]) + +- Exchange(distribution=[hash[a1]]) + +- LocalHashAggregate(groupBy=[a1], select=[a1]) + +- Calc(select=[a1]) + +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1, b1], metadata=[]]], fields=[a1, b1]) ]]> </Resource> </TestCase> @@ -826,7 +877,7 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[rig :- Exchange(distribution=[hash[a1]]) : +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1]) +- Calc(select=[a2]) - +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[right]) + +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[left]) :- Exchange(distribution=[hash[a2]]) : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) +- Exchange(distribution=[hash[a3]]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml index 711ffd9dc37..ceafec98d3c 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml @@ -801,6 +801,57 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], isBroadcas : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) +- Exchange(distribution=[hash[a1]]) +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS options:[T1]]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testJoinHintWithJoinHintInNestedCorrelatedSubQuery"> + <Resource name="sql"> + <![CDATA[select * from T1 WHERE a1 IN (select /*+ ShUfFlE_MeRgE(T2) */ a2 + T1.a1 from T2 join (select T3.* from T2 join T3 on T2.a2 = T3.a3) T3 on T2.a2 = T3.a3)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1]) ++- LogicalFilter(condition=[IN($0, { +LogicalProject(EXPR$0=[+($0, $cor0.a1)]) + LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]) + LogicalProject(a3=[$2], b3=[$3]) + LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]) + LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]]) +})], variablesSet=[[$cor0]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a1, b1]) ++- HashJoin(joinType=[InnerJoin], where=[=(a1, a10)], select=[a1, b1, a10], build=[left]) + :- Exchange(distribution=[hash[a1]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1, b1], metadata=[]]], fields=[a1, b1]) + +- Exchange(distribution=[hash[a1]]) + +- Calc(select=[a1]) + +- HashAggregate(isMerge=[true], groupBy=[EXPR$0, a1], select=[EXPR$0, a1]) + +- Exchange(distribution=[hash[EXPR$0, a1]]) + +- LocalHashAggregate(groupBy=[EXPR$0, a1], select=[EXPR$0, a1]) + +- Calc(select=[+(a2, a1) AS EXPR$0, a1]) + +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a1, +(a2, a1))], select=[a2, a1], build=[right]) + :- Calc(select=[a2]) + : +- SortMergeJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3]) + : :- Exchange(distribution=[hash[a2]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) + : +- Calc(select=[a3]) + : +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[right]) + : :- Exchange(distribution=[hash[a2]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]]) + : +- Exchange(distribution=[hash[a3]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]]) + +- Exchange(distribution=[broadcast]) + +- HashAggregate(isMerge=[true], groupBy=[a1], select=[a1]) + +- Exchange(distribution=[hash[a1]]) + +- LocalHashAggregate(groupBy=[a1], select=[a1]) + +- Calc(select=[a1]) + +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1, b1], metadata=[]]], fields=[a1, b1]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml index cedbacf095e..8311a1f449a 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml @@ -429,6 +429,26 @@ LogicalProject(a2=[$0]) LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]) })]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)] +- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)] +]]> + </Resource> + </TestCase> + <TestCase name="testJoinHintWithJoinHintInNestedCorrelatedSubQuery"> + <Resource name="sql"> + <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ a2 + T1.a1 from T2 join (select T3.* from T2 join T3 on T2.a2 = T3.a3) T3 on T2.a2 = T3.a3)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)] ++- LogicalFilter(condition=[IN($0, { +LogicalProject(EXPR$0=[+($0, $cor0.a1)]) + LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]) + LogicalProject(a3=[$2], b3=[$3]) + LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]) + LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]]) +})], variablesSet=[[$cor0]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)] + +- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)] ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.xml index 51a413d7ea8..5b9eedcb85a 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.xml @@ -429,6 +429,26 @@ LogicalProject(a2=[$0]) LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]) })]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)] +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)] +]]> + </Resource> + </TestCase> + <TestCase name="testJoinHintWithJoinHintInNestedCorrelatedSubQuery"> + <Resource name="sql"> + <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ a2 + T1.a1 from T2 join (select T3.* from T2 join T3 on T2.a2 = T3.a3) T3 on T2.a2 = T3.a3)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)] ++- LogicalFilter(condition=[IN($0, { +LogicalProject(EXPR$0=[+($0, $cor0.a1)]) + LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]) + LogicalProject(a3=[$2], b3=[$3]) + LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]) + LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]]) +})], variablesSet=[[$cor0]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)] + +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)] ]]> </Resource> </TestCase>