This is an automated email from the ASF dual-hosted git repository.

jchan pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new c1b5d71b4b9 [FLINK-33395][table-planner] Fix the join hint doesn't 
work when appears in subquery
c1b5d71b4b9 is described below

commit c1b5d71b4b9adf0c3b8f1c77af01f030a5c92626
Author: Xuyang <xyzhong...@163.com>
AuthorDate: Fri Nov 17 10:14:20 2023 +0800

    [FLINK-33395][table-planner] Fix the join hint doesn't work when appears in 
subquery
    
    This closes #23720
    
    (cherry picked from commit 1d08b8bf6a67c6a1874e8003868d5735248a7e45)
---
 .../apache/calcite/sql2rel/RelDecorrelator.java    |  23 +--
 .../apache/calcite/sql2rel/SqlToRelConverter.java  |  22 +-
 .../planner/hint/CapitalizeJoinHintsShuttle.java   |  67 +++++++
 .../ClearJoinHintsOnUnmatchedNodesShuttle.java     |  82 ++++++++
 ...earJoinHintsWithInvalidPropagationShuttle.java} |  18 +-
 .../flink/table/planner/hint/FlinkHints.java       | 102 +++++-----
 .../table/planner/hint/JoinHintsRelShuttle.java    |  65 ++++++
 .../flink/table/planner/hint/JoinStrategy.java     |  25 ++-
 ...oinHintResolver.java => JoinHintsResolver.java} |  17 +-
 .../optimize/CommonSubGraphBasedOptimizer.scala    |   2 +-
 .../rules/logical/FlinkSubQueryRemoveRule.scala    |   5 +-
 .../table/planner/plan/utils/FlinkRelOptUtil.scala | 223 ++++++++++++++++++++-
 ...inHintsWithCapitalizeJoinHintsShuttleTest.java} |   9 +-
 ...oinHintsWithInvalidPropagationShuttleTest.java} |   8 +-
 ...intsWithInvalidPropagationShuttleTestBase.java} |   6 +-
 ...oinHintsWithInvalidPropagationShuttleTest.java} |   9 +-
 .../planner/plan/hints/batch/JoinHintTestBase.java |  11 +
 .../optimize/ClearQueryBlockAliasResolverTest.java |   2 +-
 .../plan/optimize/JoinHintResolverTest.java        |   4 +-
 ...oinHintsWithCapitalizeJoinHintsShuttleTest.xml} |   0
 ...JoinHintsWithInvalidPropagationShuttleTest.xml} |   0
 ...JoinHintsWithInvalidPropagationShuttleTest.xml} |   0
 .../plan/hints/batch/BroadcastJoinHintTest.xml     |  90 +++++++--
 .../plan/hints/batch/NestLoopJoinHintTest.xml      |  92 +++++++--
 .../plan/hints/batch/ShuffleHashJoinHintTest.xml   |  60 +++++-
 .../plan/hints/batch/ShuffleMergeJoinHintTest.xml  |  50 +++++
 .../optimize/ClearQueryBlockAliasResolverTest.xml  |  20 ++
 .../planner/plan/optimize/JoinHintResolverTest.xml |  20 ++
 28 files changed, 858 insertions(+), 174 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
index 8e88e0addd2..0597008296a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
@@ -16,9 +16,6 @@
  */
 package org.apache.calcite.sql2rel;
 
-import 
org.apache.flink.table.planner.alias.ClearJoinHintWithInvalidPropagationShuttle;
-import org.apache.flink.table.planner.hint.FlinkHints;
-
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -128,9 +125,9 @@ import static 
org.apache.calcite.linq4j.Nullness.castNonNull;
  * Copied to fix calcite issues. FLINK modifications are at lines
  *
  * <ol>
- *   <li>Was changed within FLINK-29280, FLINK-28682: Line 222 ~ 232
- *   <li>Should be removed after fix of FLINK-29540: Line 298 ~ 304
- *   <li>Should be removed after fix of FLINK-29540: Line 316 ~ 322
+ *   <li>Was changed within FLINK-29280, FLINK-28682: Line 216 ~ 223
+ *   <li>Should be removed after fix of FLINK-29540: Line 289 ~ 295
+ *   <li>Should be removed after fix of FLINK-29540: Line 307 ~ 313
  * </ol>
  */
 public class RelDecorrelator implements ReflectiveVisitor {
@@ -216,18 +213,12 @@ public class RelDecorrelator implements ReflectiveVisitor 
{
             newRootRel = decorrelator.decorrelate(newRootRel);
         }
 
-        // Re-propagate the hints.
-        newRootRel = RelOptUtil.propagateRelHints(newRootRel, true);
-
         // ----- FLINK MODIFICATION BEGIN -----
+        // REASON: hints are already parsed and validated before optimizing, 
so should not
+        // re-propagate again here
 
-        // replace all join hints with upper case
-        newRootRel = FlinkHints.capitalizeJoinHints(newRootRel);
-
-        // clear join hints which are propagated into wrong query block
-        // The hint QueryBlockAlias will be added when building a RelNode tree 
before. It is used to
-        // distinguish the query block in the SQL.
-        newRootRel = newRootRel.accept(new 
ClearJoinHintWithInvalidPropagationShuttle());
+        // Re-propagate the hints.
+        // newRootRel = RelOptUtil.propagateRelHints(newRootRel, true);
 
         // ----- FLINK MODIFICATION END -----
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index 49aa96ae8f7..597b68e5b82 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -16,8 +16,9 @@
  */
 package org.apache.calcite.sql2rel;
 
-import 
org.apache.flink.table.planner.alias.ClearJoinHintWithInvalidPropagationShuttle;
+import 
org.apache.flink.table.planner.hint.ClearJoinHintsWithInvalidPropagationShuttle;
 import org.apache.flink.table.planner.hint.FlinkHints;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -219,10 +220,10 @@ import static org.apache.calcite.sql.SqlUtil.stripAs;
  * <p>FLINK modifications are at lines
  *
  * <ol>
- *   <li>Added in FLINK-29081, FLINK-28682: Lines 633 ~ 643
- *   <li>Added in FLINK-28682: Lines 2095 ~ 2112
- *   <li>Added in FLINK-28682: Lines 2149 ~ 2177
- *   <li>Added in FLINK-20873: Lines 5198 ~ 5207
+ *   <li>Added in FLINK-29081, FLINK-28682: Lines 632 ~ 649
+ *   <li>Added in FLINK-28682: Lines 2101 ~ 2118
+ *   <li>Added in FLINK-28682: Lines 2155 ~ 2183
+ *   <li>Added in FLINK-20873: Lines 5204 ~ 5213
  * </ol>
  */
 @SuppressWarnings("UnstableApiUsage")
@@ -627,10 +628,12 @@ public class SqlToRelConverter {
                 hints = SqlUtil.getRelHint(hintStrategies, select.getHints());
             }
         }
-        // propagate the hints.
-        result = RelOptUtil.propagateRelHints(result, false);
 
         // ----- FLINK MODIFICATION BEGIN -----
+        // propagate the hints.
+        // The method FlinkRelOptUtil#propagateRelHints not only finds and 
propagates hints
+        // throughout the entire rel tree but also within subqueries.
+        result = FlinkRelOptUtil.propagateRelHints(result, false);
 
         // replace all join hints with upper case
         result = FlinkHints.capitalizeJoinHints(result);
