This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ef97c651f06dc835c36e8213687d66e78d80a0b6
Author: lincoln lee <lincoln.8...@gmail.com>
AuthorDate: Thu Aug 18 14:24:41 2022 +0800

    [FLINK-28987][table-planner] Refine description of lookup join 
transformation with async params and retry strategy for easier debugging
    
    This closes #20592
---
 .../table/planner/plan/utils/LookupJoinUtil.java   |  15 +
 .../physical/common/CommonPhysicalLookupJoin.scala |   3 +-
 ...ggAndAllConstantLookupKeyWithTryResolveMode.out |   6 +-
 ...nstantLookupKeyWithTryResolveMode_newSource.out |   6 +-
 .../planner/plan/batch/sql/join/LookupJoinTest.xml |  32 +-
 .../nodes/exec/operator/BatchOperatorNameTest.xml  |  14 +-
 .../nodes/exec/operator/StreamOperatorNameTest.xml |  14 +-
 .../testAggAndLeftJoinWithTryResolveMode.out       |   2 +-
 .../testJoinTemporalTable.out                      |   2 +-
 .../testJoinTemporalTableWithAsyncHint.out         |   2 +-
 .../testJoinTemporalTableWithAsyncHint2.out        |   2 +-
 .../testJoinTemporalTableWithAsyncRetryHint.out    |   2 +-
 .../testJoinTemporalTableWithAsyncRetryHint2.out   |   2 +-
 ...testJoinTemporalTableWithProjectionPushDown.out |   2 +-
 .../testJoinTemporalTableWithRetryHint.out         |   2 +-
 .../plan/stream/sql/NonDeterministicDagTest.xml    |  40 +-
 .../table/planner/plan/stream/sql/RankTest.xml     |   2 +-
 .../plan/stream/sql/join/LookupJoinTest.xml        | 986 ++++++++++++++++++---
 .../plan/stream/sql/join/LookupJoinTest.scala      |  85 +-
 19 files changed, 1027 insertions(+), 192 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
index 55399ba9b45..cf4362991f4 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
@@ -230,6 +230,11 @@ public final class LookupJoinUtil {
         public int hashCode() {
             return Objects.hash(asyncBufferCapacity, asyncTimeout, 
asyncOutputMode);
         }
+
+        @Override
+        public String toString() {
+            return asyncOutputMode + ", " + asyncTimeout + "ms, " + 
asyncBufferCapacity;
+        }
     }
 
     /** RetryOptions includes retry lookup related options. */
@@ -286,6 +291,16 @@ public final class LookupJoinUtil {
             return Objects.hash(retryPredicate, retryStrategy, 
retryFixedDelay, retryMaxAttempts);
         }
 
+        @Override
+        public String toString() {
+            return retryPredicate
+                    + ", "
+                    + retryStrategy
+                    + ", "
+                    + retryFixedDelay
+                    + "ms, "
+                    + retryMaxAttempts;
+        }
 
         @Nullable
         public static RetryLookupOptions fromJoinHint(@Nullable RelHint 
lookupJoinHint) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
index 36c9211262c..a49b6d590ad 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
@@ -191,7 +191,6 @@ abstract class CommonPhysicalLookupJoin(
       .explainTerms(pw)
       .item("table", tableIdentifier.asSummaryString())
       .item("joinType", JoinTypeUtil.getFlinkJoinType(joinType))
-      .item("async", isAsyncEnabled)
       .item("lookup", lookupKeys)
       .itemIf("where", whereString, whereString.nonEmpty)
       .itemIf(
@@ -204,6 +203,8 @@ abstract class CommonPhysicalLookupJoin(
         remainingCondition.isDefined)
       .item("select", selection)
       .itemIf("upsertMaterialize", "true", upsertMaterialize)
+      .itemIf("async", asyncOptions.getOrElse(""), asyncOptions.isDefined)
+      .itemIf("retry", retryOptions.getOrElse(""), retryOptions.isDefined)
   }
 
   private def getInputChangelogMode(rel: RelNode): ChangelogMode = rel match {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out
 
b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out
index 561c967f834..78b059bd681 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode.out
@@ -12,7 +12,7 @@ LogicalSink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age
 
 == Optimized Physical Plan ==
 Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], async=[false], lookup=[id=100], where=[=(id, 100)], 
select=[a, name, age], upsertMaterialize=[true])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], lookup=[id=100], where=[=(id, 100)], select=[a, name, 
age], upsertMaterialize=[true])
    +- Calc(select=[a])
       +- GroupAggregate(groupBy=[b], select=[b, MAX(a) AS a])
          +- Exchange(distribution=[hash[b]])
@@ -21,7 +21,7 @@ Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])
 
 == Optimized Execution Plan ==
 Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], async=[false], lookup=[id=100], where=[(id = 100)], 
select=[a, name, age], upsertMaterialize=[true])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], lookup=[id=100], where=[(id = 100)], select=[a, name, 
age], upsertMaterialize=[true])
    +- Calc(select=[a])
       +- GroupAggregate(groupBy=[b], select=[b, MAX(a) AS a])
          +- Exchange(distribution=[hash[b]])
@@ -84,7 +84,7 @@ Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])
     "id" : 10,
     "type" : "LookupJoin[6]",
     "pact" : "Operator",
-    "contents" : 
"[6]:LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], async=[false], lookup=[id=100], where=[(id = 100)], 
select=[a, name, age], upsertMaterialize=[true])",
+    "contents" : 
"[6]:LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], lookup=[id=100], where=[(id = 100)], select=[a, name, 
age], upsertMaterialize=[true])",
     "parallelism" : 1,
     "predecessors" : [ {
       "id" : 8,
diff --git 
a/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out
 
b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out
index 823a49d1a1e..2b25d8f1b36 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/explain/stream/join/lookup/testAggAndAllConstantLookupKeyWithTryResolveMode_newSource.out
@@ -12,7 +12,7 @@ LogicalSink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age
 
 == Optimized Physical Plan ==
 Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], async=[false], lookup=[id=100], where=[=(id, 100)], 
select=[a, name, age], upsertMaterialize=[true])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], lookup=[id=100], where=[=(id, 100)], select=[a, name, 
age], upsertMaterialize=[true])
    +- Calc(select=[a])
       +- GroupAggregate(groupBy=[b], select=[b, MAX(a) AS a])
          +- Exchange(distribution=[hash[b]])
@@ -21,7 +21,7 @@ Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])
 
 == Optimized Execution Plan ==
 Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], async=[false], lookup=[id=100], where=[(id = 100)], 
select=[a, name, age], upsertMaterialize=[true])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], lookup=[id=100], where=[(id = 100)], select=[a, name, 
age], upsertMaterialize=[true])
    +- Calc(select=[a])
       +- GroupAggregate(groupBy=[b], select=[b, MAX(a) AS a])
          +- Exchange(distribution=[hash[b]])
@@ -84,7 +84,7 @@ Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])
     "id" : ,
     "type" : "LookupJoin[]",
     "pact" : "Operator",
