This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e0da5e  [FLINK-22260][table-planner-blink] Use unresolvedSchema 
during operation convertions
1e0da5e is described below

commit 1e0da5e5756e4c39f260a1e18aad3460f7fc67d9
Author: Ingo Bürk <ingo.bu...@tngtech.com>
AuthorDate: Wed Apr 14 08:39:39 2021 +0200

    [FLINK-22260][table-planner-blink] Use unresolvedSchema during operation 
convertions
    
    The #getSchema implementation has been deprecated and returns null by
    default, so we need to instead use the unresolved schema and derive the
    TableSchema from that.
    
    This closes #15606.
---
 .../operations/SqlCreateTableConverter.java        |   5 +-
 .../operations/SqlToOperationConverter.java        |  12 +-
 .../operations/SqlToOperationConverterTest.java    | 240 +++++++++++----------
 .../table/planner/utils/OperationMatchers.java     |  14 +-
 4 files changed, 145 insertions(+), 126 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
index 0f4a94e..d5865f1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
@@ -89,7 +89,10 @@ class SqlCreateTableConverter {
         if (sqlCreateTable.getTableLike().isPresent()) {
             SqlTableLike sqlTableLike = sqlCreateTable.getTableLike().get();
             CatalogTable table = lookupLikeSourceTable(sqlTableLike);
-            sourceTableSchema = table.getSchema();
+            sourceTableSchema =
+                    TableSchema.fromResolvedSchema(
+                            table.getUnresolvedSchema()
+                                    
.resolve(catalogManager.getSchemaResolver()));
             sourcePartitionKeys = table.getPartitionKeys();
             likeOptions = sqlTableLike.getOptions();
             sourceProperties = table.getOptions();
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 8806fa7..538b909 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -381,7 +381,11 @@ public class SqlToOperationConverter {
             SqlTableConstraint constraint =
                     ((SqlAlterTableAddConstraint) 
sqlAlterTable).getConstraint();
             validateTableConstraint(constraint);
-            TableSchema oriSchema = baseTable.getSchema();
+            TableSchema oriSchema =
+                    TableSchema.fromResolvedSchema(
+                            baseTable
+                                    .getUnresolvedSchema()
+                                    
.resolve(catalogManager.getSchemaResolver()));
             // Sanity check for constraint.
             TableSchema.Builder builder = 
TableSchemaUtils.builderWithGivenSchema(oriSchema);
             if (constraint.getConstraintName().isPresent()) {
@@ -399,7 +403,11 @@ public class SqlToOperationConverter {
             SqlAlterTableDropConstraint dropConstraint =
                     ((SqlAlterTableDropConstraint) sqlAlterTable);
             String constraintName = 
dropConstraint.getConstraintName().getSimple();
-            TableSchema oriSchema = baseTable.getSchema();
+            TableSchema oriSchema =
+                    TableSchema.fromResolvedSchema(
+                            baseTable
+                                    .getUnresolvedSchema()
+                                    
.resolve(catalogManager.getSchemaResolver()));
             if (!oriSchema
                     .getPrimaryKey()
                     .filter(pk -> pk.getName().equals(constraintName))
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index b3c39a6..cc5f7bf 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.operations;
 
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableColumn;
 import org.apache.flink.table.api.TableColumn.ComputedColumn;
@@ -88,7 +89,6 @@ import org.junit.rules.ExpectedException;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -146,7 +146,7 @@ public class SqlToOperationConverterTest {
                     functionCatalog,
                     catalogManager,
                     asRootSchema(new 
CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
-                    new ArrayList<>());
+                    Collections.emptyList());
 
     private PlannerContext getPlannerContext() {
         return plannerContext;
@@ -616,14 +616,16 @@ public class SqlToOperationConverterTest {
     public void testBasicCreateTableLike() {
         Map<String, String> sourceProperties = new HashMap<>();
         sourceProperties.put("format.type", "json");
-        CatalogTableImpl catalogTable =
-                new CatalogTableImpl(
-                        TableSchema.builder()
-                                .field("f0", DataTypes.INT().notNull())
-                                .field("f1", DataTypes.TIMESTAMP(3))
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("f0", DataTypes.INT().notNull())
+                                .column("f1", DataTypes.TIMESTAMP(3))
                                 .build(),
-                        sourceProperties,
-                        null);
+                        null,
+                        Collections.emptyList(),
+                        sourceProperties);
+
         catalogManager.createTable(
                 catalogTable, ObjectIdentifier.of("builtin", "default", 
"sourceTable"), false);
 
@@ -643,14 +645,11 @@ public class SqlToOperationConverterTest {
                 operation,
                 isCreateTableOperation(
                         withSchema(
-                                TableSchema.builder()
-                                        .field("f0", DataTypes.INT().notNull())
-                                        .field("f1", DataTypes.TIMESTAMP(3))
-                                        .field("a", DataTypes.INT())
-                                        .watermark(
-                                                "f1",
-                                                "`f1` - INTERVAL '5' SECOND",
-                                                DataTypes.TIMESTAMP(3))
+                                Schema.newBuilder()
+                                        .column("f0", 
DataTypes.INT().notNull())
+                                        .column("f1", DataTypes.TIMESTAMP(3))
+                                        .column("a", DataTypes.INT())
+                                        .watermark("f1", "`f1` - INTERVAL '5' 
SECOND")
                                         .build()),
                         withOptions(entry("connector.type", "kafka"), 
entry("format.type", "json")),
                         partitionedBy("a", "f0")));
@@ -661,14 +660,15 @@ public class SqlToOperationConverterTest {
         Map<String, String> sourceProperties = new HashMap<>();
         sourceProperties.put("connector.type", "kafka");
         sourceProperties.put("format.type", "json");
-        CatalogTableImpl catalogTable =
-                new CatalogTableImpl(
-                        TableSchema.builder()
-                                .field("f0", DataTypes.INT().notNull())
-                                .field("f1", DataTypes.TIMESTAMP(3))
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("f0", DataTypes.INT().notNull())
+                                .column("f1", DataTypes.TIMESTAMP(3))
                                 .build(),
-                        sourceProperties,
-                        null);
+                        null,
+                        Collections.emptyList(),
+                        sourceProperties);
         catalogManager.createTable(
                 catalogTable, ObjectIdentifier.of("builtin", "default", 
"sourceTable"), false);
         final String sql = "create table mytable like 
`builtin`.`default`.sourceTable";
@@ -678,9 +678,9 @@ public class SqlToOperationConverterTest {
                 operation,
                 isCreateTableOperation(
                         withSchema(
-                                TableSchema.builder()
-                                        .field("f0", DataTypes.INT().notNull())
-                                        .field("f1", DataTypes.TIMESTAMP(3))
+                                Schema.newBuilder()
+                                        .column("f0", 
DataTypes.INT().notNull())
+                                        .column("f1", DataTypes.TIMESTAMP(3))
                                         .build()),
                         withOptions(
                                 entry("connector.type", "kafka"), 
entry("format.type", "json"))));
@@ -690,18 +690,18 @@ public class SqlToOperationConverterTest {
     public void testMergingCreateTableLike() {
         Map<String, String> sourceProperties = new HashMap<>();
         sourceProperties.put("format.type", "json");
-        CatalogTableImpl catalogTable =
-                new CatalogTableImpl(
-                        TableSchema.builder()
-                                .field("f0", DataTypes.INT().notNull())
-                                .field("f1", DataTypes.TIMESTAMP(3))
-                                .field("f2", DataTypes.BIGINT(), "`f0` + 
12345")
-                                .watermark(
-                                        "f1", "`f1` - interval '1' second", 
DataTypes.TIMESTAMP(3))
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("f0", DataTypes.INT().notNull())
+                                .column("f1", DataTypes.TIMESTAMP(3))
+                                .columnByExpression("f2", "`f0` + 12345")
+                                .watermark("f1", "`f1` - interval '1' second")
                                 .build(),
+                        null,
                         Arrays.asList("f0", "f1"),
-                        sourceProperties,
-                        null);
+                        sourceProperties);
+
         catalogManager.createTable(
                 catalogTable, ObjectIdentifier.of("builtin", "default", 
"sourceTable"), false);
 
@@ -726,14 +726,11 @@ public class SqlToOperationConverterTest {
                 operation,
                 isCreateTableOperation(
                         withSchema(
-                                TableSchema.builder()
-                                        .field("f0", DataTypes.INT().notNull())
-                                        .field("f1", DataTypes.TIMESTAMP(3))
-                                        .field("a", DataTypes.INT())
-                                        .watermark(
-                                                "f1",
-                                                "`f1` - INTERVAL '5' SECOND",
-                                                DataTypes.TIMESTAMP(3))
+                                Schema.newBuilder()
+                                        .column("f0", 
DataTypes.INT().notNull())
+                                        .column("f1", DataTypes.TIMESTAMP(3))
+                                        .column("a", DataTypes.INT())
+                                        .watermark("f1", "`f1` - INTERVAL '5' 
SECOND")
                                         .build()),
                         withOptions(entry("connector.type", "kafka"), 
entry("format.type", "json")),
                         partitionedBy("a", "f0")));
@@ -752,11 +749,12 @@ public class SqlToOperationConverterTest {
 
     @Test
     public void testCreateTableLikeInvalidPartition() {
-        CatalogTableImpl catalogTable =
-                new CatalogTableImpl(
-                        TableSchema.builder().field("f0", 
DataTypes.INT().notNull()).build(),
-                        Collections.emptyMap(),
-                        null);
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        Schema.newBuilder().column("f0", 
DataTypes.INT().notNull()).build(),
+                        null,
+                        Collections.emptyList(),
+                        Collections.emptyMap());
         catalogManager.createTable(
                 catalogTable, ObjectIdentifier.of("builtin", "default", 
"sourceTable"), false);
 
@@ -791,11 +789,12 @@ public class SqlToOperationConverterTest {
 
     @Test
     public void testCreateTableLikeInvalidWatermark() {
-        CatalogTableImpl catalogTable =
-                new CatalogTableImpl(
-                        TableSchema.builder().field("f0", 
DataTypes.INT().notNull()).build(),
-                        Collections.emptyMap(),
-                        null);
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        Schema.newBuilder().column("f0", 
DataTypes.INT().notNull()).build(),
+                        null,
+                        Collections.emptyList(),
+                        Collections.emptyMap());
         catalogManager.createTable(
                 catalogTable, ObjectIdentifier.of("builtin", "default", 
"sourceTable"), false);
 
@@ -816,17 +815,18 @@ public class SqlToOperationConverterTest {
 
     @Test
     public void testCreateTableLikeNestedWatermark() {
-        CatalogTableImpl catalogTable =
-                new CatalogTableImpl(
-                        TableSchema.builder()
-                                .field("f0", DataTypes.INT().notNull())
-                                .field(
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("f0", DataTypes.INT().notNull())
+                                .column(
                                         "f1",
                                         DataTypes.ROW(
                                                 DataTypes.FIELD("tmstmp", 
DataTypes.TIMESTAMP(3))))
                                 .build(),
-                        Collections.emptyMap(),
-                        null);
+                        null,
+                        Collections.emptyList(),
+                        Collections.emptyMap());
         catalogManager.createTable(
                 catalogTable, ObjectIdentifier.of("builtin", "default", 
"sourceTable"), false);
 
@@ -1135,10 +1135,11 @@ public class SqlToOperationConverterTest {
         catalogManager.registerCatalog("cat1", catalog);
         catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), 
null), true);
         CatalogTable catalogTable =
-                new CatalogTableImpl(
-                        TableSchema.builder().field("a", 
DataTypes.STRING()).build(),
-                        new HashMap<>(),
-                        "tb1");
+                CatalogTable.of(
+                        Schema.newBuilder().column("a", 
DataTypes.STRING()).build(),
+                        "tb1",
+                        Collections.emptyList(),
+                        Collections.emptyMap());
         catalogManager.setCurrentCatalog("cat1");
         catalogManager.setCurrentDatabase("db1");
         catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
@@ -1181,14 +1182,15 @@ public class SqlToOperationConverterTest {
         catalogManager.registerCatalog("cat1", catalog);
         catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), 
null), true);
         CatalogTable catalogTable =
-                new CatalogTableImpl(
-                        TableSchema.builder()
-                                .field("a", DataTypes.STRING().notNull())
-                                .field("b", DataTypes.BIGINT().notNull())
-                                .field("c", DataTypes.BIGINT())
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.STRING().notNull())
+                                .column("b", DataTypes.BIGINT().notNull())
+                                .column("c", DataTypes.BIGINT())
                                 .build(),
-                        new HashMap<>(),
-                        "tb1");
+                        "tb1",
+                        Collections.emptyList(),
+                        Collections.emptyMap());
         catalogManager.setCurrentCatalog("cat1");
         catalogManager.setCurrentDatabase("db1");
         catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
@@ -1217,14 +1219,15 @@ public class SqlToOperationConverterTest {
         catalogManager.registerCatalog("cat1", catalog);
         catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), 
null), true);
         CatalogTable catalogTable =
-                new CatalogTableImpl(
-                        TableSchema.builder()
-                                .field("a", DataTypes.STRING().notNull())
-                                .field("b", DataTypes.BIGINT().notNull())
-                                .field("c", DataTypes.BIGINT())
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.STRING().notNull())
+                                .column("b", DataTypes.BIGINT().notNull())
+                                .column("c", DataTypes.BIGINT())
                                 .build(),
-                        new HashMap<>(),
-                        "tb1");
+                        "tb1",
+                        Collections.emptyList(),
+                        Collections.emptyMap());
         catalogManager.setCurrentCatalog("cat1");
         catalogManager.setCurrentDatabase("db1");
         catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
@@ -1244,13 +1247,14 @@ public class SqlToOperationConverterTest {
         catalogManager.registerCatalog("cat1", catalog);
         catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), 
null), true);
         CatalogTable catalogTable =
-                new CatalogTableImpl(
-                        TableSchema.builder()
-                                .field("a", DataTypes.STRING().notNull())
-                                .field("b", DataTypes.BIGINT().notNull())
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.STRING().notNull())
+                                .column("b", DataTypes.BIGINT().notNull())
                                 .build(),
-                        new HashMap<>(),
-                        "tb1");
+                        "tb1",
+                        Collections.emptyList(),
+                        Collections.emptyMap());
         catalogManager.setCurrentCatalog("cat1");
         catalogManager.setCurrentDatabase("db1");
         catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
@@ -1266,14 +1270,15 @@ public class SqlToOperationConverterTest {
         catalogManager.registerCatalog("cat1", catalog);
         catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), 
null), true);
         CatalogTable catalogTable =