@@ -638,7 +641,10 @@ public class SqlToRelConverter {
         // clear join hints which are propagated into wrong query block
         // The hint QueryBlockAlias will be added when building a RelNode tree 
before. It is used to
         // distinguish the query block in the SQL.
-        result = result.accept(new 
ClearJoinHintWithInvalidPropagationShuttle());
+        result = result.accept(new 
ClearJoinHintsWithInvalidPropagationShuttle());
+
+        // clear the hints on some nodes where these hints should not be 
attached
+        result = FlinkHints.clearJoinHintsOnUnmatchedNodes(result);
 
         // ----- FLINK MODIFICATION END -----
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/CapitalizeJoinHintsShuttle.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/CapitalizeJoinHintsShuttle.java
new file mode 100644
index 00000000000..b243fdb746a
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/CapitalizeJoinHintsShuttle.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.hint;
+
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.hint.Hintable;
+import org.apache.calcite.rel.hint.RelHint;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+/** A shuttle to capitalize all join hints on corresponding nodes. */
+public class CapitalizeJoinHintsShuttle extends JoinHintsRelShuttle {
+
+    @Override
+    protected RelNode visitBiRel(BiRel biRel) {
+        Hintable hBiRel = (Hintable) biRel;
+        AtomicBoolean changed = new AtomicBoolean(false);
+        List<RelHint> hintsWithCapitalJoinHints =
+                hBiRel.getHints().stream()
+                        .map(
+                                hint -> {
+                                    String capitalHintName = 
hint.hintName.toUpperCase(Locale.ROOT);
+                                    if 
(JoinStrategy.isJoinStrategy(capitalHintName)) {
+                                        changed.set(true);
+                                        if 
(JoinStrategy.isLookupHint(hint.hintName)) {
+                                            return 
RelHint.builder(capitalHintName)
+                                                    
.hintOptions(hint.kvOptions)
+                                                    
.inheritPath(hint.inheritPath)
+                                                    .build();
+                                        }
+                                        return RelHint.builder(capitalHintName)
+                                                .hintOptions(hint.listOptions)
+                                                .inheritPath(hint.inheritPath)
+                                                .build();
+                                    } else {
+                                        return hint;
+                                    }
+                                })
+                        .collect(Collectors.toList());
+
+        if (changed.get()) {
+            return super.visit(hBiRel.withHints(hintsWithCapitalJoinHints));
+        } else {
+            return super.visit(biRel);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearJoinHintsOnUnmatchedNodesShuttle.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearJoinHintsOnUnmatchedNodesShuttle.java
new file mode 100644
index 00000000000..925fa257ac8
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearJoinHintsOnUnmatchedNodesShuttle.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.hint;
+
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+
+import org.apache.calcite.rel.RelHomogeneousShuttle;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.hint.HintStrategyTable;
+import org.apache.calcite.rel.hint.Hintable;
+import org.apache.calcite.rel.hint.RelHint;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.planner.hint.FlinkHints.resolveSubQuery;
+
+/**
+ * Clear the invalid join hints in the unmatched nodes. For example, a join 
hint may be attached in
+ * the Project node at first. After accepting this shuttle, the join hint in 
the Project node will
+ * be cleared.
+ *
+ * <p>See more at {@link FlinkHintStrategies}.
+ *
+ * <p>Tips, hints about view and alias will not be cleared.
+ */
+public class ClearJoinHintsOnUnmatchedNodesShuttle extends 
RelHomogeneousShuttle {
+    private final HintStrategyTable hintStrategyTable;
+
+    public ClearJoinHintsOnUnmatchedNodesShuttle(HintStrategyTable 
hintStrategyTable) {
+        this.hintStrategyTable = hintStrategyTable;
+    }
+
+    @Override
+    public RelNode visit(RelNode other) {
+        if (FlinkRelOptUtil.containsSubQuery(other)) {
+            other = resolveSubQuery(other, relNode -> relNode.accept(this));
+        }
+
+        if (other instanceof Hintable) {
+            List<RelHint> originHints = ((Hintable) other).getHints();
+            // 1. classify the hints and separate out the join hints
+            List<RelHint> joinHints =
+                    originHints.stream()
+                            .filter(h -> 
JoinStrategy.isJoinStrategy(h.hintName))
+                            .collect(Collectors.toList());
+
+            List<RelHint> remainHints = new ArrayList<>(originHints);
+            remainHints.removeAll(joinHints);
+
+            // 2. use hintStrategyTable#apply to determine whether the join 
hint can be attached
+            // to the current node
+            // If it cannot be attached, it means that the join hint on the 
current node needs to
+            // be removed.
+            List<RelHint> hintsCanApply = hintStrategyTable.apply(joinHints, 
other);
+            if (hintsCanApply.size() != joinHints.size()) {
+                hintsCanApply.addAll(remainHints);
+                // As a result, the remaining hints will be attached.
+                other = ((Hintable) other).withHints(hintsCanApply);
+            }
+        }
+
+        return super.visit(other);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttle.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttle.java
similarity index 93%
rename from 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttle.java
rename to 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttle.java
index a37ad91496b..2be55cae8d0 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttle.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttle.java
@@ -16,10 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.alias;
-
-import org.apache.flink.table.planner.hint.FlinkHints;
-import org.apache.flink.table.planner.hint.JoinStrategy;
+package org.apache.flink.table.planner.hint;
 
 import org.apache.calcite.rel.BiRel;
 import org.apache.calcite.rel.RelNode;
@@ -48,19 +45,10 @@ import java.util.stream.Collectors;
  * <p>TODO some node will be attached join hints when parse SqlNode to RelNode 
such as Project and
  * etc. The join hints on these node can also be cleared.
  */
-public class ClearJoinHintWithInvalidPropagationShuttle extends RelShuttleImpl 
{
-
-    @Override
-    public RelNode visit(LogicalJoin join) {
-        return visitBiRel(join);
-    }
+public class ClearJoinHintsWithInvalidPropagationShuttle extends 
JoinHintsRelShuttle {
 
     @Override
-    public RelNode visit(LogicalCorrelate correlate) {
-        return visitBiRel(correlate);
-    }
-
-    private RelNode visitBiRel(BiRel biRel) {
+    protected RelNode visitBiRel(BiRel biRel) {
         List<RelHint> hints = ((Hintable) biRel).getHints();
 
         Set<String> allHintNames =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
index d49cdad635e..9154baa8295 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
@@ -23,25 +23,24 @@ import 
org.apache.flink.table.planner.plan.rules.logical.WrapJsonAggFunctionArgu
 import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
 
 import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.rel.BiRel;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelShuttleImpl;
 import org.apache.calcite.rel.hint.Hintable;
 import org.apache.calcite.rel.hint.RelHint;
-import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalSnapshot;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexSubQuery;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /** Utility class for Flink hints. */
@@ -175,7 +174,7 @@ public abstract class FlinkHints {
      * Get all query block alias hints.
      *
      * <p>Because query block alias hints will be propagated from root to 
leaves, so maybe one node
-     * will contain multi alias hints. But only the first one is the really 
query block name where
+     * will contain multi alias hints. But only the first one is the real 
query block name where
      * this node is.
      */
     public static List<RelHint> getQueryBlockAliasHints(List<RelHint> 
allHints) {
@@ -185,53 +184,60 @@ public abstract class FlinkHints {
     }
 
     public static RelNode capitalizeJoinHints(RelNode root) {
-        return root.accept(new CapitalizeJoinHintShuttle());
+        return root.accept(new CapitalizeJoinHintsShuttle());
     }
 
-    private static class CapitalizeJoinHintShuttle extends RelShuttleImpl {
-
-        @Override
-        public RelNode visit(LogicalCorrelate correlate) {
-            return visitBiRel(correlate);
-        }
+    /** Resolve the RelNode of the sub query in the node and return a new 
node. */
+    public static RelNode resolveSubQuery(RelNode node, Function<RelNode, 
RelNode> resolver) {
+        if (node instanceof LogicalProject) {
+            LogicalProject project = (LogicalProject) node;
+            List<RexNode> newProjects =
+                    project.getProjects().stream()
+                            .map(p -> resolveSubQuery(p, resolver))
+                            .collect(Collectors.toList());
+            return project.copy(
+                    project.getTraitSet(), project.getInput(), newProjects, 
project.getRowType());
+
+        } else if (node instanceof LogicalFilter) {
+            LogicalFilter filter = (LogicalFilter) node;
+            RexNode newCondition = resolveSubQuery(filter.getCondition(), 
resolver);
+            return filter.copy(filter.getTraitSet(), filter.getInput(), 
newCondition);
+
+        } else if (node instanceof LogicalJoin) {
+            LogicalJoin join = (LogicalJoin) node;
+            RexNode newCondition = resolveSubQuery(join.getCondition(), 
resolver);
+            return join.copy(
+                    join.getTraitSet(),
+                    newCondition,
+                    join.getLeft(),
+                    join.getRight(),
+                    join.getJoinType(),
+                    join.isSemiJoinDone());
 
-        @Override
-        public RelNode visit(LogicalJoin join) {
-            return visitBiRel(join);
+        } else {
+            return node;
         }
+    }
 
-        private RelNode visitBiRel(BiRel biRel) {
-            Hintable hBiRel = (Hintable) biRel;
-            AtomicBoolean changed = new AtomicBoolean(false);
-            List<RelHint> hintsWithCapitalJoinHints =
-                    hBiRel.getHints().stream()
-                            .map(
-                                    hint -> {
-                                        String capitalHintName =
-                                                
hint.hintName.toUpperCase(Locale.ROOT);
-                                        if 
(JoinStrategy.isJoinStrategy(capitalHintName)) {
-                                            changed.set(true);
-                                            if 
(JoinStrategy.isLookupHint(hint.hintName)) {
-                                                return 
RelHint.builder(capitalHintName)
-                                                        
.hintOptions(hint.kvOptions)
-                                                        
.inheritPath(hint.inheritPath)
-                                                        .build();
-                                            }
-                                            return 
RelHint.builder(capitalHintName)
-                                                    
.hintOptions(hint.listOptions)
-                                                    
.inheritPath(hint.inheritPath)
-                                                    .build();
-                                        } else {
-                                            return hint;
-                                        }
-                                    })
-                            .collect(Collectors.toList());
+    /** Resolve the RelNode of the sub query in conditions. */
+    private static RexNode resolveSubQuery(RexNode rexNode, Function<RelNode, 
RelNode> resolver) {
+        return rexNode.accept(
+                new RexShuttle() {
+                    @Override
+                    public RexNode visitSubQuery(RexSubQuery subQuery) {
+                        RelNode oldRel = subQuery.rel;
+                        RelNode newRel = resolver.apply(oldRel);
+                        if (oldRel != newRel) {
+                            return super.visitSubQuery(subQuery.clone(newRel));
+                        }
+                        return subQuery;
+                    }
+                });
+    }
 
-            if (changed.get()) {
-                return 
super.visit(hBiRel.withHints(hintsWithCapitalJoinHints));
-            } else {
-                return super.visit(biRel);
-            }
-        }
+    /** Clear the join hints on some nodes where these hints should not be 
attached. */
+    public static RelNode clearJoinHintsOnUnmatchedNodes(RelNode root) {
+        return root.accept(
+                new 
ClearJoinHintsOnUnmatchedNodesShuttle(root.getCluster().getHintStrategies()));
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinHintsRelShuttle.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinHintsRelShuttle.java
new file mode 100644
index 00000000000..73adc7040c0
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinHintsRelShuttle.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.hint;
+
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalProject;
+
+import static org.apache.flink.table.planner.hint.FlinkHints.resolveSubQuery;
+import static 
org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil.containsSubQuery;
+
+/** An abstract shuttle for each shuttle used for join hint. */
+public abstract class JoinHintsRelShuttle extends RelShuttleImpl {
+
+    @Override
+    public RelNode visit(LogicalJoin join) {
+        if (containsSubQuery(join)) {
+            join = (LogicalJoin) resolveSubQuery(join, relNode -> 
relNode.accept(this));
+        }
+        return visitBiRel(join);
+    }
+
+    @Override
+    public RelNode visit(LogicalCorrelate correlate) {
+        return visitBiRel(correlate);
+    }
+
+    protected abstract RelNode visitBiRel(BiRel biRel);
+
+    @Override
+    public RelNode visit(LogicalFilter filter) {
+        if (containsSubQuery(filter)) {
+            filter = (LogicalFilter) resolveSubQuery(filter, relNode -> 
relNode.accept(this));
+        }
+        return super.visit(filter);
+    }
+
+    @Override
+    public RelNode visit(LogicalProject project) {
+        if (containsSubQuery(project)) {
+            project = (LogicalProject) resolveSubQuery(project, relNode -> 
relNode.accept(this));
+        }
+        return super.visit(project);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java
index f48c281168c..6dcdb8b755b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java
@@ -21,7 +21,30 @@ package org.apache.flink.table.planner.hint;
 import java.util.List;
 import java.util.Locale;
 
-/** Currently available join strategies and corresponding join hint names. */
+/**
+ * Currently available join strategies and corresponding join hint names.
+ *
+ * <p>The process for handling join hints is as follows:
+ *
+ * <ol>
+ *   <li>Resolve join hint propagation:
+ *       <ol>
+ *         <li>The join hints are resolved using Calcite's functionality to 
propagate them from the
+ *             sink to the source and within sub-queries
+ *         <li>Capitalize join hints: All join hints are capitalized to ensure 
consistency, as they
+ *             are expected to be in uppercase.
+ *         <li>Clear incorrectly propagated join hints: Any join hints that 
have been mistakenly
+ *             propagated into the query block are cleared.
+ *         <li>Clear join hints from unmatched nodes: Join hints attached to 
unmatched nodes, such
+ *             as {@link org.apache.calcite.rel.core.Project}, are also 
cleared.
+ *       </ol>
+ *   <li>Validate and modify join hints: The join hints are validated, and the 
table names in the
+ *       hints are replaced with LEFT or RIGHT to indicate the join input 
ordinal.
+ *   <li>Clear query block aliases: The query block aliases are cleared from 
the sink to the source.
+ *   <li>Consume join hints in applicable rules: Finally, the join hints are 
consumed in specific
+ *       rules where they are relevant.
+ * </ol>
+ */
 public enum JoinStrategy {
     /**
      * Instructs the optimizer to use broadcast hash join strategy. If both 
sides are specified in
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintsResolver.java
similarity index 96%
rename from 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java
rename to 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintsResolver.java
index 36af173b1ae..ae12b8979ff 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintsResolver.java
@@ -21,16 +21,14 @@ package org.apache.flink.table.planner.plan.optimize;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.planner.hint.FlinkHints;
+import org.apache.flink.table.planner.hint.JoinHintsRelShuttle;
 import org.apache.flink.table.planner.hint.JoinStrategy;
 
 import org.apache.calcite.rel.BiRel;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelShuttleImpl;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.hint.Hintable;
 import org.apache.calcite.rel.hint.RelHint;
-import org.apache.calcite.rel.logical.LogicalCorrelate;
-import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
@@ -52,7 +50,7 @@ import static 
org.apache.flink.table.planner.hint.LookupJoinHintOptions.LOOKUP_T
  *
  * <p>Note: duplicate join hints are not checked here.
  */
-public class JoinHintResolver extends RelShuttleImpl {
+public class JoinHintsResolver extends JoinHintsRelShuttle {
     private final Set<RelHint> allHints = new HashSet<>();
     private final Set<RelHint> validHints = new HashSet<>();
 
@@ -79,16 +77,7 @@ public class JoinHintResolver extends RelShuttleImpl {
     }
 
     @Override
-    public RelNode visit(LogicalJoin join) {
-        return visitBiRel(join);
-    }
-
-    @Override
-    public RelNode visit(LogicalCorrelate correlate) {
-        return visitBiRel(correlate);
-    }
-
-    private RelNode visitBiRel(BiRel biRel) {
+    protected RelNode visitBiRel(BiRel biRel) {
         Optional<String> leftName = extractAliasOrTableName(biRel.getLeft());
         Optional<String> rightName = extractAliasOrTableName(biRel.getRight());
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/CommonSubGraphBasedOptimizer.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/CommonSubGraphBasedOptimizer.scala
index aa5e3db807d..8e24b7b3011 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/CommonSubGraphBasedOptimizer.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/CommonSubGraphBasedOptimizer.scala
@@ -77,7 +77,7 @@ abstract class CommonSubGraphBasedOptimizer extends Optimizer 
{
    */
   override def optimize(roots: Seq[RelNode]): Seq[RelNode] = {
     // resolve hints before optimizing
-    val joinHintResolver = new JoinHintResolver()
+    val joinHintResolver = new JoinHintsResolver()
     val resolvedHintRoots = joinHintResolver.resolve(toJava(roots))
 
     // clear query block alias bef optimizing
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala
index 62607017313..d6974b66a41 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala
@@ -17,9 +17,8 @@
  */
 package org.apache.flink.table.planner.plan.rules.logical
 
-import 
org.apache.flink.table.planner.alias.ClearJoinHintWithInvalidPropagationShuttle
 import org.apache.flink.table.planner.calcite.{FlinkRelBuilder, 
FlinkRelFactories}
-import org.apache.flink.table.planner.hint.FlinkHints
+import 
org.apache.flink.table.planner.hint.{ClearJoinHintsWithInvalidPropagationShuttle,
 FlinkHints}
 
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, 
RelOptUtil}
@@ -101,7 +100,7 @@ class FlinkSubQueryRemoveRule(
         val nodeWithHint = RelOptUtil.propagateRelHints(newNode, false)
         val nodeWithCapitalizedJoinHints = 
FlinkHints.capitalizeJoinHints(nodeWithHint)
         val finalNode =
-          nodeWithCapitalizedJoinHints.accept(new 
ClearJoinHintWithInvalidPropagationShuttle)
+          nodeWithCapitalizedJoinHints.accept(new 
ClearJoinHintsWithInvalidPropagationShuttle)
         call.transformTo(finalNode)
       case _ => // do nothing
     }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
index e12cf05246c..5900da92f6a 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
@@ -20,22 +20,28 @@ package org.apache.flink.table.planner.plan.utils
 import org.apache.flink.table.planner.JBoolean
 import org.apache.flink.table.planner.analyze.PlanAdvice
 import org.apache.flink.table.planner.calcite.{FlinkPlannerImpl, 
FlinkTypeFactory}
+import org.apache.flink.table.planner.hint.FlinkHints
 import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval, 
MiniBatchMode}
 import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
 
 import org.apache.calcite.config.NullCollation
 import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.{RelFieldCollation, RelNode}
+import org.apache.calcite.rel.{RelFieldCollation, RelHomogeneousShuttle, 
RelNode, RelShuttle}
 import org.apache.calcite.rel.RelFieldCollation.{Direction, NullDirection}
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.hint.{Hintable, HintStrategyTable, RelHint}
+import org.apache.calcite.rel.logical.{LogicalFilter, LogicalJoin, 
LogicalProject}
 import org.apache.calcite.rex.{RexBuilder, RexCall, RexInputRef, RexLiteral, 
RexNode, RexUtil, RexVisitorImpl}
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.SqlExplainLevel
 import org.apache.calcite.sql.SqlKind._
+import org.apache.calcite.util.Pair
 import org.apache.commons.math3.util.ArithmeticUtils
 
 import java.io.{PrintWriter, StringWriter}
 import java.math.BigDecimal
 import java.sql.{Date, Time, Timestamp}
+import java.util
 import java.util.Calendar
 
 import scala.collection.JavaConversions._
@@ -524,4 +530,219 @@ object FlinkRelOptUtil {
         }
     }
   }
+
+  // ----- The following is mainly copied from RelOptUtil -----
+  // ----- Copied Line: 537 ~ 743 -----
+  // ----- Modified Line: 642 ~ 662 -----
+  /**
+   * Propagates the relational expression hints from root node to leaf node.
+   *
+   * @param rel
+   *   The relational expression
+   * @param reset
+   *   Flag saying if to reset the existing hints before the propagation
+   * @return
+   *   New relational expression with hints propagated
+   */
+  def propagateRelHints(rel: RelNode, reset: Boolean): RelNode = {
+    val node = if (reset) {
+      rel.accept(new ResetHintsShuttle)
+    } else {
+      rel
+    }
+    val shuttle = new 
RelHintPropagateShuttle(node.getCluster.getHintStrategies)
+    node.accept(shuttle)
+  }
+
+  /**
+   * A [[RelShuttle]] which resets all the hints of a relational expression to 
what they are
+   * originally like.
+   *
+   * <p>This would trigger a reverse transformation of what 
[[RelHintPropagateShuttle]] does.
+   *
+   * <p>Transformation rules:
+   *
+   * <ul> <li>Project: remove the hints that have non-empty inherit path 
(which means the hint was
+   * not originally declared from it); <li>Aggregate: remove the hints that 
have non-empty inherit
+   * path; <li>Join: remove all the hints; <li>TableScan: remove the hints 
that have non-empty
+   * inherit path. </ul>
+   */
+  private class ResetHintsShuttle extends RelHomogeneousShuttle {
+    override def visit(node: RelNode): RelNode = {
+      var finalNode = visitChildren(node)
+      if (node.isInstanceOf[Hintable]) {
+        finalNode = 
ResetHintsShuttle.resetHints(finalNode.asInstanceOf[Hintable])
+      }
+      finalNode
+    }
+  }
+
+  private object ResetHintsShuttle {
+    private def resetHints(hintable: Hintable): RelNode = if 
(hintable.getHints.size > 0) {
+      val resetHints: util.List[RelHint] = hintable.getHints
+        .filter((hint: RelHint) => hint.inheritPath.size == 0)
+        .toList
+      hintable.withHints(resetHints)
+    } else {
+      hintable.asInstanceOf[RelNode]
+    }
+  }
+
+  /**
+   * A [[RelShuttle]] which propagates all the hints of relational expression 
to their children
+   * nodes.
+   *
+   * <p>Given a plan:
+   *
+   * {{{
+   *            Filter (Hint1)
+   *                |
+   *               Join
+   *              /    \
+   *            Scan  Project (Hint2)
+   *                     |
+   *                    Scan2
+   * }}}
+   *
+   * <p>Every hint has a [[inheritPath]] (integers list) which records its 
propagate path, number
+   * `0` represents the hint is propagated from the first(left) child, number 
`1` represents the
+   * hint is propagated from the second(right) child, so the plan would have 
hints path as follows
+   * (assumes each hint can be propagated to all child nodes):
+   *
+   * <ul> <li>Filter would have hints {Hint1[]}</li> <li>Join would have hints 
{Hint1[0]}</li>
+   * <li>Scan would have hints {Hint1[0, 0]}</li> <li>Project would have hints 
{Hint1[0,1],
+   * Hint2[]}</li> <li>Scan2 would have hints {[Hint1[0, 1, 0], Hint2[0]}</li> 
</ul>
+   */
+  private class RelHintPropagateShuttle private[plan] (
+      /** The hint strategies to decide if a hint should be attached to a 
relational expression. */
+      val hintStrategies: HintStrategyTable)
+    extends RelHomogeneousShuttle {
+
+    /** Stack recording the hints and its current inheritPath. */
+    final private val inheritPaths =
+      new util.ArrayDeque[Pair[util.List[RelHint], util.Deque[Integer]]]
+
+    /** Visits a particular child of a parent. */
+    override protected def visitChild(parent: RelNode, i: Int, child: 
RelNode): RelNode = {
+      inheritPaths.forEach(
+        (inheritPath: Pair[util.List[RelHint], util.Deque[Integer]]) => 
inheritPath.right.push(i))
+      try {
+        val child2 = child.accept(this)
+        if (child2 ne child) {
+          val newInputs = new util.ArrayList[RelNode](parent.getInputs)
+          newInputs.set(i, child2)
+          return parent.copy(parent.getTraitSet, newInputs)
+        }
+        parent
+      } finally
+        inheritPaths.forEach(
+          (inheritPath: Pair[util.List[RelHint], util.Deque[Integer]]) => 
inheritPath.right.pop)
+    }
+
+    // FLINK MODIFICATION BEGIN
+    // let hints propagating in sub query
+
+    override def visit(other: RelNode): RelNode = {
+      val node = tryToPropagateHintsInSubQuery(other)
+      if (node.isInstanceOf[Hintable]) {
+        visitHintable(node)
+      } else {
+        visitChildren(node)
+      }
+    }
+
+    private def tryToPropagateHintsInSubQuery(node: RelNode): RelNode = {
+      if (containsSubQuery(node)) {
+        FlinkHints.resolveSubQuery(node, relNode => relNode.accept(this))
+      } else {
+        node
+      }
+    }
+
+    // FLINK MODIFICATION END
+
+    /**
+     * Handle the [[Hintable]]s.
+     *
+     * <p>There are two cases to handle hints:
+     *
+     * <ul> <li>For TableScan: table scan is always a leaf node, attach the 
hints of the propagation
+     * path directly;</li> <li>For other [[Hintable]]s: if the node has hints 
itself, that means,
+     * these hints are query hints that need to propagate to its children, so 
we do these things:
+     * <ol> <li>push the hints with empty inheritPath to the stack</li> 
<li>visit the children nodes
+     * and propagate the hints</li> <li>pop the hints pushed in step1</li> 
<li>attach the hints of
+     * the propagation path</li> </ol> if the node does not have hints, attach 
the hints of the
+     * propagation path directly. </li> </ul>
+     *
+     * @param node
+     *   [[Hintable]] to handle
+     * @return
+     *   New copy of the [[Hintable]] with propagated hints attached
+     */
+    private def visitHintable(node: RelNode) = {
+      val topHints = node.asInstanceOf[Hintable].getHints
+      val hasHints = topHints != null && topHints.size > 0
+      val hasQueryHints = hasHints && !node.isInstanceOf[TableScan]
+      if (hasQueryHints) inheritPaths.push(Pair.of(topHints, new 
util.ArrayDeque[Integer]))
+      val node1 = visitChildren(node)
+      if (hasQueryHints) inheritPaths.pop
+      attachHints(node1)
+    }
+
+    private def attachHints(original: RelNode): RelNode = {
+      assert(original.isInstanceOf[Hintable])
+      if (inheritPaths.size > 0) {
+        val hints = inheritPaths.toList
+          .sorted(
+            (
+                o1: Pair[util.List[RelHint], util.Deque[Integer]],
+                o2: Pair[util.List[RelHint], util.Deque[Integer]]) => {
+              Integer.compare(o1.right.size, o2.right.size)
+            })
+          .map(
+            (path: Pair[util.List[RelHint], util.Deque[Integer]]) =>
+              RelHintPropagateShuttle
+                .copyWithInheritPath(path.left, path.right))
+          .foldLeft(new util.ArrayList[RelHint]())(
+            (acc, hints1) => {
+              acc.addAll(hints1)
+              acc
+            })
+        val filteredHints = hintStrategies.apply(hints, original)
+        if (filteredHints.size > 0) {
+          return original.asInstanceOf[Hintable].attachHints(filteredHints)
+        }
+      }
+      original
+    }
+  }
+
+  private object RelHintPropagateShuttle {
+    private def copyWithInheritPath(
+        hints: util.List[RelHint],
+        inheritPath: util.Deque[Integer]): util.List[RelHint] = {
+      // Copy the Dequeue in reverse order.
+      val path = new util.ArrayList[Integer]
+      val iterator = inheritPath.descendingIterator
+      while (iterator.hasNext) {
+        path.add(iterator.next)
+      }
+      hints.map((hint: RelHint) => hint.copy(path)).toList
+    }
+  }
+
+  // ----- Copied from RelOptUtil end -----
+
+  /** Check if the node contains sub query. */
+  def containsSubQuery(node: RelNode): Boolean = node match {
+    // the all types of nodes that contain sub query can be found in
+    // RexUtil.SubQueryFinder#containsSubQuery
+    case project: LogicalProject =>
+      RexUtil.SubQueryFinder.containsSubQuery(project)
+    case filter: LogicalFilter =>
+      RexUtil.SubQueryFinder.containsSubQuery(filter)
+    case join: LogicalJoin =>
+      RexUtil.SubQueryFinder.containsSubQuery(join)
+    case _ => false
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithCapitalizeJoinHintShuttleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithCapitalizeJoinHintsShuttleTest.java
similarity index 96%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithCapitalizeJoinHintShuttleTest.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithCapitalizeJoinHintsShuttleTest.java
index 210cc122c7d..fb35e79cc4d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithCapitalizeJoinHintShuttleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithCapitalizeJoinHintsShuttleTest.java
@@ -16,11 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.alias;
+package org.apache.flink.table.planner.hint;
 
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
-import org.apache.flink.table.planner.hint.FlinkHints;
 import 
org.apache.flink.table.planner.plan.nodes.exec.spec.LookupJoinHintTestUtil;
 import org.apache.flink.table.planner.utils.TableTestUtil;
 
@@ -37,8 +36,8 @@ import org.junit.Test;
 import java.util.Collections;
 
 /** Tests clearing lookup join hint case-insensitive in stream. */
-public class ClearJoinHintWithCapitalizeJoinHintShuttleTest
-        extends ClearJoinHintWithInvalidPropagationShuttleTestBase {
+public class ClearJoinHintsWithCapitalizeJoinHintsShuttleTest
+        extends ClearJoinHintsWithInvalidPropagationShuttleTestBase {
 
     @Override
     TableTestUtil getTableTestUtil() {
@@ -143,7 +142,7 @@ public class ClearJoinHintWithCapitalizeJoinHintShuttleTest
         util.assertEqualsOrExpand("afterCapitalizeJoinHints", plan, true);
 
         RelNode rootAfterClearingJoinHintWithInvalidPropagation =
-                rootAfterHintCapitalize.accept(new 
ClearJoinHintWithInvalidPropagationShuttle());
+                rootAfterHintCapitalize.accept(new 
ClearJoinHintsWithInvalidPropagationShuttle());
         plan = 
buildRelPlanWithQueryBlockAlias(rootAfterClearingJoinHintWithInvalidPropagation);
         util.assertEqualsOrExpand("afterClearingJoinHints", plan, false);
     }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttleTest.java
similarity index 95%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTest.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttleTest.java
index 6801c602539..2384083f5db 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttleTest.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.alias;
+package org.apache.flink.table.planner.hint;
 
 import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.planner.hint.FlinkHints;
-import org.apache.flink.table.planner.hint.JoinStrategy;
 import org.apache.flink.table.planner.utils.TableTestUtil;
 
 import org.apache.calcite.rel.RelNode;
@@ -29,8 +27,8 @@ import org.apache.calcite.rel.hint.RelHint;
 import org.junit.Test;
 
 /** Tests clearing join hint with invalid propagation in batch. */
-public class ClearJoinHintWithInvalidPropagationShuttleTest
-        extends ClearJoinHintWithInvalidPropagationShuttleTestBase {
+public class ClearJoinHintsWithInvalidPropagationShuttleTest
+        extends ClearJoinHintsWithInvalidPropagationShuttleTestBase {
     @Override
     TableTestUtil getTableTestUtil() {
         return batchTestUtil(TableConfig.getDefault());
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttleTestBase.java
similarity index 96%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTestBase.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttleTestBase.java
index 105f4e42a38..07882e5da5c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttleTestBase.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.alias;
+package org.apache.flink.table.planner.hint;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.Configuration;
@@ -39,7 +39,7 @@ import org.junit.Before;
 import java.util.Collections;
 
 /** A base class for testing clearing join hint with invalid propagation. */
-public abstract class ClearJoinHintWithInvalidPropagationShuttleTestBase 
extends TableTestBase {
+public abstract class ClearJoinHintsWithInvalidPropagationShuttleTestBase 
extends TableTestBase {
 
     protected final TableTestUtil util = getTableTestUtil();
 
@@ -121,7 +121,7 @@ public abstract class 
ClearJoinHintWithInvalidPropagationShuttleTestBase extends
         util.assertEqualsOrExpand("afterPropagatingHints", plan, true);
 
         RelNode rootAfterClearingJoinHintWithInvalidPropagation =
-                rootAfterHintPropagation.accept(new 
ClearJoinHintWithInvalidPropagationShuttle());
+                rootAfterHintPropagation.accept(new 
ClearJoinHintsWithInvalidPropagationShuttle());
         plan = 
buildRelPlanWithQueryBlockAlias(rootAfterClearingJoinHintWithInvalidPropagation);
         util.assertEqualsOrExpand("afterClearingJoinHints", plan, false);
     }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearLookupJoinHintsWithInvalidPropagationShuttleTest.java
similarity index 97%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearLookupJoinHintsWithInvalidPropagationShuttleTest.java
index 13c8f7e6eb5..1499a5927f6 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearLookupJoinHintsWithInvalidPropagationShuttleTest.java
@@ -16,14 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.alias;
+package org.apache.flink.table.planner.hint;
 
 import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.annotation.FunctionHint;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
-import org.apache.flink.table.planner.hint.FlinkHints;
 import 
org.apache.flink.table.planner.plan.nodes.exec.spec.LookupJoinHintTestUtil;
 import org.apache.flink.table.planner.utils.TableTestUtil;
 
@@ -44,8 +43,8 @@ import org.junit.Test;
 import java.util.Collections;
 
 /** Tests clearing lookup join hint with invalid propagation in stream. */
-public class ClearLookupJoinHintWithInvalidPropagationShuttleTest
-        extends ClearJoinHintWithInvalidPropagationShuttleTestBase {
+public class ClearLookupJoinHintsWithInvalidPropagationShuttleTest
+        extends ClearJoinHintsWithInvalidPropagationShuttleTestBase {
     @Override
     TableTestUtil getTableTestUtil() {
         return streamTestUtil(TableConfig.getDefault());
@@ -79,7 +78,7 @@ public class 
ClearLookupJoinHintWithInvalidPropagationShuttleTest
         util.tableEnv()
                 .createTemporarySystemFunction(
                         "MockOffset",
-                        new 
ClearLookupJoinHintWithInvalidPropagationShuttleTest
+                        new 
ClearLookupJoinHintsWithInvalidPropagationShuttleTest
                                 .MockOffsetTableFunction());
     }
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
index 829581d4823..10c3f737443 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
@@ -50,6 +50,9 @@ import static scala.runtime.BoxedUnit.UNIT;
  * A test base for join hint.
  *
  * <p>TODO add test to cover legacy table source.
+ *
+ * <p>Notice: Join hints in sub-query will not be printed in AST, because 
{@code RexSubQuery} use
+ * 'RelOptUtil.toString(rel)' to print node and doesn't print hints about 
{@code LogicalJoin}.
  */
 public abstract class JoinHintTestBase extends TableTestBase {
 
@@ -869,6 +872,14 @@ public abstract class JoinHintTestBase extends 
TableTestBase {
         verifyRelPlanByCustom(String.format(sql, 
buildCaseSensitiveStr(getTestSingleJoinHint())));
     }
 
+    @Test
+    public void testJoinHintWithJoinHintInNestedCorrelatedSubQuery() {
+        String sql =
+                "select * from T1 WHERE a1 IN (select /*+ %s(T2) */ a2 + T1.a1 
from T2 join (select T3.* from T2 join T3 on T2.a2 = T3.a3) T3 on T2.a2 = 
T3.a3)";
+
+        verifyRelPlanByCustom(String.format(sql, 
buildCaseSensitiveStr(getTestSingleJoinHint())));
+    }
+
     protected String buildAstPlanWithQueryBlockAlias(List<RelNode> relNodes) {
         StringBuilder astBuilder = new StringBuilder();
         relNodes.forEach(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.java
index 3fd2b8131e1..4f4f643582e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.java
@@ -95,7 +95,7 @@ public class ClearQueryBlockAliasResolverTest extends 
JoinHintTestBase {
     }
 
     private List<RelNode> clearQueryBlockAlias(List<RelNode> relNodes) {
-        JoinHintResolver joinHintResolver = new JoinHintResolver();
+        JoinHintsResolver joinHintResolver = new JoinHintsResolver();
         relNodes = joinHintResolver.resolve(relNodes);
         ClearQueryBlockAliasResolver clearQueryBlockAliasResolver =
                 new ClearQueryBlockAliasResolver();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.java
index 2789b57d4ff..6ea20736c22 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.java
@@ -32,7 +32,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
-/** A test class for {@link JoinHintResolver}. */
+/** A test class for {@link JoinHintsResolver}. */
 public class JoinHintResolverTest extends JoinHintTestBase {
 
     // use any join hint for test
@@ -95,7 +95,7 @@ public class JoinHintResolverTest extends JoinHintTestBase {
     }
 
     private List<RelNode> resolveJoinHint(List<RelNode> relNodes) {
-        JoinHintResolver joinHintResolver = new JoinHintResolver();
+        JoinHintsResolver joinHintResolver = new JoinHintsResolver();
         return joinHintResolver.resolve(relNodes);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/alias/ClearJoinHintWithCapitalizeJoinHintShuttleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearJoinHintsWithCapitalizeJoinHintsShuttleTest.xml
similarity index 100%
rename from 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/alias/ClearJoinHintWithCapitalizeJoinHintShuttleTest.xml
rename to 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearJoinHintsWithCapitalizeJoinHintsShuttleTest.xml
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttleTest.xml
similarity index 100%
rename from 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTest.xml
rename to 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearJoinHintsWithInvalidPropagationShuttleTest.xml
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearLookupJoinHintsWithInvalidPropagationShuttleTest.xml
similarity index 100%
rename from 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.xml
rename to 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearLookupJoinHintsWithInvalidPropagationShuttleTest.xml
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
index 7ecc331564f..5ceaed237c5 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
@@ -645,10 +645,10 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, EXPR$0)], 
select=[a1, b1], build=
          +- HashAggregate(isMerge=[true], groupBy=[a1], select=[a1, 
Final_COUNT(count$0) AS EXPR$0])
             +- Exchange(distribution=[hash[a1]])
                +- LocalHashAggregate(groupBy=[a1], select=[a1, 
Partial_COUNT(a2) AS count$0])
-                  +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], 
select=[a2, a1], isBroadcast=[true], build=[right])
-                     :- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
-                     +- Exchange(distribution=[broadcast])
-                        +- TableSourceScan(table=[[default_catalog, 
default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS 
options:[T1]]]])
+                  +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], 
select=[a2, a1], isBroadcast=[true], build=[left])
+                     :- Exchange(distribution=[broadcast])
+                     :  +- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS 
options:[T1]]]])
 ]]>
     </Resource>
   </TestCase>
@@ -676,10 +676,10 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], 
select=[a1, b1], build=[rig
 :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], 
fields=[a1, b1])
 +- Exchange(distribution=[hash[a2]])
    +- Calc(select=[a2])
-      +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], 
isBroadcast=[true], build=[right])
-         :- TableSourceScan(table=[[default_catalog, default_database, T2, 
project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
-         +- Exchange(distribution=[broadcast])
-            +- TableSourceScan(table=[[default_catalog, default_database, T3, 
project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
+      +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], 
isBroadcast=[true], build=[left])
+         :- Exchange(distribution=[broadcast])
+         :  +- TableSourceScan(table=[[default_catalog, default_database, T2, 
project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+         +- TableSourceScan(table=[[default_catalog, default_database, T3, 
project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
 ]]>
     </Resource>
   </TestCase>
@@ -713,10 +713,10 @@ Calc(select=[a1, b1])
                   +- Calc(select=[+(a2, a1) AS EXPR$0, a1])
                      +- NestedLoopJoin(joinType=[InnerJoin], where=[=(+(a2, 
a1), a1)], select=[a2, a1], build=[right])
                         :- Calc(select=[a2])
-                        :  +- HashJoin(joinType=[InnerJoin], where=[=(a2, 
a3)], select=[a2, a3], isBroadcast=[true], build=[right])
-                        :     :- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
-                        :     +- Exchange(distribution=[broadcast])
-                        :        +- TableSourceScan(table=[[default_catalog, 
default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS 
options:[T3]]]])
+                        :  +- HashJoin(joinType=[InnerJoin], where=[=(a2, 
a3)], select=[a2, a3], isBroadcast=[true], build=[left])
+                        :     :- Exchange(distribution=[broadcast])
+                        :     :  +- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
+                        :     +- TableSourceScan(table=[[default_catalog, 
default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS 
options:[T3]]]])
                         +- Exchange(distribution=[broadcast])
                            +- HashAggregate(isMerge=[true], groupBy=[a1], 
select=[a1])
                               +- Exchange(distribution=[hash[a1]])
@@ -773,10 +773,60 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], 
select=[a1, b1], isBroadcas
       +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], global=[true])
          +- Exchange(distribution=[single])
             +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], 
global=[false])
-               +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], 
select=[a2, a1], isBroadcast=[true], build=[right])
-                  :- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
-                  +- Exchange(distribution=[broadcast])
-                     +- TableSourceScan(table=[[default_catalog, 
default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS 
options:[T1]]]])
+               +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], 
select=[a2, a1], isBroadcast=[true], build=[left])
+                  :- Exchange(distribution=[broadcast])
+                  :  +- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS 
options:[T1]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInNestedCorrelatedSubQuery">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ a2 + 
T1.a1 from T2 join (select T3.* from T2 join T3 on T2.a2 = T3.a3) T3 on T2.a2 = 
T3.a3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(EXPR$0=[+($0, $cor0.a1)])
+  LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+    LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+    LogicalProject(a3=[$2], b3=[$3])
+      LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+        LogicalTableScan(table=[[default_catalog, default_database, T3]], 
hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a1, b1])
++- HashJoin(joinType=[InnerJoin], where=[=(a1, EXPR$0)], select=[a1, b1, 
EXPR$0], build=[left])
+   :- Exchange(distribution=[hash[a1]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], 
fields=[a1, b1])
+   +- Exchange(distribution=[hash[EXPR$0]])
+      +- Calc(select=[EXPR$0])
+         +- HashAggregate(isMerge=[true], groupBy=[EXPR$0, a1], 
select=[EXPR$0, a1])
+            +- Exchange(distribution=[hash[EXPR$0, a1]])
+               +- LocalHashAggregate(groupBy=[EXPR$0, a1], select=[EXPR$0, a1])
+                  +- Calc(select=[+(a2, a1) AS EXPR$0, a1])
+                     +- NestedLoopJoin(joinType=[InnerJoin], where=[=(+(a2, 
a1), a1)], select=[a2, a1], build=[right])
+                        :- Calc(select=[a2])
+                        :  +- HashJoin(joinType=[InnerJoin], where=[=(a2, 
a3)], select=[a2, a3], isBroadcast=[true], build=[left])
+                        :     :- Exchange(distribution=[broadcast])
+                        :     :  +- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
+                        :     +- Calc(select=[a3])
+                        :        +- HashJoin(joinType=[InnerJoin], 
where=[=(a2, a3)], select=[a2, a3], build=[right])
+                        :           :- Exchange(distribution=[hash[a2]])
+                        :           :  +- 
TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], 
metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+                        :           +- Exchange(distribution=[hash[a3]])
+                        :              +- 
TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], 
metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
+                        +- Exchange(distribution=[broadcast])
+                           +- HashAggregate(isMerge=[true], groupBy=[a1], 
select=[a1])
+                              +- Exchange(distribution=[hash[a1]])
+                                 +- LocalHashAggregate(groupBy=[a1], 
select=[a1])
+                                    +- 
TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], 
metadata=[]]], fields=[a1])
 ]]>
     </Resource>
   </TestCase>
@@ -803,10 +853,10 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], 
select=[a1, b1], build=[rig
 :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], 
fields=[a1, b1])
 +- Exchange(distribution=[hash[a2]])
    +- Calc(select=[a2])
-      +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], 
isBroadcast=[true], build=[right])
-         :- TableSourceScan(table=[[default_catalog, default_database, T2, 
project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
-         +- Exchange(distribution=[broadcast])
-            +- TableSourceScan(table=[[default_catalog, default_database, T3, 
project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
+      +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], 
isBroadcast=[true], build=[left])
+         :- Exchange(distribution=[broadcast])
+         :  +- TableSourceScan(table=[[default_catalog, default_database, T2, 
project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+         +- TableSourceScan(table=[[default_catalog, default_database, T3, 
project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
index f771fa2f933..a5dcfef070a 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
@@ -644,10 +644,10 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, EXPR$0)], 
select=[a1, b1], build=
          +- HashAggregate(isMerge=[true], groupBy=[a1], select=[a1, 
Final_COUNT(count$0) AS EXPR$0])
             +- Exchange(distribution=[hash[a1]])
                +- LocalHashAggregate(groupBy=[a1], select=[a1, 
Partial_COUNT(a2) AS count$0])
-                  +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a1)], 
select=[a2, a1], build=[right])
-                     :- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
-                     +- Exchange(distribution=[broadcast])
-                        +- TableSourceScan(table=[[default_catalog, 
default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS 
options:[T1]]]])
+                  +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a1)], 
select=[a2, a1], build=[left])
+                     :- Exchange(distribution=[broadcast])
+                     :  +- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS 
options:[T1]]]])
 ]]>
     </Resource>
   </TestCase>