-    "contents" : 
"[]:LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], async=[false], lookup=[id=100], where=[(id = 100)], 
select=[a, name, age], upsertMaterialize=[true])",
+    "contents" : 
"[]:LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], lookup=[id=100], where=[(id = 100)], select=[a, name, 
age], upsertMaterialize=[true])",
     "parallelism" : 1,
     "predecessors" : [ {
       "id" : ,
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.xml
index aa6765410e3..d1593ef8f0a 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.xml
@@ -53,7 +53,7 @@ HashAggregate(isMerge=[true], groupBy=[b], select=[b, 
Final_COUNT(count$0) AS EX
 +- Exchange(distribution=[hash[b]])
    +- LocalHashAggregate(groupBy=[b], select=[b, Partial_COUNT(a) AS count$0, 
Partial_SUM(c) AS sum$1, Partial_SUM(d) AS sum$2])
       +- Calc(select=[b, a, c, d])
-         +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], where=[(age > 10)], 
select=[b, a, c, d, id])
+         +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], where=[(age > 10)], select=[b, a, c, d, 
id])
             +- Calc(select=[b, a, c, d])
                +- HashAggregate(isMerge=[true], groupBy=[a, b], select=[a, b, 
Final_SUM(sum$0) AS c, Final_SUM(sum$1) AS d])
                   +- Exchange(distribution=[hash[a, b]])
@@ -99,7 +99,7 @@ HashAggregate(isMerge=[true], groupBy=[b], select=[b, 
Final_COUNT(count$0) AS EX
 +- Exchange(distribution=[hash[b]])
    +- LocalHashAggregate(groupBy=[b], select=[b, Partial_COUNT(a) AS count$0, 
Partial_SUM(c) AS sum$1, Partial_SUM(d) AS sum$2])
       +- Calc(select=[b, a, c, d])
-         +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], where=[(age > 10)], 
select=[b, a, c, d, id])
+         +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], where=[(age > 10)], select=[b, a, c, d, 
id])
             +- Calc(select=[b, a, c, d])
                +- HashAggregate(isMerge=[true], groupBy=[a, b], select=[a, b, 
Final_SUM(sum$0) AS c, Final_SUM(sum$1) AS d])
                   +- Exchange(distribution=[hash[a, b]])
@@ -126,7 +126,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
id=[$4], name=[$5], age=[$
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, 
age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, id, name, age])
    +- Calc(select=[a, b, c, PROCTIME() AS proctime])
       +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], 
fields=[a, b, c])
 ]]>
@@ -150,7 +150,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
id=[$4], name=[$5], age=[$
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, 
age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, id, name, age])
    +- Calc(select=[a, b, c, PROCTIME() AS proctime])
       +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], 
fields=[a, b, c])
 ]]>
@@ -181,7 +181,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], name=[$5], age=[$6], 
nominal_age=[$7])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, name, age, nominal_age])
-+- 
LookupJoin(table=[default_catalog.default_database.LookupTableWithComputedColumn],
 joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, id, name, 
age, (age + 1) AS nominal_age])
++- 
LookupJoin(table=[default_catalog.default_database.LookupTableWithComputedColumn],
 joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, id, name, age, (age + 1) 
AS nominal_age])
    +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], 
fields=[a, b, c])
 ]]>
     </Resource>
@@ -211,7 +211,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], name=[$5], age=[$6], 
nominal_age=[$7])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, name, age, nominal_age])
-+- 
LookupJoin(table=[default_catalog.default_database.LookupTableWithComputedColumn],
 joinType=[InnerJoin], async=[false], lookup=[id=a], where=[((age + 1) > 12)], 
select=[a, b, c, id, name, age, (age + 1) AS nominal_age])
++- 
LookupJoin(table=[default_catalog.default_database.LookupTableWithComputedColumn],
 joinType=[InnerJoin], lookup=[id=a], where=[((age + 1) > 12)], select=[a, b, 
c, id, name, age, (age + 1) AS nominal_age])
    +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], 
fields=[a, b, c])
 ]]>
     </Resource>
@@ -240,7 +240,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
id=[$4], name=[$5], age=[$
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, 
CAST(10 AS INTEGER) AS age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[age=10, id=a], where=[(age = 10)], 
select=[a, b, c, proctime, id, name])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, 
c, proctime, id, name])
    +- Calc(select=[a, b, c, PROCTIME() AS proctime], where=[(c > 1000)])
       +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], 
fields=[a, b, c])
 ]]>
@@ -270,7 +270,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
id=[$4], name=[$5], age=[$
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, 
CAST(10 AS INTEGER) AS age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[age=10, id=a], where=[(age = 10)], 
select=[a, b, c, proctime, id, name])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, 
c, proctime, id, name])
    +- Calc(select=[a, b, c, PROCTIME() AS proctime], where=[(c > 1000)])
       +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], 
fields=[a, b, c])
 ]]>
@@ -296,7 +296,7 @@ LogicalProject(a=[$0], b=[$1], proctime=[$2], id=[$3], 
name=[$4], age=[$5])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, proctime, id, 
name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, proctime, id, name, age])
    +- Calc(select=[a, b, PROCTIME() AS proctime], where=[(c > 1000)])
       +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], 
fields=[a, b, c])
 ]]>
@@ -322,7 +322,7 @@ LogicalProject(a=[$0], b=[$1], proctime=[$2], id=[$3], 
name=[$4], age=[$5])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, proctime, id, 
name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, proctime, id, name, age])
    +- Calc(select=[a, b, PROCTIME() AS proctime], where=[(c > 1000)])
       +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], 
fields=[a, b, c])
 ]]>
@@ -351,7 +351,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
id=[$4])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, id])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
id])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, id])
    +- Calc(select=[a, b, c, PROCTIME() AS proctime])
       +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], 
fields=[a, b, c])
 ]]>
@@ -375,7 +375,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
id=[$4], name=[$5], age=[$
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, 
age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], async=[false], lookup=[id=a], select=[a, b, c, 
proctime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], lookup=[id=a], select=[a, b, c, proctime, id, name, 
age])
    +- Calc(select=[a, b, c, PROCTIME() AS proctime])
       +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], 
fields=[a, b, c])
 ]]>
@@ -404,7 +404,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
id=[$4])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, id])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
id])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, id])
    +- Calc(select=[a, b, c, PROCTIME() AS proctime])
       +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], 
fields=[a, b, c])
 ]]>
@@ -428,7 +428,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
id=[$4], name=[$5], age=[$
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, 
age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], async=[false], lookup=[id=a], select=[a, b, c, 
proctime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], lookup=[id=a], select=[a, b, c, proctime, id, name, 
age])
    +- Calc(select=[a, b, c, PROCTIME() AS proctime])
       +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], 
fields=[a, b, c])
 ]]>
@@ -588,7 +588,7 @@ Calc(select=[EXPR$0, EXPR$1, EXPR$2])
       +- LocalHashAggregate(groupBy=[b, b0], select=[b, b0, Partial_COUNT(a) 
AS count$0, Partial_COUNT(id) AS count$1, Partial_SUM(a0) AS sum$2])
          +- HashJoin(joinType=[InnerJoin], where=[(a = a0)], select=[b, a, id, 
a0, b0], build=[right])
             :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
-            :  +- 
LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], where=[(age > 10)], 
select=[b, a, id])(reuse_id=[1])
+            :  +- 
LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], where=[(age > 10)], select=[b, a, 
id])(reuse_id=[1])
             :     +- Calc(select=[b, a])
             :        +- HashAggregate(isMerge=[true], groupBy=[a, b], 
select=[a, b])
             :           +- Exchange(distribution=[hash[a, b]])