-                new CatalogTableImpl(
-                        TableSchema.builder()
-                                .field("a", DataTypes.STRING().notNull())
-                                .field("b", DataTypes.BIGINT().notNull())
-                                .field("c", DataTypes.BIGINT())
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.STRING().notNull())
+                                .column("b", DataTypes.BIGINT().notNull())
+                                .column("c", DataTypes.BIGINT())
                                 .build(),
-                        new HashMap<>(),
-                        "tb1");
+                        "tb1",
+                        Collections.emptyList(),
+                        Collections.emptyMap());
         catalogManager.setCurrentCatalog("cat1");
         catalogManager.setCurrentDatabase("db1");
         catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
@@ -1289,15 +1294,16 @@ public class SqlToOperationConverterTest {
         catalogManager.registerCatalog("cat1", catalog);
         catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), 
null), true);
         CatalogTable catalogTable =
-                new CatalogTableImpl(
-                        TableSchema.builder()
-                                .field("a", DataTypes.STRING().notNull())
-                                .field("b", DataTypes.BIGINT().notNull())
-                                .field("c", DataTypes.BIGINT())
-                                .primaryKey("ct1", new String[] {"a", "b"})
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.STRING().notNull())
+                                .column("b", DataTypes.BIGINT().notNull())
+                                .column("c", DataTypes.BIGINT())
+                                .primaryKeyNamed("ct1", "a", "b")
                                 .build(),
