This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 383991a  [FLINK-19281][table-planner-blink] LIKE cannot recognize full 
table path
383991a is described below

commit 383991adc7ca68a0e0e4b595e06e8c5edd853eb1
Author: zhushang <zhush...@qutoutiao.net>
AuthorDate: Sun Sep 20 22:46:14 2020 +0800

    [FLINK-19281][table-planner-blink] LIKE cannot recognize full table path
    
    This closes #13431
---
 .../operations/SqlCreateTableConverter.java        |  3 +-
 .../operations/SqlToOperationConverterTest.java    | 33 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
index 2530897..cf8f62b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
@@ -140,8 +140,7 @@ class SqlCreateTableConverter {
        }
 
        private CatalogTable lookupLikeSourceTable(SqlTableLike sqlTableLike) {
-               UnresolvedIdentifier unresolvedIdentifier = 
UnresolvedIdentifier.of(sqlTableLike.getSourceTable()
-                       .toString());
+               UnresolvedIdentifier unresolvedIdentifier = 
UnresolvedIdentifier.of(sqlTableLike.getSourceTable().names);
                ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
                CatalogManager.TableLookupResult lookupResult = 
catalogManager.getTable(identifier)
                        .orElseThrow(() -> new 
ValidationException(String.format(
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 1c582a3..2d5faad 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -529,6 +529,39 @@ public class SqlToOperationConverterTest {
        }
 
        @Test
+       public void testCreateTableLikeWithFullPath(){
+               Map<String, String> sourceProperties = new HashMap<>();
+               sourceProperties.put("connector.type", "kafka");
+               sourceProperties.put("format.type", "json");
+               CatalogTableImpl catalogTable = new CatalogTableImpl(
+                       TableSchema.builder()
+                               .field("f0", DataTypes.INT().notNull())
+                               .field("f1", DataTypes.TIMESTAMP(3))
+                               .build(),
+                       sourceProperties,
+                       null
+               );
+               catalogManager.createTable(catalogTable, 
ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
+               final String sql = "create table mytable like 
`builtin`.`default`.sourceTable";
+               Operation operation = parseAndConvert(sql);
+
+               assertThat(
+                       operation,
+                       isCreateTableOperation(
+                       withSchema(
+                               TableSchema.builder()
+                                       .field("f0", DataTypes.INT().notNull())
+                                       .field("f1", DataTypes.TIMESTAMP(3))
+                                       .build()
+                       ),
+                       withOptions(
+                               entry("connector.type", "kafka"),
+                               entry("format.type", "json")
+                       )
+               ));
+       }
+
+       @Test
        public void testMergingCreateTableLike() {
                Map<String, String> sourceProperties = new HashMap<>();
                sourceProperties.put("format.type", "json");

Reply via email to