@@ -664,7 +664,7 @@ Calc(select=[EXPR$0, EXPR$1, EXPR$2])
       +- LocalHashAggregate(groupBy=[b, b0], select=[b, b0, Partial_COUNT(a) 
AS count$0, Partial_COUNT(id) AS count$1, Partial_SUM(a0) AS sum$2])
          +- HashJoin(joinType=[InnerJoin], where=[(a = a0)], select=[b, a, id, 
a0, b0], build=[right])
             :- Exchange(distribution=[hash[a]], shuffle_mode=[BATCH])
-            :  +- 
LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], where=[(age > 10)], 
select=[b, a, id])(reuse_id=[1])
+            :  +- 
LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], where=[(age > 10)], select=[b, a, 
id])(reuse_id=[1])
             :     +- Calc(select=[b, a])
             :        +- HashAggregate(isMerge=[true], groupBy=[a, b], 
select=[a, b])
             :           +- Exchange(distribution=[hash[a, b]])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml
index d2d13adb880..2b3a0c1a854 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml
@@ -837,13 +837,13 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
rowtime=[$4], proctime=[$5], id=[
 
 == Optimized Physical Plan ==
 Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
id, name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=b], select=[a, b, c, d, 
rowtime, proctime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=b], select=[a, b, c, d, rowtime, proctime, id, 
name, age])
    +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, rowtime])
 
 == Optimized Execution Plan ==
 Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
id, name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=b], select=[a, b, c, d, 
rowtime, proctime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=b], select=[a, b, c, d, rowtime, proctime, id, 
name, age])
    +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, rowtime])
 
@@ -868,9 +868,9 @@ Calc(select=[a, b, c, d, rowtime, 
PROCTIME_MATERIALIZE(proctime) AS proctime, id
     } ]
   }, {
     "id" : ,
-    "type" : "LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=b], select=[a, b, c, d, 
rowtime, proctime, id, name, age])",
+    "type" : "LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=b], select=[a, b, c, d, rowtime, proctime, id, 
name, age])",
     "pact" : "Operator",
-    "contents" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=b], select=[a, b, c, d, 
rowtime, proctime, id, name, age])",
+    "contents" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=b], select=[a, b, c, d, rowtime, proctime, id, 
name, age])",
     "parallelism" : 1,
     "predecessors" : [ {
       "id" : ,
@@ -905,13 +905,13 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
rowtime=[$4], proctime=[$5], id=[
 
 == Optimized Physical Plan ==
 Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
id, name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=b], select=[a, b, c, d, 
rowtime, proctime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=b], select=[a, b, c, d, rowtime, proctime, id, 
name, age])
    +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, rowtime])
 
 == Optimized Execution Plan ==
 Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
id, name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=b], select=[a, b, c, d, 
rowtime, proctime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=b], select=[a, b, c, d, rowtime, proctime, id, 
name, age])
    +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, rowtime])
 
@@ -938,7 +938,7 @@ Calc(select=[a, b, c, d, rowtime, 
PROCTIME_MATERIALIZE(proctime) AS proctime, id
     "id" : ,
     "type" : "LookupJoin[]",
     "pact" : "Operator",
-    "contents" : 
"[]:LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=b], select=[a, b, c, d, 
rowtime, proctime, id, name, age])",
+    "contents" : 
"[]:LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=b], select=[a, b, c, d, rowtime, proctime, id, 
name, age])",
     "parallelism" : 1,
     "predecessors" : [ {
       "id" : ,
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.xml
index 4e779aac6ff..f18b3e24d9e 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.xml
@@ -2160,14 +2160,14 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
rowtime=[$4], proctime=[$5], id=[
 
 == Optimized Physical Plan ==
 Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
id, name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=b], select=[a, b, c, d, 
rowtime, proctime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=b], select=[a, b, c, d, rowtime, proctime, id, 
name, age])
    +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
       +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
 
 == Optimized Execution Plan ==
 Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
id, name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=b], select=[a, b, c, d, 
rowtime, proctime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=b], select=[a, b, c, d, rowtime, proctime, id, 
name, age])
    +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL 
SECOND)])
       +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
@@ -2204,9 +2204,9 @@ Calc(select=[a, b, c, d, rowtime, 
PROCTIME_MATERIALIZE(proctime) AS proctime, id
     } ]
   }, {
     "id" : ,
-    "type" : "LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=b], select=[a, b, c, d, 
rowtime, proctime, id, name, age])",
+    "type" : "LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=b], select=[a, b, c, d, rowtime, proctime, id, 
name, age])",
     "pact" : "Operator",
-    "contents" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=b], select=[a, b, c, d, 
rowtime, proctime, id, name, age])",
+    "contents" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=b], select=[a, b, c, d, rowtime, proctime, id, 
name, age])",
     "parallelism" : 1,
     "predecessors" : [ {
       "id" : ,
@@ -2242,14 +2242,14 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
rowtime=[$4], proctime=[$5], id=[
 
 == Optimized Physical Plan ==
 Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
id, name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=b], select=[a, b, c, d, 
rowtime, proctime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=b], select=[a, b, c, d, rowtime, proctime, id, 
name, age])
    +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL 
SECOND)])
       +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
 
 == Optimized Execution Plan ==
 Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, 
id, name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=b], select=[a, b, c, d, 
rowtime, proctime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=b], select=[a, b, c, d, rowtime, proctime, id, 
name, age])
    +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL 
SECOND)])
       +- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, rowtime])
@@ -2288,7 +2288,7 @@ Calc(select=[a, b, c, d, rowtime, 
PROCTIME_MATERIALIZE(proctime) AS proctime, id
     "id" : ,
     "type" : "LookupJoin[]",
     "pact" : "Operator",
-    "contents" : 
"[]:LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=b], select=[a, b, c, d, 
rowtime, proctime, id, name, age])",
+    "contents" : 
"[]:LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=b], select=[a, b, c, d, rowtime, proctime, id, 
name, age])",
     "parallelism" : 1,
     "predecessors" : [ {
       "id" : ,
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testAggAndLeftJoinWithTryResolveMode.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testAggAndLeftJoinWithTryResolveMode.out
index a378e26e29e..5c9649bfb02 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testAggAndLeftJoinWithTryResolveMode.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testAggAndLeftJoinWithTryResolveMode.out
@@ -315,7 +315,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`a` INT, `id` INT, `name` VARCHAR(2147483647), `age` 
INT>",
-    "description" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], async=[false], lookup=[id=a], select=[a, id, name, 
age], upsertMaterialize=[true])"
+    "description" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], lookup=[id=a], select=[a, id, name, age], 
upsertMaterialize=[true])"
   }, {
     "id" : 9,
     "type" : "stream-exec-calc_1",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
index 1f8477aefb7..43bd34417c9 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
@@ -304,7 +304,7 @@
         "fieldType" : "INT"
       } ]
     },
-    "description" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])"
+    "description" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age])"
   }, {
     "id" : 5,
     "type" : "stream-exec-calc_1",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncHint.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncHint.out
index 24d394854c1..57c29f7169a 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncHint.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncHint.out
@@ -304,7 +304,7 @@
         "fieldType" : "INT"
       } ]
     },
-    "description" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])"
+    "description" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age])"
   }, {
     "id" : 5,
     "type" : "stream-exec-calc_1",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncHint2.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncHint2.out
index 24d394854c1..57c29f7169a 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncHint2.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncHint2.out
@@ -304,7 +304,7 @@
         "fieldType" : "INT"
       } ]
     },
