This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c5b5d4368437fc98b2f5ca31b1f1c6cf3e4ce263
Author: lincoln lee <lincoln.8...@gmail.com>
AuthorDate: Tue Aug 9 18:14:08 2022 +0800

    [FLINK-28849][table-planner] Fix errors when enable retry on async lookup 
and add more tests
    
    Disable retry on async because of two problems need to be resolved first
    
    This closes #20482
---
 .../nodes/exec/common/CommonExecLookupJoin.java    |  35 +++----
 .../table/planner/plan/utils/LookupJoinUtil.java   |  13 ++-
 .../physical/common/CommonPhysicalLookupJoin.scala |   7 +-
 .../factories/TestValuesRuntimeFunctions.java      |  91 ++++++++++++++++++
 .../planner/factories/TestValuesTableFactory.java  |  52 ++++++++--
 .../plan/stream/sql/join/LookupJoinTest.xml        |   2 +-
 .../runtime/stream/sql/AsyncLookupJoinITCase.scala | 103 +++++++++++++++++++-
 .../runtime/stream/sql/LookupJoinITCase.scala      | 105 ++++++++++++++++++++-
 8 files changed, 367 insertions(+), 41 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
index 52192ee79ad..d00c888e1fa 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
@@ -542,30 +542,17 @@ public abstract class CommonExecLookupJoin extends 
ExecNodeBase<RowData>
                             isLeftOuterJoin,
                             asyncLookupOptions.asyncBufferCapacity);
         }
