This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new cfb213040a0 [FLINK-31604][table] Reduce usage of CatalogTableImpl in table-planner (#22263) cfb213040a0 is described below commit cfb213040a08021da3a954a1c0e7f94f22a80f1e Author: Sergey Nuyanzin <snuyan...@gmail.com> AuthorDate: Thu Mar 30 23:40:21 2023 +0200 [FLINK-31604][table] Reduce usage of CatalogTableImpl in table-planner (#22263) --- .../apache/flink/table/utils/TableSchemaUtils.java | 22 ++++----- .../flink/table/utils/TableSchemaUtilsTest.java | 26 ++++++----- .../table/planner/catalog/CatalogSchemaTable.java | 21 ++++++--- .../planner/plan/FlinkCalciteCatalogReader.java | 22 ++++++--- .../plan/schema/LegacyCatalogSourceTable.scala | 23 ++++++---- .../planner/catalog/CatalogConstraintTest.java | 53 ++++++++++++---------- .../planner/catalog/CatalogStatisticsTest.java | 37 +++++++-------- .../SqlNodeToOperationConversionTestBase.java | 22 +++++---- .../flink/table/api/TableEnvironmentTest.scala | 6 ++- ...artitionIntoLegacyTableSourceScanRuleTest.scala | 34 ++++++++------ .../batch/sql/PartitionableSinkITCase.scala | 21 ++++++--- .../table/planner/utils/testTableSourceSinks.scala | 27 +++++------ 12 files changed, 184 insertions(+), 130 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java index 90aa855e17d..8a8fc556523 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java @@ -118,19 +118,15 @@ public class TableSchemaUtils { } } - /** Removes time attributes from the {@link ResolvedSchema} and build a {@link TableSchema}. */ - public static TableSchema removeTimeAttributeFromResolvedSchema(ResolvedSchema resolvedSchema) { - return TableSchema.fromResolvedSchema( - new ResolvedSchema( - resolvedSchema.getColumns().stream() - .map( - col -> - col.copy( - DataTypeUtils.removeTimeAttribute( - col.getDataType()))) - .collect(Collectors.toList()), - resolvedSchema.getWatermarkSpecs(), - resolvedSchema.getPrimaryKey().orElse(null))); + /** Removes time attributes from the {@link ResolvedSchema}. */ + public static ResolvedSchema removeTimeAttributeFromResolvedSchema( + ResolvedSchema resolvedSchema) { + return new ResolvedSchema( + resolvedSchema.getColumns().stream() + .map(col -> col.copy(DataTypeUtils.removeTimeAttribute(col.getDataType()))) + .collect(Collectors.toList()), + resolvedSchema.getWatermarkSpecs(), + resolvedSchema.getPrimaryKey().orElse(null)); } /** diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java index 28b7f96d36c..1b7460352e9 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java @@ -19,7 +19,6 @@ package org.apache.flink.table.utils; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableColumn; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Column; @@ -105,15 +104,20 @@ class TableSchemaUtilsTest { UniqueConstraint.primaryKey("test-pk", Collections.singletonList("id"))); assertThat(TableSchemaUtils.removeTimeAttributeFromResolvedSchema(schema)) .isEqualTo( - TableSchema.builder() - .field("id", DataTypes.INT().notNull()) - .field("t", DataTypes.TIMESTAMP(3)) - .field("date", DataTypes.DATE(), "TO_DATE(t)") - .add( - TableColumn.metadata( - "metadata-1", DataTypes.INT(), "metadata", false)) - .watermark("t", "t", rowTimeType) - .primaryKey("test-pk", new String[] {"id"}) - .build()); + new ResolvedSchema( + Arrays.asList( + Column.physical("id", DataTypes.INT().notNull()), + Column.physical("t", DataTypes.TIMESTAMP(3)), + Column.computed( + "date", + new ResolvedExpressionMock( + DataTypes.DATE(), () -> "TO_DATE(t)")), + Column.metadata( + "metadata-1", DataTypes.INT(), "metadata", false)), + Collections.singletonList( + WatermarkSpec.of( + "t", ResolvedExpressionMock.of(rowTimeType, "t"))), + UniqueConstraint.primaryKey( + "test-pk", Collections.singletonList("id")))); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java index b45265e51c1..043e8654884 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java @@ -20,11 +20,11 @@ package org.apache.flink.table.planner.catalog; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ConnectorCatalogTable; import org.apache.flink.table.catalog.ContextResolvedTable; @@ -172,12 +172,19 @@ public class CatalogSchemaTable extends AbstractTable implements TemporalTable { TableSourceFactory.Context context = new TableSourceFactoryContextImpl( contextResolvedTable.getIdentifier(), - new CatalogTableImpl( - TableSchemaUtils.removeTimeAttributeFromResolvedSchema( - originTable.getResolvedSchema()), - originTable.getPartitionKeys(), - originTable.getOptions(), - originTable.getComment()), + new ResolvedCatalogTable( + CatalogTable.of( + Schema.newBuilder() + .fromResolvedSchema( + TableSchemaUtils + .removeTimeAttributeFromResolvedSchema( + originTable + .getResolvedSchema())) + .build(), + originTable.getComment(), + originTable.getPartitionKeys(), + originTable.getOptions()), + originTable.getResolvedSchema()), config, contextResolvedTable.isTemporary()); TableSource<?> source = TableFactoryUtil.findAndCreateTableSource(context); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java index e19c8af5554..b27caff91c1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java @@ -19,16 +19,17 @@ package org.apache.flink.table.planner.plan; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.ConnectorCatalogTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.QueryOperationCatalogView; import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.descriptors.ConnectorDescriptorValidator; import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.factories.TableFactoryUtil; @@ -230,15 +231,22 @@ public class FlinkCalciteCatalogReader extends CalciteCatalogReader { // DataTypeUtils#removeTimeAttribute} ResolvedCatalogTable originTable = schemaTable.getContextResolvedTable().getResolvedTable(); + ResolvedSchema resolvedSchemaWithRemovedTimeAttribute = + TableSchemaUtils.removeTimeAttributeFromResolvedSchema( + originTable.getResolvedSchema()); TableFactoryUtil.findAndCreateTableSource( schemaTable.getContextResolvedTable().getCatalog().orElse(null), schemaTable.getContextResolvedTable().getIdentifier(), - new CatalogTableImpl( - TableSchemaUtils.removeTimeAttributeFromResolvedSchema( - originTable.getResolvedSchema()), - originTable.getPartitionKeys(), - originTable.getOptions(), - originTable.getComment()), + new ResolvedCatalogTable( + CatalogTable.of( + Schema.newBuilder() + .fromResolvedSchema( + resolvedSchemaWithRemovedTimeAttribute) + .build(), + originTable.getComment(), + originTable.getPartitionKeys(), + originTable.getOptions()), + resolvedSchemaWithRemovedTimeAttribute), new Configuration(), schemaTable.isTemporary()); // success, then we will use the legacy factories diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala index 29535ea7a13..474487f9d39 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala @@ -18,10 +18,10 @@ package org.apache.flink.table.planner.plan.schema import org.apache.flink.configuration.ReadableConfig -import org.apache.flink.table.api.{TableException, ValidationException} +import org.apache.flink.table.api.{Schema, TableException, ValidationException} import org.apache.flink.table.api.TableColumn.ComputedColumn import org.apache.flink.table.api.config.TableConfigOptions -import org.apache.flink.table.catalog.{CatalogTable, CatalogTableImpl} +import org.apache.flink.table.catalog.{CatalogTable, ResolvedCatalogTable} import org.apache.flink.table.factories.TableFactoryUtil import org.apache.flink.table.planner.JMap import org.apache.flink.table.planner.calcite.{FlinkRelBuilder, FlinkTypeFactory} @@ -174,15 +174,22 @@ class LegacyCatalogSourceTable[T]( catalogTable } val identifier = schemaTable.getContextResolvedTable.getIdentifier + val resolvedSchemaWithRemovedTimeAttribute = + TableSchemaUtils.removeTimeAttributeFromResolvedSchema( + schemaTable.getContextResolvedTable.getResolvedSchema) val tableSource = TableFactoryUtil.findAndCreateTableSource( schemaTable.getContextResolvedTable.getCatalog.orElse(null), identifier, - new CatalogTableImpl( - TableSchemaUtils.removeTimeAttributeFromResolvedSchema( - schemaTable.getContextResolvedTable.getResolvedSchema), - tableToFind.getPartitionKeys, - tableToFind.getOptions, - tableToFind.getComment), + new ResolvedCatalogTable( + CatalogTable.of( + Schema.newBuilder + .fromResolvedSchema(resolvedSchemaWithRemovedTimeAttribute) + .build(), + tableToFind.getComment, + tableToFind.getPartitionKeys, + tableToFind.getOptions + ), + resolvedSchemaWithRemovedTimeAttribute), conf, schemaTable.isTemporary ) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java index ccc6b8b50db..68aa5390e8b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java @@ -20,14 +20,16 @@ package org.apache.flink.table.planner.catalog; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.Catalog; -import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery; import org.apache.flink.table.planner.utils.TableTestUtil; -import org.apache.flink.table.types.DataType; import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; @@ -36,6 +38,8 @@ import org.apache.calcite.util.ImmutableBitSet; import org.junit.Before; import org.junit.Test; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -59,22 +63,24 @@ public class CatalogConstraintTest { @Test public void testWithPrimaryKey() throws Exception { - TableSchema tableSchema = - TableSchema.builder() - .fields( - new String[] {"a", "b", "c"}, - new DataType[] { - DataTypes.STRING(), - DataTypes.BIGINT().notNull(), - DataTypes.INT() - }) - .primaryKey("b") + final Schema tableSchema = + Schema.newBuilder() + .fromResolvedSchema( + new ResolvedSchema( + Arrays.asList( + Column.physical("a", DataTypes.STRING()), + Column.physical("b", DataTypes.BIGINT().notNull()), + Column.physical("c", DataTypes.INT())), + Collections.emptyList(), + UniqueConstraint.primaryKey( + "primary_constraint", + Collections.singletonList("b")))) .build(); Map<String, String> properties = buildCatalogTableProperties(tableSchema); catalog.createTable( new ObjectPath(databaseName, "T1"), - new CatalogTableImpl(tableSchema, properties, ""), + CatalogTable.of(tableSchema, "", Collections.emptyList(), properties), false); RelNode t1 = TableTestUtil.toRelNode(tEnv.sqlQuery("select * from T1")); @@ -85,19 +91,20 @@ public class CatalogConstraintTest { @Test public void testWithoutPrimaryKey() throws Exception { - TableSchema tableSchema = - TableSchema.builder() - .fields( - new String[] {"a", "b", "c"}, - new DataType[] { - DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.INT() - }) + + final Schema tableSchema = + Schema.newBuilder() + .fromResolvedSchema( + ResolvedSchema.of( + Column.physical("a", DataTypes.BIGINT()), + Column.physical("b", DataTypes.STRING()), + Column.physical("c", DataTypes.INT()))) .build(); Map<String, String> properties = buildCatalogTableProperties(tableSchema); catalog.createTable( new ObjectPath(databaseName, "T1"), - new CatalogTableImpl(tableSchema, properties, ""), + CatalogTable.of(tableSchema, "", Collections.emptyList(), properties), false); RelNode t1 = TableTestUtil.toRelNode(tEnv.sqlQuery("select * from T1")); @@ -106,7 +113,7 @@ public class CatalogConstraintTest { assertThat(mq.getUniqueKeys(t1)).isEqualTo(ImmutableSet.of()); } - private Map<String, String> buildCatalogTableProperties(TableSchema tableSchema) { + private Map<String, String> buildCatalogTableProperties(Schema tableSchema) { Map<String, String> properties = new HashMap<>(); properties.put("connector.type", "filesystem"); properties.put("connector.property-version", "1"); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java index 2b5af2b03b3..8036c88fe1e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java @@ -20,15 +20,17 @@ package org.apache.flink.table.planner.catalog; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogPartitionImpl; import org.apache.flink.table.catalog.CatalogPartitionSpec; -import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ConnectorCatalogTable; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.catalog.exceptions.TablePartitionedException; import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; @@ -47,7 +49,6 @@ import org.apache.flink.table.planner.plan.stats.ValueInterval$; import org.apache.flink.table.planner.utils.TableTestUtil; import org.apache.flink.table.planner.utils.TestPartitionableSourceFactory; import org.apache.flink.table.planner.utils.TestTableSource; -import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.DateTimeUtils; import org.apache.calcite.rel.RelNode; @@ -57,6 +58,7 @@ import org.junit.Test; import java.math.BigDecimal; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -66,20 +68,17 @@ import static org.assertj.core.api.Assertions.assertThat; /** Test for Catalog Statistics. */ public class CatalogStatisticsTest { - private String databaseName = "default_database"; - - private TableSchema tableSchema = - TableSchema.builder() - .fields( - new String[] {"b1", "l2", "s3", "d4", "dd5"}, - new DataType[] { - DataTypes.BOOLEAN(), - DataTypes.BIGINT(), - DataTypes.STRING(), - DataTypes.DATE(), - DataTypes.DOUBLE() - }) - .build(); + private final String databaseName = "default_database"; + + private final ResolvedSchema resolvedSchema = + ResolvedSchema.physical( + Arrays.asList("b1", "l2", "s3", "d4", "dd5"), + Arrays.asList( + DataTypes.BOOLEAN(), + DataTypes.BIGINT(), + DataTypes.STRING(), + DataTypes.DATE(), + DataTypes.DOUBLE())); private TableEnvironment tEnv; private Catalog catalog; @@ -94,6 +93,7 @@ public class CatalogStatisticsTest { @Test public void testGetStatsFromCatalogForConnectorCatalogTable() throws Exception { + final TableSchema tableSchema = TableSchema.fromResolvedSchema(resolvedSchema); catalog.createTable( new ObjectPath(databaseName, "T1"), ConnectorCatalogTable.source(new TestTableSource(true, tableSchema), true), @@ -121,13 +121,14 @@ public class CatalogStatisticsTest { properties.put("format.property-version", "1"); properties.put("format.field-delimiter", ";"); + final Schema schema = Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(); catalog.createTable( new ObjectPath(databaseName, "T1"), - new CatalogTableImpl(tableSchema, properties, ""), + CatalogTable.of(schema, "", Collections.emptyList(), properties), false); catalog.createTable( new ObjectPath(databaseName, "T2"), - new CatalogTableImpl(tableSchema, properties, ""), + CatalogTable.of(schema, "", Collections.emptyList(), properties), false); alterTableStatistics(catalog, "T1"); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java index 72e6b6d40cc..68efa48899e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java @@ -22,16 +22,17 @@ import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; @@ -102,16 +103,19 @@ public class SqlNodeToOperationConversionTestBase { final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1"); final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2"); - final TableSchema tableSchema = - TableSchema.builder() - .field("a", DataTypes.BIGINT()) - .field("b", DataTypes.VARCHAR(Integer.MAX_VALUE)) - .field("c", DataTypes.INT()) - .field("d", DataTypes.VARCHAR(Integer.MAX_VALUE)) + final Schema tableSchema = + Schema.newBuilder() + .fromResolvedSchema( + ResolvedSchema.of( + Column.physical("a", DataTypes.BIGINT()), + Column.physical("b", DataTypes.VARCHAR(Integer.MAX_VALUE)), + Column.physical("c", DataTypes.INT()), + Column.physical("d", DataTypes.VARCHAR(Integer.MAX_VALUE)))) .build(); Map<String, String> options = new HashMap<>(); options.put("connector", "COLLECTION"); - final CatalogTable catalogTable = new CatalogTableImpl(tableSchema, options, ""); + final CatalogTable catalogTable = + CatalogTable.of(tableSchema, "", Collections.emptyList(), options); catalog.createTable(path1, catalogTable, true); catalog.createTable(path2, catalogTable, true); } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala index 202013403c9..bd6d2a84e1a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala @@ -2802,7 +2802,11 @@ class TableEnvironmentTest { table: CatalogBaseTable): CatalogBaseTable = { numTempTable += 1 if (table.isInstanceOf[CatalogTable]) { - new CatalogTableImpl(table.getSchema, table.getOptions, tableComment) + CatalogTable.of( + table.getUnresolvedSchema, + tableComment, + Collections.emptyList(), + table.getOptions) } else { val view = table.asInstanceOf[CatalogView] CatalogView.of( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRuleTest.scala index 86e90330f39..206ae9a30f5 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRuleTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRuleTest.scala @@ -17,7 +17,9 @@ */ package org.apache.flink.table.planner.plan.rules.logical -import org.apache.flink.table.api.{DataTypes, TableSchema} +import org.apache.flink.table.api.{DataTypes, Schema} +import org.apache.flink.table.catalog.{Column, ResolvedSchema} +import org.apache.flink.table.expressions.utils.ResolvedExpressionMock import org.apache.flink.table.planner.expressions.utils.Func1 import org.apache.flink.table.planner.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE} import org.apache.flink.table.planner.utils.{BatchTableTestUtil, TableConfigUtils, TableTestBase, TestPartitionableSourceFactory} @@ -58,21 +60,25 @@ class PushPartitionIntoLegacyTableSourceScanRuleTest( .build() ) - val tableSchema = TableSchema - .builder() - .field("id", DataTypes.INT()) - .field("name", DataTypes.STRING()) - .field("part1", DataTypes.STRING()) - .field("part2", DataTypes.INT()) + val tableSchema = Schema + .newBuilder() + .fromResolvedSchema(ResolvedSchema.of( + Column.physical("id", DataTypes.INT()), + Column.physical("name", DataTypes.STRING()), + Column.physical("part1", DataTypes.STRING()), + Column.physical("part2", DataTypes.INT()) + )) .build() - val tableSchema2 = TableSchema - .builder() - .field("id", DataTypes.INT()) - .field("name", DataTypes.STRING()) - .field("part1", DataTypes.STRING()) - .field("part2", DataTypes.INT()) - .field("virtualField", DataTypes.INT(), "`part2` + 1") + val tableSchema2 = Schema + .newBuilder() + .fromResolvedSchema(ResolvedSchema.of( + Column.physical("id", DataTypes.INT()), + Column.physical("name", DataTypes.STRING()), + Column.physical("part1", DataTypes.STRING()), + Column.physical("part2", DataTypes.INT()), + Column.computed("virtualField", ResolvedExpressionMock.of(DataTypes.INT(), "`part2` + 1")) + )) .build() TestPartitionableSourceFactory.createTemporaryTable( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala index 13cf6ab7483..a16b7c6d540 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala @@ -24,9 +24,9 @@ import org.apache.flink.configuration.{BatchExecutionOptions, Configuration} import org.apache.flink.connector.file.table.FileSystemConnectorOptions import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} import org.apache.flink.streaming.api.functions.sink.RichSinkFunction -import org.apache.flink.table.api.{TableEnvironment, TableException, TableSchema, ValidationException} +import org.apache.flink.table.api.{Schema, TableEnvironment, TableException, TableSchema, ValidationException} import org.apache.flink.table.api.config.ExecutionConfigOptions -import org.apache.flink.table.catalog.{CatalogTableImpl, ObjectPath} +import org.apache.flink.table.catalog.{CatalogTable, ObjectPath, ResolvedSchema} import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE import org.apache.flink.table.descriptors.DescriptorProperties import org.apache.flink.table.descriptors.Schema.SCHEMA @@ -37,6 +37,7 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TestData._ import org.apache.flink.table.sinks.{PartitionableTableSink, StreamTableSink, TableSink} import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType} +import org.apache.flink.table.types.utils.TypeConversions import org.apache.flink.table.utils.LegacyRowResource import org.apache.flink.types.Row @@ -318,11 +319,19 @@ object PartitionableSinkITCase { properties.putString("partition-column." + i, part) } - val table = new CatalogTableImpl( - new TableSchema(Array("a", "b", "c"), rowType.getFieldTypes), + val table = CatalogTable.of( + Schema + .newBuilder() + .fromResolvedSchema( + ResolvedSchema.physical( + Array("a", "b", "c"), + TypeConversions.fromLegacyInfoToDataType(rowType.getFieldTypes) + ) + ) + .build(), + "", util.Arrays.asList[String](partitionColumns: _*), - properties.asMap(), - "" + properties.asMap() ) tEnv .getCatalog(tEnv.getCurrentCatalog) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala index 459788d55e0..dcd3f200a43 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala @@ -26,10 +26,9 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.core.io.InputSplit import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.table.api import org.apache.flink.table.api.{DataTypes, TableEnvironment, TableSchema} import org.apache.flink.table.api.internal.TableEnvironmentInternal -import org.apache.flink.table.catalog.{CatalogPartitionImpl, CatalogPartitionSpec, CatalogTableImpl, ObjectPath} +import org.apache.flink.table.catalog._ import org.apache.flink.table.descriptors._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR, CONNECTOR_TYPE} import org.apache.flink.table.expressions.{CallExpression, Expression, FieldReferenceExpression, ValueLiteralExpression} @@ -42,7 +41,7 @@ import org.apache.flink.table.planner.plan.hint.OptionsHintTest.IS_BOUNDED import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.EventTimeSourceFunction import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo -import org.apache.flink.table.sinks.{CsvAppendTableSinkFactory, CsvBatchTableSinkFactory, StreamTableSink, TableSink} +import org.apache.flink.table.sinks.{CsvBatchTableSinkFactory, StreamTableSink, TableSink} import org.apache.flink.table.sources._ import org.apache.flink.table.sources.tsextractors.ExistingField import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks} @@ -1143,12 +1142,14 @@ class TestPartitionableSourceFactory extends TableSourceFactory[Row] { } object TestPartitionableSourceFactory { - private val tableSchema: TableSchema = TableSchema - .builder() - .field("id", DataTypes.INT()) - .field("name", DataTypes.STRING()) - .field("part1", DataTypes.STRING()) - .field("part2", DataTypes.INT()) + private val tableSchema: org.apache.flink.table.api.Schema = org.apache.flink.table.api.Schema + .newBuilder() + .fromResolvedSchema(ResolvedSchema.of( + Column.physical("id", DataTypes.INT()), + Column.physical("name", DataTypes.STRING()), + Column.physical("part1", DataTypes.STRING()), + Column.physical("part2", DataTypes.INT()) + )) .build() /** For java invoking. */ @@ -1160,7 +1161,7 @@ object TestPartitionableSourceFactory { tEnv: TableEnvironment, tableName: String, isBounded: Boolean, - tableSchema: TableSchema = tableSchema, + tableSchema: org.apache.flink.table.api.Schema = tableSchema, remainingPartitions: JList[JMap[String, String]] = null, sourceFetchPartitions: Boolean = false): Unit = { val properties = new DescriptorProperties() @@ -1177,11 +1178,11 @@ object TestPartitionableSourceFactory { } } - val table = new CatalogTableImpl( + val table = CatalogTable.of( tableSchema, + "", util.Arrays.asList[String]("part1", "part2"), - properties.asMap(), - "" + properties.asMap() ) val catalog = tEnv.getCatalog(tEnv.getCurrentCatalog).get() val path = new ObjectPath(tEnv.getCurrentDatabase, tableName)