-    "description" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])"
+    "description" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age])"
   }, {
     "id" : 5,
     "type" : "stream-exec-calc_1",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncRetryHint.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncRetryHint.out
index 2da272a7bb6..6171c82ac9e 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncRetryHint.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncRetryHint.out
@@ -310,7 +310,7 @@
         "fieldType" : "INT"
       } ]
     },
-    "description" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])"
+    "description" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])"
   }, {
     "id" : 5,
     "type" : "stream-exec-calc_1",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncRetryHint2.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncRetryHint2.out
index 2da272a7bb6..6171c82ac9e 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncRetryHint2.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncRetryHint2.out
@@ -310,7 +310,7 @@
         "fieldType" : "INT"
       } ]
     },
-    "description" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])"
+    "description" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])"
   }, {
     "id" : 5,
     "type" : "stream-exec-calc_1",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
index c3b8ae21edd..26b74e63d0f 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
@@ -302,7 +302,7 @@
         "fieldType" : "INT"
       } ]
     },
-    "description" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id])"
+    "description" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id])"
   }, {
     "id" : 5,
     "type" : "stream-exec-calc_1",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithRetryHint.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithRetryHint.out
index 2da272a7bb6..6171c82ac9e 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithRetryHint.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithRetryHint.out
@@ -310,7 +310,7 @@
         "fieldType" : "INT"
       } ]
     },
-    "description" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])"
+    "description" : 
"LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])"
   }, {
     "id" : 5,
     "type" : "stream-exec-calc_1",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
index d1c124c3a60..be561131927 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
@@ -533,7 +533,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c], 
upsertMaterialize=[true])
 +- Calc(select=[a, b0 AS b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], async=[false], lookup=[a=a], joinCondition=[(b > 
ndFunc(b0))], select=[a, b, a0, b0, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], joinCondition=[(b > ndFunc(b0))], 
select=[a, b, a0, b0, c])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -565,7 +565,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[InnerJoin], async=[false], lookup=[a=a], select=[a, b, a, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a, c])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -597,7 +597,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[InnerJoin], async=[false], lookup=[a=a], select=[a, b, a, c], 
upsertMaterialize=[true])
+   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a, c], 
upsertMaterialize=[true])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -629,7 +629,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c], 
upsertMaterialize=[true])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[InnerJoin], async=[false], lookup=[a=a], select=[a, b, a, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a, c])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -661,7 +661,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c], 
upsertMaterialize=[true])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[InnerJoin], async=[false], lookup=[a=a], select=[a, b, a, c], 
upsertMaterialize=[true])
+   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a, c], 
upsertMaterialize=[true])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -693,7 +693,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[ndFunc(a0) AS a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], async=[false], lookup=[a=a], select=[a, b, c, a])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b, c], metadata=[]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -727,7 +727,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, ve
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, 
version, c], upsertMaterialize=[true])
 +- Calc(select=[a, b AS version, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], async=[false], lookup=[a=a], where=[(b > 
(UNIX_TIMESTAMP() - 300))], select=[a, a, b, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], where=[(b > (UNIX_TIMESTAMP() - 300))], 
select=[a, a, b, c])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a], metadata=[]]], fields=[a])
 ]]>
     </Resource>
@@ -759,7 +759,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], async=[false], lookup=[a=a], where=[(ndFunc(b) > 100)], 
select=[a, b, c, a])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], where=[(ndFunc(b) > 100)], select=[a, b, c, 
a])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b, c], metadata=[]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -791,7 +791,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], async=[false], lookup=[a=a], select=[a, b, c, a])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b, c], metadata=[]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -823,7 +823,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], async=[false], lookup=[a=a], select=[a, b, c, a])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b, c], metadata=[]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -855,7 +855,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], async=[false], lookup=[a=a], select=[a, c, a, b])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, c, a, b])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, c], metadata=[]]], fields=[a, c])
 ]]>
     </Resource>
@@ -887,7 +887,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], async=[false], lookup=[a=a], select=[a, c, a, b], 
upsertMaterialize=[true])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, c, a, b], 
upsertMaterialize=[true])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, c], metadata=[]]], fields=[a, c])
 ]]>
     </Resource>
@@ -919,7 +919,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], async=[false], lookup=[a=a], select=[a, b, a, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a, c])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -951,7 +951,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[LeftOuterJoin], async=[false], lookup=[a=a], select=[a, b, a, c], 
upsertMaterialize=[true])
+   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a, c], 
upsertMaterialize=[true])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -983,7 +983,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], async=[false], lookup=[a=a], select=[a, b, a, c], 
upsertMaterialize=[true])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a, c], 
upsertMaterialize=[true])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -1015,7 +1015,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], async=[false], lookup=[a=a], select=[a, b, a, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a, c])
       +- DropUpdateBefore
          +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
@@ -1048,7 +1048,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], async=[false], lookup=[a=a], select=[a, b, a, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, a, c])
       +- DropUpdateBefore
          +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
@@ -1153,7 +1153,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, b, 
c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[LeftOuterJoin], async=[false], lookup=[a=a], select=[a, b, a, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_without_pk], 
joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a, c])
       +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
@@ -1185,7 +1185,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[LeftOuterJoin], async=[false], lookup=[a=a], select=[a, b, a, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a, c])
       +- DropUpdateBefore
          +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
@@ -1218,7 +1218,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b,
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])
 +- Calc(select=[a, b, c])
-   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[LeftOuterJoin], async=[false], lookup=[a=a], select=[a, b, a, c])
+   +- LookupJoin(table=[default_catalog.default_database.dim_with_pk], 
joinType=[LeftOuterJoin], lookup=[a=a], select=[a, b, a, c])
       +- DropUpdateBefore
          +- TableSourceScan(table=[[default_catalog, default_database, cdc, 
project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
index 44df5164128..2e0cf0e5065 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
@@ -743,7 +743,7 @@ Rank(strategy=[UpdateFastStrategy[0]], 
rankType=[ROW_NUMBER], rankRange=[rankSta
    +- GroupAggregate(groupBy=[name], select=[name, SUM(id) FILTER $f2 AS ids])
       +- Exchange(distribution=[hash[name]])
          +- Calc(select=[name, id, IS TRUE(>(id, 0)) AS $f2])
-            +- 
LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, id, name])
+            +- 
LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name])
                +- Calc(select=[a])
                   +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
index e5f6474d247..e021cd264c9 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
@@ -259,6 +259,190 @@ Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])
       "side" : "second"
     } ]
   } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testAsyncJoinWithDefaultParams[LegacyTableSource=false]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, 