@@ -675,10 +675,10 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], 
select=[a1, b1], build=[rig
 :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], 
fields=[a1, b1])
 +- Exchange(distribution=[hash[a2]])
    +- Calc(select=[a2])
-      +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, 
a3], build=[right])
-         :- TableSourceScan(table=[[default_catalog, default_database, T2, 
project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
-         +- Exchange(distribution=[broadcast])
-            +- TableSourceScan(table=[[default_catalog, default_database, T3, 
project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
+      +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, 
a3], build=[left])
+         :- Exchange(distribution=[broadcast])
+         :  +- TableSourceScan(table=[[default_catalog, default_database, T2, 
project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+         +- TableSourceScan(table=[[default_catalog, default_database, T3, 
project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
 ]]>
     </Resource>
   </TestCase>
@@ -712,10 +712,10 @@ Calc(select=[a1, b1])
                   +- Calc(select=[+(a2, a1) AS EXPR$0, a1])
                      +- NestedLoopJoin(joinType=[InnerJoin], where=[=(+(a2, 
a1), a1)], select=[a2, a1], build=[right])
                         :- Calc(select=[a2])
-                        :  +- NestedLoopJoin(joinType=[InnerJoin], 
where=[=(a2, a3)], select=[a2, a3], build=[right])
-                        :     :- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
-                        :     +- Exchange(distribution=[broadcast])
-                        :        +- TableSourceScan(table=[[default_catalog, 
default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS 
options:[T3]]]])
+                        :  +- NestedLoopJoin(joinType=[InnerJoin], 
where=[=(a2, a3)], select=[a2, a3], build=[left])
+                        :     :- Exchange(distribution=[broadcast])
+                        :     :  +- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
+                        :     +- TableSourceScan(table=[[default_catalog, 
default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS 
options:[T3]]]])
                         +- Exchange(distribution=[broadcast])
                            +- HashAggregate(isMerge=[true], groupBy=[a1], 
select=[a1])
                               +- Exchange(distribution=[hash[a1]])
@@ -772,10 +772,60 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], 
select=[a1, b1], isBroadcas
       +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], global=[true])
          +- Exchange(distribution=[single])
             +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], 
global=[false])
-               +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a1)], 
select=[a2, a1], build=[right])
-                  :- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
-                  +- Exchange(distribution=[broadcast])
-                     +- TableSourceScan(table=[[default_catalog, 
default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS 
options:[T1]]]])
+               +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a1)], 
select=[a2, a1], build=[left])
+                  :- Exchange(distribution=[broadcast])
+                  :  +- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS 
options:[T1]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInNestedCorrelatedSubQuery">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ NeSt_lOoP(T2) */ a2 + 
T1.a1 from T2 join (select T3.* from T2 join T3 on T2.a2 = T3.a3) T3 on T2.a2 = 
T3.a3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(EXPR$0=[+($0, $cor0.a1)])
+  LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+    LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+    LogicalProject(a3=[$2], b3=[$3])
+      LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+        LogicalTableScan(table=[[default_catalog, default_database, T3]], 
hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a1, b1])
++- HashJoin(joinType=[InnerJoin], where=[=(a1, EXPR$0)], select=[a1, b1, 
EXPR$0], build=[left])
+   :- Exchange(distribution=[hash[a1]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], 
fields=[a1, b1])
+   +- Exchange(distribution=[hash[EXPR$0]])
+      +- Calc(select=[EXPR$0])
+         +- HashAggregate(isMerge=[true], groupBy=[EXPR$0, a1], 
select=[EXPR$0, a1])
+            +- Exchange(distribution=[hash[EXPR$0, a1]])
+               +- LocalHashAggregate(groupBy=[EXPR$0, a1], select=[EXPR$0, a1])
+                  +- Calc(select=[+(a2, a1) AS EXPR$0, a1])
+                     +- NestedLoopJoin(joinType=[InnerJoin], where=[=(+(a2, 
a1), a1)], select=[a2, a1], build=[right])
+                        :- Calc(select=[a2])
+                        :  +- NestedLoopJoin(joinType=[InnerJoin], 
where=[=(a2, a3)], select=[a2, a3], build=[left])
+                        :     :- Exchange(distribution=[broadcast])
+                        :     :  +- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
+                        :     +- Calc(select=[a3])
+                        :        +- HashJoin(joinType=[InnerJoin], 
where=[=(a2, a3)], select=[a2, a3], build=[right])
+                        :           :- Exchange(distribution=[hash[a2]])
+                        :           :  +- 
TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], 
metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+                        :           +- Exchange(distribution=[hash[a3]])
+                        :              +- 
TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], 
metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
+                        +- Exchange(distribution=[broadcast])
+                           +- HashAggregate(isMerge=[true], groupBy=[a1], 
select=[a1])
+                              +- Exchange(distribution=[hash[a1]])
+                                 +- LocalHashAggregate(groupBy=[a1], 
select=[a1])
+                                    +- 
TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], 
metadata=[]]], fields=[a1])
 ]]>
     </Resource>
   </TestCase>
