This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cfb213040a0 [FLINK-31604][table] Reduce usage of CatalogTableImpl in 
table-planner (#22263)
cfb213040a0 is described below

commit cfb213040a08021da3a954a1c0e7f94f22a80f1e
Author: Sergey Nuyanzin <snuyan...@gmail.com>
AuthorDate: Thu Mar 30 23:40:21 2023 +0200

    [FLINK-31604][table] Reduce usage of CatalogTableImpl in table-planner 
(#22263)
---
 .../apache/flink/table/utils/TableSchemaUtils.java | 22 ++++-----
 .../flink/table/utils/TableSchemaUtilsTest.java    | 26 ++++++-----
 .../table/planner/catalog/CatalogSchemaTable.java  | 21 ++++++---
 .../planner/plan/FlinkCalciteCatalogReader.java    | 22 ++++++---
 .../plan/schema/LegacyCatalogSourceTable.scala     | 23 ++++++----
 .../planner/catalog/CatalogConstraintTest.java     | 53 ++++++++++++----------
 .../planner/catalog/CatalogStatisticsTest.java     | 37 +++++++--------
 .../SqlNodeToOperationConversionTestBase.java      | 22 +++++----
 .../flink/table/api/TableEnvironmentTest.scala     |  6 ++-
 ...artitionIntoLegacyTableSourceScanRuleTest.scala | 34 ++++++++------
 .../batch/sql/PartitionableSinkITCase.scala        | 21 ++++++---
 .../table/planner/utils/testTableSourceSinks.scala | 27 +++++------
 12 files changed, 184 insertions(+), 130 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
index 90aa855e17d..8a8fc556523 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
@@ -118,19 +118,15 @@ public class TableSchemaUtils {
         }
     }
 
-    /** Removes time attributes from the {@link ResolvedSchema} and build a 
{@link TableSchema}. */
-    public static TableSchema 
removeTimeAttributeFromResolvedSchema(ResolvedSchema resolvedSchema) {
-        return TableSchema.fromResolvedSchema(
-                new ResolvedSchema(
-                        resolvedSchema.getColumns().stream()
-                                .map(
-                                        col ->
-                                                col.copy(
-                                                        
DataTypeUtils.removeTimeAttribute(
-                                                                
col.getDataType())))
-                                .collect(Collectors.toList()),
-                        resolvedSchema.getWatermarkSpecs(),
-                        resolvedSchema.getPrimaryKey().orElse(null)));
+    /** Removes time attributes from the {@link ResolvedSchema}. */
+    public static ResolvedSchema removeTimeAttributeFromResolvedSchema(
+            ResolvedSchema resolvedSchema) {
+        return new ResolvedSchema(
+                resolvedSchema.getColumns().stream()
+                        .map(col -> 
col.copy(DataTypeUtils.removeTimeAttribute(col.getDataType())))
+                        .collect(Collectors.toList()),
+                resolvedSchema.getWatermarkSpecs(),
+                resolvedSchema.getPrimaryKey().orElse(null));
     }
 
     /**
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
index 28b7f96d36c..1b7460352e9 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.utils;
 
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableColumn;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Column;
@@ -105,15 +104,20 @@ class TableSchemaUtilsTest {
                         UniqueConstraint.primaryKey("test-pk", 
Collections.singletonList("id")));
         
assertThat(TableSchemaUtils.removeTimeAttributeFromResolvedSchema(schema))
                 .isEqualTo(
-                        TableSchema.builder()
-                                .field("id", DataTypes.INT().notNull())
-                                .field("t", DataTypes.TIMESTAMP(3))
-                                .field("date", DataTypes.DATE(), "TO_DATE(t)")
-                                .add(
-                                        TableColumn.metadata(
-                                                "metadata-1", DataTypes.INT(), 
"metadata", false))
-                                .watermark("t", "t", rowTimeType)
-                                .primaryKey("test-pk", new String[] {"id"})
-                                .build());
+                        new ResolvedSchema(
+                                Arrays.asList(
+                                        Column.physical("id", 
DataTypes.INT().notNull()),
+                                        Column.physical("t", 
DataTypes.TIMESTAMP(3)),
+                                        Column.computed(
+                                                "date",
+                                                new ResolvedExpressionMock(
+                                                        DataTypes.DATE(), () 
-> "TO_DATE(t)")),
+                                        Column.metadata(
+                                                "metadata-1", DataTypes.INT(), 
"metadata", false)),
+                                Collections.singletonList(
+                                        WatermarkSpec.of(
+                                                "t", 
ResolvedExpressionMock.of(rowTimeType, "t"))),
+                                UniqueConstraint.primaryKey(
+                                        "test-pk", 
Collections.singletonList("id"))));
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
index b45265e51c1..043e8654884 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
@@ -20,11 +20,11 @@ package org.apache.flink.table.planner.catalog;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ConnectorCatalogTable;
 import org.apache.flink.table.catalog.ContextResolvedTable;
@@ -172,12 +172,19 @@ public class CatalogSchemaTable extends AbstractTable 
implements TemporalTable {
                 TableSourceFactory.Context context =
                         new TableSourceFactoryContextImpl(
                                 contextResolvedTable.getIdentifier(),
-                                new CatalogTableImpl(
-                                        
TableSchemaUtils.removeTimeAttributeFromResolvedSchema(
-                                                
originTable.getResolvedSchema()),
-                                        originTable.getPartitionKeys(),
-                                        originTable.getOptions(),
-                                        originTable.getComment()),
+                                new ResolvedCatalogTable(
+                                        CatalogTable.of(
+                                                Schema.newBuilder()
+                                                        .fromResolvedSchema(
+                                                                
TableSchemaUtils
+                                                                        
.removeTimeAttributeFromResolvedSchema(
+                                                                               
 originTable
+                                                                               
         .getResolvedSchema()))
+                                                        .build(),
+                                                originTable.getComment(),
+                                                originTable.getPartitionKeys(),
+                                                originTable.getOptions()),
+                                        originTable.getResolvedSchema()),
                                 config,
                                 contextResolvedTable.isTemporary());
                 TableSource<?> source = 
TableFactoryUtil.findAndCreateTableSource(context);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java
index e19c8af5554..b27caff91c1 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java
@@ -19,16 +19,17 @@
 package org.apache.flink.table.planner.plan;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.ConnectorCatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.QueryOperationCatalogView;
 import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.TableFactoryUtil;
@@ -230,15 +231,22 @@ public class FlinkCalciteCatalogReader extends 
CalciteCatalogReader {
                 // DataTypeUtils#removeTimeAttribute}
                 ResolvedCatalogTable originTable =
                         
schemaTable.getContextResolvedTable().getResolvedTable();
+                ResolvedSchema resolvedSchemaWithRemovedTimeAttribute =
+                        TableSchemaUtils.removeTimeAttributeFromResolvedSchema(
+                                originTable.getResolvedSchema());
                 TableFactoryUtil.findAndCreateTableSource(
                         
schemaTable.getContextResolvedTable().getCatalog().orElse(null),
                         schemaTable.getContextResolvedTable().getIdentifier(),
-                        new CatalogTableImpl(
-                                
TableSchemaUtils.removeTimeAttributeFromResolvedSchema(
-                                        originTable.getResolvedSchema()),
-                                originTable.getPartitionKeys(),
-                                originTable.getOptions(),
-                                originTable.getComment()),
+                        new ResolvedCatalogTable(
+                                CatalogTable.of(
+                                        Schema.newBuilder()
+                                                .fromResolvedSchema(
+                                                        
resolvedSchemaWithRemovedTimeAttribute)
+                                                .build(),
+                                        originTable.getComment(),
+                                        originTable.getPartitionKeys(),
+                                        originTable.getOptions()),
+                                resolvedSchemaWithRemovedTimeAttribute),
                         new Configuration(),
                         schemaTable.isTemporary());
                 // success, then we will use the legacy factories
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
index 29535ea7a13..474487f9d39 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
@@ -18,10 +18,10 @@
 package org.apache.flink.table.planner.plan.schema
 
 import org.apache.flink.configuration.ReadableConfig
-import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.api.{Schema, TableException, ValidationException}
 import org.apache.flink.table.api.TableColumn.ComputedColumn
 import org.apache.flink.table.api.config.TableConfigOptions
-import org.apache.flink.table.catalog.{CatalogTable, CatalogTableImpl}
+import org.apache.flink.table.catalog.{CatalogTable, ResolvedCatalogTable}
 import org.apache.flink.table.factories.TableFactoryUtil
 import org.apache.flink.table.planner.JMap
 import org.apache.flink.table.planner.calcite.{FlinkRelBuilder, 
FlinkTypeFactory}
@@ -174,15 +174,22 @@ class LegacyCatalogSourceTable[T](
       catalogTable
     }
     val identifier = schemaTable.getContextResolvedTable.getIdentifier
+    val resolvedSchemaWithRemovedTimeAttribute =
+      TableSchemaUtils.removeTimeAttributeFromResolvedSchema(
+        schemaTable.getContextResolvedTable.getResolvedSchema)
     val tableSource = TableFactoryUtil.findAndCreateTableSource(
       schemaTable.getContextResolvedTable.getCatalog.orElse(null),
       identifier,
-      new CatalogTableImpl(
-        TableSchemaUtils.removeTimeAttributeFromResolvedSchema(
-          schemaTable.getContextResolvedTable.getResolvedSchema),
-        tableToFind.getPartitionKeys,
-        tableToFind.getOptions,
-        tableToFind.getComment),
+      new ResolvedCatalogTable(
+        CatalogTable.of(
+          Schema.newBuilder
+            .fromResolvedSchema(resolvedSchemaWithRemovedTimeAttribute)
+            .build(),
+          tableToFind.getComment,
+          tableToFind.getPartitionKeys,
+          tableToFind.getOptions
+        ),
+        resolvedSchemaWithRemovedTimeAttribute),
       conf,
       schemaTable.isTemporary
     )
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java
index ccc6b8b50db..68aa5390e8b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java
@@ -20,14 +20,16 @@ package org.apache.flink.table.planner.catalog;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
 import org.apache.flink.table.planner.utils.TableTestUtil;
-import org.apache.flink.table.types.DataType;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
 
@@ -36,6 +38,8 @@ import org.apache.calcite.util.ImmutableBitSet;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -59,22 +63,24 @@ public class CatalogConstraintTest {
 
     @Test
     public void testWithPrimaryKey() throws Exception {
-        TableSchema tableSchema =
-                TableSchema.builder()
-                        .fields(
-                                new String[] {"a", "b", "c"},
-                                new DataType[] {
-                                    DataTypes.STRING(),
-                                    DataTypes.BIGINT().notNull(),
-                                    DataTypes.INT()
-                                })
-                        .primaryKey("b")
+        final Schema tableSchema =
+                Schema.newBuilder()
+                        .fromResolvedSchema(
+                                new ResolvedSchema(
+                                        Arrays.asList(
+                                                Column.physical("a", 
DataTypes.STRING()),
+                                                Column.physical("b", 
DataTypes.BIGINT().notNull()),
+                                                Column.physical("c", 
DataTypes.INT())),
+                                        Collections.emptyList(),
+                                        UniqueConstraint.primaryKey(
+                                                "primary_constraint",
+                                                
Collections.singletonList("b"))))
                         .build();
         Map<String, String> properties = 
buildCatalogTableProperties(tableSchema);
 
         catalog.createTable(
                 new ObjectPath(databaseName, "T1"),
-                new CatalogTableImpl(tableSchema, properties, ""),
+                CatalogTable.of(tableSchema, "", Collections.emptyList(), 
properties),
                 false);
 
         RelNode t1 = TableTestUtil.toRelNode(tEnv.sqlQuery("select * from 
T1"));
@@ -85,19 +91,20 @@ public class CatalogConstraintTest {
 
     @Test
     public void testWithoutPrimaryKey() throws Exception {
-        TableSchema tableSchema =
-                TableSchema.builder()
-                        .fields(
-                                new String[] {"a", "b", "c"},
-                                new DataType[] {
-                                    DataTypes.BIGINT(), DataTypes.STRING(), 
DataTypes.INT()
-                                })
+
+        final Schema tableSchema =
+                Schema.newBuilder()
+                        .fromResolvedSchema(
+                                ResolvedSchema.of(
+                                        Column.physical("a", 
DataTypes.BIGINT()),
+                                        Column.physical("b", 
DataTypes.STRING()),
+                                        Column.physical("c", DataTypes.INT())))
                         .build();
         Map<String, String> properties = 
buildCatalogTableProperties(tableSchema);
 
         catalog.createTable(
                 new ObjectPath(databaseName, "T1"),
-                new CatalogTableImpl(tableSchema, properties, ""),
+                CatalogTable.of(tableSchema, "", Collections.emptyList(), 
properties),
                 false);
 
         RelNode t1 = TableTestUtil.toRelNode(tEnv.sqlQuery("select * from 
T1"));
@@ -106,7 +113,7 @@ public class CatalogConstraintTest {
         assertThat(mq.getUniqueKeys(t1)).isEqualTo(ImmutableSet.of());
     }
 
-    private Map<String, String> buildCatalogTableProperties(TableSchema 
tableSchema) {
+    private Map<String, String> buildCatalogTableProperties(Schema 
tableSchema) {
         Map<String, String> properties = new HashMap<>();
         properties.put("connector.type", "filesystem");
         properties.put("connector.property-version", "1");
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java
index 2b5af2b03b3..8036c88fe1e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java
@@ -20,15 +20,17 @@ package org.apache.flink.table.planner.catalog;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogPartitionImpl;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ConnectorCatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
@@ -47,7 +49,6 @@ import 
org.apache.flink.table.planner.plan.stats.ValueInterval$;
 import org.apache.flink.table.planner.utils.TableTestUtil;
 import org.apache.flink.table.planner.utils.TestPartitionableSourceFactory;
 import org.apache.flink.table.planner.utils.TestTableSource;
-import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.DateTimeUtils;
 
 import org.apache.calcite.rel.RelNode;
@@ -57,6 +58,7 @@ import org.junit.Test;
 
 import java.math.BigDecimal;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -66,20 +68,17 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Test for Catalog Statistics. */
 public class CatalogStatisticsTest {
 
-    private String databaseName = "default_database";
-
-    private TableSchema tableSchema =
-            TableSchema.builder()
-                    .fields(
-                            new String[] {"b1", "l2", "s3", "d4", "dd5"},
-                            new DataType[] {
-                                DataTypes.BOOLEAN(),
-                                DataTypes.BIGINT(),
-                                DataTypes.STRING(),
-                                DataTypes.DATE(),
-                                DataTypes.DOUBLE()
-                            })
-                    .build();
+    private final String databaseName = "default_database";
+
+    private final ResolvedSchema resolvedSchema =
+            ResolvedSchema.physical(
+                    Arrays.asList("b1", "l2", "s3", "d4", "dd5"),
+                    Arrays.asList(
+                            DataTypes.BOOLEAN(),
+                            DataTypes.BIGINT(),
+                            DataTypes.STRING(),
+                            DataTypes.DATE(),
+                            DataTypes.DOUBLE()));
 
     private TableEnvironment tEnv;
     private Catalog catalog;
@@ -94,6 +93,7 @@ public class CatalogStatisticsTest {
 
     @Test
     public void testGetStatsFromCatalogForConnectorCatalogTable() throws 
Exception {
+        final TableSchema tableSchema = 
TableSchema.fromResolvedSchema(resolvedSchema);
         catalog.createTable(
                 new ObjectPath(databaseName, "T1"),
                 ConnectorCatalogTable.source(new TestTableSource(true, 
tableSchema), true),
@@ -121,13 +121,14 @@ public class CatalogStatisticsTest {
         properties.put("format.property-version", "1");
         properties.put("format.field-delimiter", ";");
 
+        final Schema schema = 
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build();
         catalog.createTable(
                 new ObjectPath(databaseName, "T1"),
-                new CatalogTableImpl(tableSchema, properties, ""),
+                CatalogTable.of(schema, "", Collections.emptyList(), 
properties),
                 false);
         catalog.createTable(
                 new ObjectPath(databaseName, "T2"),
-                new CatalogTableImpl(tableSchema, properties, ""),
+                CatalogTable.of(schema, "", Collections.emptyList(), 
properties),
                 false);
 
         alterTableStatistics(catalog, "T1");
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
index 72e6b6d40cc..68efa48899e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
@@ -22,16 +22,17 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
@@ -102,16 +103,19 @@ public class SqlNodeToOperationConversionTestBase {
 
         final ObjectPath path1 = new 
ObjectPath(catalogManager.getCurrentDatabase(), "t1");
         final ObjectPath path2 = new 
ObjectPath(catalogManager.getCurrentDatabase(), "t2");
-        final TableSchema tableSchema =
-                TableSchema.builder()
-                        .field("a", DataTypes.BIGINT())
-                        .field("b", DataTypes.VARCHAR(Integer.MAX_VALUE))
-                        .field("c", DataTypes.INT())
-                        .field("d", DataTypes.VARCHAR(Integer.MAX_VALUE))
+        final Schema tableSchema =
+                Schema.newBuilder()
+                        .fromResolvedSchema(
+                                ResolvedSchema.of(
+                                        Column.physical("a", 
DataTypes.BIGINT()),
+                                        Column.physical("b", 
DataTypes.VARCHAR(Integer.MAX_VALUE)),
+                                        Column.physical("c", DataTypes.INT()),
+                                        Column.physical("d", 
DataTypes.VARCHAR(Integer.MAX_VALUE))))
                         .build();
         Map<String, String> options = new HashMap<>();
         options.put("connector", "COLLECTION");
-        final CatalogTable catalogTable = new CatalogTableImpl(tableSchema, 
options, "");
+        final CatalogTable catalogTable =
+                CatalogTable.of(tableSchema, "", Collections.emptyList(), 
options);
         catalog.createTable(path1, catalogTable, true);
         catalog.createTable(path2, catalogTable, true);
     }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 202013403c9..bd6d2a84e1a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -2802,7 +2802,11 @@ class TableEnvironmentTest {
         table: CatalogBaseTable): CatalogBaseTable = {
       numTempTable += 1
       if (table.isInstanceOf[CatalogTable]) {
-        new CatalogTableImpl(table.getSchema, table.getOptions, tableComment)
+        CatalogTable.of(
+          table.getUnresolvedSchema,
+          tableComment,
+          Collections.emptyList(),
+          table.getOptions)
       } else {
         val view = table.asInstanceOf[CatalogView]
         CatalogView.of(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRuleTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRuleTest.scala
index 86e90330f39..206ae9a30f5 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRuleTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRuleTest.scala
@@ -17,7 +17,9 @@
  */
 package org.apache.flink.table.planner.plan.rules.logical
 
-import org.apache.flink.table.api.{DataTypes, TableSchema}
+import org.apache.flink.table.api.{DataTypes, Schema}
+import org.apache.flink.table.catalog.{Column, ResolvedSchema}
+import org.apache.flink.table.expressions.utils.ResolvedExpressionMock
 import org.apache.flink.table.planner.expressions.utils.Func1
 import 
org.apache.flink.table.planner.plan.optimize.program.{FlinkBatchProgram, 
FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
 import org.apache.flink.table.planner.utils.{BatchTableTestUtil, 
TableConfigUtils, TableTestBase, TestPartitionableSourceFactory}
@@ -58,21 +60,25 @@ class PushPartitionIntoLegacyTableSourceScanRuleTest(
         .build()
     )
 
-    val tableSchema = TableSchema
-      .builder()
-      .field("id", DataTypes.INT())
-      .field("name", DataTypes.STRING())
-      .field("part1", DataTypes.STRING())
-      .field("part2", DataTypes.INT())
+    val tableSchema = Schema
+      .newBuilder()
+      .fromResolvedSchema(ResolvedSchema.of(
+        Column.physical("id", DataTypes.INT()),
+        Column.physical("name", DataTypes.STRING()),
+        Column.physical("part1", DataTypes.STRING()),
+        Column.physical("part2", DataTypes.INT())
+      ))
       .build()
 
-    val tableSchema2 = TableSchema
-      .builder()
-      .field("id", DataTypes.INT())
-      .field("name", DataTypes.STRING())
-      .field("part1", DataTypes.STRING())
-      .field("part2", DataTypes.INT())
-      .field("virtualField", DataTypes.INT(), "`part2` + 1")
+    val tableSchema2 = Schema
+      .newBuilder()
+      .fromResolvedSchema(ResolvedSchema.of(
+        Column.physical("id", DataTypes.INT()),
+        Column.physical("name", DataTypes.STRING()),
+        Column.physical("part1", DataTypes.STRING()),
+        Column.physical("part2", DataTypes.INT()),
+        Column.computed("virtualField", 
ResolvedExpressionMock.of(DataTypes.INT(), "`part2` + 1"))
+      ))
       .build()
 
     TestPartitionableSourceFactory.createTemporaryTable(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
index 13cf6ab7483..a16b7c6d540 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
@@ -24,9 +24,9 @@ import org.apache.flink.configuration.{BatchExecutionOptions, 
Configuration}
 import org.apache.flink.connector.file.table.FileSystemConnectorOptions
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-import org.apache.flink.table.api.{TableEnvironment, TableException, 
TableSchema, ValidationException}
+import org.apache.flink.table.api.{Schema, TableEnvironment, TableException, 
TableSchema, ValidationException}
 import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.catalog.{CatalogTableImpl, ObjectPath}
+import org.apache.flink.table.catalog.{CatalogTable, ObjectPath, 
ResolvedSchema}
 import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE
 import org.apache.flink.table.descriptors.DescriptorProperties
 import org.apache.flink.table.descriptors.Schema.SCHEMA
@@ -37,6 +37,7 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData._
 import org.apache.flink.table.sinks.{PartitionableTableSink, StreamTableSink, 
TableSink}
 import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
+import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.table.utils.LegacyRowResource
 import org.apache.flink.types.Row
 
@@ -318,11 +319,19 @@ object PartitionableSinkITCase {
         properties.putString("partition-column." + i, part)
     }
 
-    val table = new CatalogTableImpl(
-      new TableSchema(Array("a", "b", "c"), rowType.getFieldTypes),
+    val table = CatalogTable.of(
+      Schema
+        .newBuilder()
+        .fromResolvedSchema(
+          ResolvedSchema.physical(
+            Array("a", "b", "c"),
+            TypeConversions.fromLegacyInfoToDataType(rowType.getFieldTypes)
+          )
+        )
+        .build(),
+      "",
       util.Arrays.asList[String](partitionColumns: _*),
-      properties.asMap(),
-      ""
+      properties.asMap()
     )
     tEnv
       .getCatalog(tEnv.getCurrentCatalog)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
index 459788d55e0..dcd3f200a43 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
@@ -26,10 +26,9 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.core.io.InputSplit
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api
 import org.apache.flink.table.api.{DataTypes, TableEnvironment, TableSchema}
 import org.apache.flink.table.api.internal.TableEnvironmentInternal
-import org.apache.flink.table.catalog.{CatalogPartitionImpl, 
CatalogPartitionSpec, CatalogTableImpl, ObjectPath}
+import org.apache.flink.table.catalog._
 import org.apache.flink.table.descriptors._
 import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR, 
CONNECTOR_TYPE}
 import org.apache.flink.table.expressions.{CallExpression, Expression, 
FieldReferenceExpression, ValueLiteralExpression}
@@ -42,7 +41,7 @@ import 
org.apache.flink.table.planner.plan.hint.OptionsHintTest.IS_BOUNDED
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import 
org.apache.flink.table.planner.runtime.utils.TimeTestUtil.EventTimeSourceFunction
 import 
org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
-import org.apache.flink.table.sinks.{CsvAppendTableSinkFactory, 
CsvBatchTableSinkFactory, StreamTableSink, TableSink}
+import org.apache.flink.table.sinks.{CsvBatchTableSinkFactory, 
StreamTableSink, TableSink}
 import org.apache.flink.table.sources._
 import org.apache.flink.table.sources.tsextractors.ExistingField
 import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
PreserveWatermarks}
@@ -1143,12 +1142,14 @@ class TestPartitionableSourceFactory extends 
TableSourceFactory[Row] {
 }
 
 object TestPartitionableSourceFactory {
-  private val tableSchema: TableSchema = TableSchema
-    .builder()
-    .field("id", DataTypes.INT())
-    .field("name", DataTypes.STRING())
-    .field("part1", DataTypes.STRING())
-    .field("part2", DataTypes.INT())
+  private val tableSchema: org.apache.flink.table.api.Schema = 
org.apache.flink.table.api.Schema
+    .newBuilder()
+    .fromResolvedSchema(ResolvedSchema.of(
+      Column.physical("id", DataTypes.INT()),
+      Column.physical("name", DataTypes.STRING()),
+      Column.physical("part1", DataTypes.STRING()),
+      Column.physical("part2", DataTypes.INT())
+    ))
     .build()
 
   /** For java invoking. */
@@ -1160,7 +1161,7 @@ object TestPartitionableSourceFactory {
       tEnv: TableEnvironment,
       tableName: String,
       isBounded: Boolean,
-      tableSchema: TableSchema = tableSchema,
+      tableSchema: org.apache.flink.table.api.Schema = tableSchema,
       remainingPartitions: JList[JMap[String, String]] = null,
       sourceFetchPartitions: Boolean = false): Unit = {
     val properties = new DescriptorProperties()
@@ -1177,11 +1178,11 @@ object TestPartitionableSourceFactory {
       }
     }
 
-    val table = new CatalogTableImpl(
+    val table = CatalogTable.of(
       tableSchema,
+      "",
       util.Arrays.asList[String]("part1", "part2"),
-      properties.asMap(),
-      ""
+      properties.asMap()
     )
     val catalog = tEnv.getCatalog(tEnv.getCurrentCatalog).get()
     val path = new ObjectPath(tEnv.getCurrentDatabase, tableName)


Reply via email to