age])
++- LogicalProject(a=[$0], name=[$6], age=[$7])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 3}])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+      +- LogicalFilter(condition=[=($cor0.a, $0)])
+         +- LogicalSnapshot(period=[$cor0.proctime])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
AsyncLookupTable]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- Calc(select=[a, name, age])
+   +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], async=[ORDERED, 
180000ms, 100])
+      +- Calc(select=[a])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- Calc(select=[a, name, age])
+   +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], async=[ORDERED, 
180000ms, 100])
+      +- Calc(select=[a])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: Collection Source",
+    "pact" : "Data Source",
+    "contents" : "Source: Collection Source",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "SourceConversion[]",
+    "pact" : "Operator",
+    "contents" : 
"[]:SourceConversion(table=[default_catalog.default_database.MyTable], 
fields=[a, b, c, proctime, rowtime])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "LookupJoin[]",
+    "pact" : "Operator",
+    "contents" : 
"[]:LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], async=[ORDERED, 
180000ms, 100])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a, name, age])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Sink1[]",
+    "pact" : "Data Sink",
+    "contents" : "[]:Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testAsyncJoinWithDefaultParams[LegacyTableSource=true]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, 
age])
++- LogicalProject(a=[$0], name=[$6], age=[$7])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 3}])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+      +- LogicalFilter(condition=[=($cor0.a, $0)])
+         +- LogicalSnapshot(period=[$cor0.proctime])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
AsyncLookupTable, source: [TestTemporalTable(id, name, age)]]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- Calc(select=[a, name, age])
+   +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], async=[ORDERED, 
180000ms, 100])
+      +- Calc(select=[a])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- Calc(select=[a, name, age])
+   +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], async=[ORDERED, 
180000ms, 100])
+      +- Calc(select=[a])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: Collection Source",
+    "pact" : "Data Source",
+    "contents" : "Source: Collection Source",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "SourceConversion[]",
+    "pact" : "Operator",
+    "contents" : 
"[]:SourceConversion(table=[default_catalog.default_database.MyTable], 
fields=[a, b, c, proctime, rowtime])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "LookupJoin[]",
+    "pact" : "Operator",
+    "contents" : 
"[]:LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], async=[ORDERED, 
180000ms, 100])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a, name, age])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Sink1[]",
+    "pact" : "Data Sink",
+    "contents" : "[]:Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
 }]]>
     </Resource>
   </TestCase>
@@ -298,39 +482,11 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], 
EXPR$2=[SUM($2)], EXPR$3=[SUM(
 GroupAggregate(groupBy=[b], select=[b, COUNT_RETRACT(a) AS EXPR$1, 
SUM_RETRACT(c) AS EXPR$2, SUM_RETRACT(d) AS EXPR$3])
 +- Exchange(distribution=[hash[b]])
    +- Calc(select=[b, a, c, d])
-      +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], where=[(age > 10)], 
select=[b, a, c, d, id])
+      +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], where=[(age > 10)], select=[b, a, c, d, 
id])
          +- Calc(select=[b, a, c, d])
             +- GroupAggregate(groupBy=[a, b], select=[a, b, SUM(c) AS c, 
SUM(d) AS d])
                +- Exchange(distribution=[hash[a, b]])
                   +- DataStreamScan(table=[[default_catalog, default_database, 
T1]], fields=[a, b, c, d])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase 
name="testJoinTemporalTableWithCalcPushDown[LegacyTableSource=true]">
-    <Resource name="sql">
-      <![CDATA[
-SELECT * FROM MyTable AS T
-JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
-ON T.a = D.id AND D.age = 10
-WHERE cast(D.name as bigint) > 1000
-      ]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], 
name=[$6], age=[$7])
-+- LogicalFilter(condition=[>(CAST($6):BIGINT, 1000)])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 3}])
-      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-      +- LogicalFilter(condition=[AND(=($cor0.a, $0), =($2, 10))])
-         +- LogicalSnapshot(period=[$cor0.proctime])
-            +- LogicalTableScan(table=[[default_catalog, default_database, 
LookupTable, source: [TestTemporalTable(id, name, age)]]])
-]]>
-    </Resource>
-    <Resource name="optimized exec plan">
-      <![CDATA[
-Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, CAST(10 AS INTEGER) AS age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[age=10, id=a], where=[((age = 10) 
AND (CAST(name AS BIGINT) > 1000))], select=[a, b, c, proctime, rowtime, id, 
name])
-   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -370,7 +526,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], 
EXPR$2=[SUM($2)], EXPR$3=[SUM(
 GroupAggregate(groupBy=[b], select=[b, COUNT_RETRACT(a) AS EXPR$1, 
SUM_RETRACT(c) AS EXPR$2, SUM_RETRACT(d) AS EXPR$3])
 +- Exchange(distribution=[hash[b]])
    +- Calc(select=[b, a, c, d])
-      +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], where=[(age > 10)], 
select=[b, a, c, d, id])
+      +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], where=[(age > 10)], select=[b, a, c, d, 
id])
          +- Calc(select=[b, a, c, d])
             +- GroupAggregate(groupBy=[a, b], select=[a, b, SUM(c) AS c, 
SUM(d) AS d])
                +- Exchange(distribution=[hash[a, b]])
@@ -395,7 +551,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
-+- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age], async=[ORDERED, 180000ms, 100])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -417,7 +573,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
-+- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age], async=[ORDERED, 180000ms, 100])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -439,7 +595,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
-+- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age], async=[ORDERED, 180000ms, 100])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -461,7 +617,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
-+- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age], async=[ORDERED, 180000ms, 100])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -483,7 +639,29 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithTableNameOnly[LegacyTableSource=true]">
+    <Resource name="sql">
+      <![CDATA[SELECT /*+ LOOKUP('table'='LookupTable') */ * FROM MyTable AS T 
JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], 
name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 3}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], 
hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
LookupTable, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -505,7 +683,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -527,7 +705,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -549,7 +727,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -571,7 +749,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -599,54 +777,127 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, CAST(10 AS INTEGER) AS age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[age=10, id=a], where=[((age = 10) 
AND (CAST(name AS BIGINT) > 1000))], select=[a, b, c, proctime, rowtime, id, 
name])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[age=10, id=a], where=[((age = 10) AND (CAST(name 
AS BIGINT) > 1000))], select=[a, b, c, proctime, rowtime, id, name])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase 
name="testJoinTemporalTableWithNestedQuery[LegacyTableSource=true]">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM (SELECT a, b, proctime FROM MyTable WHERE c > 
1000) AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = 
D.id]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a=[$0], b=[$1], proctime=[$2], id=[$3], name=[$4], age=[$5])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 2}])
-   :- LogicalProject(a=[$0], b=[$1], proctime=[$3])
-   :  +- LogicalFilter(condition=[>($2, 1000)])
-   :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
-   +- LogicalFilter(condition=[=($cor0.a, $0)])
-      +- LogicalSnapshot(period=[$cor0.proctime])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
LookupTable, source: [TestTemporalTable(id, name, age)]]])
-]]>
-    </Resource>
-    <Resource name="optimized exec plan">
-      <![CDATA[
-Calc(select=[a, b, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, proctime, id, 
name, age])
-   +- Calc(select=[a, b, proctime], where=[(c > 1000)])
-      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testJoinHintWithTableNameOnly[LegacyTableSource=true]">
-    <Resource name="sql">
-      <![CDATA[SELECT /*+ LOOKUP('table'='LookupTable') */ * FROM MyTable AS T 
JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], 
name=[$6], age=[$7])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 3}])
-   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], 
hints=[[[ALIAS inheritPath:[] options:[T]]]])
-   +- LogicalFilter(condition=[=($cor0.a, $0)])
-      +- LogicalSnapshot(period=[$cor0.proctime])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
LookupTable, source: [TestTemporalTable(id, name, age)]]])
-]]>
-    </Resource>
+  <TestCase name="testJoinWithAsyncAndRetryHint[LegacyTableSource=true]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, 
age])
++- LogicalProject(a=[$0], name=[$6], age=[$7])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 3}])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+      +- LogicalFilter(condition=[=($cor0.a, $0)])
+         +- LogicalSnapshot(period=[$cor0.proctime])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
AsyncLookupTable, source: [TestTemporalTable(id, name, age)]]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- Calc(select=[a, name, age])
+   +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], 
async=[UNORDERED, 180000ms, 300], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])
+      +- Calc(select=[a])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- Calc(select=[a, name, age])
+   +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], 
async=[UNORDERED, 180000ms, 300], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])
+      +- Calc(select=[a])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: Collection Source",
+    "pact" : "Data Source",
+    "contents" : "Source: Collection Source",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "SourceConversion[]",
+    "pact" : "Operator",
+    "contents" : 
"[]:SourceConversion(table=[default_catalog.default_database.MyTable], 
fields=[a, b, c, proctime, rowtime])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "LookupJoin[]",
+    "pact" : "Operator",
+    "contents" : 
"[]:LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], 
async=[UNORDERED, 180000ms, 300], retry=[lookup_miss, FIXED_DELAY, 10000ms, 
3])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a, name, age])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Sink1[]",
+    "pact" : "Data Sink",
+    "contents" : "[]:Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testJoinTemporalTableWithCalcPushDown[LegacyTableSource=true]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM MyTable AS T
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ON T.a = D.id AND D.age = 10
+WHERE cast(D.name as bigint) > 1000
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], 
name=[$6], age=[$7])
++- LogicalFilter(condition=[>(CAST($6):BIGINT, 1000)])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 3}])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+      +- LogicalFilter(condition=[AND(=($cor0.a, $0), =($2, 10))])
+         +- LogicalSnapshot(period=[$cor0.proctime])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
LookupTable, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, CAST(10 AS INTEGER) AS age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[age=10, id=a], where=[((age = 10) AND (CAST(name 
AS BIGINT) > 1000))], select=[a, b, c, proctime, rowtime, id, name])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -675,7 +926,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], name=[$6], age=[$7], 
nominal_age=[$8])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, name, age, nominal_age])
-+- 
LookupJoin(table=[default_catalog.default_database.LookupTableWithComputedColumn],
 joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, id, name, 
age, (age + 1) AS nominal_age])
++- 
LookupJoin(table=[default_catalog.default_database.LookupTableWithComputedColumn],
 joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, id, name, age, (age + 1) 
AS nominal_age])
    +- Calc(select=[a, b, c])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