@@ -802,10 +852,10 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], 
select=[a1, b1], build=[rig
 :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], 
fields=[a1, b1])
 +- Exchange(distribution=[hash[a2]])
    +- Calc(select=[a2])
-      +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, 
a3], build=[right])
-         :- TableSourceScan(table=[[default_catalog, default_database, T2, 
project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
-         +- Exchange(distribution=[broadcast])
-            +- TableSourceScan(table=[[default_catalog, default_database, T3, 
project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
+      +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, 
a3], build=[left])
+         :- Exchange(distribution=[broadcast])
+         :  +- TableSourceScan(table=[[default_catalog, default_database, T2, 
project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+         +- TableSourceScan(table=[[default_catalog, default_database, T3, 
project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
 ]]>
     </Resource>
   </TestCase>
@@ -1082,7 +1132,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], select=[a1, EXPR$0], 
build=[right])
+NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], select=[a1, EXPR$0], 
build=[right], singleRowJoin=[true])
 :- Calc(select=[a1])
 :  +- HashJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, a2], 
build=[left])
 :     :- Exchange(distribution=[hash[a1]])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
index 00fbfb8ac69..4a2398579cc 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
@@ -663,7 +663,7 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, EXPR$0)], 
select=[a1, b1], build=
    +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
       +- Calc(select=[EXPR$0])
          +- HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, COUNT(a2) 
