This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new 383991a [FLINK-19281][table-planner-blink] LIKE cannot recognize full table path 383991a is described below commit 383991adc7ca68a0e0e4b595e06e8c5edd853eb1 Author: zhushang <zhush...@qutoutiao.net> AuthorDate: Sun Sep 20 22:46:14 2020 +0800 [FLINK-19281][table-planner-blink] LIKE cannot recognize full table path This closes #13431 --- .../operations/SqlCreateTableConverter.java | 3 +- .../operations/SqlToOperationConverterTest.java | 33 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java index 2530897..cf8f62b 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java @@ -140,8 +140,7 @@ class SqlCreateTableConverter { } private CatalogTable lookupLikeSourceTable(SqlTableLike sqlTableLike) { - UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlTableLike.getSourceTable() - .toString()); + UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlTableLike.getSourceTable().names); ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); CatalogManager.TableLookupResult lookupResult = catalogManager.getTable(identifier) .orElseThrow(() -> new ValidationException(String.format( diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index 1c582a3..2d5faad 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -529,6 +529,39 @@ public class SqlToOperationConverterTest { } @Test + public void testCreateTableLikeWithFullPath(){ + Map<String, String> sourceProperties = new HashMap<>(); + sourceProperties.put("connector.type", "kafka"); + sourceProperties.put("format.type", "json"); + CatalogTableImpl catalogTable = new CatalogTableImpl( + TableSchema.builder() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.TIMESTAMP(3)) + .build(), + sourceProperties, + null + ); + catalogManager.createTable(catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false); + final String sql = "create table mytable like `builtin`.`default`.sourceTable"; + Operation operation = parseAndConvert(sql); + + assertThat( + operation, + isCreateTableOperation( + withSchema( + TableSchema.builder() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.TIMESTAMP(3)) + .build() + ), + withOptions( + entry("connector.type", "kafka"), + entry("format.type", "json") + ) + )); + } + + @Test public void testMergingCreateTableLike() { Map<String, String> sourceProperties = new HashMap<>(); sourceProperties.put("format.type", "json");