@@ -705,7 +956,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], name=[$6], age=[$7], 
nominal_age=[$8])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, name, age, nominal_age])
-+- 
LookupJoin(table=[default_catalog.default_database.LookupTableWithComputedColumn],
 joinType=[InnerJoin], async=[false], lookup=[id=a], where=[((age + 1) > 12)], 
select=[a, b, c, id, name, age, (age + 1) AS nominal_age])
++- 
LookupJoin(table=[default_catalog.default_database.LookupTableWithComputedColumn],
 joinType=[InnerJoin], lookup=[id=a], where=[((age + 1) > 12)], select=[a, b, 
c, id, name, age, (age + 1) AS nominal_age])
    +- Calc(select=[a, b, c])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
@@ -736,7 +987,7 @@ LogicalProject(b=[$1])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[b])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[age=10, id=a], where=[(age = 10)], 
select=[a, b, id])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, 
id])
    +- Calc(select=[a, b])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
@@ -767,7 +1018,7 @@ LogicalProject(b=[$1])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[b])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[age=10, id=a], where=[(age = 10)], 
select=[a, b, id])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, 
id])
    +- Calc(select=[a, b])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
@@ -796,7 +1047,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, CAST(10 AS INTEGER) AS age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[age=10, id=a], where=[(age = 10)], 
select=[a, b, c, proctime, rowtime, id, name])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, 
c, proctime, rowtime, id, name])
    +- Calc(select=[a, b, c, proctime, rowtime], where=[(c > 1000)])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
@@ -825,7 +1076,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, CAST(10 AS INTEGER) AS age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[age=10, id=a], where=[(age = 10)], 
select=[a, b, c, proctime, rowtime, id, name])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, 
c, proctime, rowtime, id, name])
    +- Calc(select=[a, b, c, proctime, rowtime], where=[(c > 1000)])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
@@ -852,7 +1103,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, CAST(11 AS INTEGER) AS age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[age=11], where=[(age = 11)], 
joinCondition=[(b = $f3)], select=[a, b, c, proctime, rowtime, id, name, 
CONCAT(name, '!') AS $f3])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[age=11], where=[(age = 11)], joinCondition=[(b = 
$f3)], select=[a, b, c, proctime, rowtime, id, name, CONCAT(name, '!') AS $f3])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -878,7 +1129,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, CAST(11 AS INTEGER) AS age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[age=11], where=[(age = 11)], 
joinCondition=[(b = $f3)], select=[a, b, c, proctime, rowtime, id, name, 
CONCAT(name, '!') AS $f3])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[age=11], where=[(age = 11)], joinCondition=[(b = 
$f3)], select=[a, b, c, proctime, rowtime, id, name, CONCAT(name, '!') AS $f3])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -906,7 +1157,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], where=[LIKE(name, 
'Jack%')], joinCondition=[(b = $f3)], select=[a, b, c, proctime, rowtime, id, 
name, age, CONCAT(name, '!') AS $f3])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], where=[LIKE(name, 'Jack%')], 
joinCondition=[(b = $f3)], select=[a, b, c, proctime, rowtime, id, name, age, 
CONCAT(name, '!') AS $f3])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -934,7 +1185,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], where=[LIKE(name, 
'Jack%')], joinCondition=[(b = $f3)], select=[a, b, c, proctime, rowtime, id, 
name, age, CONCAT(name, '!') AS $f3])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], where=[LIKE(name, 'Jack%')], 
joinCondition=[(b = $f3)], select=[a, b, c, proctime, rowtime, id, name, age, 
CONCAT(name, '!') AS $f3])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -956,7 +1207,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=c0], joinCondition=[(a = id)], 
select=[a, b, c, proctime, rowtime, c0, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=c0], joinCondition=[(a = id)], select=[a, b, 
c, proctime, rowtime, c0, id, name, age])
    +- Calc(select=[a, b, c, proctime, rowtime, CAST(c AS INTEGER) AS c0])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
@@ -979,7 +1230,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=c0], joinCondition=[(a = id)], 
select=[a, b, c, proctime, rowtime, c0, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=c0], joinCondition=[(a = id)], select=[a, b, 
c, proctime, rowtime, c0, id, name, age])
    +- Calc(select=[a, b, c, proctime, rowtime, CAST(c AS INTEGER) AS c0])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
@@ -1006,7 +1257,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, CAST(11 AS INTEGER) AS age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[age=11], where=[(age = 11)], 
joinCondition=[((a = $f3) AND (b = $f4))], select=[a, b, c, proctime, rowtime, 
id, name, (id + 1) AS $f3, CONCAT(name, '!') AS $f4])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[age=11], where=[(age = 11)], joinCondition=[((a = 
$f3) AND (b = $f4))], select=[a, b, c, proctime, rowtime, id, name, (id + 1) AS 
$f3, CONCAT(name, '!') AS $f4])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -1032,7 +1283,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, CAST(11 AS INTEGER) AS age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[age=11], where=[(age = 11)], 
joinCondition=[((a = $f3) AND (b = $f4))], select=[a, b, c, proctime, rowtime, 
id, name, (id + 1) AS $f3, CONCAT(name, '!') AS $f4])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[age=11], where=[(age = 11)], joinCondition=[((a = 
$f3) AND (b = $f4))], select=[a, b, c, proctime, rowtime, id, name, (id + 1) AS 
$f3, CONCAT(name, '!') AS $f4])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -1060,7 +1311,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
CAST('AAA' AS VARCHAR(2147483647)) AS name, CAST(10 AS INTEGER) AS age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[age=10, name=_UTF-16LE'AAA', 
id=a], where=[((age = 10) AND (name = 'AAA'))], select=[a, b, c, proctime, 
rowtime, id])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[age=10, name=_UTF-16LE'AAA', id=a], where=[((age 
= 10) AND (name = 'AAA'))], select=[a, b, c, proctime, rowtime, id])
    +- Calc(select=[a, b, c, proctime, rowtime], where=[(c > 1000)])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