AS EXPR$0])
-            +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, 
a1], build=[right])
+            +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, 
a1], build=[left])
                :- Exchange(distribution=[hash[a2]])
                :  +- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
                +- Exchange(distribution=[hash[a1]])
@@ -694,7 +694,7 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], 
select=[a1, b1], build=[rig
 :- Exchange(distribution=[hash[a1]])
 :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], 
fields=[a1, b1])
 +- Calc(select=[a2])
-   +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], 
build=[right])
+   +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], 
build=[left])
       :- Exchange(distribution=[hash[a2]])
       :  +- TableSourceScan(table=[[default_catalog, default_database, T2, 
project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
       +- Exchange(distribution=[hash[a3]])
@@ -732,7 +732,7 @@ Calc(select=[a1, b1])
                   +- Calc(select=[+(a2, a1) AS EXPR$0, a1])
                      +- NestedLoopJoin(joinType=[InnerJoin], where=[=(+(a2, 
a1), a1)], select=[a2, a1], build=[right])
                         :- Calc(select=[a2])
-                        :  +- HashJoin(joinType=[InnerJoin], where=[=(a2, 
a3)], select=[a2, a3], build=[right])
+                        :  +- HashJoin(joinType=[InnerJoin], where=[=(a2, 
a3)], select=[a2, a3], build=[left])
                         :     :- Exchange(distribution=[hash[a2]])
                         :     :  +- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
                         :     +- Exchange(distribution=[hash[a3]])
@@ -793,11 +793,61 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], 
select=[a1, b1], isBroadcas
       +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], global=[true])
          +- Exchange(distribution=[single])
             +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], 
