This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new be509e6d674 [FLINK-33064][table-planner] Improve the error message when the lookup source is used as the scan source (#23377) be509e6d674 is described below commit be509e6d67471d886e58d3ddea6ddd3627a191a8 Author: yunhong <337361...@qq.com> AuthorDate: Wed Sep 13 13:58:04 2023 +0800 [FLINK-33064][table-planner] Improve the error message when the lookup source is used as the scan source (#23377) Co-authored-by: zhengyunhong.zyh <zhengyunhong....@alibaba-inc.com> --- .../TestDynamicTableSourceOnlyFactory.java | 41 ++++- .../apache/calcite/sql2rel/SqlToRelConverter.java | 172 +++++++++++++++++---- .../planner/plan/utils/TemporalTableJoinUtil.java | 14 ++ .../rules/common/CommonTemporalTableJoinRule.scala | 17 +- .../table/planner/catalog/CatalogTableITCase.scala | 52 ++++--- .../planner/plan/stream/sql/TableScanTest.scala | 8 +- 6 files changed, 236 insertions(+), 68 deletions(-) diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableSourceOnlyFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableSourceOnlyFactory.java index 7aa78b6492e..6d3af4650d3 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableSourceOnlyFactory.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableSourceOnlyFactory.java @@ -19,7 +19,10 @@ package org.apache.flink.table.factories; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; import java.util.Collections; import java.util.Set; @@ -32,9 +35,14 @@ public final class TestDynamicTableSourceOnlyFactory implements DynamicTableSour public static final String IDENTIFIER = "source-only"; + private static final ConfigOption<Boolean> BOUNDED = + ConfigOptions.key("bounded").booleanType().defaultValue(false); + @Override public DynamicTableSource createDynamicTableSource(Context context) { - return null; + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + boolean isBounded = helper.getOptions().get(BOUNDED); + return new MockedScanTableSource(isBounded); } @Override @@ -49,6 +57,35 @@ public final class TestDynamicTableSourceOnlyFactory implements DynamicTableSour @Override public Set<ConfigOption<?>> optionalOptions() { - return Collections.emptySet(); + return Collections.singleton(BOUNDED); + } + + /** A mocked {@link ScanTableSource} for validation test. */ + private static class MockedScanTableSource implements ScanTableSource { + private final boolean isBounded; + + private MockedScanTableSource(boolean isBounded) { + this.isBounded = isBounded; + } + + @Override + public DynamicTableSource copy() { + return null; + } + + @Override + public String asSummaryString() { + return null; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + return () -> isBounded; + } } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java index 71cafa91308..67663ce0984 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java @@ -17,11 +17,16 @@ package org.apache.calcite.sql2rel; import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.planner.alias.ClearJoinHintWithInvalidPropagationShuttle; import org.apache.flink.table.planner.calcite.TimestampSchemaVersion; import org.apache.flink.table.planner.hint.FlinkHints; import org.apache.flink.table.planner.plan.FlinkCalciteCatalogSnapshotReader; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.utils.TemporalTableJoinUtil; import org.apache.flink.table.planner.utils.ShortcutUtils; import com.google.common.base.Preconditions; @@ -64,6 +69,7 @@ import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.RelFactories; import org.apache.calcite.rel.core.Sample; import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.hint.HintStrategyTable; import org.apache.calcite.rel.hint.Hintable; import org.apache.calcite.rel.hint.RelHint; @@ -235,12 +241,17 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * <p>FLINK modifications are at lines * * <ol> - * <li>Added in FLINK-29081, FLINK-28682: Lines 644 ~ 654 - * <li>Added in FLINK-28682: Lines 2277 ~ 2294 - * <li>Added in FLINK-28682: Lines 2331 ~ 2359 - * <li>Added in FLINK-20873: Lines 5484 ~ 5493 - * <li>Added in FLINK-32474: Lines 2841 ~ 2853 - * <li>Added in FLINK-32474: Lines 2953 ~ 2987 + * <li>Added in FLINK-29081, FLINK-28682: Lines 667 ~ 677 + * <li>Added in FLINK-28682: Lines 2300 ~ 2317 + * <li>Added in FLINK-28682: Lines 2354 ~ 2382 + * <li>Added in FLINK-32474: Lines 2932 ~ 2944 + * <li>Added in FLINK-32474: Lines 3057 ~ 3093 + * <li>Added in FLINK-20873: Lines 5594 ~ 5603 + * <li>Added in FLINK-33064: Lines 2431 ~ 2475 + * <li>Added in FLINK-33064: Lines 2481 ~ 2570 + * <li>Added in FLINK-33064: Lines 2970 ~ 2978 + * <li>Added in FLINK-33064: Lines 3050 ~ 3052 + * <li>Added in FLINK-33064: Lines 4102 ~ 4107 * </ol> */ @SuppressWarnings("UnstableApiUsage") @@ -2417,25 +2428,17 @@ public class SqlToRelConverter { throw new AssertionError("unknown TABLESAMPLE type: " + sampleSpec); } return; - + // ----- FLINK MODIFICATION BEGIN ----- case TABLE_REF: - call = (SqlCall) from; - convertIdentifier(bb, call.operand(0), null, call.operand(1), null); + convertTableRef(bb, from, false); return; case IDENTIFIER: - convertIdentifier(bb, (SqlIdentifier) from, null, null, null); + convertIdentifier(bb, (SqlIdentifier) from, null, null, null, false); return; case EXTEND: - call = (SqlCall) from; - final SqlNode operand0 = call.getOperandList().get(0); - final SqlIdentifier id = - operand0.getKind() == SqlKind.TABLE_REF - ? ((SqlCall) operand0).operand(0) - : (SqlIdentifier) operand0; - SqlNodeList extendedColumns = (SqlNodeList) call.getOperandList().get(1); - convertIdentifier(bb, id, extendedColumns, null, null); + convertExtend(bb, from, false); return; case SNAPSHOT: @@ -2466,19 +2469,106 @@ public class SqlToRelConverter { return; case COLLECTION_TABLE: - call = (SqlCall) from; - - // Dig out real call; TABLE() wrapper is just syntactic. - assert call.getOperandList().size() == 1; - final SqlCall call2 = call.operand(0); - convertCollectionTable(bb, call2); + convertCollectionTable(bb, from, false); return; + // ----- FLINK MODIFICATION END ----- default: throw new AssertionError("not a join operator " + from); } } + // ----- FLINK MODIFICATION BEGIN ----- + private void convertTableRef(Blackboard bb, SqlNode from, boolean isTemporalJoinRightSide) { + SqlCall call = (SqlCall) from; + convertIdentifier( + bb, call.operand(0), null, call.operand(1), null, isTemporalJoinRightSide); + } + + private void convertExtend(Blackboard bb, SqlNode from, boolean isTemporalJoinRightSide) { + SqlCall call = (SqlCall) from; + final SqlNode operand0 = call.getOperandList().get(0); + final SqlIdentifier id = + operand0.getKind() == SqlKind.TABLE_REF + ? ((SqlCall) operand0).operand(0) + : (SqlIdentifier) operand0; + SqlNodeList extendedColumns = (SqlNodeList) call.getOperandList().get(1); + convertIdentifier(bb, id, extendedColumns, null, null, isTemporalJoinRightSide); + } + + /** + * Converts a FROM clause into a relational expression for the right side in temporal join. The + * right side in temporary join is a special type of source which can support {@link + * org.apache.flink.table.connector.source.LookupTableSource}. So we need to distinguish it from + * the regular scan table source during convert. + * + * @param bb Scope within which to resolve identifiers + * @param from FROM clause of a query. Examples include: + * @param fieldNames Field aliases, usually come from AS clause, or null + */ + private void convertTemporalJoinRightSide( + Blackboard bb, @Nullable SqlNode from, @Nullable List<String> fieldNames) { + if (from == null) { + bb.setRoot(LogicalValues.createOneRow(cluster), false); + return; + } + + switch (from.getKind()) { + case TABLE_REF: + convertTableRef(bb, from, true); + return; + case IDENTIFIER: + convertIdentifier(bb, (SqlIdentifier) from, null, null, null, true); + return; + case EXTEND: + convertExtend(bb, from, true); + return; + case COLLECTION_TABLE: + convertCollectionTable(bb, from, true); + return; + default: + convertFrom(bb, from, fieldNames); + } + } + + /** + * Validate the input {@link RelNode} to judge if it is a legal source. For example, for a table + * source that only implements the {@link + * org.apache.flink.table.connector.source.LookupTableSource}, and doesn't implement the {@link + * ScanTableSource}, it can only be used as a right table ref in temporal join or lookup join + * and cannot be used as a scan table. + */ + private void validateScan(RelNode relNode, boolean isTemporalJoinRightSide) { + relNode.accept( + new RelShuttleImpl() { + @Override + public RelNode visit(TableScan scan) { + final RelOptTable table = scan.getTable(); + if (table instanceof TableSourceTable) { + final TableSourceTable sourceTable = + scan.getTable().unwrap(TableSourceTable.class); + assert sourceTable != null; + final DynamicTableSource dynamicTableSource = sourceTable.tableSource(); + if (!isTemporalJoinRightSide + && !(dynamicTableSource instanceof ScanTableSource)) { + throw new ValidationException( + String.format( + "The specified table source %s doesn't extend %s and can not be used " + + "as the scan source.\n" + + "Hint: You can read the data from the source as a dim table " + + "with the look up join syntax. Otherwise, please refer to " + + "the document and change the type of the connector to a " + + "source table that supports direct reads.", + sourceTable.contextResolvedTable().getIdentifier(), + ScanTableSource.class.getSimpleName())); + } + } + return scan; + } + }); + } + // ----- FLINK MODIFICATION END ----- + private void convertUnnest(Blackboard bb, SqlCall call, @Nullable List<String> fieldNames) { final List<SqlNode> nodes = call.getOperandList(); final SqlUnnestOperator operator = (SqlUnnestOperator) call.getOperator(); @@ -2830,7 +2920,8 @@ public class SqlToRelConverter { SqlIdentifier id, @Nullable SqlNodeList extendedColumns, @Nullable SqlNodeList tableHints, - @Nullable SchemaVersion schemaVersion) { + @Nullable SchemaVersion schemaVersion, + boolean isTemporalJoinRightSide) { final SqlValidatorNamespace fromNamespace = getNamespace(id).resolve(); if (fromNamespace.getNode() != null) { convertFrom(bb, fromNamespace.getNode()); @@ -2864,7 +2955,7 @@ public class SqlToRelConverter { hintStrategies.apply( SqlUtil.getRelHint(hintStrategies, tableHints), LogicalTableScan.create(cluster, table, ImmutableList.of())); - final RelNode tableRel = toRel(table, hints); + final RelNode tableRel = toRel(table, hints, isTemporalJoinRightSide); bb.setRoot(tableRel, true); if (RelOptUtil.isPureOrder(castNonNull(bb.root)) && removeSortInSubQuery(bb.top)) { @@ -2876,7 +2967,15 @@ public class SqlToRelConverter { } } - protected void convertCollectionTable(Blackboard bb, SqlCall call) { + // ----- FLINK MODIFICATION BEGIN ----- + protected void convertCollectionTable( + Blackboard bb, SqlNode from, boolean isTemporalJoinRightSide) { + SqlCall sqlCall = (SqlCall) from; + + // Dig out real call; TABLE() wrapper is just syntactic. + assert sqlCall.getOperandList().size() == 1; + final SqlCall call = sqlCall.operand(0); + // ----- FLINK MODIFICATION END ----- final SqlOperator operator = call.getOperator(); if (operator == SqlStdOperatorTable.TABLESAMPLE) { final String sampleName = SqlLiteral.unchain(call.operand(0)).getValueAs(String.class); @@ -2911,7 +3010,7 @@ public class SqlToRelConverter { RelOptTable relOptTable = RelOptTableImpl.create( null, rowType, udf.getNameAsId().names, table, expressionFunction); - RelNode converted = toRel(relOptTable, ImmutableList.of()); + RelNode converted = toRel(relOptTable, ImmutableList.of(), isTemporalJoinRightSide); bb.setRoot(converted, true); return; } @@ -2948,15 +3047,22 @@ public class SqlToRelConverter { final SqlSnapshot snapshot = (SqlSnapshot) call; final RexNode period = bb.convertExpression(snapshot.getPeriod()); + // ----- FLINK MODIFICATION BEGIN ----- + boolean isTemporalJoin = TemporalTableJoinUtil.isTemporalJoinSupportPeriod(period); + // ----- FLINK MODIFICATION END ----- + // convert inner query, could be a table name or a derived table SqlNode expr = snapshot.getTableRef(); + // ----- FLINK MODIFICATION BEGIN ----- SqlNode tableRef = snapshot.getTableRef(); // since we have reduced the period of SqlSnapshot in the validate phase, we only need to // check whether the period is a RexLiteral. // in most cases, tableRef is a SqlBasicCall and the first operand is a SqlIdentifier. // when using SQL Hints, tableRef will be a SqlTableRef. - if (((tableRef instanceof SqlBasicCall + if (isTemporalJoin) { + convertTemporalJoinRightSide(bb, expr, Collections.emptyList()); + } else if (((tableRef instanceof SqlBasicCall && ((SqlBasicCall) tableRef).operand(0) instanceof SqlIdentifier) || (tableRef instanceof SqlTableRef)) && period instanceof RexLiteral) { @@ -2980,7 +3086,7 @@ public class SqlToRelConverter { ? ((SqlBasicCall) tableRef).operand(0) : ((SqlTableRef) tableRef).operand(0); SchemaVersion schemaVersion = TimestampSchemaVersion.of(timeTravelTimestamp); - convertIdentifier(bb, sqlIdentifier, null, null, schemaVersion); + convertIdentifier(bb, sqlIdentifier, null, null, schemaVersion, false); } else { convertFrom(bb, expr); } @@ -3993,8 +4099,12 @@ public class SqlToRelConverter { return ViewExpanders.toRelContext(viewExpander, cluster, hints); } - public RelNode toRel(final RelOptTable table, final List<RelHint> hints) { + // ----- FLINK MODIFICATION BEGIN ----- + public RelNode toRel( + final RelOptTable table, final List<RelHint> hints, boolean isTemporalJoinRightSide) { final RelNode scan = table.toRel(createToRelContext(hints)); + validateScan(scan, isTemporalJoinRightSide); + // ----- FLINK MODIFICATION END ----- final InitializerExpressionFactory ief = table.maybeUnwrap(InitializerExpressionFactory.class) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java index 38b5c705b0d..1c0585bef6e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java @@ -18,7 +18,11 @@ package org.apache.flink.table.planner.plan.utils; +import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType; + import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexCorrelVariable; +import org.apache.calcite.rex.RexFieldAccess; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexVisitor; import org.apache.calcite.rex.RexVisitorImpl; @@ -66,4 +70,14 @@ public class TemporalTableJoinUtil { return call.getOperator() == TemporalJoinUtil.TEMPORAL_JOIN_CONDITION() && call.operands.size() == 5; } + + public static boolean isTemporalJoinSupportPeriod(RexNode period) { + // it should be left table's field and is a time attribute + if (period instanceof RexFieldAccess) { + RexFieldAccess rexFieldAccess = (RexFieldAccess) period; + return rexFieldAccess.getType() instanceof TimeIndicatorRelDataType + && rexFieldAccess.getReferenceExpr() instanceof RexCorrelVariable; + } + return false; + } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/common/CommonTemporalTableJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/common/CommonTemporalTableJoinRule.scala index a755dfeeb2e..b876b4d4b9b 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/common/CommonTemporalTableJoinRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/common/CommonTemporalTableJoinRule.scala @@ -22,13 +22,13 @@ import org.apache.flink.table.connector.source.LookupTableSource import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalLegacyTableSourceScan, FlinkLogicalRel, FlinkLogicalSnapshot, FlinkLogicalTableSourceScan} import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalLookupJoin, StreamPhysicalTemporalJoin} import org.apache.flink.table.planner.plan.schema.{LegacyTableSourceTable, TableSourceTable, TimeIndicatorRelDataType} +import org.apache.flink.table.planner.plan.utils.TemporalTableJoinUtil import org.apache.flink.table.sources.LookupableTableSource import org.apache.calcite.plan.hep.HepRelVertex import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.core.TableScan import org.apache.calcite.rel.logical.{LogicalProject, LogicalTableScan} -import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess} /** * Base implementation that matches temporal join node. @@ -42,15 +42,12 @@ trait CommonTemporalTableJoinRule { protected def matches(snapshot: FlinkLogicalSnapshot): Boolean = { // period specification check - snapshot.getPeriod match { - // it should be left table's field and is a time attribute - case r: RexFieldAccess - if r.getType.isInstanceOf[TimeIndicatorRelDataType] && - r.getReferenceExpr.isInstanceOf[RexCorrelVariable] => // pass - case _ => - throw new TableException( - "Temporal table join currently only supports " + - "'FOR SYSTEM_TIME AS OF' left table's time attribute field.") + val isTemporalJoinSnapshot = + TemporalTableJoinUtil.isTemporalJoinSupportPeriod(snapshot.getPeriod) + if (!isTemporalJoinSnapshot) { + throw new TableException( + "Temporal table join currently only supports " + + "'FOR SYSTEM_TIME AS OF' left table's time attribute field.") } true diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala index 88fd58c8fdc..14aafa094e5 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala @@ -1117,15 +1117,17 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase { @Test def testCreateViewAndShowCreateTable(): Unit = { + val isBounded = !isStreamingMode val createTableDDL = - """ |create table `source` ( - | `id` bigint not null, - | `group` string not null, - | `score` double - |) with ( - | 'connector' = 'source-only' - |) - |""".stripMargin + s""" |create table `source` ( + | `id` bigint not null, + | `group` string not null, + | `score` double + |) with ( + | 'connector' = 'source-only', + | 'bounded' = '$isBounded' + |) + |""".stripMargin val createViewDDL = """ |create view `tmp` as |select `group`, avg(`score`) as avg_score @@ -1144,13 +1146,15 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase { @Test def testAlterViewRename(): Unit = { - tableEnv.executeSql(""" - | CREATE TABLE T ( - | id INT - | ) WITH ( - | 'connector' = 'source-only' - | ) - |""".stripMargin) + val isBounded = !isStreamingMode + tableEnv.executeSql(s""" + | CREATE TABLE T ( + | id INT + | ) WITH ( + | 'connector' = 'source-only', + | 'bounded' = '$isBounded' + | ) + |""".stripMargin) tableEnv.executeSql("CREATE VIEW V AS SELECT * FROM T") tableEnv.executeSql("ALTER VIEW V RENAME TO V2") @@ -1159,14 +1163,16 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase { @Test def testAlterViewAs(): Unit = { - tableEnv.executeSql(""" - | CREATE TABLE T ( - | a INT, - | b INT - | ) WITH ( - | 'connector' = 'source-only' - | ) - |""".stripMargin) + val isBounded = !isStreamingMode + tableEnv.executeSql(s""" + | CREATE TABLE T ( + | a INT, + | b INT + | ) WITH ( + | 'connector' = 'source-only', + | 'bounded' = '$isBounded' + | ) + |""".stripMargin) tableEnv.executeSql("CREATE VIEW V AS SELECT a FROM T") tableEnv.executeSql("ALTER VIEW V AS SELECT b FROM T") diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala index d228c206aa5..eebafbeecd3 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala @@ -731,8 +731,12 @@ class TableScanTest extends TableTestBase { | 'table-source-class' = '${classOf[MockedLookupTableSource].getName}' |) """.stripMargin) - thrown.expect(classOf[TableException]) - thrown.expectMessage("Cannot generate a valid execution plan for the given query") + thrown.expect(classOf[ValidationException]) + thrown.expectMessage( + "The specified table source `default_catalog`.`default_database`.`src` " + + "doesn't extend ScanTableSource and can not be used as the scan source.\nHint: You can read the data " + + "from the source as a dim table with the look up join syntax. Otherwise, please refer to the document " + + "and change the type of the connector to a source table that supports direct reads.") util.verifyRelPlan("SELECT * FROM src", ExplainDetail.CHANGELOG_MODE) }