@@ -1089,7 +1340,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
CAST('AAA' AS VARCHAR(2147483647)) AS name, CAST(10 AS INTEGER) AS age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[age=10, name=_UTF-16LE'AAA', 
id=a], where=[((age = 10) AND (name = 'AAA'))], select=[a, b, c, proctime, 
rowtime, id])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[age=10, name=_UTF-16LE'AAA', id=a], where=[((age 
= 10) AND (name = 'AAA'))], select=[a, b, c, proctime, rowtime, id])
    +- Calc(select=[a, b, c, proctime, rowtime], where=[(c > 1000)])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
@@ -1114,7 +1365,7 @@ LogicalProject(a=[$0], b=[$1], proctime=[$2], id=[$3], 
name=[$4], age=[$5])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, proctime, id, 
name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, proctime, id, name, age])
    +- Calc(select=[a, b, proctime], where=[(c > 1000)])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
@@ -1142,29 +1393,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id])
-   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testLeftJoinTemporalTable[LegacyTableSource=true]">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM MyTable AS T LEFT JOIN LookupTable FOR 
SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], 
name=[$6], age=[$7])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
3}])
-   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-   +- LogicalFilter(condition=[=($cor0.a, $0)])
-      +- LogicalSnapshot(period=[$cor0.proctime])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
LookupTable, source: [TestTemporalTable(id, name, age)]]])
-]]>
-    </Resource>
-    <Resource name="optimized exec plan">
-      <![CDATA[
-Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], async=[false], lookup=[id=a], select=[a, b, c, 
proctime, rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -1191,7 +1420,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -1220,7 +1449,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], name=[$6])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, name])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], where=[(CONCAT('Hello-', 
name) = 'Hello-Jark')], select=[a, b, c, id, name])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], where=[(CONCAT('Hello-', name) = 
'Hello-Jark')], select=[a, b, c, id, name])
    +- Calc(select=[a, b, c])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
@@ -1250,12 +1479,519 @@ LogicalProject(a=[$0], b=[$1], c=[$2], name=[$6])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, name])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], where=[(CONCAT('Hello-', 
name) = 'Hello-Jark')], select=[a, b, c, id, name])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], where=[(CONCAT('Hello-', name) = 
'Hello-Jark')], select=[a, b, c, id, name])
    +- Calc(select=[a, b, c])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testJoinWithAsyncAndRetryHint[LegacyTableSource=false]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, 
age])
++- LogicalProject(a=[$0], name=[$6], age=[$7])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 3}])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+      +- LogicalFilter(condition=[=($cor0.a, $0)])
+         +- LogicalSnapshot(period=[$cor0.proctime])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
AsyncLookupTable]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- Calc(select=[a, name, age])
+   +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], 
async=[UNORDERED, 180000ms, 300], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])
+      +- Calc(select=[a])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- Calc(select=[a, name, age])
+   +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], 
async=[UNORDERED, 180000ms, 300], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])
+      +- Calc(select=[a])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: Collection Source",
+    "pact" : "Data Source",
+    "contents" : "Source: Collection Source",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "SourceConversion[]",
+    "pact" : "Operator",
+    "contents" : 
"[]:SourceConversion(table=[default_catalog.default_database.MyTable], 
fields=[a, b, c, proctime, rowtime])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "LookupJoin[]",
+    "pact" : "Operator",
+    "contents" : 
"[]:LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], 
async=[UNORDERED, 180000ms, 300], retry=[lookup_miss, FIXED_DELAY, 10000ms, 
3])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a, name, age])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Sink1[]",
+    "pact" : "Data Sink",
+    "contents" : "[]:Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinWithAsyncHint[LegacyTableSource=true]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, 
age])
++- LogicalProject(a=[$0], name=[$6], age=[$7])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 3}])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+      +- LogicalFilter(condition=[=($cor0.a, $0)])
+         +- LogicalSnapshot(period=[$cor0.proctime])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
AsyncLookupTable, source: [TestTemporalTable(id, name, age)]]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- Calc(select=[a, name, age])
+   +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], 
async=[UNORDERED, 180000ms, 300])
+      +- Calc(select=[a])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- Calc(select=[a, name, age])
+   +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], 
async=[UNORDERED, 180000ms, 300])
+      +- Calc(select=[a])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: Collection Source",
+    "pact" : "Data Source",
+    "contents" : "Source: Collection Source",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "SourceConversion[]",
+    "pact" : "Operator",
+    "contents" : 
"[]:SourceConversion(table=[default_catalog.default_database.MyTable], 
fields=[a, b, c, proctime, rowtime])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "LookupJoin[]",
+    "pact" : "Operator",
+    "contents" : 
"[]:LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], 
async=[UNORDERED, 180000ms, 300])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a, name, age])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Sink1[]",
+    "pact" : "Data Sink",
+    "contents" : "[]:Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testJoinTemporalTableWithNestedQuery[LegacyTableSource=true]">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM (SELECT a, b, proctime FROM MyTable WHERE c > 
1000) AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = 
D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], proctime=[$2], id=[$3], name=[$4], age=[$5])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 2}])
+   :- LogicalProject(a=[$0], b=[$1], proctime=[$3])
+   :  +- LogicalFilter(condition=[>($2, 1000)])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
LookupTable, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, proctime, id, name, age])
+   +- Calc(select=[a, b, proctime], where=[(c > 1000)])
+      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinWithAsyncHint[LegacyTableSource=false]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, 
age])
++- LogicalProject(a=[$0], name=[$6], age=[$7])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 3}])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+      +- LogicalFilter(condition=[=($cor0.a, $0)])
+         +- LogicalSnapshot(period=[$cor0.proctime])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
AsyncLookupTable]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- Calc(select=[a, name, age])
+   +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], 
async=[UNORDERED, 180000ms, 300])
+      +- Calc(select=[a])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- Calc(select=[a, name, age])
+   +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], 
async=[UNORDERED, 180000ms, 300])
+      +- Calc(select=[a])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: Collection Source",
+    "pact" : "Data Source",
+    "contents" : "Source: Collection Source",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "SourceConversion[]",
+    "pact" : "Operator",
+    "contents" : 
"[]:SourceConversion(table=[default_catalog.default_database.MyTable], 
fields=[a, b, c, proctime, rowtime])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "LookupJoin[]",
+    "pact" : "Operator",
+    "contents" : 
"[]:LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], 
async=[UNORDERED, 180000ms, 300])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a, name, age])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Sink1[]",
+    "pact" : "Data Sink",
+    "contents" : "[]:Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinWithRetryHint[LegacyTableSource=false]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, 
age])
++- LogicalProject(a=[$0], name=[$6], age=[$7])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 3}])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+      +- LogicalFilter(condition=[=($cor0.a, $0)])
+         +- LogicalSnapshot(period=[$cor0.proctime])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
LookupTable]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- Calc(select=[a, name, age])
+   +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], 
retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])
+      +- Calc(select=[a])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- Calc(select=[a, name, age])
+   +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], 
retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])
+      +- Calc(select=[a])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: Collection Source",
+    "pact" : "Data Source",
+    "contents" : "Source: Collection Source",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "SourceConversion[]",
+    "pact" : "Operator",
+    "contents" : 
"[]:SourceConversion(table=[default_catalog.default_database.MyTable], 
fields=[a, b, c, proctime, rowtime])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "LookupJoin[]",
+    "pact" : "Operator",
+    "contents" : 
"[]:LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], 
retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a, name, age])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Sink1[]",
+    "pact" : "Data Sink",
+    "contents" : "[]:Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftJoinTemporalTable[LegacyTableSource=true]">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable AS T LEFT JOIN LookupTable FOR 
SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], 
name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
3}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
LookupTable, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, 
id, name, age])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinWithRetryHint[LegacyTableSource=true]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, 
age])
++- LogicalProject(a=[$0], name=[$6], age=[$7])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 3}])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+      +- LogicalFilter(condition=[=($cor0.a, $0)])
+         +- LogicalSnapshot(period=[$cor0.proctime])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
LookupTable, source: [TestTemporalTable(id, name, age)]]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- Calc(select=[a, name, age])
+   +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], 
retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])
+      +- Calc(select=[a])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
++- Calc(select=[a, name, age])
+   +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], 
retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])
+      +- Calc(select=[a])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: Collection Source",
+    "pact" : "Data Source",
+    "contents" : "Source: Collection Source",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "SourceConversion[]",
+    "pact" : "Operator",
+    "contents" : 
"[]:SourceConversion(table=[default_catalog.default_database.MyTable], 
fields=[a, b, c, proctime, rowtime])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "LookupJoin[]",
+    "pact" : "Operator",
+    "contents" : 
"[]:LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, id, name, age], 
retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a, name, age])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Sink1[]",
+    "pact" : "Data Sink",
+    "contents" : "[]:Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testLeftJoinTemporalTable[LegacyTableSource=false]">
     <Resource name="sql">
       <![CDATA[SELECT * FROM MyTable AS T LEFT JOIN LookupTable FOR 
SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
@@ -1273,7 +2009,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], async=[false], lookup=[id=a], select=[a, b, c, 
proctime, rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, 
id, name, age])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
index 30c1f61a490..469b74fa287 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
@@ -30,7 +30,7 @@ import org.apache.flink.table.descriptors.DescriptorProperties
 import org.apache.flink.table.factories.TableSourceFactory
 import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction, 