-        /**
-         * why not implements async-retry directly in AsyncLookupFunction ? - 
because the active
-         * sleeping on async callback thread will occupy the task cpu time 
while the retry support
-         * in async data stream api provides a more efficient way via 
processing time service which
-         * does not occupy callback thread. Both AsyncLookupFunction 
AsyncTableFunction can support
-         * retry. does not occupy callback thread. Both AsyncLookupFunction 
AsyncTableFunction can
-         * support retry.
-         */
-        if (null != joinHintSpec) {
-            // simplify code here, not check whether ResultRetryStrategy is 
NO_RETRY_STRATEGY or not
-            // because AsyncWaitOperator has short-path optimization during 
compile time.
-            return new AsyncWaitOperatorFactory<>(
-                    asyncFunc,
-                    asyncLookupOptions.asyncTimeout,
-                    asyncLookupOptions.asyncBufferCapacity,
-                    convert(asyncLookupOptions.asyncOutputMode),
-                    joinHintSpec.toRetryStrategy());
-        } else {
-            return new AsyncWaitOperatorFactory<>(
-                    asyncFunc,
-                    asyncLookupOptions.asyncTimeout,
-                    asyncLookupOptions.asyncBufferCapacity,
-                    convert(asyncLookupOptions.asyncOutputMode));
-        }
+        // TODO async retry to be supported, can not directly enable retry on 
'AsyncWaitOperator'
+        // because of two reasons: 1. AsyncLookupJoinRunner has a 'stateful' 
resultFutureBuffer bind
+        // to each input record (it's non-reenter-able) 2. can not lookup new 
value if cache empty
+        // enabled when chained with the new AsyncCachingLookupFunction. This 
two issues should be
+        // resolved first before enable async retry.
+
+        return new AsyncWaitOperatorFactory<>(
+                asyncFunc,
+                asyncLookupOptions.asyncTimeout,
+                asyncLookupOptions.asyncBufferCapacity,
+                convert(asyncLookupOptions.asyncOutputMode));
     }
 
     private AsyncDataStream.OutputMode convert(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
index 833eb1908a7..529f79b706e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
@@ -400,18 +400,23 @@ public final class LookupJoinUtil {
                 || lookupJoinHintSpec.isAsync();
     }
 
-    public static boolean isAsyncLookup(RelOptTable temporalTable, 
Collection<Integer> lookupKeys) {
+    public static boolean isAsyncLookup(
+            RelOptTable temporalTable,
+            Collection<Integer> lookupKeys,
+            LookupJoinHintSpec lookupJoinHintSpec) {
+        boolean preferAsync = preferAsync(lookupJoinHintSpec);
         if (temporalTable instanceof TableSourceTable) {
             LookupTableSource.LookupRuntimeProvider provider =
                     getLookupRuntimeProvider(temporalTable, lookupKeys);
-            return provider instanceof AsyncLookupFunctionProvider
-                    || provider instanceof AsyncTableFunctionProvider;
+            return preferAsync
+                    && (provider instanceof AsyncLookupFunctionProvider
+                            || provider instanceof AsyncTableFunctionProvider);
         } else if (temporalTable instanceof LegacyTableSourceTable) {
             LegacyTableSourceTable<?> legacyTableSourceTable =
                     (LegacyTableSourceTable<?>) temporalTable;
             LookupableTableSource<?> lookupableTableSource =
                     (LookupableTableSource<?>) 
legacyTableSourceTable.tableSource();
-            return lookupableTableSource.isAsyncEnabled();
+            return preferAsync && lookupableTableSource.isAsyncEnabled();
         }
         throw new TableException(
                 String.format(
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
index 568909ddd5b..c5fde386f73 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
@@ -161,8 +161,13 @@ abstract class CommonPhysicalLookupJoin(
       case t: LegacyTableSourceTable[_] => t.tableIdentifier
     }
 
+    // The lookup function maybe not the final choice at runtime because lack 
of upsert materialize
+    // info here. This can be consistent after planner offers enough info here.
     val isAsyncEnabled: Boolean =
-      LookupJoinUtil.isAsyncLookup(temporalTable, 
allLookupKeys.keys.map(Int.box).toList.asJava)
+      LookupJoinUtil.isAsyncLookup(
+        temporalTable,
+        allLookupKeys.keys.map(Int.box).toList.asJava,
+        lookupHintSpec.orNull)
 
     super
       .explainTerms(pw)
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
index b915529ef62..510efcd09ab 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
@@ -707,4 +707,95 @@ final class TestValuesRuntimeFunctions {
                     });
         }
     }
+
+    /**
+     * The {@link TestNoLookupUntilNthAccessLookupFunction} extends {@link
+     * TestValuesLookupFunction}, it will not do real lookup for a key (return 
null value
+     * immediately) until which lookup times beyond predefined threshold 
'lookupThreshold'.
+     */
+    public static class TestNoLookupUntilNthAccessLookupFunction extends 
TestValuesLookupFunction {
+
+        private static final long serialVersionUID = 1L;
+
+        /** The threshold that a real lookup can happen, otherwise no lookup 
at all. */
+        private final int lookupThreshold;
+
+        private transient Map<RowData, Integer> accessCounter;
+
+        protected TestNoLookupUntilNthAccessLookupFunction(
+                List<Row> data,
+                int[] lookupIndices,
+                LookupTableSource.DataStructureConverter converter,
+                int lookupThreshold) {
+            super(data, lookupIndices, converter);
+            this.lookupThreshold = lookupThreshold;
+        }
+
+        @Override
+        public void open(FunctionContext context) throws Exception {
+            super.open(context);
+            accessCounter = new HashMap<>();
+        }
+
+        protected int counter(RowData key) {
+            int currentCnt = accessCounter.computeIfAbsent(key, cnt -> 0) + 1;
+            accessCounter.put(key, currentCnt);
+            return currentCnt;
+        }
+
+        @Override
+        public Collection<RowData> lookup(RowData keyRow) throws IOException {
+            int currentCnt = counter(keyRow);
+            if (currentCnt <= lookupThreshold) {
+                return null;
+            }
+            return super.lookup(keyRow);
+        }
+    }
+
+    /**
+     * The {@link TestNoLookupUntilNthAccessAsyncLookupFunction} extends {@link
+     * AsyncTestValueLookupFunction}, it will not do real lookup for a key 
(return empty result
+     * immediately) until which lookup times beyond predefined threshold 
'lookupThreshold'.
+     */
+    public static class TestNoLookupUntilNthAccessAsyncLookupFunction
+            extends AsyncTestValueLookupFunction {
+        private static final long serialVersionUID = 1L;
+        private static Collection<RowData> emptyResult = 
Collections.emptyList();
+
+        /** The threshold that a real lookup can happen, otherwise no lookup 
at all. */
+        private final int lookupThreshold;
+
+        private transient Map<RowData, Integer> accessCounter;
+
+        public TestNoLookupUntilNthAccessAsyncLookupFunction(
+                List<Row> data,
+                int[] lookupIndices,
+                LookupTableSource.DataStructureConverter converter,
+                int lookupThreshold) {
+            super(data, lookupIndices, converter);
+            this.lookupThreshold = lookupThreshold;
+        }
+
+        @Override
+        public void open(FunctionContext context) throws Exception {
+            super.open(context);
+            accessCounter = new HashMap<>();
+        }
+
+        protected int counter(RowData key) {
+            int currentCnt = accessCounter.computeIfAbsent(key, cnt -> 0) + 1;
+            accessCounter.put(key, currentCnt);
+            return currentCnt;
+        }
+
+        @Override
+        public CompletableFuture<Collection<RowData>> asyncLookup(RowData 
keyRow) {
+            int currentCnt = counter(keyRow);
+            if (currentCnt <= lookupThreshold) {
+                return CompletableFuture.supplyAsync(() -> emptyResult);
+            }
+            return super.asyncLookup(keyRow);
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 60967f8494d..949b9f57e57 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -86,14 +86,18 @@ import 
org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.functions.AsyncLookupFunction;
 import org.apache.flink.table.functions.AsyncTableFunction;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.LookupFunction;
 import org.apache.flink.table.functions.TableFunction;
 import 
org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.AppendingOutputFormat;
 import 
org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.AppendingSinkFunction;
 import 
org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.AsyncTestValueLookupFunction;
 import 
org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.KeyedUpsertingSinkFunction;
 import 
org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.RetractingSinkFunction;
+import 
org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.TestNoLookupUntilNthAccessAsyncLookupFunction;
+import 
org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.TestNoLookupUntilNthAccessLookupFunction;
 import 
org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.TestValuesLookupFunction;
 import org.apache.flink.table.planner.functions.aggfunctions.Count1AggFunction;
 import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction;
@@ -317,6 +321,13 @@ public final class TestValuesTableFactory
     private static final ConfigOption<String> LOOKUP_FUNCTION_CLASS =
             
ConfigOptions.key("lookup-function-class").stringType().noDefaultValue();
 
+    private static final ConfigOption<Integer> LOOKUP_THRESHOLD =
+            ConfigOptions.key("start-lookup-threshold")
+                    .intType()
+                    .defaultValue(-1)
+                    .withDescription(
+                            "The threshold which backend lookup function will 
not do real lookup for"
+                                    + " a key (returns null value immediately) 
until its lookup times beyond");
     private static final ConfigOption<Boolean> ASYNC_ENABLED =
             ConfigOptions.key("async").booleanType().defaultValue(false);
 
@@ -416,6 +427,7 @@ public final class TestValuesTableFactory
         boolean failingSource = helper.getOptions().get(FAILING_SOURCE);
         int numElementToSkip = 
helper.getOptions().get(SOURCE_NUM_ELEMENT_TO_SKIP);
         boolean internalData = helper.getOptions().get(INTERNAL_DATA);
+        int lookupThreshold = helper.getOptions().get(LOOKUP_THRESHOLD);
         DefaultLookupCache cache = null;
         if 
(helper.getOptions().get(CACHE_TYPE).equals(LookupOptions.LookupCacheType.PARTIAL))
 {
             cache = DefaultLookupCache.fromConfig(helper.getOptions());
@@ -540,7 +552,8 @@ public final class TestValuesTableFactory
                         readableMetadata,
                         null,
                         cache,
-                        reloadTrigger);
+                        reloadTrigger,
+                        lookupThreshold);
             }
         } else {
             try {
@@ -622,6 +635,7 @@ public final class TestValuesTableFactory
                         TABLE_SOURCE_CLASS,
                         FAILING_SOURCE,
                         LOOKUP_FUNCTION_CLASS,
+                        LOOKUP_THRESHOLD,
                         ASYNC_ENABLED,
                         DISABLE_LOOKUP,
                         TABLE_SOURCE_CLASS,
@@ -1496,6 +1510,7 @@ public final class TestValuesTableFactory
         private final @Nullable LookupCache cache;
         private final @Nullable CacheReloadTrigger reloadTrigger;
         private final boolean isAsync;
+        private final int lookupThreshold;
 
         private TestValuesScanLookupTableSource(
                 DataType producedDataType,
@@ -1517,7 +1532,8 @@ public final class TestValuesTableFactory
                 Map<String, DataType> readableMetadata,
                 @Nullable int[] projectedMetadataFields,
                 @Nullable LookupCache cache,
-                @Nullable CacheReloadTrigger reloadTrigger) {
+                @Nullable CacheReloadTrigger reloadTrigger,
+                int lookupThreshold) {
             super(
                     producedDataType,
                     changelogMode,
@@ -1539,6 +1555,7 @@ public final class TestValuesTableFactory
             this.isAsync = isAsync;
             this.cache = cache;
             this.reloadTrigger = reloadTrigger;
+            this.lookupThreshold = lookupThreshold;
         }
 
         @SuppressWarnings({"unchecked", "rawtypes"})
@@ -1549,7 +1566,11 @@ public final class TestValuesTableFactory
                 try {
                     Class<?> clazz = Class.forName(lookupFunctionClass);
                     Object udtf = InstantiationUtil.instantiate(clazz);
-                    if (udtf instanceof TableFunction) {
+                    if (udtf instanceof LookupFunction) {
+                        return LookupFunctionProvider.of((LookupFunction) 
udtf);
+                    } else if (udtf instanceof AsyncLookupFunction) {
+                        return 
AsyncLookupFunctionProvider.of((AsyncLookupFunction) udtf);
+                    } else if (udtf instanceof TableFunction) {
                         return TableFunctionProvider.of((TableFunction) udtf);
                     } else {
                         return 
AsyncTableFunctionProvider.of((AsyncTableFunction) udtf);
@@ -1582,7 +1603,7 @@ public final class TestValuesTableFactory
                     context.createDataStructureConverter(producedDataType);
             if (isAsync) {
                 AsyncTestValueLookupFunction asyncLookupFunction =
-                        new AsyncTestValueLookupFunction(data, lookupIndices, 
converter);
+                        getTestValuesAsyncLookupFunction(data, lookupIndices, 
converter);
                 if (cache == null) {
                     return AsyncLookupFunctionProvider.of(asyncLookupFunction);
                 } else {
@@ -1591,7 +1612,7 @@ public final class TestValuesTableFactory
                 }
             } else {
                 TestValuesLookupFunction lookupFunction =
-                        new TestValuesLookupFunction(data, lookupIndices, 
converter);
+                        getTestValuesLookupFunction(data, lookupIndices, 
converter);
                 if (cache != null) {
                     return PartialCachingLookupProvider.of(lookupFunction, 
cache);
                 } else if (reloadTrigger != null) {
@@ -1608,6 +1629,24 @@ public final class TestValuesTableFactory
             }
         }
 
+        private AsyncTestValueLookupFunction getTestValuesAsyncLookupFunction(
+                List<Row> data, int[] lookupIndices, DataStructureConverter 
converter) {
+            if (lookupThreshold > 0) {
+                return new TestNoLookupUntilNthAccessAsyncLookupFunction(
+                        data, lookupIndices, converter, lookupThreshold);
+            }
+            return new AsyncTestValueLookupFunction(data, lookupIndices, 
converter);
+        }
+
+        private TestValuesLookupFunction getTestValuesLookupFunction(
+                List<Row> data, int[] lookupIndices, DataStructureConverter 
converter) {
+            if (lookupThreshold > 0) {
+                return new TestNoLookupUntilNthAccessLookupFunction(
+                        data, lookupIndices, converter, lookupThreshold);
+            }
+            return new TestValuesLookupFunction(data, lookupIndices, 
converter);
+        }
+
         @Override
         public DynamicTableSource copy() {
             return new TestValuesScanLookupTableSource(
@@ -1630,7 +1669,8 @@ public final class TestValuesTableFactory
                     readableMetadata,
                     projectedMetadataFields,
                     cache,
-                    reloadTrigger);
+                    reloadTrigger,
+                    lookupThreshold);
         }
     }
 
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
index e1de434979f..4f6a308b9b5 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
@@ -193,7 +193,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
-+- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
index 8826a2ccf46..cb66ed0aff7 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
@@ -76,6 +76,10 @@ class AsyncLookupJoinITCase(
 
     createScanTable("src", data)
     createLookupTable("user_table", userData)
+    // lookup will start from the 2nd time, first lookup will always get null 
result
+    createLookupTable("user_table_with_lookup_threshold2", userData, 2)
+    // lookup will start from the 3rd time, first lookup will always get null 
result
+    createLookupTable("user_table_with_lookup_threshold3", userData, 3)
   }
 
   @After
@@ -88,7 +92,10 @@ class AsyncLookupJoinITCase(
     }
   }
 
-  private def createLookupTable(tableName: String, data: List[Row]): Unit = {
+  private def createLookupTable(
+      tableName: String,
+      data: List[Row],
+      lookupThreshold: Int = -1): Unit = {
     if (legacyTableSource) {
       val userSchema = TableSchema
         .builder()
@@ -111,6 +118,10 @@ class AsyncLookupJoinITCase(
              |  '${LookupOptions.PARTIAL_CACHE_MAX_ROWS.key()}' = 
'${Long.MaxValue}',
              |""".stripMargin
         else ""
+      val lookupThresholdOption = if (lookupThreshold > 0) {
+        s"'start-lookup-threshold'='$lookupThreshold',"
+      } else ""
+
       tEnv.executeSql(s"""
                          |CREATE TABLE $tableName (
                          |  `age` INT,
@@ -118,6 +129,7 @@ class AsyncLookupJoinITCase(
                          |  `name` STRING
                          |) WITH (
                          |  $cacheOptions
+                         |  $lookupThresholdOption
                          |  'connector' = 'values',
                          |  'data-id' = '$dataId',
                          |  'async' = 'true'
@@ -126,6 +138,19 @@ class AsyncLookupJoinITCase(
     }
   }
 
+  // TODO a base class or utility class is better to reuse code for this and 
LookupJoinITCase
+  private def getAsyncRetryLookupHint(lookupTable: String, maxAttempts: Int): 
String = {
+    s"""
+       |/*+ LOOKUP('table'='$lookupTable', 
+       | 'async'='true', 
+       | 'time-out'='300s',
+       | 'retry-predicate'='lookup_miss',
+       | 'retry-strategy'='fixed_delay',
+       | 'fixed-delay'='1 ms',
+       | 'max-attempts'='$maxAttempts')
+       |*/""".stripMargin
+  }
+
   private def createScanTable(tableName: String, data: List[Row]): Unit = {
     val dataId = TestValuesTableFactory.registerData(data)
     tEnv.executeSql(s"""
@@ -289,7 +314,7 @@ class AsyncLookupJoinITCase(
     // only legacy source can provide both sync and async functions
     if (!legacyTableSource) {
       thrown.expectMessage(
-        "Require a synchronous lookup function due to planner's requirement 
but no available functions")
+        "Required sync lookup function by planner, but table [default_catalog, 
default_database, user_table]does not offer a valid lookup function")
       thrown.expect(classOf[TableException])
     }
     tEnv.getConfig.set(
@@ -415,7 +440,79 @@ class AsyncLookupJoinITCase(
     new java.lang.Long(l)
   }
 
-// TODO add case with async and retry in FLINK-28849
+  @Test
+  def testAsyncJoinTemporalTableWithRetry(): Unit = {
+    val maxRetryTwiceHint = getAsyncRetryLookupHint("user_table", 2)
+    val sink = new TestingAppendSink
+    tEnv
+      .sqlQuery(s"""
+                   |SELECT $maxRetryTwiceHint T.id, T.len, T.content, D.name 
FROM src AS T
+                   |JOIN user_table for system_time as of T.proctime AS D
+                   |ON T.id = D.id
+                   |""".stripMargin)
+      .toAppendStream[Row]
+      .addSink(sink)
+    env.execute()
+
+    // the result is deterministic because the test data of lookup source is 
static
+    val expected = Seq("1,12,Julian,Julian", "2,15,Hello,Jark", 
"3,15,Fabian,Fabian")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testAsyncJoinTemporalTableWithLookupThresholdWithInsufficientRetry(): 
Unit = {
+    val maxRetryOnceHint = 
getAsyncRetryLookupHint("user_table_with_lookup_threshold3", 1)
+    val sink = new TestingAppendSink
+    tEnv
+      .sqlQuery(s"""
+                   |SELECT $maxRetryOnceHint T.id, T.len, T.content, D.name 
FROM src AS T
+                   |JOIN user_table_with_lookup_threshold3 for system_time as 
of T.proctime AS D
+                   |ON T.id = D.id
+                   |""".stripMargin)
+      .toAppendStream[Row]
+      .addSink(sink)
+    env.execute()
+
+    val expected = if (legacyTableSource) {
+      // test legacy lookup source do not support lookup threshold
+      // for real async lookup functions(both new and legacy api) do support 
retry
+      Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian")
+    } else {
+      // the user_table_with_lookup_threshold3 will return null result before 
3rd lookup
+      Seq()
+    }
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testAsyncJoinTemporalTableWithLookupThresholdWithSufficientRetry(): Unit 
= {
+    // When enable async retry, there should left enough time for the async 
operator doing delayed
+    // retry work, but due the fast finish of testing bounded source, it has 
no assurance of the
+    // max attempts number, it only ensures at least one retry for each 
element in current version
+    // so we can only use a max lookup threshold to 2 to get a deterministic 
results
+    val maxRetryTwiceHint = 
getAsyncRetryLookupHint("user_table_with_lookup_threshold2", 2)
+
+    val sink = new TestingAppendSink
+    tEnv
+      .sqlQuery(s"""
+                   |SELECT $maxRetryTwiceHint T.id, T.len, T.content, D.name 
FROM src AS T
+                   |JOIN user_table_with_lookup_threshold2 for system_time as 
of T.proctime AS D
+                   |ON T.id = D.id
+                   |""".stripMargin)
+      .toAppendStream[Row]
+      .addSink(sink)
+    env.execute()
+
+    val expected = if (legacyTableSource) {
+      // test legacy lookup source do not support lookup threshold
+      // for real async lookup functions(both new and legacy api) do support 
retry
+      Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian")
+    } else {
+      // TODO retry on async is not supported currently, this should be 
updated after supported
+      Seq()
+    }
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
 
 }
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
index c871ab24c34..edea5d48915 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
@@ -81,6 +81,10 @@ class LookupJoinITCase(legacyTableSource: Boolean, 
cacheType: LookupCacheType)
     createScanTable("nullable_src", dataWithNull)
     createLookupTable("user_table", userData)
     createLookupTable("nullable_user_table", userDataWithNull)
+    // lookup will start from the 2nd time, first lookup will always get null 
result
+    createLookupTable("user_table_with_lookup_threshold2", userData, 2)
+    // lookup will start from the 3rd time, first lookup will always get null 
result
+    createLookupTable("user_table_with_lookup_threshold3", userData, 3)
     createLookupTableWithComputedColumn("userTableWithComputedColumn", 
userData)
   }
 
@@ -93,7 +97,11 @@ class LookupJoinITCase(legacyTableSource: Boolean, 
cacheType: LookupCacheType)
     }
   }
 
-  private def createLookupTable(tableName: String, data: List[Row]): Unit = {
+  /** The lookupThreshold only works for new table source (not 
legacyTableSource). */
+  private def createLookupTable(
+      tableName: String,
+      data: List[Row],
+      lookupThreshold: Int = -1): Unit = {
     if (legacyTableSource) {
       val userSchema = TableSchema
         .builder()
@@ -122,6 +130,9 @@ class LookupJoinITCase(legacyTableSource: Boolean, 
cacheType: LookupCacheType)
              |  '${LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL.key()}' = 
'${Long.MaxValue}',
              |""".stripMargin
         else ""
+      val lookupThresholdOption = if (lookupThreshold > 0) {
+        s"'start-lookup-threshold'='$lookupThreshold',"
+      } else ""
 
       tEnv.executeSql(s"""
                          |CREATE TABLE $tableName (
@@ -130,6 +141,7 @@ class LookupJoinITCase(legacyTableSource: Boolean, 
cacheType: LookupCacheType)
                          |  `name` STRING
                          |) WITH (
                          |  $cacheOptions
+                         |  $lookupThresholdOption
                          |  'connector' = 'values',
                          |  'data-id' = '$dataId'
                          |)
@@ -708,8 +720,97 @@ class LookupJoinITCase(legacyTableSource: Boolean, 
cacheType: LookupCacheType)
     val expected = Seq("3", "8", "9")
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
-  // TODO add case with retry hint in FLINK-28849
 
+  private def getRetryLookupHint(lookupTable: String, maxAttempts: Int): 
String = {
+    s"""
+       |/*+ LOOKUP('table'='$lookupTable', 'retry-predicate'='lookup_miss',
+       | 'retry-strategy'='fixed_delay',
+       |  'fixed-delay'='5 ms',
+       |   'max-attempts'='$maxAttempts')
+       |*/""".stripMargin
+  }
+
+  @Test
+  def testJoinTemporalTableWithRetry(): Unit = {
+    val maxRetryTwiceHint = getRetryLookupHint("user_table", 2)
+    val sink = new TestingAppendSink
+    tEnv
+      .sqlQuery(s"""
+                   |SELECT $maxRetryTwiceHint T.id, T.len, T.content, D.name 
FROM src AS T
+                   |JOIN user_table for system_time as of T.proctime AS D
+                   |ON T.id = D.id
+                   |""".stripMargin)
+      .toAppendStream[Row]
+      .addSink(sink)
+    env.execute()
+
+    // the result is deterministic because the test data of lookup source is 
static
+    val expected = Seq("1,12,Julian,Julian", "2,15,Hello,Jark", 
"3,15,Fabian,Fabian")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testJoinTemporalTableWithLookupThresholdWithInsufficientRetry(): Unit = {
+    val maxRetryOnceHint = 
getRetryLookupHint("user_table_with_lookup_threshold3", 1)
+    val sink = new TestingAppendSink
+    tEnv
+      .sqlQuery(s"""
+                   |SELECT $maxRetryOnceHint T.id, T.len, T.content, D.name 
FROM src AS T
+                   |JOIN user_table_with_lookup_threshold3 for system_time as 
of T.proctime AS D
+                   |ON T.id = D.id
+                   |""".stripMargin)
+      .toAppendStream[Row]
+      .addSink(sink)
+    env.execute()
+
+    val expected = if (legacyTableSource || cacheType == LookupCacheType.FULL) 
{
+      // legacy lookup source and full caching lookup do not support retry
+      Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian")
+    } else {
+      // the user_table_with_lookup_threshold3 will return null result before 
3rd lookup
+      Seq()
+    }
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testJoinTemporalTableWithLookupThresholdWithSufficientRetry(): Unit = {
+    val maxRetryTwiceHint = 
getRetryLookupHint("user_table_with_lookup_threshold2", 2)
+
+    val sink = new TestingAppendSink
+    tEnv
+      .sqlQuery(s"""
+                   |SELECT $maxRetryTwiceHint T.id, T.len, T.content, D.name 
FROM src AS T
+                   |JOIN user_table_with_lookup_threshold2 for system_time as 
of T.proctime AS D
+                   |ON T.id = D.id
+                   |""".stripMargin)
+      .toAppendStream[Row]
+      .addSink(sink)
+    env.execute()
+
+    val expected = Seq("1,12,Julian,Julian", "2,15,Hello,Jark", 
"3,15,Fabian,Fabian")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testJoinTemporalTableWithLookupThresholdWithLargerRetry(): Unit = {
+    // max times beyond the lookup threshold of 
'user_table_with_lookup_threshold2'
+    val largerRetryHint = 
getRetryLookupHint("user_table_with_lookup_threshold2", 10)
+
+    val sink = new TestingAppendSink
+    tEnv
+      .sqlQuery(s"""
+                   |SELECT $largerRetryHint T.id, T.len, T.content, D.name 
FROM src AS T
+                   |JOIN user_table_with_lookup_threshold2 for system_time as 
of T.proctime AS D
+                   |ON T.id = D.id
+                   |""".stripMargin)
+      .toAppendStream[Row]
+      .addSink(sink)
+    env.execute()
+
+    val expected = Seq("1,12,Julian,Julian", "2,15,Hello,Jark", 
"3,15,Fabian,Fabian")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
 }
 
 object LookupJoinITCase {

Reply via email to