This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c5b5d4368437fc98b2f5ca31b1f1c6cf3e4ce263 Author: lincoln lee <lincoln.8...@gmail.com> AuthorDate: Tue Aug 9 18:14:08 2022 +0800 [FLINK-28849][table-planner] Fix errors when enable retry on async lookup and add more tests Disable retry on async because of two problems need to be resolved first This closes #20482 --- .../nodes/exec/common/CommonExecLookupJoin.java | 35 +++---- .../table/planner/plan/utils/LookupJoinUtil.java | 13 ++- .../physical/common/CommonPhysicalLookupJoin.scala | 7 +- .../factories/TestValuesRuntimeFunctions.java | 91 ++++++++++++++++++ .../planner/factories/TestValuesTableFactory.java | 52 ++++++++-- .../plan/stream/sql/join/LookupJoinTest.xml | 2 +- .../runtime/stream/sql/AsyncLookupJoinITCase.scala | 103 +++++++++++++++++++- .../runtime/stream/sql/LookupJoinITCase.scala | 105 ++++++++++++++++++++- 8 files changed, 367 insertions(+), 41 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java index 52192ee79ad..d00c888e1fa 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java @@ -542,30 +542,17 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> isLeftOuterJoin, asyncLookupOptions.asyncBufferCapacity); } - /** - * why not implements async-retry directly in AsyncLookupFunction ? - because the active - * sleeping on async callback thread will occupy the task cpu time while the retry support - * in async data stream api provides a more efficient way via processing time service which - * does not occupy callback thread. Both AsyncLookupFunction AsyncTableFunction can support - * retry. does not occupy callback thread. Both AsyncLookupFunction AsyncTableFunction can - * support retry. - */ - if (null != joinHintSpec) { - // simplify code here, not check whether ResultRetryStrategy is NO_RETRY_STRATEGY or not - // because AsyncWaitOperator has short-path optimization during compile time. - return new AsyncWaitOperatorFactory<>( - asyncFunc, - asyncLookupOptions.asyncTimeout, - asyncLookupOptions.asyncBufferCapacity, - convert(asyncLookupOptions.asyncOutputMode), - joinHintSpec.toRetryStrategy()); - } else { - return new AsyncWaitOperatorFactory<>( - asyncFunc, - asyncLookupOptions.asyncTimeout, - asyncLookupOptions.asyncBufferCapacity, - convert(asyncLookupOptions.asyncOutputMode)); - } + // TODO async retry to be supported, can not directly enable retry on 'AsyncWaitOperator' + // because of two reasons: 1. AsyncLookupJoinRunner has a 'stateful' resultFutureBuffer bind + // to each input record (it's non-reenter-able) 2. can not lookup new value if cache empty + // enabled when chained with the new AsyncCachingLookupFunction. This two issues should be + // resolved first before enable async retry. + + return new AsyncWaitOperatorFactory<>( + asyncFunc, + asyncLookupOptions.asyncTimeout, + asyncLookupOptions.asyncBufferCapacity, + convert(asyncLookupOptions.asyncOutputMode)); } private AsyncDataStream.OutputMode convert( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java index 833eb1908a7..529f79b706e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java @@ -400,18 +400,23 @@ public final class LookupJoinUtil { || lookupJoinHintSpec.isAsync(); } - public static boolean isAsyncLookup(RelOptTable temporalTable, Collection<Integer> lookupKeys) { + public static boolean isAsyncLookup( + RelOptTable temporalTable, + Collection<Integer> lookupKeys, + LookupJoinHintSpec lookupJoinHintSpec) { + boolean preferAsync = preferAsync(lookupJoinHintSpec); if (temporalTable instanceof TableSourceTable) { LookupTableSource.LookupRuntimeProvider provider = getLookupRuntimeProvider(temporalTable, lookupKeys); - return provider instanceof AsyncLookupFunctionProvider - || provider instanceof AsyncTableFunctionProvider; + return preferAsync + && (provider instanceof AsyncLookupFunctionProvider + || provider instanceof AsyncTableFunctionProvider); } else if (temporalTable instanceof LegacyTableSourceTable) { LegacyTableSourceTable<?> legacyTableSourceTable = (LegacyTableSourceTable<?>) temporalTable; LookupableTableSource<?> lookupableTableSource = (LookupableTableSource<?>) legacyTableSourceTable.tableSource(); - return lookupableTableSource.isAsyncEnabled(); + return preferAsync && lookupableTableSource.isAsyncEnabled(); } throw new TableException( String.format( diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala index 568909ddd5b..c5fde386f73 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala @@ -161,8 +161,13 @@ abstract class CommonPhysicalLookupJoin( case t: LegacyTableSourceTable[_] => t.tableIdentifier } + // The lookup function maybe not the final choice at runtime because lack of upsert materialize + // info here. This can be consistent after planner offers enough info here. val isAsyncEnabled: Boolean = - LookupJoinUtil.isAsyncLookup(temporalTable, allLookupKeys.keys.map(Int.box).toList.asJava) + LookupJoinUtil.isAsyncLookup( + temporalTable, + allLookupKeys.keys.map(Int.box).toList.asJava, + lookupHintSpec.orNull) super .explainTerms(pw) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java index b915529ef62..510efcd09ab 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java @@ -707,4 +707,95 @@ final class TestValuesRuntimeFunctions { }); } } + + /** + * The {@link TestNoLookupUntilNthAccessLookupFunction} extends {@link + * TestValuesLookupFunction}, it will not do real lookup for a key (return null value + * immediately) until which lookup times beyond predefined threshold 'lookupThreshold'. + */ + public static class TestNoLookupUntilNthAccessLookupFunction extends TestValuesLookupFunction { + + private static final long serialVersionUID = 1L; + + /** The threshold that a real lookup can happen, otherwise no lookup at all. */ + private final int lookupThreshold; + + private transient Map<RowData, Integer> accessCounter; + + protected TestNoLookupUntilNthAccessLookupFunction( + List<Row> data, + int[] lookupIndices, + LookupTableSource.DataStructureConverter converter, + int lookupThreshold) { + super(data, lookupIndices, converter); + this.lookupThreshold = lookupThreshold; + } + + @Override + public void open(FunctionContext context) throws Exception { + super.open(context); + accessCounter = new HashMap<>(); + } + + protected int counter(RowData key) { + int currentCnt = accessCounter.computeIfAbsent(key, cnt -> 0) + 1; + accessCounter.put(key, currentCnt); + return currentCnt; + } + + @Override + public Collection<RowData> lookup(RowData keyRow) throws IOException { + int currentCnt = counter(keyRow); + if (currentCnt <= lookupThreshold) { + return null; + } + return super.lookup(keyRow); + } + } + + /** + * The {@link TestNoLookupUntilNthAccessAsyncLookupFunction} extends {@link + * AsyncTestValueLookupFunction}, it will not do real lookup for a key (return empty result + * immediately) until which lookup times beyond predefined threshold 'lookupThreshold'. + */ + public static class TestNoLookupUntilNthAccessAsyncLookupFunction + extends AsyncTestValueLookupFunction { + private static final long serialVersionUID = 1L; + private static Collection<RowData> emptyResult = Collections.emptyList(); + + /** The threshold that a real lookup can happen, otherwise no lookup at all. */ + private final int lookupThreshold; + + private transient Map<RowData, Integer> accessCounter; + + public TestNoLookupUntilNthAccessAsyncLookupFunction( + List<Row> data, + int[] lookupIndices, + LookupTableSource.DataStructureConverter converter, + int lookupThreshold) { + super(data, lookupIndices, converter); + this.lookupThreshold = lookupThreshold; + } + + @Override + public void open(FunctionContext context) throws Exception { + super.open(context); + accessCounter = new HashMap<>(); + } + + protected int counter(RowData key) { + int currentCnt = accessCounter.computeIfAbsent(key, cnt -> 0) + 1; + accessCounter.put(key, currentCnt); + return currentCnt; + } + + @Override + public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) { + int currentCnt = counter(keyRow); + if (currentCnt <= lookupThreshold) { + return CompletableFuture.supplyAsync(() -> emptyResult); + } + return super.asyncLookup(keyRow); + } + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java index 60967f8494d..949b9f57e57 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java @@ -86,14 +86,18 @@ import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.functions.AsyncLookupFunction; import org.apache.flink.table.functions.AsyncTableFunction; import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.LookupFunction; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.AppendingOutputFormat; import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.AppendingSinkFunction; import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.AsyncTestValueLookupFunction; import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.KeyedUpsertingSinkFunction; import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.RetractingSinkFunction; +import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.TestNoLookupUntilNthAccessAsyncLookupFunction; +import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.TestNoLookupUntilNthAccessLookupFunction; import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.TestValuesLookupFunction; import org.apache.flink.table.planner.functions.aggfunctions.Count1AggFunction; import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction; @@ -317,6 +321,13 @@ public final class TestValuesTableFactory private static final ConfigOption<String> LOOKUP_FUNCTION_CLASS = ConfigOptions.key("lookup-function-class").stringType().noDefaultValue(); + private static final ConfigOption<Integer> LOOKUP_THRESHOLD = + ConfigOptions.key("start-lookup-threshold") + .intType() + .defaultValue(-1) + .withDescription( + "The threshold which backend lookup function will not do real lookup for" + + " a key (returns null value immediately) until its lookup times beyond"); private static final ConfigOption<Boolean> ASYNC_ENABLED = ConfigOptions.key("async").booleanType().defaultValue(false); @@ -416,6 +427,7 @@ public final class TestValuesTableFactory boolean failingSource = helper.getOptions().get(FAILING_SOURCE); int numElementToSkip = helper.getOptions().get(SOURCE_NUM_ELEMENT_TO_SKIP); boolean internalData = helper.getOptions().get(INTERNAL_DATA); + int lookupThreshold = helper.getOptions().get(LOOKUP_THRESHOLD); DefaultLookupCache cache = null; if (helper.getOptions().get(CACHE_TYPE).equals(LookupOptions.LookupCacheType.PARTIAL)) { cache = DefaultLookupCache.fromConfig(helper.getOptions()); @@ -540,7 +552,8 @@ public final class TestValuesTableFactory readableMetadata, null, cache, - reloadTrigger); + reloadTrigger, + lookupThreshold); } } else { try { @@ -622,6 +635,7 @@ public final class TestValuesTableFactory TABLE_SOURCE_CLASS, FAILING_SOURCE, LOOKUP_FUNCTION_CLASS, + LOOKUP_THRESHOLD, ASYNC_ENABLED, DISABLE_LOOKUP, TABLE_SOURCE_CLASS, @@ -1496,6 +1510,7 @@ public final class TestValuesTableFactory private final @Nullable LookupCache cache; private final @Nullable CacheReloadTrigger reloadTrigger; private final boolean isAsync; + private final int lookupThreshold; private TestValuesScanLookupTableSource( DataType producedDataType, @@ -1517,7 +1532,8 @@ public final class TestValuesTableFactory Map<String, DataType> readableMetadata, @Nullable int[] projectedMetadataFields, @Nullable LookupCache cache, - @Nullable CacheReloadTrigger reloadTrigger) { + @Nullable CacheReloadTrigger reloadTrigger, + int lookupThreshold) { super( producedDataType, changelogMode, @@ -1539,6 +1555,7 @@ public final class TestValuesTableFactory this.isAsync = isAsync; this.cache = cache; this.reloadTrigger = reloadTrigger; + this.lookupThreshold = lookupThreshold; } @SuppressWarnings({"unchecked", "rawtypes"}) @@ -1549,7 +1566,11 @@ public final class TestValuesTableFactory try { Class<?> clazz = Class.forName(lookupFunctionClass); Object udtf = InstantiationUtil.instantiate(clazz); - if (udtf instanceof TableFunction) { + if (udtf instanceof LookupFunction) { + return LookupFunctionProvider.of((LookupFunction) udtf); + } else if (udtf instanceof AsyncLookupFunction) { + return AsyncLookupFunctionProvider.of((AsyncLookupFunction) udtf); + } else if (udtf instanceof TableFunction) { return TableFunctionProvider.of((TableFunction) udtf); } else { return AsyncTableFunctionProvider.of((AsyncTableFunction) udtf); @@ -1582,7 +1603,7 @@ public final class TestValuesTableFactory context.createDataStructureConverter(producedDataType); if (isAsync) { AsyncTestValueLookupFunction asyncLookupFunction = - new AsyncTestValueLookupFunction(data, lookupIndices, converter); + getTestValuesAsyncLookupFunction(data, lookupIndices, converter); if (cache == null) { return AsyncLookupFunctionProvider.of(asyncLookupFunction); } else { @@ -1591,7 +1612,7 @@ public final class TestValuesTableFactory } } else { TestValuesLookupFunction lookupFunction = - new TestValuesLookupFunction(data, lookupIndices, converter); + getTestValuesLookupFunction(data, lookupIndices, converter); if (cache != null) { return PartialCachingLookupProvider.of(lookupFunction, cache); } else if (reloadTrigger != null) { @@ -1608,6 +1629,24 @@ public final class TestValuesTableFactory } } + private AsyncTestValueLookupFunction getTestValuesAsyncLookupFunction( + List<Row> data, int[] lookupIndices, DataStructureConverter converter) { + if (lookupThreshold > 0) { + return new TestNoLookupUntilNthAccessAsyncLookupFunction( + data, lookupIndices, converter, lookupThreshold); + } + return new AsyncTestValueLookupFunction(data, lookupIndices, converter); + } + + private TestValuesLookupFunction getTestValuesLookupFunction( + List<Row> data, int[] lookupIndices, DataStructureConverter converter) { + if (lookupThreshold > 0) { + return new TestNoLookupUntilNthAccessLookupFunction( + data, lookupIndices, converter, lookupThreshold); + } + return new TestValuesLookupFunction(data, lookupIndices, converter); + } + @Override public DynamicTableSource copy() { return new TestValuesScanLookupTableSource( @@ -1630,7 +1669,8 @@ public final class TestValuesTableFactory readableMetadata, projectedMetadataFields, cache, - reloadTrigger); + reloadTrigger, + lookupThreshold); } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml index e1de434979f..4f6a308b9b5 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml @@ -193,7 +193,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], nam <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age]) -+- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age]) ++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) ]]> </Resource> diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala index 8826a2ccf46..cb66ed0aff7 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala @@ -76,6 +76,10 @@ class AsyncLookupJoinITCase( createScanTable("src", data) createLookupTable("user_table", userData) + // lookup will start from the 2nd time, first lookup will always get null result + createLookupTable("user_table_with_lookup_threshold2", userData, 2) + // lookup will start from the 3rd time, first lookup will always get null result + createLookupTable("user_table_with_lookup_threshold3", userData, 3) } @After @@ -88,7 +92,10 @@ class AsyncLookupJoinITCase( } } - private def createLookupTable(tableName: String, data: List[Row]): Unit = { + private def createLookupTable( + tableName: String, + data: List[Row], + lookupThreshold: Int = -1): Unit = { if (legacyTableSource) { val userSchema = TableSchema .builder() @@ -111,6 +118,10 @@ class AsyncLookupJoinITCase( | '${LookupOptions.PARTIAL_CACHE_MAX_ROWS.key()}' = '${Long.MaxValue}', |""".stripMargin else "" + val lookupThresholdOption = if (lookupThreshold > 0) { + s"'start-lookup-threshold'='$lookupThreshold'," + } else "" + tEnv.executeSql(s""" |CREATE TABLE $tableName ( | `age` INT, @@ -118,6 +129,7 @@ class AsyncLookupJoinITCase( | `name` STRING |) WITH ( | $cacheOptions + | $lookupThresholdOption | 'connector' = 'values', | 'data-id' = '$dataId', | 'async' = 'true' @@ -126,6 +138,19 @@ class AsyncLookupJoinITCase( } } + // TODO a base class or utility class is better to reuse code for this and LookupJoinITCase + private def getAsyncRetryLookupHint(lookupTable: String, maxAttempts: Int): String = { + s""" + |/*+ LOOKUP('table'='$lookupTable', + | 'async'='true', + | 'time-out'='300s', + | 'retry-predicate'='lookup_miss', + | 'retry-strategy'='fixed_delay', + | 'fixed-delay'='1 ms', + | 'max-attempts'='$maxAttempts') + |*/""".stripMargin + } + private def createScanTable(tableName: String, data: List[Row]): Unit = { val dataId = TestValuesTableFactory.registerData(data) tEnv.executeSql(s""" @@ -289,7 +314,7 @@ class AsyncLookupJoinITCase( // only legacy source can provide both sync and async functions if (!legacyTableSource) { thrown.expectMessage( - "Require a synchronous lookup function due to planner's requirement but no available functions") + "Required sync lookup function by planner, but table [default_catalog, default_database, user_table]does not offer a valid lookup function") thrown.expect(classOf[TableException]) } tEnv.getConfig.set( @@ -415,7 +440,79 @@ class AsyncLookupJoinITCase( new java.lang.Long(l) } -// TODO add case with async and retry in FLINK-28849 + @Test + def testAsyncJoinTemporalTableWithRetry(): Unit = { + val maxRetryTwiceHint = getAsyncRetryLookupHint("user_table", 2) + val sink = new TestingAppendSink + tEnv + .sqlQuery(s""" + |SELECT $maxRetryTwiceHint T.id, T.len, T.content, D.name FROM src AS T + |JOIN user_table for system_time as of T.proctime AS D + |ON T.id = D.id + |""".stripMargin) + .toAppendStream[Row] + .addSink(sink) + env.execute() + + // the result is deterministic because the test data of lookup source is static + val expected = Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + + @Test + def testAsyncJoinTemporalTableWithLookupThresholdWithInsufficientRetry(): Unit = { + val maxRetryOnceHint = getAsyncRetryLookupHint("user_table_with_lookup_threshold3", 1) + val sink = new TestingAppendSink + tEnv + .sqlQuery(s""" + |SELECT $maxRetryOnceHint T.id, T.len, T.content, D.name FROM src AS T + |JOIN user_table_with_lookup_threshold3 for system_time as of T.proctime AS D + |ON T.id = D.id + |""".stripMargin) + .toAppendStream[Row] + .addSink(sink) + env.execute() + + val expected = if (legacyTableSource) { + // test legacy lookup source do not support lookup threshold + // for real async lookup functions(both new and legacy api) do support retry + Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian") + } else { + // the user_table_with_lookup_threshold3 will return null result before 3rd lookup + Seq() + } + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + + @Test + def testAsyncJoinTemporalTableWithLookupThresholdWithSufficientRetry(): Unit = { + // When enable async retry, there should left enough time for the async operator doing delayed + // retry work, but due the fast finish of testing bounded source, it has no assurance of the + // max attempts number, it only ensures at least one retry for each element in current version + // so we can only use a max lookup threshold to 2 to get a deterministic results + val maxRetryTwiceHint = getAsyncRetryLookupHint("user_table_with_lookup_threshold2", 2) + + val sink = new TestingAppendSink + tEnv + .sqlQuery(s""" + |SELECT $maxRetryTwiceHint T.id, T.len, T.content, D.name FROM src AS T + |JOIN user_table_with_lookup_threshold2 for system_time as of T.proctime AS D + |ON T.id = D.id + |""".stripMargin) + .toAppendStream[Row] + .addSink(sink) + env.execute() + + val expected = if (legacyTableSource) { + // test legacy lookup source do not support lookup threshold + // for real async lookup functions(both new and legacy api) do support retry + Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian") + } else { + // TODO retry on async is not supported currently, this should be updated after supported + Seq() + } + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala index c871ab24c34..edea5d48915 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala @@ -81,6 +81,10 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType) createScanTable("nullable_src", dataWithNull) createLookupTable("user_table", userData) createLookupTable("nullable_user_table", userDataWithNull) + // lookup will start from the 2nd time, first lookup will always get null result + createLookupTable("user_table_with_lookup_threshold2", userData, 2) + // lookup will start from the 3rd time, first lookup will always get null result + createLookupTable("user_table_with_lookup_threshold3", userData, 3) createLookupTableWithComputedColumn("userTableWithComputedColumn", userData) } @@ -93,7 +97,11 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType) } } - private def createLookupTable(tableName: String, data: List[Row]): Unit = { + /** The lookupThreshold only works for new table source (not legacyTableSource). */ + private def createLookupTable( + tableName: String, + data: List[Row], + lookupThreshold: Int = -1): Unit = { if (legacyTableSource) { val userSchema = TableSchema .builder() @@ -122,6 +130,9 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType) | '${LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL.key()}' = '${Long.MaxValue}', |""".stripMargin else "" + val lookupThresholdOption = if (lookupThreshold > 0) { + s"'start-lookup-threshold'='$lookupThreshold'," + } else "" tEnv.executeSql(s""" |CREATE TABLE $tableName ( @@ -130,6 +141,7 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType) | `name` STRING |) WITH ( | $cacheOptions + | $lookupThresholdOption | 'connector' = 'values', | 'data-id' = '$dataId' |) @@ -708,8 +720,97 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType) val expected = Seq("3", "8", "9") assertEquals(expected.sorted, sink.getRetractResults.sorted) } - // TODO add case with retry hint in FLINK-28849 + private def getRetryLookupHint(lookupTable: String, maxAttempts: Int): String = { + s""" + |/*+ LOOKUP('table'='$lookupTable', 'retry-predicate'='lookup_miss', + | 'retry-strategy'='fixed_delay', + | 'fixed-delay'='5 ms', + | 'max-attempts'='$maxAttempts') + |*/""".stripMargin + } + + @Test + def testJoinTemporalTableWithRetry(): Unit = { + val maxRetryTwiceHint = getRetryLookupHint("user_table", 2) + val sink = new TestingAppendSink + tEnv + .sqlQuery(s""" + |SELECT $maxRetryTwiceHint T.id, T.len, T.content, D.name FROM src AS T + |JOIN user_table for system_time as of T.proctime AS D + |ON T.id = D.id + |""".stripMargin) + .toAppendStream[Row] + .addSink(sink) + env.execute() + + // the result is deterministic because the test data of lookup source is static + val expected = Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + + @Test + def testJoinTemporalTableWithLookupThresholdWithInsufficientRetry(): Unit = { + val maxRetryOnceHint = getRetryLookupHint("user_table_with_lookup_threshold3", 1) + val sink = new TestingAppendSink + tEnv + .sqlQuery(s""" + |SELECT $maxRetryOnceHint T.id, T.len, T.content, D.name FROM src AS T + |JOIN user_table_with_lookup_threshold3 for system_time as of T.proctime AS D + |ON T.id = D.id + |""".stripMargin) + .toAppendStream[Row] + .addSink(sink) + env.execute() + + val expected = if (legacyTableSource || cacheType == LookupCacheType.FULL) { + // legacy lookup source and full caching lookup do not support retry + Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian") + } else { + // the user_table_with_lookup_threshold3 will return null result before 3rd lookup + Seq() + } + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + + @Test + def testJoinTemporalTableWithLookupThresholdWithSufficientRetry(): Unit = { + val maxRetryTwiceHint = getRetryLookupHint("user_table_with_lookup_threshold2", 2) + + val sink = new TestingAppendSink + tEnv + .sqlQuery(s""" + |SELECT $maxRetryTwiceHint T.id, T.len, T.content, D.name FROM src AS T + |JOIN user_table_with_lookup_threshold2 for system_time as of T.proctime AS D + |ON T.id = D.id + |""".stripMargin) + .toAppendStream[Row] + .addSink(sink) + env.execute() + + val expected = Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + + @Test + def testJoinTemporalTableWithLookupThresholdWithLargerRetry(): Unit = { + // max times beyond the lookup threshold of 'user_table_with_lookup_threshold2' + val largerRetryHint = getRetryLookupHint("user_table_with_lookup_threshold2", 10) + + val sink = new TestingAppendSink + tEnv + .sqlQuery(s""" + |SELECT $largerRetryHint T.id, T.len, T.content, D.name FROM src AS T + |JOIN user_table_with_lookup_threshold2 for system_time as of T.proctime AS D + |ON T.id = D.id + |""".stripMargin) + .toAppendStream[Row] + .addSink(sink) + env.execute() + + val expected = Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } } object LookupJoinITCase {