This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new be509e6d674 [FLINK-33064][table-planner] Improve the error message 
when the lookup source is used as the scan source (#23377)
be509e6d674 is described below

commit be509e6d67471d886e58d3ddea6ddd3627a191a8
Author: yunhong <337361...@qq.com>
AuthorDate: Wed Sep 13 13:58:04 2023 +0800

    [FLINK-33064][table-planner] Improve the error message when the lookup 
source is used as the scan source (#23377)
    
    
    
    Co-authored-by: zhengyunhong.zyh <zhengyunhong....@alibaba-inc.com>
---
 .../TestDynamicTableSourceOnlyFactory.java         |  41 ++++-
 .../apache/calcite/sql2rel/SqlToRelConverter.java  | 172 +++++++++++++++++----
 .../planner/plan/utils/TemporalTableJoinUtil.java  |  14 ++
 .../rules/common/CommonTemporalTableJoinRule.scala |  17 +-
 .../table/planner/catalog/CatalogTableITCase.scala |  52 ++++---
 .../planner/plan/stream/sql/TableScanTest.scala    |   8 +-
 6 files changed, 236 insertions(+), 68 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableSourceOnlyFactory.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableSourceOnlyFactory.java
index 7aa78b6492e..6d3af4650d3 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableSourceOnlyFactory.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableSourceOnlyFactory.java
@@ -19,7 +19,10 @@
 package org.apache.flink.table.factories;
 
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
 
 import java.util.Collections;
 import java.util.Set;
@@ -32,9 +35,14 @@ public final class TestDynamicTableSourceOnlyFactory 
implements DynamicTableSour
 
     public static final String IDENTIFIER = "source-only";
 
+    private static final ConfigOption<Boolean> BOUNDED =
+            ConfigOptions.key("bounded").booleanType().defaultValue(false);
+
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
-        return null;
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        boolean isBounded = helper.getOptions().get(BOUNDED);
+        return new MockedScanTableSource(isBounded);
     }
 
     @Override
@@ -49,6 +57,35 @@ public final class TestDynamicTableSourceOnlyFactory 
implements DynamicTableSour
 
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
-        return Collections.emptySet();
+        return Collections.singleton(BOUNDED);
+    }
+
+    /** A mocked {@link ScanTableSource} for validation test. */
+    private static class MockedScanTableSource implements ScanTableSource {
+        private final boolean isBounded;
+
+        private MockedScanTableSource(boolean isBounded) {
+            this.isBounded = isBounded;
+        }
+
+        @Override
+        public DynamicTableSource copy() {
+            return null;
+        }
+
+        @Override
+        public String asSummaryString() {
+            return null;
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode() {
+            return ChangelogMode.insertOnly();
+        }
+
+        @Override
+        public ScanRuntimeProvider getScanRuntimeProvider(ScanContext 
runtimeProviderContext) {
+            return () -> isBounded;
+        }
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index 71cafa91308..67663ce0984 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -17,11 +17,16 @@
 package org.apache.calcite.sql2rel;
 
 import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.data.TimestampData;
 import 
org.apache.flink.table.planner.alias.ClearJoinHintWithInvalidPropagationShuttle;
 import org.apache.flink.table.planner.calcite.TimestampSchemaVersion;
 import org.apache.flink.table.planner.hint.FlinkHints;
 import org.apache.flink.table.planner.plan.FlinkCalciteCatalogSnapshotReader;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.TemporalTableJoinUtil;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
 
 import com.google.common.base.Preconditions;
@@ -64,6 +69,7 @@ import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.core.Sample;
 import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.hint.HintStrategyTable;
 import org.apache.calcite.rel.hint.Hintable;
 import org.apache.calcite.rel.hint.RelHint;
@@ -235,12 +241,17 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * <p>FLINK modifications are at lines
  *
  * <ol>
- *   <li>Added in FLINK-29081, FLINK-28682: Lines 644 ~ 654
- *   <li>Added in FLINK-28682: Lines 2277 ~ 2294
- *   <li>Added in FLINK-28682: Lines 2331 ~ 2359
- *   <li>Added in FLINK-20873: Lines 5484 ~ 5493
- *   <li>Added in FLINK-32474: Lines 2841 ~ 2853
- *   <li>Added in FLINK-32474: Lines 2953 ~ 2987
+ *   <li>Added in FLINK-29081, FLINK-28682: Lines 667 ~ 677
+ *   <li>Added in FLINK-28682: Lines 2300 ~ 2317
+ *   <li>Added in FLINK-28682: Lines 2354 ~ 2382
+ *   <li>Added in FLINK-32474: Lines 2932 ~ 2944
+ *   <li>Added in FLINK-32474: Lines 3057 ~ 3093
+ *   <li>Added in FLINK-20873: Lines 5594 ~ 5603
+ *   <li>Added in FLINK-33064: Lines 2431 ~ 2475
+ *   <li>Added in FLINK-33064: Lines 2481 ~ 2570
+ *   <li>Added in FLINK-33064: Lines 2970 ~ 2978
+ *   <li>Added in FLINK-33064: Lines 3050 ~ 3052
+ *   <li>Added in FLINK-33064: Lines 4102 ~ 4107
  * </ol>
  */
 @SuppressWarnings("UnstableApiUsage")
@@ -2417,25 +2428,17 @@ public class SqlToRelConverter {
                     throw new AssertionError("unknown TABLESAMPLE type: " + 
sampleSpec);
                 }
                 return;
-
+                // ----- FLINK MODIFICATION BEGIN -----
             case TABLE_REF:
-                call = (SqlCall) from;
-                convertIdentifier(bb, call.operand(0), null, call.operand(1), 
null);
+                convertTableRef(bb, from, false);
                 return;
 
             case IDENTIFIER:
-                convertIdentifier(bb, (SqlIdentifier) from, null, null, null);
+                convertIdentifier(bb, (SqlIdentifier) from, null, null, null, 
false);
                 return;
 
             case EXTEND:
-                call = (SqlCall) from;
-                final SqlNode operand0 = call.getOperandList().get(0);
-                final SqlIdentifier id =
-                        operand0.getKind() == SqlKind.TABLE_REF
-                                ? ((SqlCall) operand0).operand(0)
-                                : (SqlIdentifier) operand0;
-                SqlNodeList extendedColumns = (SqlNodeList) 
call.getOperandList().get(1);
-                convertIdentifier(bb, id, extendedColumns, null, null);
+                convertExtend(bb, from, false);
                 return;
 
             case SNAPSHOT:
@@ -2466,19 +2469,106 @@ public class SqlToRelConverter {
                 return;
 
             case COLLECTION_TABLE:
-                call = (SqlCall) from;
-
-                // Dig out real call; TABLE() wrapper is just syntactic.
-                assert call.getOperandList().size() == 1;
-                final SqlCall call2 = call.operand(0);
-                convertCollectionTable(bb, call2);
+                convertCollectionTable(bb, from, false);
                 return;
 
+                // ----- FLINK MODIFICATION END -----
             default:
                 throw new AssertionError("not a join operator " + from);
         }
     }
 
+    // ----- FLINK MODIFICATION BEGIN -----
+    private void convertTableRef(Blackboard bb, SqlNode from, boolean 
isTemporalJoinRightSide) {
+        SqlCall call = (SqlCall) from;
+        convertIdentifier(
+                bb, call.operand(0), null, call.operand(1), null, 
isTemporalJoinRightSide);
+    }
+
+    private void convertExtend(Blackboard bb, SqlNode from, boolean 
isTemporalJoinRightSide) {
+        SqlCall call = (SqlCall) from;
+        final SqlNode operand0 = call.getOperandList().get(0);
+        final SqlIdentifier id =
+                operand0.getKind() == SqlKind.TABLE_REF
+                        ? ((SqlCall) operand0).operand(0)
+                        : (SqlIdentifier) operand0;
+        SqlNodeList extendedColumns = (SqlNodeList) 
call.getOperandList().get(1);
+        convertIdentifier(bb, id, extendedColumns, null, null, 
isTemporalJoinRightSide);
+    }
+
+    /**
+     * Converts a FROM clause into a relational expression for the right side 
in temporal join. The
+     * right side in temporary join is a special type of source which can 
support {@link
+     * org.apache.flink.table.connector.source.LookupTableSource}. So we need 
to distinguish it from
+     * the regular scan table source during convert.
+     *
+     * @param bb Scope within which to resolve identifiers
+     * @param from FROM clause of a query. Examples include:
+     * @param fieldNames Field aliases, usually come from AS clause, or null
+     */
+    private void convertTemporalJoinRightSide(
+            Blackboard bb, @Nullable SqlNode from, @Nullable List<String> 
fieldNames) {
+        if (from == null) {
+            bb.setRoot(LogicalValues.createOneRow(cluster), false);
+            return;
+        }
+
+        switch (from.getKind()) {
+            case TABLE_REF:
+                convertTableRef(bb, from, true);
+                return;
+            case IDENTIFIER:
+                convertIdentifier(bb, (SqlIdentifier) from, null, null, null, 
true);
+                return;
+            case EXTEND:
+                convertExtend(bb, from, true);
+                return;
+            case COLLECTION_TABLE:
+                convertCollectionTable(bb, from, true);
+                return;
+            default:
+                convertFrom(bb, from, fieldNames);
+        }
+    }
+
+    /**
+     * Validate the input {@link RelNode} to judge if it is a legal source. 
For example, for a table
+     * source that only implements the {@link
+     * org.apache.flink.table.connector.source.LookupTableSource}, and doesn't 
implement the {@link
+     * ScanTableSource}, it can only be used as a right table ref in temporal 
join or lookup join
+     * and cannot be used as a scan table.
+     */
+    private void validateScan(RelNode relNode, boolean 
isTemporalJoinRightSide) {
+        relNode.accept(
+                new RelShuttleImpl() {
+                    @Override
+                    public RelNode visit(TableScan scan) {
+                        final RelOptTable table = scan.getTable();
+                        if (table instanceof TableSourceTable) {
+                            final TableSourceTable sourceTable =
+                                    
scan.getTable().unwrap(TableSourceTable.class);
+                            assert sourceTable != null;
+                            final DynamicTableSource dynamicTableSource = 
sourceTable.tableSource();
+                            if (!isTemporalJoinRightSide
+                                    && !(dynamicTableSource instanceof 
ScanTableSource)) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "The specified table source %s 
doesn't extend %s and can not be used "
+                                                        + "as the scan 
source.\n"
+                                                        + "Hint: You can read 
the data from the source as a dim table "
+                                                        + "with the look up 
join syntax. Otherwise, please refer to "
+                                                        + "the document and 
change the type of the connector to a "
+                                                        + "source table that 
supports direct reads.",
+                                                
sourceTable.contextResolvedTable().getIdentifier(),
+                                                
ScanTableSource.class.getSimpleName()));
+                            }
+                        }
+                        return scan;
+                    }
+                });
+    }
+    // ----- FLINK MODIFICATION END -----
+
     private void convertUnnest(Blackboard bb, SqlCall call, @Nullable 
List<String> fieldNames) {
         final List<SqlNode> nodes = call.getOperandList();
         final SqlUnnestOperator operator = (SqlUnnestOperator) 
call.getOperator();
@@ -2830,7 +2920,8 @@ public class SqlToRelConverter {
             SqlIdentifier id,
             @Nullable SqlNodeList extendedColumns,
             @Nullable SqlNodeList tableHints,
-            @Nullable SchemaVersion schemaVersion) {
+            @Nullable SchemaVersion schemaVersion,
+            boolean isTemporalJoinRightSide) {
         final SqlValidatorNamespace fromNamespace = getNamespace(id).resolve();
         if (fromNamespace.getNode() != null) {
             convertFrom(bb, fromNamespace.getNode());
@@ -2864,7 +2955,7 @@ public class SqlToRelConverter {
                 hintStrategies.apply(
                         SqlUtil.getRelHint(hintStrategies, tableHints),
                         LogicalTableScan.create(cluster, table, 
ImmutableList.of()));
-        final RelNode tableRel = toRel(table, hints);
+        final RelNode tableRel = toRel(table, hints, isTemporalJoinRightSide);
         bb.setRoot(tableRel, true);
 
         if (RelOptUtil.isPureOrder(castNonNull(bb.root)) && 
removeSortInSubQuery(bb.top)) {
@@ -2876,7 +2967,15 @@ public class SqlToRelConverter {
         }
     }
 
-    protected void convertCollectionTable(Blackboard bb, SqlCall call) {
+    // ----- FLINK MODIFICATION BEGIN -----
+    protected void convertCollectionTable(
+            Blackboard bb, SqlNode from, boolean isTemporalJoinRightSide) {
+        SqlCall sqlCall = (SqlCall) from;
+
+        // Dig out real call; TABLE() wrapper is just syntactic.
+        assert sqlCall.getOperandList().size() == 1;
+        final SqlCall call = sqlCall.operand(0);
+        // ----- FLINK MODIFICATION END -----
         final SqlOperator operator = call.getOperator();
         if (operator == SqlStdOperatorTable.TABLESAMPLE) {
             final String sampleName = 
SqlLiteral.unchain(call.operand(0)).getValueAs(String.class);
@@ -2911,7 +3010,7 @@ public class SqlToRelConverter {
             RelOptTable relOptTable =
                     RelOptTableImpl.create(
                             null, rowType, udf.getNameAsId().names, table, 
expressionFunction);
-            RelNode converted = toRel(relOptTable, ImmutableList.of());
+            RelNode converted = toRel(relOptTable, ImmutableList.of(), 
isTemporalJoinRightSide);
             bb.setRoot(converted, true);
             return;
         }
@@ -2948,15 +3047,22 @@ public class SqlToRelConverter {
         final SqlSnapshot snapshot = (SqlSnapshot) call;
         final RexNode period = bb.convertExpression(snapshot.getPeriod());
 
+        // ----- FLINK MODIFICATION BEGIN -----
+        boolean isTemporalJoin = 
TemporalTableJoinUtil.isTemporalJoinSupportPeriod(period);
+        // ----- FLINK MODIFICATION END -----
+
         // convert inner query, could be a table name or a derived table
         SqlNode expr = snapshot.getTableRef();
+
         // ----- FLINK MODIFICATION BEGIN -----
         SqlNode tableRef = snapshot.getTableRef();
         // since we have reduced the period of SqlSnapshot in the validate 
phase, we only need to
         // check whether the period is a RexLiteral.
         // in most cases, tableRef is a SqlBasicCall and the first operand is 
a SqlIdentifier.
         // when using SQL Hints, tableRef will be a SqlTableRef.
-        if (((tableRef instanceof SqlBasicCall
+        if (isTemporalJoin) {
+            convertTemporalJoinRightSide(bb, expr, Collections.emptyList());
+        } else if (((tableRef instanceof SqlBasicCall
                                 && ((SqlBasicCall) tableRef).operand(0) 
instanceof SqlIdentifier)
                         || (tableRef instanceof SqlTableRef))
                 && period instanceof RexLiteral) {
@@ -2980,7 +3086,7 @@ public class SqlToRelConverter {
                             ? ((SqlBasicCall) tableRef).operand(0)
                             : ((SqlTableRef) tableRef).operand(0);
             SchemaVersion schemaVersion = 
TimestampSchemaVersion.of(timeTravelTimestamp);
-            convertIdentifier(bb, sqlIdentifier, null, null, schemaVersion);
+            convertIdentifier(bb, sqlIdentifier, null, null, schemaVersion, 
false);
         } else {
             convertFrom(bb, expr);
         }
@@ -3993,8 +4099,12 @@ public class SqlToRelConverter {
         return ViewExpanders.toRelContext(viewExpander, cluster, hints);
     }
 
-    public RelNode toRel(final RelOptTable table, final List<RelHint> hints) {
+    // ----- FLINK MODIFICATION BEGIN -----
+    public RelNode toRel(
+            final RelOptTable table, final List<RelHint> hints, boolean 
isTemporalJoinRightSide) {
         final RelNode scan = table.toRel(createToRelContext(hints));
+        validateScan(scan, isTemporalJoinRightSide);
+        // ----- FLINK MODIFICATION END -----
 
         final InitializerExpressionFactory ief =
                 table.maybeUnwrap(InitializerExpressionFactory.class)
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java
index 38b5c705b0d..1c0585bef6e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/TemporalTableJoinUtil.java
@@ -18,7 +18,11 @@
 
 package org.apache.flink.table.planner.plan.utils;
 
+import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
+
 import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexFieldAccess;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexVisitor;
 import org.apache.calcite.rex.RexVisitorImpl;
@@ -66,4 +70,14 @@ public class TemporalTableJoinUtil {
         return call.getOperator() == TemporalJoinUtil.TEMPORAL_JOIN_CONDITION()
                 && call.operands.size() == 5;
     }
+
+    public static boolean isTemporalJoinSupportPeriod(RexNode period) {
+        // it should be left table's field and is a time attribute
+        if (period instanceof RexFieldAccess) {
+            RexFieldAccess rexFieldAccess = (RexFieldAccess) period;
+            return rexFieldAccess.getType() instanceof TimeIndicatorRelDataType
+                    && rexFieldAccess.getReferenceExpr() instanceof 
RexCorrelVariable;
+        }
+        return false;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/common/CommonTemporalTableJoinRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/common/CommonTemporalTableJoinRule.scala
index a755dfeeb2e..b876b4d4b9b 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/common/CommonTemporalTableJoinRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/common/CommonTemporalTableJoinRule.scala
@@ -22,13 +22,13 @@ import 
org.apache.flink.table.connector.source.LookupTableSource
 import 
org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalLegacyTableSourceScan,
 FlinkLogicalRel, FlinkLogicalSnapshot, FlinkLogicalTableSourceScan}
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalLookupJoin,
 StreamPhysicalTemporalJoin}
 import org.apache.flink.table.planner.plan.schema.{LegacyTableSourceTable, 
TableSourceTable, TimeIndicatorRelDataType}
+import org.apache.flink.table.planner.plan.utils.TemporalTableJoinUtil
 import org.apache.flink.table.sources.LookupableTableSource
 
 import org.apache.calcite.plan.hep.HepRelVertex
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rel.logical.{LogicalProject, LogicalTableScan}
-import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess}
 
 /**
  * Base implementation that matches temporal join node.
@@ -42,15 +42,12 @@ trait CommonTemporalTableJoinRule {
   protected def matches(snapshot: FlinkLogicalSnapshot): Boolean = {
 
     // period specification check
-    snapshot.getPeriod match {
-      // it should be left table's field and is a time attribute
-      case r: RexFieldAccess
-          if r.getType.isInstanceOf[TimeIndicatorRelDataType] &&
-            r.getReferenceExpr.isInstanceOf[RexCorrelVariable] => // pass
-      case _ =>
-        throw new TableException(
-          "Temporal table join currently only supports " +
-            "'FOR SYSTEM_TIME AS OF' left table's time attribute field.")
+    val isTemporalJoinSnapshot =
+      TemporalTableJoinUtil.isTemporalJoinSupportPeriod(snapshot.getPeriod)
+    if (!isTemporalJoinSnapshot) {
+      throw new TableException(
+        "Temporal table join currently only supports " +
+          "'FOR SYSTEM_TIME AS OF' left table's time attribute field.")
     }
 
     true
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index 88fd58c8fdc..14aafa094e5 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -1117,15 +1117,17 @@ class CatalogTableITCase(isStreamingMode: Boolean) 
extends AbstractTestBase {
 
   @Test
   def testCreateViewAndShowCreateTable(): Unit = {
+    val isBounded = !isStreamingMode
     val createTableDDL =
-      """ |create table `source` (
-        |  `id` bigint not null,
-        | `group` string not null,
-        | `score` double
-        |) with (
-        |  'connector' = 'source-only'
-        |)
-        |""".stripMargin
+      s""" |create table `source` (
+         |  `id` bigint not null,
+         | `group` string not null,
+         | `score` double
+         |) with (
+         |  'connector' = 'source-only',
+         |  'bounded' = '$isBounded'
+         |)
+         |""".stripMargin
     val createViewDDL =
       """ |create view `tmp` as
         |select `group`, avg(`score`) as avg_score
@@ -1144,13 +1146,15 @@ class CatalogTableITCase(isStreamingMode: Boolean) 
extends AbstractTestBase {
 
   @Test
   def testAlterViewRename(): Unit = {
-    tableEnv.executeSql("""
-                          | CREATE TABLE T (
-                          |   id INT
-                          | ) WITH (
-                          |   'connector' = 'source-only'
-                          | )
-                          |""".stripMargin)
+    val isBounded = !isStreamingMode
+    tableEnv.executeSql(s"""
+                           | CREATE TABLE T (
+                           |   id INT
+                           | ) WITH (
+                           |   'connector' = 'source-only',
+                           |   'bounded' = '$isBounded'
+                           | )
+                           |""".stripMargin)
     tableEnv.executeSql("CREATE VIEW V AS SELECT * FROM T")
 
     tableEnv.executeSql("ALTER VIEW V RENAME TO V2")
@@ -1159,14 +1163,16 @@ class CatalogTableITCase(isStreamingMode: Boolean) 
extends AbstractTestBase {
 
   @Test
   def testAlterViewAs(): Unit = {
-    tableEnv.executeSql("""
-                          | CREATE TABLE T (
-                          |   a INT,
-                          |   b INT
-                          | ) WITH (
-                          |   'connector' = 'source-only'
-                          | )
-                          |""".stripMargin)
+    val isBounded = !isStreamingMode
+    tableEnv.executeSql(s"""
+                           | CREATE TABLE T (
+                           |   a INT,
+                           |   b INT
+                           | ) WITH (
+                           |   'connector' = 'source-only',
+                           |   'bounded' = '$isBounded'
+                           | )
+                           |""".stripMargin)
     tableEnv.executeSql("CREATE VIEW V AS SELECT a FROM T")
 
     tableEnv.executeSql("ALTER VIEW V AS SELECT b FROM T")
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index d228c206aa5..eebafbeecd3 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -731,8 +731,12 @@ class TableScanTest extends TableTestBase {
                      |  'table-source-class' = 
'${classOf[MockedLookupTableSource].getName}'
                      |)
       """.stripMargin)
-    thrown.expect(classOf[TableException])
-    thrown.expectMessage("Cannot generate a valid execution plan for the given 
query")
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage(
+      "The specified table source `default_catalog`.`default_database`.`src` " 
+
+        "doesn't extend ScanTableSource and can not be used as the scan 
source.\nHint: You can read the data " +
+        "from the source as a dim table with the look up join syntax. 
Otherwise, please refer to the document " +
+        "and change the type of the connector to a source table that supports 
direct reads.")
     util.verifyRelPlan("SELECT * FROM src", ExplainDetail.CHANGELOG_MODE)
   }
 

Reply via email to