-                        new HashMap<>(),
-                        "tb1");
+                        "tb1",
+                        Collections.emptyList(),
+                        Collections.emptyMap());
         catalogManager.setCurrentCatalog("cat1");
         catalogManager.setCurrentDatabase("db1");
         catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
@@ -1319,18 +1325,19 @@ public class SqlToOperationConverterTest {
         Map<String, String> prop = new HashMap<>();
         prop.put("connector", "values");
         prop.put("bounded", "true");
-        CatalogTableImpl catalogTable =
-                new CatalogTableImpl(
-                        TableSchema.builder()
-                                .field("id", DataTypes.INT().notNull())
-                                .field("measurement", 
DataTypes.BIGINT().notNull())
-                                .field(
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("id", DataTypes.INT().notNull())
+                                .column("measurement", 
DataTypes.BIGINT().notNull())
+                                .column(
                                         "ts",
                                         DataTypes.ROW(
                                                 DataTypes.FIELD("tmstmp", 
DataTypes.TIMESTAMP(3))))
                                 .build(),
-                        prop,
-                        null);
+                        null,
+                        Collections.emptyList(),
+                        prop);
 
         catalogManager.createTable(
                 catalogTable, ObjectIdentifier.of("builtin", "default", 
"events"), false);
@@ -1363,14 +1370,15 @@ public class SqlToOperationConverterTest {
         Map<String, String> prop = new HashMap<>();
         prop.put("connector", "values");
         prop.put("bounded", "true");
-        CatalogTableImpl catalogTable =
-                new CatalogTableImpl(
-                        TableSchema.builder()
-                                .field("f0", DataTypes.INT())
-                                .field("f1", DataTypes.VARCHAR(20))
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("f0", DataTypes.INT())
+                                .column("f1", DataTypes.VARCHAR(20))
                                 .build(),
-                        prop,
-                        null);
+                        null,
+                        Collections.emptyList(),
+                        prop);
 
         catalogManager.createTable(
                 catalogTable, ObjectIdentifier.of("builtin", "default", 
"sourceA"), false);
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/OperationMatchers.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/OperationMatchers.java
index 6d83a02..d1d711a 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/OperationMatchers.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/OperationMatchers.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.utils;
 
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
@@ -115,18 +116,17 @@ public class OperationMatchers {
     }
 
     /**
-     * Checks that the schema of {@link CreateTableOperation} is equal to the 
given {@link
-     * TableSchema}.
+     * Checks that the schema of {@link CreateTableOperation} is equal to the 
given {@link Schema}.
      *
      * @param schema TableSchema that the {@link CreateTableOperation} should 
have
      * @see #isCreateTableOperation(Matcher[])
      */
-    public static Matcher<CreateTableOperation> withSchema(TableSchema schema) 
{
-        return new FeatureMatcher<CreateTableOperation, TableSchema>(
-                equalTo(schema), "table schema of the derived table", "table 
schema") {
+    public static Matcher<CreateTableOperation> withSchema(Schema schema) {
+        return new FeatureMatcher<CreateTableOperation, Schema>(
+                equalTo(schema), "schema of the derived table", "schema") {
             @Override
-            protected TableSchema featureValueOf(CreateTableOperation actual) {
-                return actual.getCatalogTable().getSchema();
+            protected Schema featureValueOf(CreateTableOperation actual) {
+                return actual.getCatalogTable().getUnresolvedSchema();
             }
         };
     }

Reply via email to