global=[false])
-               +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], 
select=[a2, a1], build=[right])
+               +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], 
select=[a2, a1], build=[left])
                   :- Exchange(distribution=[hash[a2]])
                   :  +- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
                   +- Exchange(distribution=[hash[a1]])
                      +- TableSourceScan(table=[[default_catalog, 
default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS 
options:[T1]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInNestedCorrelatedSubQuery">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ ShUfFlE_HaSh(T2) */ a2 
+ T1.a1 from T2 join (select T3.* from T2 join T3 on T2.a2 = T3.a3) T3 on T2.a2 
= T3.a3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(EXPR$0=[+($0, $cor0.a1)])
+  LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+    LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+    LogicalProject(a3=[$2], b3=[$3])
+      LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+        LogicalTableScan(table=[[default_catalog, default_database, T3]], 
hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a1, b1])
++- HashJoin(joinType=[InnerJoin], where=[=(a1, EXPR$0)], select=[a1, b1, 
EXPR$0], build=[left])
+   :- Exchange(distribution=[hash[a1]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], 
fields=[a1, b1])
+   +- Exchange(distribution=[hash[EXPR$0]])
+      +- Calc(select=[EXPR$0])
+         +- HashAggregate(isMerge=[true], groupBy=[EXPR$0, a1], 
select=[EXPR$0, a1])
+            +- Exchange(distribution=[hash[EXPR$0, a1]])
+               +- LocalHashAggregate(groupBy=[EXPR$0, a1], select=[EXPR$0, a1])
+                  +- Calc(select=[+(a2, a1) AS EXPR$0, a1])
+                     +- NestedLoopJoin(joinType=[InnerJoin], where=[=(+(a2, 
a1), a1)], select=[a2, a1], build=[right])
+                        :- Calc(select=[a2])
+                        :  +- HashJoin(joinType=[InnerJoin], where=[=(a2, 
a3)], select=[a2, a3], build=[left])
+                        :     :- Exchange(distribution=[hash[a2]])
+                        :     :  +- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
+                        :     +- Calc(select=[a3])
+                        :        +- HashJoin(joinType=[InnerJoin], 
where=[=(a2, a3)], select=[a2, a3], build=[right])
+                        :           :- Exchange(distribution=[hash[a2]])
+                        :           :  +- 
TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], 
metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+                        :           +- Exchange(distribution=[hash[a3]])
+                        :              +- 
TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], 
metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
+                        +- Exchange(distribution=[broadcast])
+                           +- HashAggregate(isMerge=[true], groupBy=[a1], 
select=[a1])
+                              +- Exchange(distribution=[hash[a1]])
+                                 +- LocalHashAggregate(groupBy=[a1], 
select=[a1])
+                                    +- 
TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], 
metadata=[]]], fields=[a1])
 ]]>
     </Resource>
   </TestCase>