UserDefinedFunction}
 import org.apache.flink.table.planner.plan.utils._
-import org.apache.flink.table.planner.utils.TableTestBase
+import org.apache.flink.table.planner.utils.{TableTestBase, 
TestingTableEnvironment}
 import org.apache.flink.table.planner.utils.TableTestUtil.{readFromResource, 
replaceNodeIdInOperator, replaceStageId, replaceStreamNodeId}
 import org.apache.flink.table.sources._
 import org.apache.flink.table.types.DataType
@@ -811,6 +811,89 @@ class LookupJoinTest(legacyTableSource: Boolean) extends 
TableTestBase with Seri
     util.verifyExplain(stmt, ExplainDetail.JSON_EXECUTION_PLAN)
   }
 
+  def testAggAndLeftJoinWithTryResolveMode(): Unit = {
+    thrown.expectMessage("Required sync lookup function by planner, but table")
+    thrown.expect(classOf[TableException])
+
+    util.tableEnv.getConfig.set(
+      OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+      OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE)
+
+    val stmt = 
util.tableEnv.asInstanceOf[TestingTableEnvironment].createStatementSet()
+    stmt.addInsertSql(
+      """
+        |INSERT INTO Sink1
+        |SELECT T.a, D.name, D.age
+        |FROM (SELECT max(a) a, count(c) c, PROCTIME() proctime FROM MyTable 
GROUP BY b) T
+        |LEFT JOIN AsyncLookupTable
+        |FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id
+        |""".stripMargin)
+
+    util.verifyExplain(stmt, ExplainDetail.JSON_EXECUTION_PLAN)
+  }
+
+  @Test
+  def testAsyncJoinWithDefaultParams(): Unit = {
+    val stmt = 
util.tableEnv.asInstanceOf[TestingTableEnvironment].createStatementSet()
+    stmt.addInsertSql("""
+                        |INSERT INTO Sink1
+                        |SELECT T.a, D.name, D.age
+                        |FROM MyTable T
+                        |JOIN AsyncLookupTable
+                        |FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id
+                        |""".stripMargin)
+
+    util.verifyExplain(stmt, ExplainDetail.JSON_EXECUTION_PLAN)
+  }
+
+  @Test
+  def testJoinWithAsyncHint(): Unit = {
+    val stmt = 
util.tableEnv.asInstanceOf[TestingTableEnvironment].createStatementSet()
+    stmt.addInsertSql(
+      """
+        |INSERT INTO Sink1
+        |SELECT /*+ LOOKUP('table'='AsyncLookupTable', 
'output-mode'='allow_unordered', 'time-out'='600s', 'capacity'='300') */
+        | T.a, D.name, D.age
+        |FROM MyTable T
+        |JOIN AsyncLookupTable
+        |FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id
+        |""".stripMargin)
+
+    util.verifyExplain(stmt, ExplainDetail.JSON_EXECUTION_PLAN)
+  }
+
+  @Test
+  def testJoinWithRetryHint(): Unit = {
+    val stmt = 
util.tableEnv.asInstanceOf[TestingTableEnvironment].createStatementSet()
+    stmt.addInsertSql(
+      """
+        |INSERT INTO Sink1
+        |SELECT /*+ LOOKUP('table'='LookupTable', 
'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 
'fixed-delay'='10s', 'max-attempts'='3') */
+        | T.a, D.name, D.age
+        |FROM MyTable T
+        |JOIN LookupTable
+        |FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id
+        |""".stripMargin)
+
+    util.verifyExplain(stmt, ExplainDetail.JSON_EXECUTION_PLAN)
+  }
+
+  @Test
+  def testJoinWithAsyncAndRetryHint(): Unit = {
+    val stmt = 
util.tableEnv.asInstanceOf[TestingTableEnvironment].createStatementSet()
+    stmt.addInsertSql(
+      """
+        |INSERT INTO Sink1
+        |SELECT /*+ LOOKUP('table'='AsyncLookupTable', 
'output-mode'='allow_unordered', 'time-out'='600s', 'capacity'='300', 
'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 
'fixed-delay'='10s', 'max-attempts'='3') */
+        | T.a, D.name, D.age
+        |FROM MyTable T
+        |JOIN AsyncLookupTable
+        |FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id
+        |""".stripMargin)
+
+    util.verifyExplain(stmt, ExplainDetail.JSON_EXECUTION_PLAN)
+  }
+
   // 
==========================================================================================
 
   private def createLookupTable(tableName: String, lookupFunction: 
UserDefinedFunction): Unit = {

Reply via email to