This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 1e0da5e [FLINK-22260][table-planner-blink] Use unresolvedSchema during operation convertions 1e0da5e is described below commit 1e0da5e5756e4c39f260a1e18aad3460f7fc67d9 Author: Ingo Bürk <ingo.bu...@tngtech.com> AuthorDate: Wed Apr 14 08:39:39 2021 +0200 [FLINK-22260][table-planner-blink] Use unresolvedSchema during operation convertions The #getSchema implementation has been deprecated and returns null by default, so we need to instead use the unresolved schema and derive the TableSchema from that. This closes #15606. --- .../operations/SqlCreateTableConverter.java | 5 +- .../operations/SqlToOperationConverter.java | 12 +- .../operations/SqlToOperationConverterTest.java | 240 +++++++++++---------- .../table/planner/utils/OperationMatchers.java | 14 +- 4 files changed, 145 insertions(+), 126 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java index 0f4a94e..d5865f1 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java @@ -89,7 +89,10 @@ class SqlCreateTableConverter { if (sqlCreateTable.getTableLike().isPresent()) { SqlTableLike sqlTableLike = sqlCreateTable.getTableLike().get(); CatalogTable table = lookupLikeSourceTable(sqlTableLike); - sourceTableSchema = table.getSchema(); + sourceTableSchema = + TableSchema.fromResolvedSchema( + table.getUnresolvedSchema() + .resolve(catalogManager.getSchemaResolver())); sourcePartitionKeys = table.getPartitionKeys(); likeOptions = sqlTableLike.getOptions(); sourceProperties = table.getOptions(); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index 8806fa7..538b909 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -381,7 +381,11 @@ public class SqlToOperationConverter { SqlTableConstraint constraint = ((SqlAlterTableAddConstraint) sqlAlterTable).getConstraint(); validateTableConstraint(constraint); - TableSchema oriSchema = baseTable.getSchema(); + TableSchema oriSchema = + TableSchema.fromResolvedSchema( + baseTable + .getUnresolvedSchema() + .resolve(catalogManager.getSchemaResolver())); // Sanity check for constraint. TableSchema.Builder builder = TableSchemaUtils.builderWithGivenSchema(oriSchema); if (constraint.getConstraintName().isPresent()) { @@ -399,7 +403,11 @@ public class SqlToOperationConverter { SqlAlterTableDropConstraint dropConstraint = ((SqlAlterTableDropConstraint) sqlAlterTable); String constraintName = dropConstraint.getConstraintName().getSimple(); - TableSchema oriSchema = baseTable.getSchema(); + TableSchema oriSchema = + TableSchema.fromResolvedSchema( + baseTable + .getUnresolvedSchema() + .resolve(catalogManager.getSchemaResolver())); if (!oriSchema .getPrimaryKey() .filter(pk -> pk.getName().equals(constraintName)) diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index b3c39a6..cc5f7bf 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.operations; import org.apache.flink.sql.parser.ddl.SqlCreateTable; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableColumn; import org.apache.flink.table.api.TableColumn.ComputedColumn; @@ -88,7 +89,6 @@ import org.junit.rules.ExpectedException; import javax.annotation.Nullable; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -146,7 +146,7 @@ public class SqlToOperationConverterTest { functionCatalog, catalogManager, asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)), - new ArrayList<>()); + Collections.emptyList()); private PlannerContext getPlannerContext() { return plannerContext; @@ -616,14 +616,16 @@ public class SqlToOperationConverterTest { public void testBasicCreateTableLike() { Map<String, String> sourceProperties = new HashMap<>(); sourceProperties.put("format.type", "json"); - CatalogTableImpl catalogTable = - new CatalogTableImpl( - TableSchema.builder() - .field("f0", DataTypes.INT().notNull()) - .field("f1", DataTypes.TIMESTAMP(3)) + CatalogTable catalogTable = + CatalogTable.of( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.TIMESTAMP(3)) .build(), - sourceProperties, - null); + null, + Collections.emptyList(), + sourceProperties); + catalogManager.createTable( catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false); @@ -643,14 +645,11 @@ public class SqlToOperationConverterTest { operation, isCreateTableOperation( withSchema( - TableSchema.builder() - .field("f0", DataTypes.INT().notNull()) - .field("f1", DataTypes.TIMESTAMP(3)) - .field("a", DataTypes.INT()) - .watermark( - "f1", - "`f1` - INTERVAL '5' SECOND", - DataTypes.TIMESTAMP(3)) + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.TIMESTAMP(3)) + .column("a", DataTypes.INT()) + .watermark("f1", "`f1` - INTERVAL '5' SECOND") .build()), withOptions(entry("connector.type", "kafka"), entry("format.type", "json")), partitionedBy("a", "f0"))); @@ -661,14 +660,15 @@ public class SqlToOperationConverterTest { Map<String, String> sourceProperties = new HashMap<>(); sourceProperties.put("connector.type", "kafka"); sourceProperties.put("format.type", "json"); - CatalogTableImpl catalogTable = - new CatalogTableImpl( - TableSchema.builder() - .field("f0", DataTypes.INT().notNull()) - .field("f1", DataTypes.TIMESTAMP(3)) + CatalogTable catalogTable = + CatalogTable.of( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.TIMESTAMP(3)) .build(), - sourceProperties, - null); + null, + Collections.emptyList(), + sourceProperties); catalogManager.createTable( catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false); final String sql = "create table mytable like `builtin`.`default`.sourceTable"; @@ -678,9 +678,9 @@ public class SqlToOperationConverterTest { operation, isCreateTableOperation( withSchema( - TableSchema.builder() - .field("f0", DataTypes.INT().notNull()) - .field("f1", DataTypes.TIMESTAMP(3)) + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.TIMESTAMP(3)) .build()), withOptions( entry("connector.type", "kafka"), entry("format.type", "json")))); @@ -690,18 +690,18 @@ public class SqlToOperationConverterTest { public void testMergingCreateTableLike() { Map<String, String> sourceProperties = new HashMap<>(); sourceProperties.put("format.type", "json"); - CatalogTableImpl catalogTable = - new CatalogTableImpl( - TableSchema.builder() - .field("f0", DataTypes.INT().notNull()) - .field("f1", DataTypes.TIMESTAMP(3)) - .field("f2", DataTypes.BIGINT(), "`f0` + 12345") - .watermark( - "f1", "`f1` - interval '1' second", DataTypes.TIMESTAMP(3)) + CatalogTable catalogTable = + CatalogTable.of( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.TIMESTAMP(3)) + .columnByExpression("f2", "`f0` + 12345") + .watermark("f1", "`f1` - interval '1' second") .build(), + null, Arrays.asList("f0", "f1"), - sourceProperties, - null); + sourceProperties); + catalogManager.createTable( catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false); @@ -726,14 +726,11 @@ public class SqlToOperationConverterTest { operation, isCreateTableOperation( withSchema( - TableSchema.builder() - .field("f0", DataTypes.INT().notNull()) - .field("f1", DataTypes.TIMESTAMP(3)) - .field("a", DataTypes.INT()) - .watermark( - "f1", - "`f1` - INTERVAL '5' SECOND", - DataTypes.TIMESTAMP(3)) + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column("f1", DataTypes.TIMESTAMP(3)) + .column("a", DataTypes.INT()) + .watermark("f1", "`f1` - INTERVAL '5' SECOND") .build()), withOptions(entry("connector.type", "kafka"), entry("format.type", "json")), partitionedBy("a", "f0"))); @@ -752,11 +749,12 @@ public class SqlToOperationConverterTest { @Test public void testCreateTableLikeInvalidPartition() { - CatalogTableImpl catalogTable = - new CatalogTableImpl( - TableSchema.builder().field("f0", DataTypes.INT().notNull()).build(), - Collections.emptyMap(), - null); + CatalogTable catalogTable = + CatalogTable.of( + Schema.newBuilder().column("f0", DataTypes.INT().notNull()).build(), + null, + Collections.emptyList(), + Collections.emptyMap()); catalogManager.createTable( catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false); @@ -791,11 +789,12 @@ public class SqlToOperationConverterTest { @Test public void testCreateTableLikeInvalidWatermark() { - CatalogTableImpl catalogTable = - new CatalogTableImpl( - TableSchema.builder().field("f0", DataTypes.INT().notNull()).build(), - Collections.emptyMap(), - null); + CatalogTable catalogTable = + CatalogTable.of( + Schema.newBuilder().column("f0", DataTypes.INT().notNull()).build(), + null, + Collections.emptyList(), + Collections.emptyMap()); catalogManager.createTable( catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false); @@ -816,17 +815,18 @@ public class SqlToOperationConverterTest { @Test public void testCreateTableLikeNestedWatermark() { - CatalogTableImpl catalogTable = - new CatalogTableImpl( - TableSchema.builder() - .field("f0", DataTypes.INT().notNull()) - .field( + CatalogTable catalogTable = + CatalogTable.of( + Schema.newBuilder() + .column("f0", DataTypes.INT().notNull()) + .column( "f1", DataTypes.ROW( DataTypes.FIELD("tmstmp", DataTypes.TIMESTAMP(3)))) .build(), - Collections.emptyMap(), - null); + null, + Collections.emptyList(), + Collections.emptyMap()); catalogManager.createTable( catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false); @@ -1135,10 +1135,11 @@ public class SqlToOperationConverterTest { catalogManager.registerCatalog("cat1", catalog); catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true); CatalogTable catalogTable = - new CatalogTableImpl( - TableSchema.builder().field("a", DataTypes.STRING()).build(), - new HashMap<>(), - "tb1"); + CatalogTable.of( + Schema.newBuilder().column("a", DataTypes.STRING()).build(), + "tb1", + Collections.emptyList(), + Collections.emptyMap()); catalogManager.setCurrentCatalog("cat1"); catalogManager.setCurrentDatabase("db1"); catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true); @@ -1181,14 +1182,15 @@ public class SqlToOperationConverterTest { catalogManager.registerCatalog("cat1", catalog); catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true); CatalogTable catalogTable = - new CatalogTableImpl( - TableSchema.builder() - .field("a", DataTypes.STRING().notNull()) - .field("b", DataTypes.BIGINT().notNull()) - .field("c", DataTypes.BIGINT()) + CatalogTable.of( + Schema.newBuilder() + .column("a", DataTypes.STRING().notNull()) + .column("b", DataTypes.BIGINT().notNull()) + .column("c", DataTypes.BIGINT()) .build(), - new HashMap<>(), - "tb1"); + "tb1", + Collections.emptyList(), + Collections.emptyMap()); catalogManager.setCurrentCatalog("cat1"); catalogManager.setCurrentDatabase("db1"); catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true); @@ -1217,14 +1219,15 @@ public class SqlToOperationConverterTest { catalogManager.registerCatalog("cat1", catalog); catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true); CatalogTable catalogTable = - new CatalogTableImpl( - TableSchema.builder() - .field("a", DataTypes.STRING().notNull()) - .field("b", DataTypes.BIGINT().notNull()) - .field("c", DataTypes.BIGINT()) + CatalogTable.of( + Schema.newBuilder() + .column("a", DataTypes.STRING().notNull()) + .column("b", DataTypes.BIGINT().notNull()) + .column("c", DataTypes.BIGINT()) .build(), - new HashMap<>(), - "tb1"); + "tb1", + Collections.emptyList(), + Collections.emptyMap()); catalogManager.setCurrentCatalog("cat1"); catalogManager.setCurrentDatabase("db1"); catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true); @@ -1244,13 +1247,14 @@ public class SqlToOperationConverterTest { catalogManager.registerCatalog("cat1", catalog); catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true); CatalogTable catalogTable = - new CatalogTableImpl( - TableSchema.builder() - .field("a", DataTypes.STRING().notNull()) - .field("b", DataTypes.BIGINT().notNull()) + CatalogTable.of( + Schema.newBuilder() + .column("a", DataTypes.STRING().notNull()) + .column("b", DataTypes.BIGINT().notNull()) .build(), - new HashMap<>(), - "tb1"); + "tb1", + Collections.emptyList(), + Collections.emptyMap()); catalogManager.setCurrentCatalog("cat1"); catalogManager.setCurrentDatabase("db1"); catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true); @@ -1266,14 +1270,15 @@ public class SqlToOperationConverterTest { catalogManager.registerCatalog("cat1", catalog); catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true); CatalogTable catalogTable = - new CatalogTableImpl( - TableSchema.builder() - .field("a", DataTypes.STRING().notNull()) - .field("b", DataTypes.BIGINT().notNull()) - .field("c", DataTypes.BIGINT()) + CatalogTable.of( + Schema.newBuilder() + .column("a", DataTypes.STRING().notNull()) + .column("b", DataTypes.BIGINT().notNull()) + .column("c", DataTypes.BIGINT()) .build(), - new HashMap<>(), - "tb1"); + "tb1", + Collections.emptyList(), + Collections.emptyMap()); catalogManager.setCurrentCatalog("cat1"); catalogManager.setCurrentDatabase("db1"); catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true); @@ -1289,15 +1294,16 @@ public class SqlToOperationConverterTest { catalogManager.registerCatalog("cat1", catalog); catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true); CatalogTable catalogTable = - new CatalogTableImpl( - TableSchema.builder() - .field("a", DataTypes.STRING().notNull()) - .field("b", DataTypes.BIGINT().notNull()) - .field("c", DataTypes.BIGINT()) - .primaryKey("ct1", new String[] {"a", "b"}) + CatalogTable.of( + Schema.newBuilder() + .column("a", DataTypes.STRING().notNull()) + .column("b", DataTypes.BIGINT().notNull()) + .column("c", DataTypes.BIGINT()) + .primaryKeyNamed("ct1", "a", "b") .build(), - new HashMap<>(), - "tb1"); + "tb1", + Collections.emptyList(), + Collections.emptyMap()); catalogManager.setCurrentCatalog("cat1"); catalogManager.setCurrentDatabase("db1"); catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true); @@ -1319,18 +1325,19 @@ public class SqlToOperationConverterTest { Map<String, String> prop = new HashMap<>(); prop.put("connector", "values"); prop.put("bounded", "true"); - CatalogTableImpl catalogTable = - new CatalogTableImpl( - TableSchema.builder() - .field("id", DataTypes.INT().notNull()) - .field("measurement", DataTypes.BIGINT().notNull()) - .field( + CatalogTable catalogTable = + CatalogTable.of( + Schema.newBuilder() + .column("id", DataTypes.INT().notNull()) + .column("measurement", DataTypes.BIGINT().notNull()) + .column( "ts", DataTypes.ROW( DataTypes.FIELD("tmstmp", DataTypes.TIMESTAMP(3)))) .build(), - prop, - null); + null, + Collections.emptyList(), + prop); catalogManager.createTable( catalogTable, ObjectIdentifier.of("builtin", "default", "events"), false); @@ -1363,14 +1370,15 @@ public class SqlToOperationConverterTest { Map<String, String> prop = new HashMap<>(); prop.put("connector", "values"); prop.put("bounded", "true"); - CatalogTableImpl catalogTable = - new CatalogTableImpl( - TableSchema.builder() - .field("f0", DataTypes.INT()) - .field("f1", DataTypes.VARCHAR(20)) + CatalogTable catalogTable = + CatalogTable.of( + Schema.newBuilder() + .column("f0", DataTypes.INT()) + .column("f1", DataTypes.VARCHAR(20)) .build(), - prop, - null); + null, + Collections.emptyList(), + prop); catalogManager.createTable( catalogTable, ObjectIdentifier.of("builtin", "default", "sourceA"), false); diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/OperationMatchers.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/OperationMatchers.java index 6d83a02..d1d711a 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/OperationMatchers.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/OperationMatchers.java @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.utils; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; @@ -115,18 +116,17 @@ public class OperationMatchers { } /** - * Checks that the schema of {@link CreateTableOperation} is equal to the given {@link - * TableSchema}. + * Checks that the schema of {@link CreateTableOperation} is equal to the given {@link Schema}. * * @param schema TableSchema that the {@link CreateTableOperation} should have * @see #isCreateTableOperation(Matcher[]) */ - public static Matcher<CreateTableOperation> withSchema(TableSchema schema) { - return new FeatureMatcher<CreateTableOperation, TableSchema>( - equalTo(schema), "table schema of the derived table", "table schema") { + public static Matcher<CreateTableOperation> withSchema(Schema schema) { + return new FeatureMatcher<CreateTableOperation, Schema>( + equalTo(schema), "schema of the derived table", "schema") { @Override - protected TableSchema featureValueOf(CreateTableOperation actual) { - return actual.getCatalogTable().getSchema(); + protected Schema featureValueOf(CreateTableOperation actual) { + return actual.getCatalogTable().getUnresolvedSchema(); } }; }