@@ -823,7 +873,7 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], 
select=[a1, b1], build=[rig
 :- Exchange(distribution=[hash[a1]])
 :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], 
fields=[a1, b1])
 +- Calc(select=[a2])
-   +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], 
build=[right])
+   +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], 
build=[left])
       :- Exchange(distribution=[hash[a2]])
       :  +- TableSourceScan(table=[[default_catalog, default_database, T2, 
project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
       +- Exchange(distribution=[hash[a3]])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
index 9600cb51a53..55119b885ba 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
@@ -798,6 +798,56 @@ HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], 
select=[a1, b1], isBroadcas
                   :  +- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
                   +- Exchange(distribution=[hash[a1]])
                      +- TableSourceScan(table=[[default_catalog, 
default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS 
options:[T1]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInNestedCorrelatedSubQuery">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ ShUfFlE_MeRgE(T2) */ 
a2 + T1.a1 from T2 join (select T3.* from T2 join T3 on T2.a2 = T3.a3) T3 on 
T2.a2 = T3.a3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(EXPR$0=[+($0, $cor0.a1)])
+  LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+    LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+    LogicalProject(a3=[$2], b3=[$3])
+      LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+        LogicalTableScan(table=[[default_catalog, default_database, T3]], 
hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a1, b1])
++- HashJoin(joinType=[InnerJoin], where=[=(a1, EXPR$0)], select=[a1, b1, 
EXPR$0], build=[left])
+   :- Exchange(distribution=[hash[a1]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], 
fields=[a1, b1])
+   +- Exchange(distribution=[hash[EXPR$0]])
+      +- Calc(select=[EXPR$0])
+         +- HashAggregate(isMerge=[true], groupBy=[EXPR$0, a1], 
select=[EXPR$0, a1])
+            +- Exchange(distribution=[hash[EXPR$0, a1]])
+               +- LocalHashAggregate(groupBy=[EXPR$0, a1], select=[EXPR$0, a1])
+                  +- Calc(select=[+(a2, a1) AS EXPR$0, a1])
+                     +- NestedLoopJoin(joinType=[InnerJoin], where=[=(+(a2, 
a1), a1)], select=[a2, a1], build=[right])
+                        :- Calc(select=[a2])
+                        :  +- SortMergeJoin(joinType=[InnerJoin], where=[=(a2, 
a3)], select=[a2, a3])
+                        :     :- Exchange(distribution=[hash[a2]])
+                        :     :  +- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS 
options:[T2]]]])
+                        :     +- Calc(select=[a3])
+                        :        +- HashJoin(joinType=[InnerJoin], 
where=[=(a2, a3)], select=[a2, a3], build=[right])
+                        :           :- Exchange(distribution=[hash[a2]])
+                        :           :  +- 
TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], 
metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+                        :           +- Exchange(distribution=[hash[a3]])
+                        :              +- 
TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], 
metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
+                        +- Exchange(distribution=[broadcast])
+                           +- HashAggregate(isMerge=[true], groupBy=[a1], 
select=[a1])
+                              +- Exchange(distribution=[hash[a1]])
+                                 +- LocalHashAggregate(groupBy=[a1], 
select=[a1])
+                                    +- 
TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], 
metadata=[]]], fields=[a1])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml
index cedbacf095e..8311a1f449a 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml
@@ -429,6 +429,26 @@ LogicalProject(a2=[$0])
         LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
 })]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
    +- LogicalTableScan(table=[[default_catalog, default_database, T1]]), 
rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInNestedCorrelatedSubQuery">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ a2 + 
T1.a1 from T2 join (select T3.* from T2 join T3 on T2.a2 = T3.a3) T3 on T2.a2 = 
T3.a3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, 
VARCHAR(2147483647) b1)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(EXPR$0=[+($0, $cor0.a1)])
+  LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+    LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+    LogicalProject(a3=[$2], b3=[$3])
+      LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+        LogicalTableScan(table=[[default_catalog, default_database, T3]], 
hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})], variablesSet=[[$cor0]]), rowType=[RecordType(BIGINT a1, 
VARCHAR(2147483647) b1)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]]), 
rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.xml
index 51a413d7ea8..5b9eedcb85a 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.xml
@@ -429,6 +429,26 @@ LogicalProject(a2=[$0])
         LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
 })]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
    +- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, 
VARCHAR(2147483647) b1)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInNestedCorrelatedSubQuery">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ a2 + 
T1.a1 from T2 join (select T3.* from T2 join T3 on T2.a2 = T3.a3) T3 on T2.a2 = 
T3.a3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, 
VARCHAR(2147483647) b1)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(EXPR$0=[+($0, $cor0.a1)])
+  LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+    LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+    LogicalProject(a3=[$2], b3=[$3])
+      LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+        LogicalTableScan(table=[[default_catalog, default_database, T3]], 
hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})], variablesSet=[[$cor0]]), rowType=[RecordType(BIGINT a1, 
VARCHAR(2147483647) b1)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, 
VARCHAR(2147483647) b1)]
 ]]>
     </Resource>
   </TestCase>

Reply via email to