This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit be6b1c94ef3f552c753746863cb0a4e7dd86d2fc Author: lincoln lee <lincoln.8...@gmail.com> AuthorDate: Fri Sep 9 17:48:04 2022 +0800 [FLINK-28850][table-planner] Support table alias in LOOKUP hint This closes #20800 --- .../content.zh/docs/dev/table/sql/queries/hints.md | 2 +- docs/content/docs/dev/table/sql/queries/hints.md | 2 +- .../flink/table/planner/hint/FlinkHints.java | 1 - .../nodes/exec/stream/LookupJoinJsonPlanTest.java | 10 +- .../plan/stream/sql/join/LookupJoinTest.xml | 260 +++++++++++++++++---- .../plan/stream/sql/join/LookupJoinTest.scala | 66 ++++-- .../runtime/stream/sql/AsyncLookupJoinITCase.scala | 6 +- .../runtime/stream/sql/LookupJoinITCase.scala | 8 +- 8 files changed, 275 insertions(+), 80 deletions(-) diff --git a/docs/content.zh/docs/dev/table/sql/queries/hints.md b/docs/content.zh/docs/dev/table/sql/queries/hints.md index 9e6ce8b3cce..7cfec52207b 100644 --- a/docs/content.zh/docs/dev/table/sql/queries/hints.md +++ b/docs/content.zh/docs/dev/table/sql/queries/hints.md @@ -331,7 +331,7 @@ LOOKUP 联接提示允许用户建议 Flink 优化器: {{< hint info >}} 注意:其中 -- 'table' 是必选项,需要填写目标联接表的表名(和 FROM 子句引用的表名保持一致),注意当前不支持填写表的别名(这将在后续版本中支持)。 +- 'table' 是必选项,需要填写目标联接表的表名(和 FROM 子句引用的表名保持一致),注意如果表定义了别名,则提示选项必须使用别名。 - 异步查找参数可按需设置一个或多个,未设置的参数按默认值生效。 - 重试查找参数没有默认值,在需要开启时所有参数都必须设置为有效值。 {{< /hint >}} diff --git a/docs/content/docs/dev/table/sql/queries/hints.md b/docs/content/docs/dev/table/sql/queries/hints.md index 5cc8723112e..33eebcfb75a 100644 --- a/docs/content/docs/dev/table/sql/queries/hints.md +++ b/docs/content/docs/dev/table/sql/queries/hints.md @@ -338,7 +338,7 @@ The LOOKUP hint allows users to suggest the Flink optimizer to: {{< hint info >}} Note: -- 'table' option is required, only table name is supported(keep consistent with which in the FROM clause), alias name is not supported currently(will be supported in later versions). +- 'table' option is required, only table name is supported(keep consistent with which in the FROM clause), note that only alias name can be used if table has an alias name. - async options are all optional, will use default value if not configured. - there is no default value for retry options, all retry options should be set to valid values when need to enable retry. {{< /hint >}} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java index 7463ac22893..4e8ce7a7fcb 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java @@ -114,7 +114,6 @@ public abstract class FlinkHints { public static boolean canTransposeToTableScan(RelNode node) { return node instanceof LogicalProject // computed column on table || node instanceof LogicalFilter - // TODO support lookup join hint with alias name in FLINK-28850 || node instanceof LogicalSnapshot; } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java index 05b9200280a..a68eee6357a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java @@ -201,7 +201,7 @@ public class LookupJoinJsonPlanTest extends TableTestBase { // LookupTable has sync func only, just verify the hint has take effect util.verifyJsonPlan( "INSERT INTO MySink1 SELECT " - + "/*+ LOOKUP('table'='LookupTable', 'async'='true', 'output-mode'='allow_unordered') */ * " + + "/*+ LOOKUP('table'='D', 'async'='true', 'output-mode'='allow_unordered') */ * " + "FROM MyTable AS T JOIN LookupTable " + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"); } @@ -211,7 +211,7 @@ public class LookupJoinJsonPlanTest extends TableTestBase { // LookupTable has sync func only, just verify the hint has take effect util.verifyJsonPlan( "INSERT INTO MySink1 SELECT " - + "/*+ LOOKUP('table'='LookupTable', 'async'='true', 'timeout'='600s', 'capacity'='1000') */ * " + + "/*+ LOOKUP('table'='D', 'async'='true', 'timeout'='600s', 'capacity'='1000') */ * " + "FROM MyTable AS T JOIN LookupTable " + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"); } @@ -220,7 +220,7 @@ public class LookupJoinJsonPlanTest extends TableTestBase { public void testJoinTemporalTableWithRetryHint() { util.verifyJsonPlan( "INSERT INTO MySink1 SELECT " - + "/*+ LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * " + + "/*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * " + "FROM MyTable AS T JOIN LookupTable " + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"); } @@ -230,7 +230,7 @@ public class LookupJoinJsonPlanTest extends TableTestBase { // LookupTable has sync func only, just verify the hint has take effect util.verifyJsonPlan( "INSERT INTO MySink1 SELECT " - + "/*+ LOOKUP('table'='LookupTable', 'async'='true', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * " + + "/*+ LOOKUP('table'='D', 'async'='true', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * " + "FROM MyTable AS T JOIN LookupTable " + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"); } @@ -240,7 +240,7 @@ public class LookupJoinJsonPlanTest extends TableTestBase { // LookupTable has sync func only, just verify the hint has take effect util.verifyJsonPlan( "INSERT INTO MySink1 SELECT " - + "/*+ LOOKUP('table'='LookupTable', 'async'='true', 'timeout'='600s', 'capacity'='1000', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * " + + "/*+ LOOKUP('table'='D', 'async'='true', 'timeout'='600s', 'capacity'='1000', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * " + "FROM MyTable AS T JOIN LookupTable " + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"); } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml index c0626e94636..90ab534634e 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml @@ -536,12 +536,12 @@ GroupAggregate(groupBy=[b], select=[b, COUNT_RETRACT(a) AS EXPR$1, SUM_RETRACT(c </TestCase> <TestCase name="testJoinAsyncTableWithAsyncHint[LegacyTableSource=false]"> <Resource name="sql"> - <![CDATA[SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'async'='true') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]> + <![CDATA[SELECT /*+ LOOKUP('table'='D', 'async'='true') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]> </Resource> <Resource name="ast"> <![CDATA[ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7]) -+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, table=AsyncLookupTable}]]]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, table=D}]]]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) +- LogicalFilter(condition=[=($cor0.a, $0)]) +- LogicalSnapshot(period=[$cor0.proctime]) @@ -558,12 +558,12 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n </TestCase> <TestCase name="testJoinAsyncTableWithAsyncHint[LegacyTableSource=true]"> <Resource name="sql"> - <![CDATA[SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'async'='true') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]> + <![CDATA[SELECT /*+ LOOKUP('table'='D', 'async'='true') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]> </Resource> <Resource name="ast"> <![CDATA[ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7]) -+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, table=AsyncLookupTable}]]]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, table=D}]]]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) +- LogicalFilter(condition=[=($cor0.a, $0)]) +- LogicalSnapshot(period=[$cor0.proctime]) @@ -580,12 +580,12 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n </TestCase> <TestCase name="testJoinAsyncTableWithSyncHint[LegacyTableSource=false]"> <Resource name="sql"> - <![CDATA[SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'async'='false') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]> + <![CDATA[SELECT /*+ LOOKUP('table'='D', 'async'='false') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]> </Resource> <Resource name="ast"> <![CDATA[ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7]) -+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=false, table=AsyncLookupTable}]]]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=false, table=D}]]]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) +- LogicalFilter(condition=[=($cor0.a, $0)]) +- LogicalSnapshot(period=[$cor0.proctime]) @@ -602,12 +602,12 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n </TestCase> <TestCase name="testJoinAsyncTableWithSyncHint[LegacyTableSource=true]"> <Resource name="sql"> - <![CDATA[SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'async'='false') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]> + <![CDATA[SELECT /*+ LOOKUP('table'='D', 'async'='false') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]> </Resource> <Resource name="ast"> <![CDATA[ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7]) -+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=false, table=AsyncLookupTable}]]]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=false, table=D}]]]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) +- LogicalFilter(condition=[=($cor0.a, $0)]) +- LogicalSnapshot(period=[$cor0.proctime]) @@ -619,12 +619,56 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], nam Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age]) +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age], async=[ORDERED, 180000ms, 100]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testJoinHintWithTableAlias[LegacyTableSource=false]"> + <Resource name="sql"> + <![CDATA[SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=D}]]]) + :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) + +- LogicalFilter(condition=[=($cor0.a, $0)]) + +- LogicalSnapshot(period=[$cor0.proctime]) + +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age]) ++- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testJoinHintWithTableAlias[LegacyTableSource=true]"> + <Resource name="sql"> + <![CDATA[SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=D}]]]) + :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) + +- LogicalFilter(condition=[=($cor0.a, $0)]) + +- LogicalSnapshot(period=[$cor0.proctime]) + +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable, source: [TestTemporalTable(id, name, age)]]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age]) ++- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) ]]> </Resource> </TestCase> <TestCase name="testJoinHintWithTableNameOnly[LegacyTableSource=false]"> <Resource name="sql"> - <![CDATA[SELECT /*+ LOOKUP('table'='LookupTable') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]> + <![CDATA[SELECT /*+ LOOKUP('table'='LookupTable') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime ON T.a = LookupTable.id]]> </Resource> <Resource name="ast"> <![CDATA[ @@ -633,7 +677,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], nam :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) +- LogicalFilter(condition=[=($cor0.a, $0)]) +- LogicalSnapshot(period=[$cor0.proctime]) - +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]]) + +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]], hints=[[[ALIAS inheritPath:[] options:[LookupTable]]]]) ]]> </Resource> <Resource name="optimized exec plan"> @@ -646,7 +690,7 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n </TestCase> <TestCase name="testJoinHintWithTableNameOnly[LegacyTableSource=true]"> <Resource name="sql"> - <![CDATA[SELECT /*+ LOOKUP('table'='LookupTable') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]> + <![CDATA[SELECT /*+ LOOKUP('table'='LookupTable') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime ON T.a = LookupTable.id]]> </Resource> <Resource name="ast"> <![CDATA[ @@ -655,7 +699,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], nam :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) +- LogicalFilter(condition=[=($cor0.a, $0)]) +- LogicalSnapshot(period=[$cor0.proctime]) - +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable, source: [TestTemporalTable(id, name, age)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable, source: [TestTemporalTable(id, name, age)]]], hints=[[[ALIAS inheritPath:[] options:[LookupTable]]]]) ]]> </Resource> <Resource name="optimized exec plan"> @@ -668,12 +712,12 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n </TestCase> <TestCase name="testJoinSyncTableWithAsyncHint[LegacyTableSource=false]"> <Resource name="sql"> - <![CDATA[SELECT /*+ LOOKUP('table'='LookupTable', 'async'='true') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]> + <![CDATA[SELECT /*+ LOOKUP('table'='D', 'async'='true') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]> </Resource> <Resource name="ast"> <![CDATA[ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7]) -+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, table=LookupTable}]]]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, table=D}]]]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) +- LogicalFilter(condition=[=($cor0.a, $0)]) +- LogicalSnapshot(period=[$cor0.proctime]) @@ -690,12 +734,12 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n </TestCase> <TestCase name="testJoinSyncTableWithAsyncHint[LegacyTableSource=true]"> <Resource name="sql"> - <![CDATA[SELECT /*+ LOOKUP('table'='LookupTable', 'async'='true') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]> + <![CDATA[SELECT /*+ LOOKUP('table'='D', 'async'='true') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]> </Resource> <Resource name="ast"> <![CDATA[ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7]) -+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, table=LookupTable}]]]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, table=D}]]]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) +- LogicalFilter(condition=[=($cor0.a, $0)]) +- LogicalSnapshot(period=[$cor0.proctime]) @@ -787,7 +831,7 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n <![CDATA[== Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, age]) +- LogicalProject(a=[$0], name=[$6], age=[$7]) - +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, time-out=600s, max-attempts=3, output-mode=allow_unordered, fixed-delay=10s, retry-predicate=lookup_miss, table=AsyncLookupTable, capacity=300}]]]) + +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, time-out=600s, max-attempts=3, output-mode=allow_unordered, fixed-delay=10s, retry-predicate=lookup_miss, table=D, capacity=300}]]]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) +- LogicalFilter(condition=[=($cor0.a, $0)]) +- LogicalSnapshot(period=[$cor0.proctime]) @@ -1490,7 +1534,7 @@ Calc(select=[a, b, c, name]) <![CDATA[== Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, age]) +- LogicalProject(a=[$0], name=[$6], age=[$7]) - +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, time-out=600s, max-attempts=3, output-mode=allow_unordered, fixed-delay=10s, retry-predicate=lookup_miss, table=AsyncLookupTable, capacity=300}]]]) + +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, time-out=600s, max-attempts=3, output-mode=allow_unordered, fixed-delay=10s, retry-predicate=lookup_miss, table=D, capacity=300}]]]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) +- LogicalFilter(condition=[=($cor0.a, $0)]) +- LogicalSnapshot(period=[$cor0.proctime]) @@ -1582,7 +1626,7 @@ Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age]) <![CDATA[== Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, age]) +- LogicalProject(a=[$0], name=[$6], age=[$7]) - +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{time-out=600s, output-mode=allow_unordered, table=AsyncLookupTable, capacity=300}]]]) + +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{time-out=600s, output-mode=allow_unordered, table=D, capacity=300}]]]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) +- LogicalFilter(condition=[=($cor0.a, $0)]) +- LogicalSnapshot(period=[$cor0.proctime]) @@ -1699,7 +1743,7 @@ Calc(select=[a, b, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, age]) <![CDATA[== Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, age]) +- LogicalProject(a=[$0], name=[$6], age=[$7]) - +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{time-out=600s, output-mode=allow_unordered, table=AsyncLookupTable, capacity=300}]]]) + +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{time-out=600s, output-mode=allow_unordered, table=D, capacity=300}]]]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) +- LogicalFilter(condition=[=($cor0.a, $0)]) +- LogicalSnapshot(period=[$cor0.proctime]) @@ -1791,7 +1835,7 @@ Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age]) <![CDATA[== Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, age]) +- LogicalProject(a=[$0], name=[$6], age=[$7]) - +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]]) + +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=D}]]]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) +- LogicalFilter(condition=[=($cor0.a, $0)]) +- LogicalSnapshot(period=[$cor0.proctime]) @@ -1883,7 +1927,7 @@ Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age]) <![CDATA[== Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, age]) +- LogicalProject(a=[$0], name=[$6], age=[$7]) - +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]]) + +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=D}]]]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) +- LogicalFilter(condition=[=($cor0.a, $0)]) +- LogicalSnapshot(period=[$cor0.proctime]) @@ -1995,11 +2039,11 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n <TestCase name="testMultipleJoinHintsWithSameTableName[LegacyTableSource=true]"> <Resource name="sql"> <![CDATA[ -SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'), - LOOKUP('table'='AsyncLookupTable', 'output-mode'='ordered') */ * -FROM MyTable AS T -JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D - ON T.a = D.id +SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'), + LOOKUP('table'='AsyncLookupTable', 'output-mode'='ordered') */ * +FROM MyTable AS T +JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime + ON T.a = AsyncLookupTable.id ]]> </Resource> <Resource name="ast"> @@ -2009,7 +2053,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], nam :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) +- LogicalFilter(condition=[=($cor0.a, $0)]) +- LogicalSnapshot(period=[$cor0.proctime]) - +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable, source: [TestTemporalTable(id, name, age)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable, source: [TestTemporalTable(id, name, age)]]], hints=[[[ALIAS inheritPath:[] options:[AsyncLookupTable]]]]) ]]> </Resource> <Resource name="optimized exec plan"> @@ -2042,12 +2086,12 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n ]]> </Resource> </TestCase> - <TestCase name="testMultipleJoinHintsWithDifferentTableName[LegacyTableSource=false]"> + <TestCase name="testMultipleJoinHintsWithDifferentTableAlias[LegacyTableSource=false]"> <Resource name="sql"> <![CDATA[ -SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'), - LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * -FROM MyTable AS T +SELECT /*+ LOOKUP('table'='D', 'output-mode'='allow_unordered'), + LOOKUP('table'='D1', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * +FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D1 @@ -2057,8 +2101,8 @@ JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D1 <Resource name="ast"> <![CDATA[ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7], id0=[$8], name0=[$9], age0=[$10]) -+- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]]) - :- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0, 0] options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP inheritPath:[0, 0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]]) ++- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{output-mode=allow_unordered, table=D}][LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=D1}]]]) + :- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0, 0] options:{output-mode=allow_unordered, table=D}][LOOKUP inheritPath:[0, 0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=D1}]]]) : :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) : +- LogicalFilter(condition=[=($cor0.a, $0)]) : +- LogicalSnapshot(period=[$cor0.proctime]) @@ -2071,6 +2115,41 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], nam <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age, id0, name0, age0]) ++- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age, id0, name0, age0], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3]) + +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age], async=[UNORDERED, 180000ms, 100]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultipleJoinHintsWithDifferentTableName[LegacyTableSource=false]"> + <Resource name="sql"> + <![CDATA[ +SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'), + LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * +FROM MyTable AS T +JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime + ON T.a = AsyncLookupTable.id +JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime + ON T.a = LookupTable.id + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7], id0=[$8], name0=[$9], age0=[$10]) ++- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]]) + :- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0, 0] options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP inheritPath:[0, 0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]]) + : :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) + : +- LogicalFilter(condition=[=($cor0.a, $0)]) + : +- LogicalSnapshot(period=[$cor0.proctime]) + : +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable]], hints=[[[ALIAS inheritPath:[] options:[AsyncLookupTable]]]]) + +- LogicalFilter(condition=[=($cor1.a, $0)]) + +- LogicalSnapshot(period=[$cor1.proctime]) + +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]], hints=[[[ALIAS inheritPath:[] options:[LookupTable]]]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age, id0, name0, age0]) +- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age, id0, name0, age0], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3]) +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age], async=[UNORDERED, 180000ms, 100]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -2080,9 +2159,100 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n <TestCase name="testMultipleJoinHintsWithDifferentTableName[LegacyTableSource=true]"> <Resource name="sql"> <![CDATA[ -SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'), - LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * -FROM MyTable AS T +SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'), + LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * +FROM MyTable AS T +JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime + ON T.a = AsyncLookupTable.id +JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime + ON T.a = LookupTable.id + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7], id0=[$8], name0=[$9], age0=[$10]) ++- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]]) + :- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0, 0] options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP inheritPath:[0, 0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]]) + : :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) + : +- LogicalFilter(condition=[=($cor0.a, $0)]) + : +- LogicalSnapshot(period=[$cor0.proctime]) + : +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable, source: [TestTemporalTable(id, name, age)]]], hints=[[[ALIAS inheritPath:[] options:[AsyncLookupTable]]]]) + +- LogicalFilter(condition=[=($cor1.a, $0)]) + +- LogicalSnapshot(period=[$cor1.proctime]) + +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable, source: [TestTemporalTable(id, name, age)]]], hints=[[[ALIAS inheritPath:[] options:[LookupTable]]]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age, id0, name0, age0]) ++- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age, id0, name0, age0], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3]) + +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age], async=[UNORDERED, 180000ms, 100]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultipleJoinHintsWithSameTableAlias[LegacyTableSource=false]"> + <Resource name="sql"> + <![CDATA[ +SELECT /*+ LOOKUP('table'='D', 'output-mode'='allow_unordered'), + LOOKUP('table'='D', 'output-mode'='ordered') */ * +FROM MyTable AS T +JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D + ON T.a = D.id + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{output-mode=allow_unordered, table=D}][LOOKUP inheritPath:[0] options:{output-mode=ordered, table=D}]]]) + :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) + +- LogicalFilter(condition=[=($cor0.a, $0)]) + +- LogicalSnapshot(period=[$cor0.proctime]) + +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age]) ++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age], async=[UNORDERED, 180000ms, 100]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultipleJoinHintsWithSameTableAlias[LegacyTableSource=true]"> + <Resource name="sql"> + <![CDATA[ +SELECT /*+ LOOKUP('table'='D', 'output-mode'='allow_unordered'), + LOOKUP('table'='D', 'output-mode'='ordered') */ * +FROM MyTable AS T +JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D + ON T.a = D.id + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{output-mode=allow_unordered, table=D}][LOOKUP inheritPath:[0] options:{output-mode=ordered, table=D}]]]) + :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) + +- LogicalFilter(condition=[=($cor0.a, $0)]) + +- LogicalSnapshot(period=[$cor0.proctime]) + +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable, source: [TestTemporalTable(id, name, age)]]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age]) ++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age], async=[UNORDERED, 180000ms, 100]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultipleJoinHintsWithDifferentTableAlias[LegacyTableSource=true]"> + <Resource name="sql"> + <![CDATA[ +SELECT /*+ LOOKUP('table'='D', 'output-mode'='allow_unordered'), + LOOKUP('table'='D1', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * +FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D1 @@ -2092,8 +2262,8 @@ JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D1 <Resource name="ast"> <![CDATA[ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7], id0=[$8], name0=[$9], age0=[$10]) -+- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]]) - :- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0, 0] options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP inheritPath:[0, 0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]]) ++- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{output-mode=allow_unordered, table=D}][LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=D1}]]]) + :- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0, 0] options:{output-mode=allow_unordered, table=D}][LOOKUP inheritPath:[0, 0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=D1}]]]) : :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) : +- LogicalFilter(condition=[=($cor0.a, $0)]) : +- LogicalSnapshot(period=[$cor0.proctime]) @@ -2115,11 +2285,11 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n <TestCase name="testMultipleJoinHintsWithSameTableName[LegacyTableSource=false]"> <Resource name="sql"> <![CDATA[ -SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'), - LOOKUP('table'='AsyncLookupTable', 'output-mode'='ordered') */ * -FROM MyTable AS T -JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D - ON T.a = D.id +SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'), + LOOKUP('table'='AsyncLookupTable', 'output-mode'='ordered') */ * +FROM MyTable AS T +JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime + ON T.a = AsyncLookupTable.id ]]> </Resource> <Resource name="ast"> @@ -2129,7 +2299,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], nam :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]]) +- LogicalFilter(condition=[=($cor0.a, $0)]) +- LogicalSnapshot(period=[$cor0.proctime]) - +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable]]) + +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable]], hints=[[[ALIAS inheritPath:[] options:[AsyncLookupTable]]]]) ]]> </Resource> <Resource name="optimized exec plan"> diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala index f793bc68e73..ac72487ad98 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala @@ -750,20 +750,16 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri @Test def testJoinHintWithTableAlias(): Unit = { - // TODO to be supported in FLINK-28850 (to make LogicalSnapshot Hintable) - thrown.expectMessage( - "The options of following hints cannot match the name of input tables or" + - " views: \n`D` in `LOOKUP`") - thrown.expect(classOf[ValidationException]) - val sql = "SELECT /*+ LOOKUP('table'='D') */ * FROM MyTable AS T JOIN LookupTable " + - "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id" + val sql = + "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * FROM MyTable AS T JOIN LookupTable " + + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id" util.verifyExecPlan(sql) } @Test def testJoinHintWithTableNameOnly(): Unit = { val sql = "SELECT /*+ LOOKUP('table'='LookupTable') */ * FROM MyTable AS T JOIN LookupTable " + - "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id" + "FOR SYSTEM_TIME AS OF T.proctime ON T.a = LookupTable.id" util.verifyExecPlan(sql) } @@ -772,9 +768,23 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri // only the first hint will take effect val sql = """ - |SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'), - | LOOKUP('table'='AsyncLookupTable', 'output-mode'='ordered') */ * - |FROM MyTable AS T + |SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'), + | LOOKUP('table'='AsyncLookupTable', 'output-mode'='ordered') */ * + |FROM MyTable AS T + |JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime + | ON T.a = AsyncLookupTable.id + """.stripMargin + util.verifyExecPlan(sql) + } + + @Test + def testMultipleJoinHintsWithSameTableAlias(): Unit = { + // only the first hint will take effect + val sql = + """ + |SELECT /*+ LOOKUP('table'='D', 'output-mode'='allow_unordered'), + | LOOKUP('table'='D', 'output-mode'='ordered') */ * + |FROM MyTable AS T |JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D | ON T.a = D.id """.stripMargin @@ -786,9 +796,25 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri // both hints on corresponding tables will take effect val sql = """ - |SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'), - | LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * - |FROM MyTable AS T + |SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'), + | LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * + |FROM MyTable AS T + |JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime + | ON T.a = AsyncLookupTable.id + |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime + | ON T.a = LookupTable.id + """.stripMargin + util.verifyExecPlan(sql) + } + + @Test + def testMultipleJoinHintsWithDifferentTableAlias(): Unit = { + // both hints on corresponding tables will take effect + val sql = + """ + |SELECT /*+ LOOKUP('table'='D', 'output-mode'='allow_unordered'), + | LOOKUP('table'='D1', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * + |FROM MyTable AS T |JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D | ON T.a = D.id |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D1 @@ -800,7 +826,7 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri @Test def testJoinSyncTableWithAsyncHint(): Unit = { val sql = - "SELECT /*+ LOOKUP('table'='LookupTable', 'async'='true') */ * FROM MyTable AS T JOIN LookupTable " + + "SELECT /*+ LOOKUP('table'='D', 'async'='true') */ * FROM MyTable AS T JOIN LookupTable " + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id" util.verifyExecPlan(sql) } @@ -808,7 +834,7 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri @Test def testJoinAsyncTableWithAsyncHint(): Unit = { val sql = - "SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'async'='true') */ * " + + "SELECT /*+ LOOKUP('table'='D', 'async'='true') */ * " + "FROM MyTable AS T JOIN AsyncLookupTable " + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id" util.verifyExecPlan(sql) @@ -817,7 +843,7 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri @Test def testJoinAsyncTableWithSyncHint(): Unit = { val sql = - "SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'async'='false') */ * " + + "SELECT /*+ LOOKUP('table'='D', 'async'='false') */ * " + "FROM MyTable AS T JOIN AsyncLookupTable " + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id" util.verifyExecPlan(sql) @@ -883,7 +909,7 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri stmt.addInsertSql( """ |INSERT INTO Sink1 - |SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered', 'time-out'='600s', 'capacity'='300') */ + |SELECT /*+ LOOKUP('table'='D', 'output-mode'='allow_unordered', 'time-out'='600s', 'capacity'='300') */ | T.a, D.name, D.age |FROM MyTable T |JOIN AsyncLookupTable @@ -899,7 +925,7 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri stmt.addInsertSql( """ |INSERT INTO Sink1 - |SELECT /*+ LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ + |SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ | T.a, D.name, D.age |FROM MyTable T |JOIN LookupTable @@ -915,7 +941,7 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri stmt.addInsertSql( """ |INSERT INTO Sink1 - |SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered', 'time-out'='600s', 'capacity'='300', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ + |SELECT /*+ LOOKUP('table'='D', 'output-mode'='allow_unordered', 'time-out'='600s', 'capacity'='300', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ | T.a, D.name, D.age |FROM MyTable T |JOIN AsyncLookupTable diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala index 864c01527bd..c2e8e2e606b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala @@ -444,7 +444,7 @@ class AsyncLookupJoinITCase( @Test def testAsyncJoinTemporalTableWithRetry(): Unit = { - val maxRetryTwiceHint = getAsyncRetryLookupHint("user_table", 2) + val maxRetryTwiceHint = getAsyncRetryLookupHint("D", 2) val sink = new TestingAppendSink tEnv .sqlQuery(s""" @@ -463,7 +463,7 @@ class AsyncLookupJoinITCase( @Test def testAsyncJoinTemporalTableWithLookupThresholdWithInsufficientRetry(): Unit = { - val maxRetryOnceHint = getAsyncRetryLookupHint("user_table_with_lookup_threshold3", 1) + val maxRetryOnceHint = getAsyncRetryLookupHint("D", 1) val sink = new TestingAppendSink tEnv .sqlQuery(s""" @@ -492,7 +492,7 @@ class AsyncLookupJoinITCase( // retry work, but due the fast finish of testing bounded source, it has no assurance of the // max attempts number, it only ensures at least one retry for each element in current version // so we can only use a max lookup threshold to 2 to get a deterministic results - val maxRetryTwiceHint = getAsyncRetryLookupHint("user_table_with_lookup_threshold2", 2) + val maxRetryTwiceHint = getAsyncRetryLookupHint("D", 2) val sink = new TestingAppendSink tEnv diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala index 39efec4a146..08dec3d4321 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala @@ -740,7 +740,7 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType) @Test def testJoinTemporalTableWithRetry(): Unit = { - val maxRetryTwiceHint = getRetryLookupHint("user_table", 2) + val maxRetryTwiceHint = getRetryLookupHint("D", 2) val sink = new TestingAppendSink tEnv .sqlQuery(s""" @@ -759,7 +759,7 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType) @Test def testJoinTemporalTableWithLookupThresholdWithInsufficientRetry(): Unit = { - val maxRetryOnceHint = getRetryLookupHint("user_table_with_lookup_threshold3", 1) + val maxRetryOnceHint = getRetryLookupHint("D", 1) val sink = new TestingAppendSink tEnv .sqlQuery(s""" @@ -783,7 +783,7 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType) @Test def testJoinTemporalTableWithLookupThresholdWithSufficientRetry(): Unit = { - val maxRetryTwiceHint = getRetryLookupHint("user_table_with_lookup_threshold2", 2) + val maxRetryTwiceHint = getRetryLookupHint("D", 2) val sink = new TestingAppendSink tEnv @@ -803,7 +803,7 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType) @Test def testJoinTemporalTableWithLookupThresholdWithLargerRetry(): Unit = { // max times beyond the lookup threshold of 'user_table_with_lookup_threshold2' - val largerRetryHint = getRetryLookupHint("user_table_with_lookup_threshold2", 10) + val largerRetryHint = getRetryLookupHint("D", 10) val sink = new TestingAppendSink tEnv