This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 0c24adfda59 [SPARK-42398][SQL] Refine default column value DS v2 
interface
0c24adfda59 is described below

commit 0c24adfda5945f78aa19539c62d21a7efc265719
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Mon Feb 20 16:30:50 2023 +0800

    [SPARK-42398][SQL] Refine default column value DS v2 interface
    
    ### What changes were proposed in this pull request?
    
    The current default value DS V2 API is a bit inconsistent. The 
`createTable` API only takes `StructType`, so implementations must know the 
special metadata key of the default value to access it. The `TableChange` API 
has the default value as an individual field.
    
    This API adds a new `Column` interface, which holds both current default 
(as a SQL string) and exist default (as a v2 literal). `createTable` API now 
takes `Column`. This avoids the need of special metadata key and is also more 
extensible when adding more special cols like generated cols. This is also 
type-safe and makes sure the exist default is literal. The implementation is 
free to decide how to encode and store default values. Note: backward 
compatibility is taken care of.
    
    ### Why are the changes needed?
    
    better DS v2 API for default value
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #40049 from cloud-fan/table2.
    
    Lead-authored-by: Wenchen Fan <wenc...@databricks.com>
    Co-authored-by: Wenchen Fan <cloud0...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 70a098c83da4cff2bdc8d15a5a8b513a32564dbc)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/connect/ProtoToParsedPlanTestSuite.scala   |   8 +-
 .../spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala    |   3 +-
 .../apache/spark/sql/connector/catalog/Column.java |  90 ++++++++++++++++++
 .../sql/connector/catalog/ColumnDefaultValue.java  |  84 +++++++++++++++++
 .../sql/connector/catalog/StagingTableCatalog.java |  67 ++++++++++++--
 .../apache/spark/sql/connector/catalog/Table.java  |  11 +++
 .../spark/sql/connector/catalog/TableCatalog.java  |  23 ++++-
 .../spark/sql/connector/catalog/TableChange.java   |  20 ++--
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   4 +-
 .../sql/catalyst/analysis/v2ResolutionPlans.scala  |   2 +-
 .../sql/catalyst/plans/logical/statements.scala    |  13 +++
 .../plans/logical/v2AlterTableCommands.scala       |   4 +-
 .../catalyst/util/ResolveDefaultColumnsUtil.scala  |  30 ++++--
 .../sql/connector/catalog/CatalogV2Implicits.scala |   7 ++
 .../sql/connector/catalog/CatalogV2Util.scala      |  89 +++++++++++++++++-
 .../connector/write/RowLevelOperationTable.scala   |   3 +-
 .../datasources/v2/DataSourceV2Relation.scala      |   3 +-
 .../spark/sql/internal/connector/ColumnImpl.scala  |  30 ++++++
 .../internal/connector/SimpleTableProvider.scala   |   3 +-
 .../spark/sql/connector/catalog/CatalogSuite.scala | 103 +++++++++++----------
 .../sql/connector/catalog/CatalogV2UtilSuite.scala |   4 +-
 .../connector/catalog/InMemoryTableCatalog.scala   |  10 ++
 .../SupportsAtomicPartitionManagementSuite.scala   |   4 +-
 .../catalog/SupportsPartitionManagementSuite.scala |   7 +-
 .../spark/sql/execution/datasources/rules.scala    |   1 -
 .../execution/datasources/v2/CreateTableExec.scala |   7 +-
 .../datasources/v2/DataSourceV2Strategy.scala      |   8 +-
 .../datasources/v2/FileDataSourceV2.scala          |   3 +-
 .../datasources/v2/ReplaceTableExec.scala          |  13 ++-
 .../datasources/v2/ShowCreateTableExec.scala       |   3 +-
 .../datasources/v2/V2SessionCatalog.scala          |  11 ++-
 .../datasources/v2/WriteToDataSourceV2Exec.scala   |  22 +++--
 .../sources/TextSocketSourceProvider.scala         |   5 +-
 .../spark/sql/streaming/DataStreamReader.scala     |   3 +-
 .../sql/connector/DataSourceV2DataFrameSuite.scala |   3 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala |  20 ++--
 .../sql/connector/DeleteFromTableSuiteBase.scala   |   4 +-
 .../sql/connector/TestV2SessionCatalogBase.scala   |  11 ++-
 .../WriteDistributionAndOrderingSuite.scala        |   2 +-
 .../execution/command/PlanResolutionSuite.scala    |  56 ++++++-----
 .../datasources/InMemoryTableMetricSuite.scala     |   3 +-
 .../datasources/v2/V2SessionCatalogSuite.scala     |  96 +++++++++----------
 .../org/apache/spark/sql/hive/InsertSuite.scala    |   6 +-
 43 files changed, 670 insertions(+), 229 deletions(-)

diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
index 18f656748ac..841017ae6c0 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
@@ -30,6 +30,7 @@ import 
org.apache.spark.sql.catalyst.analysis.{caseSensitiveResolution, Analyzer
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.connect.planner.SparkConnectPlanner
 import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, 
InMemoryCatalog}
+import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -67,16 +68,17 @@ class ProtoToParsedPlanTestSuite extends SparkFunSuite with 
SharedSparkSession {
 
   protected val inputFilePath: Path = baseResourcePath.resolve("queries")
   protected val goldenFilePath: Path = 
baseResourcePath.resolve("explain-results")
+  private val emptyProps: util.Map[String, String] = 
util.Collections.emptyMap()
 
   private val analyzer = {
     val inMemoryCatalog = new InMemoryCatalog
     inMemoryCatalog.initialize("primary", CaseInsensitiveStringMap.empty())
-    inMemoryCatalog.createNamespace(Array("tempdb"), 
util.Collections.emptyMap())
+    inMemoryCatalog.createNamespace(Array("tempdb"), emptyProps)
     inMemoryCatalog.createTable(
       Identifier.of(Array("tempdb"), "myTable"),
       new StructType().add("id", "long"),
-      Array.empty,
-      util.Collections.emptyMap())
+      Array.empty[Transform],
+      emptyProps)
 
     val catalogManager = new CatalogManager(
       inMemoryCatalog,
diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala
index b2500a2dbf2..d3f17187a37 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala
@@ -27,6 +27,7 @@ import org.apache.logging.log4j.Level
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
 import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange}
+import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
 import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite
 import org.apache.spark.sql.test.SharedSparkSession
@@ -118,7 +119,7 @@ private[v2] trait V2JDBCNamespaceTest extends 
SharedSparkSession with DockerInte
       // Drop non empty namespace without cascade
       catalog.createNamespace(Array("foo"), commentMap.asJava)
       assert(catalog.namespaceExists(Array("foo")) === true)
-      catalog.createTable(ident1, schema, Array.empty, emptyProps)
+      catalog.createTable(ident1, schema, Array.empty[Transform], emptyProps)
       if (supportsDropSchemaRestrict) {
         intercept[NonEmptyNamespaceException] {
           catalog.dropNamespace(Array("foo"), cascade = false)
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java
new file mode 100644
index 00000000000..d2c8f25e739
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.internal.connector.ColumnImpl;
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * An interface representing a column of a {@link Table}. It defines basic 
properties of a column,
+ * such as name and data type, as well as some advanced ones like default 
column value.
+ * <p>
+ * Data Sources do not need to implement it. They should consume it in APIs 
like
+ * {@link TableCatalog#createTable(Identifier, Column[], Transform[], Map)}, 
and report it in
+ * {@link Table#columns()} by calling the static {@code create} functions of 
this interface to
+ * create it.
+ */
+@Evolving
+public interface Column {
+
+  static Column create(String name, DataType dataType) {
+    return create(name, dataType, true);
+  }
+
+  static Column create(String name, DataType dataType, boolean nullable) {
+    return create(name, dataType, nullable, null, null, null);
+  }
+
+  static Column create(
+      String name,
+      DataType dataType,
+      boolean nullable,
+      String comment,
+      ColumnDefaultValue defaultValue,
+      String metadataInJSON) {
+    return new ColumnImpl(name, dataType, nullable, comment, defaultValue, 
metadataInJSON);
+  }
+
+  /**
+   * Returns the name of this table column.
+   */
+  String name();
+
+  /**
+   * Returns the data type of this table column.
+   */
+  DataType dataType();
+
+  /**
+   * Returns true if this column may produce null values.
+   */
+  boolean nullable();
+
+  /**
+   * Returns the comment of this table column. Null means no comment.
+   */
+  @Nullable
+  String comment();
+
+  /**
+   * Returns the default value of this table column. Null means no default 
value.
+   */
+  @Nullable
+  ColumnDefaultValue defaultValue();
+
+  /**
+   * Returns the column metadata in JSON format.
+   */
+  @Nullable
+  String metadataInJSON();
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ColumnDefaultValue.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ColumnDefaultValue.java
new file mode 100644
index 00000000000..b8e75c11c81
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ColumnDefaultValue.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.Objects;
+import javax.annotation.Nonnull;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.Literal;
+
+/**
+ * A class representing the default value of a column. It contains both the 
SQL string and literal
+ * value of the user-specified default value expression. The SQL string should 
be re-evaluated for
+ * each table writing command, which may produce different values if the 
default value expression is
+ * something like {@code CURRENT_DATE()}. The literal value is used to 
back-fill existing data if
+ * new columns with default value are added. Note: the back-fill can be lazy. 
The data sources can
+ * remember the column default value and let the reader fill the column value 
when reading existing
+ * data that do not have these new columns.
+ */
+@Evolving
+public class ColumnDefaultValue {
+  private String sql;
+  private Literal<?> value;
+
+  public ColumnDefaultValue(String sql, Literal<?> value) {
+    this.sql = sql;
+    this.value = value;
+  }
+
+  /**
+   * Returns the SQL string (Spark SQL dialect) of the default value 
expression. This is the
+   * original string contents of the SQL expression specified at the time the 
column was created in
+   * a CREATE TABLE, REPLACE TABLE, or ADD COLUMN command. For example, for
+   * "CREATE TABLE t (col INT DEFAULT 40 + 2)", this returns the string 
literal "40 + 2" (without
+   * quotation marks).
+   */
+  @Nonnull
+  public String getSql() {
+    return sql;
+  }
+
+  /**
+   * Returns the default value literal. This is the literal value 
corresponding to
+   * {@link #getSql()}. For the example in the doc of {@link #getSql()}, this 
returns a literal
+   * integer with a value of 42.
+   */
+  @Nonnull
+  public Literal<?> getValue() {
+    return value;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof ColumnDefaultValue)) return false;
+    ColumnDefaultValue that = (ColumnDefaultValue) o;
+    return sql.equals(that.sql) && value.equals(that.value);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(sql, value);
+  }
+
+  @Override
+  public String toString() {
+    return "ColumnDefaultValue{sql='" + sql + "\', value=" + value + '}';
+  }
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
index 35455a0ed99..4337a7c6152 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
@@ -54,6 +54,19 @@ import org.apache.spark.sql.types.StructType;
 @Evolving
 public interface StagingTableCatalog extends TableCatalog {
 
+  /**
+   * Stage the creation of a table, preparing it to be committed into the 
metastore.
+   * <p>
+   * This is deprecated. Please override
+   * {@link #stageCreate(Identifier, Column[], Transform[], Map)} instead.
+   */
+  @Deprecated
+  StagedTable stageCreate(
+      Identifier ident,
+      StructType schema,
+      Transform[] partitions,
+      Map<String, String> properties) throws TableAlreadyExistsException, 
NoSuchNamespaceException;
+
   /**
    * Stage the creation of a table, preparing it to be committed into the 
metastore.
    * <p>
@@ -64,7 +77,7 @@ public interface StagingTableCatalog extends TableCatalog {
    * committed, an exception should be thrown by {@link 
StagedTable#commitStagedChanges()}.
    *
    * @param ident a table identifier
-   * @param schema the schema of the new table, as a struct type
+   * @param columns the column of the new table
    * @param partitions transforms to use for partitioning data in the table
    * @param properties a string map of table properties
    * @return metadata for the new table
@@ -72,11 +85,26 @@ public interface StagingTableCatalog extends TableCatalog {
    * @throws UnsupportedOperationException If a requested partition transform 
is not supported
    * @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
    */
-  StagedTable stageCreate(
+  default StagedTable stageCreate(
+      Identifier ident,
+      Column[] columns,
+      Transform[] partitions,
+      Map<String, String> properties) throws TableAlreadyExistsException, 
NoSuchNamespaceException {
+    return stageCreate(ident, CatalogV2Util.v2ColumnsToStructType(columns), 
partitions, properties);
+  }
+
+  /**
+   * Stage the replacement of a table, preparing it to be committed into the 
metastore when the
+   * returned table's {@link StagedTable#commitStagedChanges()} is called.
+   * <p>
+   * This is deprecated, please override
+   * {@link #stageReplace(Identifier, StructType, Transform[], Map)} instead.
+   */
+  StagedTable stageReplace(
       Identifier ident,
       StructType schema,
       Transform[] partitions,
-      Map<String, String> properties) throws TableAlreadyExistsException, 
NoSuchNamespaceException;
+      Map<String, String> properties) throws NoSuchNamespaceException, 
NoSuchTableException;
 
   /**
    * Stage the replacement of a table, preparing it to be committed into the 
metastore when the
@@ -97,7 +125,7 @@ public interface StagingTableCatalog extends TableCatalog {
    * operation.
    *
    * @param ident a table identifier
-   * @param schema the schema of the new table, as a struct type
+   * @param columns the columns of the new table
    * @param partitions transforms to use for partitioning data in the table
    * @param properties a string map of table properties
    * @return metadata for the new table
@@ -105,11 +133,27 @@ public interface StagingTableCatalog extends TableCatalog 
{
    * @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
    * @throws NoSuchTableException If the table does not exist
    */
-  StagedTable stageReplace(
+  default StagedTable stageReplace(
+      Identifier ident,
+      Column[] columns,
+      Transform[] partitions,
+      Map<String, String> properties) throws NoSuchNamespaceException, 
NoSuchTableException {
+    return stageReplace(
+      ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, 
properties);
+  }
+
+  /**
+   * Stage the creation or replacement of a table, preparing it to be 
committed into the metastore
+   * when the returned table's {@link StagedTable#commitStagedChanges()} is 
called.
+   * <p>
+   * This is deprecated, please override
+   * {@link #stageCreateOrReplace(Identifier, Column[], Transform[], Map)} 
instead.
+   */
+  StagedTable stageCreateOrReplace(
       Identifier ident,
       StructType schema,
       Transform[] partitions,
-      Map<String, String> properties) throws NoSuchNamespaceException, 
NoSuchTableException;
+      Map<String, String> properties) throws NoSuchNamespaceException;
 
   /**
    * Stage the creation or replacement of a table, preparing it to be 
committed into the metastore
@@ -129,16 +173,19 @@ public interface StagingTableCatalog extends TableCatalog 
{
    * the staged changes are committed but the table doesn't exist at commit 
time.
    *
    * @param ident a table identifier
-   * @param schema the schema of the new table, as a struct type
+   * @param columns the columns of the new table
    * @param partitions transforms to use for partitioning data in the table
    * @param properties a string map of table properties
    * @return metadata for the new table
    * @throws UnsupportedOperationException If a requested partition transform 
is not supported
    * @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
    */
-  StagedTable stageCreateOrReplace(
+  default StagedTable stageCreateOrReplace(
       Identifier ident,
-      StructType schema,
+      Column[] columns,
       Transform[] partitions,
-      Map<String, String> properties) throws NoSuchNamespaceException;
+      Map<String, String> properties) throws NoSuchNamespaceException {
+    return stageCreateOrReplace(
+      ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, 
properties);
+  }
 }
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java
index 8f7a8740483..b9753a08aba 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java
@@ -51,9 +51,20 @@ public interface Table {
   /**
    * Returns the schema of this table. If the table is not readable and 
doesn't have a schema, an
    * empty schema can be returned here.
+   * <p>
+   * This is deprecated. Please override {@link #columns} instead.
    */
+  @Deprecated
   StructType schema();
 
+  /**
+   * Returns the columns of this table. If the table is not readable and 
doesn't have a schema, an
+   * empty array can be returned here.
+   */
+  default Column[] columns() {
+    return CatalogV2Util.structTypeToV2Columns(schema());
+  }
+
   /**
    * Returns the physical partitioning of this table.
    */
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
index b04c7e55138..82622d65205 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
@@ -159,11 +159,24 @@ public interface TableCatalog extends CatalogPlugin {
     }
   }
 
+  /**
+   * Create a table in the catalog.
+   * <p>
+   * This is deprecated. Please override
+   * {@link #createTable(Identifier, Column[], Transform[], Map)} instead.
+   */
+  @Deprecated
+  Table createTable(
+      Identifier ident,
+      StructType schema,
+      Transform[] partitions,
+      Map<String, String> properties) throws TableAlreadyExistsException, 
NoSuchNamespaceException;
+
   /**
    * Create a table in the catalog.
    *
    * @param ident a table identifier
-   * @param schema the schema of the new table, as a struct type
+   * @param columns the columns of the new table.
    * @param partitions transforms to use for partitioning data in the table
    * @param properties a string map of table properties
    * @return metadata for the new table
@@ -171,11 +184,13 @@ public interface TableCatalog extends CatalogPlugin {
    * @throws UnsupportedOperationException If a requested partition transform 
is not supported
    * @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
    */
-  Table createTable(
+  default Table createTable(
       Identifier ident,
-      StructType schema,
+      Column[] columns,
       Transform[] partitions,
-      Map<String, String> properties) throws TableAlreadyExistsException, 
NoSuchNamespaceException;
+      Map<String, String> properties) throws TableAlreadyExistsException, 
NoSuchNamespaceException {
+    return createTable(ident, CatalogV2Util.v2ColumnsToStructType(columns), 
partitions, properties);
+  }
 
   /**
    * Apply a set of {@link TableChange changes} to a table in the catalog.
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
index cf735ed9452..609cfab2d56 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
@@ -140,7 +140,7 @@ public interface TableChange {
       boolean isNullable,
       String comment,
       ColumnPosition position,
-      String defaultValue) {
+      ColumnDefaultValue defaultValue) {
     return new AddColumn(fieldNames, dataType, isNullable, comment, position, 
defaultValue);
   }
 
@@ -228,7 +228,7 @@ public interface TableChange {
    * If the field does not exist, the change will result in an {@link 
IllegalArgumentException}.
    *
    * @param fieldNames field names of the column to update
-   * @param newDefaultValue the new default value
+   * @param newDefaultValue the new default value SQL string (Spark SQL 
dialect).
    * @return a TableChange for the update
    */
   static TableChange updateColumnDefaultValue(String[] fieldNames, String 
newDefaultValue) {
@@ -383,7 +383,9 @@ public interface TableChange {
   }
 
   /**
-   * A TableChange to add a field.
+   * A TableChange to add a field. The implementation may need to back-fill 
all the existing data
+   * to add this new column, or remember the column default value specified 
here and let the reader
+   * fill the column value when reading existing data that do not have this 
new column.
    * <p>
    * If the field already exists, the change must result in an {@link 
IllegalArgumentException}.
    * If the new field is nested and its parent does not exist or is not a 
struct, the change must
@@ -395,7 +397,7 @@ public interface TableChange {
     private final boolean isNullable;
     private final String comment;
     private final ColumnPosition position;
-    private final String defaultValue;
+    private final ColumnDefaultValue defaultValue;
 
     private AddColumn(
         String[] fieldNames,
@@ -403,7 +405,7 @@ public interface TableChange {
         boolean isNullable,
         String comment,
         ColumnPosition position,
-        String defaultValue) {
+        ColumnDefaultValue defaultValue) {
       this.fieldNames = fieldNames;
       this.dataType = dataType;
       this.isNullable = isNullable;
@@ -436,7 +438,7 @@ public interface TableChange {
     }
 
     @Nullable
-    public String defaultValue() { return defaultValue; }
+    public ColumnDefaultValue defaultValue() { return defaultValue; }
 
     @Override
     public boolean equals(Object o) {
@@ -691,6 +693,12 @@ public interface TableChange {
       return fieldNames;
     }
 
+    /**
+     * Returns the column default value SQL string (Spark SQL dialect). The 
default value literal
+     * is not provided as updating column default values does not need to 
back-fill existing data.
+     * Null means dropping the column default value.
+     */
+    @Nullable
     public String newDefaultValue() { return newDefaultValue; }
 
     @Override
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 46cc0b0fbf0..d7cc34d6f15 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1249,7 +1249,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
             }
             SubqueryAlias(
               catalog.name +: ident.asMultipartIdentifier,
-              StreamingRelationV2(None, table.name, table, options, 
table.schema.toAttributes,
+              StreamingRelationV2(None, table.name, table, options, 
table.columns.toAttributes,
                 Some(catalog), Some(ident), v1Fallback))
           } else {
             SubqueryAlias(
@@ -3722,7 +3722,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
           colsToAdd(resolvedParentName) = fieldsAdded :+ col.colName
           resolvedPosition
         }
-        val schema = r.table.schema
+        val schema = r.table.columns.asSchema
         val resolvedCols = cols.map { col =>
           col.path match {
             case Some(parent: UnresolvedFieldName) =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
index e6be5c23955..2d26e281607 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
@@ -181,7 +181,7 @@ object ResolvedTable {
       catalog: TableCatalog,
       identifier: Identifier,
       table: Table): ResolvedTable = {
-    val schema = 
CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.schema)
+    val schema = 
CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.columns.asSchema)
     ResolvedTable(catalog, identifier, table, schema.toAttributes)
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index af70f07bc87..9c639a4bce6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -20,6 +20,9 @@ package org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.trees.{LeafLike, UnaryLike}
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
+import org.apache.spark.sql.connector.catalog.ColumnDefaultValue
+import org.apache.spark.sql.connector.expressions.LiteralValue
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types.DataType
 
@@ -135,6 +138,16 @@ case class QualifiedColType(
   def name: Seq[String] = path.map(_.name).getOrElse(Nil) :+ colName
 
   def resolved: Boolean = path.forall(_.resolved) && 
position.forall(_.resolved)
+
+  def getV2Default: ColumnDefaultValue = {
+    default.map { sql =>
+      val e = ResolveDefaultColumns.analyze(colName, dataType, sql, "ALTER 
TABLE")
+      assert(e.resolved && e.foldable,
+        "The existence default value must be a simple SQL string that is 
resolved and foldable, " +
+          "but got: " + sql)
+      new ColumnDefaultValue(sql, LiteralValue(e.eval(), dataType))
+    }.orNull
+  }
 }
 
 /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
index 94f2a570663..eb9d45f06ec 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
@@ -120,7 +120,7 @@ case class AddColumns(
         col.nullable,
         col.comment.orNull,
         col.position.map(_.position).orNull,
-        col.default.orNull)
+        col.getV2Default)
     }
   }
 
@@ -156,7 +156,7 @@ case class ReplaceColumns(
         col.nullable,
         col.comment.orNull,
         null,
-        col.default.orNull)
+        col.getV2Default)
     }
     deleteChanges ++ addChanges
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index 667c0988d0c..be7d74b0782 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -151,16 +151,28 @@ object ResolveDefaultColumns {
       field: StructField,
       statementType: String,
       metadataKey: String = CURRENT_DEFAULT_COLUMN_METADATA_KEY): Expression = 
{
+    analyze(field.name, field.dataType, field.metadata.getString(metadataKey), 
statementType)
+  }
+
+  /**
+   * Parses and analyzes the DEFAULT column SQL string, returning an error 
upon failure.
+   *
+   * @return Result of the analysis and constant-folding operation.
+   */
+  def analyze(
+      colName: String,
+      dataType: DataType,
+      defaultSQL: String,
+      statementType: String): Expression = {
     // Parse the expression.
-    val colText: String = field.metadata.getString(metadataKey)
     lazy val parser = new CatalystSqlParser()
     val parsed: Expression = try {
-      parser.parseExpression(colText)
+      parser.parseExpression(defaultSQL)
     } catch {
       case ex: ParseException =>
         throw new AnalysisException(
           s"Failed to execute $statementType command because the destination 
table column " +
-            s"${field.name} has a DEFAULT value of $colText which fails to 
parse as a valid " +
+            s"$colName has a DEFAULT value of $defaultSQL which fails to parse 
as a valid " +
             s"expression: ${ex.getMessage}")
     }
     // Check invariants before moving on to analysis.
@@ -170,28 +182,28 @@ object ResolveDefaultColumns {
     // Analyze the parse result.
     val plan = try {
       val analyzer: Analyzer = DefaultColumnAnalyzer
-      val analyzed = analyzer.execute(Project(Seq(Alias(parsed, 
field.name)()), OneRowRelation()))
+      val analyzed = analyzer.execute(Project(Seq(Alias(parsed, colName)()), 
OneRowRelation()))
       analyzer.checkAnalysis(analyzed)
       ConstantFolding(analyzed)
     } catch {
       case ex: AnalysisException =>
         throw new AnalysisException(
           s"Failed to execute $statementType command because the destination 
table column " +
-            s"${field.name} has a DEFAULT value of $colText which fails to 
resolve as a valid " +
+            s"$colName has a DEFAULT value of $defaultSQL which fails to 
resolve as a valid " +
             s"expression: ${ex.getMessage}")
     }
     val analyzed: Expression = plan.collectFirst {
       case Project(Seq(a: Alias), OneRowRelation()) => a.child
     }.get
     // Perform implicit coercion from the provided expression type to the 
required column type.
-    if (field.dataType == analyzed.dataType) {
+    if (dataType == analyzed.dataType) {
       analyzed
-    } else if (Cast.canUpCast(analyzed.dataType, field.dataType)) {
-      Cast(analyzed, field.dataType)
+    } else if (Cast.canUpCast(analyzed.dataType, dataType)) {
+      Cast(analyzed, dataType)
     } else {
       throw new AnalysisException(
         s"Failed to execute $statementType command because the destination 
table column " +
-          s"${field.name} has a DEFAULT value with type ${field.dataType}, but 
the " +
+          s"$colName has a DEFAULT value with type $dataType, but the " +
           s"statement provided a value of incompatible type 
${analyzed.dataType}")
     }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
index 0c9282f9675..12858887bb5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
@@ -21,10 +21,12 @@ import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.quoteIfNeeded
 import org.apache.spark.sql.connector.expressions.{BucketTransform, 
FieldReference, IdentityTransform, LogicalExpressions, Transform}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.types.StructType
 
 /**
  * Conversion helpers for working with v2 [[CatalogPlugin]].
@@ -183,6 +185,11 @@ private[sql] object CatalogV2Implicits {
     }
   }
 
+  implicit class ColumnsHelper(columns: Array[Column]) {
+    def asSchema: StructType = CatalogV2Util.v2ColumnsToStructType(columns)
+    def toAttributes: Seq[AttributeReference] = asSchema.toAttributes
+  }
+
   def parseColumnPath(name: String): Seq[String] = {
     CatalystSqlParser.parseMultipartIdentifier(name)
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 72c557c8d77..9b481356fa6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -23,12 +23,14 @@ import java.util.Collections
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, 
NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, 
NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
+import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.connector.catalog.TableChange._
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
+import org.apache.spark.sql.connector.expressions.LiteralValue
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
+import org.apache.spark.sql.types.{ArrayType, MapType, Metadata, 
MetadataBuilder, StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.Utils
 
@@ -142,8 +144,7 @@ private[sql] object CatalogV2Util {
           add.fieldNames match {
             case Array(name) =>
               val field = StructField(name, add.dataType, nullable = 
add.isNullable)
-              val fieldWithDefault: StructField =
-                
Option(add.defaultValue).map(field.withCurrentDefaultValue).getOrElse(field)
+              val fieldWithDefault: StructField = 
encodeDefaultValue(add.defaultValue(), field)
               val fieldWithComment: StructField =
                 
Option(add.comment).map(fieldWithDefault.withComment).getOrElse(fieldWithDefault)
               addField(schema, fieldWithComment, add.position(), 
tableProvider, statementType, true)
@@ -151,8 +152,7 @@ private[sql] object CatalogV2Util {
               replace(schema, names.init, parent => parent.dataType match {
                 case parentType: StructType =>
                   val field = StructField(names.last, add.dataType, nullable = 
add.isNullable)
-                  val fieldWithDefault: StructField =
-                    
Option(add.defaultValue).map(field.withCurrentDefaultValue).getOrElse(field)
+                  val fieldWithDefault: StructField = 
encodeDefaultValue(add.defaultValue(), field)
                   val fieldWithComment: StructField =
                     Option(add.comment).map(fieldWithDefault.withComment)
                       .getOrElse(fieldWithDefault)
@@ -431,4 +431,83 @@ private[sql] object CatalogV2Util {
       .getOrElse(catalogManager.v2SessionCatalog)
       .asTableCatalog
   }
+
+  /**
+   * Converts DS v2 columns to StructType, which encodes column comment and 
default value to
+   * StructField metadata. This is mainly used to define the schema of v2 
scan, w.r.t. the columns
+   * of the v2 table.
+   */
+  def v2ColumnsToStructType(columns: Array[Column]): StructType = {
+    StructType(columns.map(v2ColumnToStructField))
+  }
+
+  private def v2ColumnToStructField(col: Column): StructField = {
+    val metadata = 
Option(col.metadataInJSON()).map(Metadata.fromJson).getOrElse(Metadata.empty)
+    var f = StructField(col.name(), col.dataType(), col.nullable(), metadata)
+    Option(col.comment()).foreach { comment =>
+      f = f.withComment(comment)
+    }
+    Option(col.defaultValue()).foreach { default =>
+      f = encodeDefaultValue(default, f)
+    }
+    f
+  }
+
+  // For built-in file sources, we encode the default value in StructField 
metadata. An analyzer
+  // rule will check the special metadata and change the DML input plan to 
fill the default value.
+  private def encodeDefaultValue(defaultValue: ColumnDefaultValue, f: 
StructField): StructField = {
+    Option(defaultValue).map { default =>
+      // The "exist default" is used to back-fill the existing data when new 
columns are added, and
+      // should be a fixed value which was evaluated at the definition time. 
For example, if the
+      // default value is `current_date()`, the "exist default" should be the 
value of
+      // `current_date()` when the column was defined/altered, instead of when 
back-fall happens.
+      // Note: the back-fill here is a logical concept. The data source can 
keep the existing
+      //       data unchanged and let the data reader to return "exist 
default" for missing
+      //       columns.
+      val existingDefault = Literal(default.getValue.value(), 
default.getValue.dataType()).sql
+      
f.withExistenceDefaultValue(existingDefault).withCurrentDefaultValue(default.getSql)
+    }.getOrElse(f)
+  }
+
+  /**
+   * Converts a StructType to DS v2 columns, which decodes the StructField 
metadata to v2 column
+   * comment and default value. This is mainly used to generate DS v2 columns 
from table schema in
+   * DDL commands, so that Spark can pass DS v2 columns to DS v2 createTable 
and related APIs.
+   */
+  def structTypeToV2Columns(schema: StructType): Array[Column] = {
+    schema.fields.map(structFieldToV2Column)
+  }
+
+  private def structFieldToV2Column(f: StructField): Column = {
+    def createV2Column(defaultValue: ColumnDefaultValue, metadata: Metadata): 
Column = {
+      val metadataJSON = if (metadata == Metadata.empty) {
+        null
+      } else {
+        metadata.json
+      }
+      Column.create(
+        f.name, f.dataType, f.nullable, f.getComment().orNull, defaultValue, 
metadataJSON)
+    }
+    if (f.getCurrentDefaultValue().isDefined && 
f.getExistenceDefaultValue().isDefined) {
+      val e = analyze(f, EXISTS_DEFAULT_COLUMN_METADATA_KEY)
+      assert(e.resolved && e.foldable,
+        "The existence default value must be a simple SQL string that is 
resolved and foldable, " +
+          "but got: " + f.getExistenceDefaultValue().get)
+      val defaultValue = new ColumnDefaultValue(
+        f.getCurrentDefaultValue().get, LiteralValue(e.eval(), f.dataType))
+      val cleanedMetadata = new MetadataBuilder()
+        .withMetadata(f.metadata)
+        .remove("comment")
+        .remove(CURRENT_DEFAULT_COLUMN_METADATA_KEY)
+        .remove(EXISTS_DEFAULT_COLUMN_METADATA_KEY)
+        .build()
+      createV2Column(defaultValue, cleanedMetadata)
+    } else {
+      val cleanedMetadata = new MetadataBuilder()
+        .withMetadata(f.metadata)
+        .remove("comment")
+        .build()
+      createV2Column(null, cleanedMetadata)
+    }
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/RowLevelOperationTable.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/RowLevelOperationTable.scala
index d1f7ba000c6..07acacd9a35 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/RowLevelOperationTable.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/RowLevelOperationTable.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.connector.write
 
 import java.util
 
-import org.apache.spark.sql.connector.catalog.{SupportsRead, 
SupportsRowLevelOperations, SupportsWrite, Table, TableCapability}
+import org.apache.spark.sql.connector.catalog.{Column, SupportsRead, 
SupportsRowLevelOperations, SupportsWrite, Table, TableCapability}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -38,6 +38,7 @@ private[sql] case class RowLevelOperationTable(
 
   override def name: String = table.name
   override def schema: StructType = table.schema
+  override def columns: Array[Column] = table.columns()
   override def capabilities: util.Set[TableCapability] = table.capabilities
   override def toString: String = table.toString
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 51ef3dda817..c170b7ae672 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -189,9 +189,10 @@ object DataSourceV2Relation {
       catalog: Option[CatalogPlugin],
       identifier: Option[Identifier],
       options: CaseInsensitiveStringMap): DataSourceV2Relation = {
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
     // The v2 source may return schema containing char/varchar type. We 
replace char/varchar
     // with "annotated" string type here as the query engine doesn't support 
char/varchar yet.
-    val schema = 
CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.schema)
+    val schema = 
CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.columns.asSchema)
     DataSourceV2Relation(table, schema.toAttributes, catalog, identifier, 
options)
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala
new file mode 100644
index 00000000000..5ab3f83eeae
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal.connector
+
+import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue}
+import org.apache.spark.sql.types.DataType
+
+// The standard concrete implementation of data source V2 column.
+case class ColumnImpl(
+    name: String,
+    dataType: DataType,
+    nullable: Boolean,
+    comment: String,
+    defaultValue: ColumnDefaultValue,
+    metadataInJSON: String) extends Column
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala
index 7bfe1df1117..f8b237195fa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala
@@ -37,7 +37,8 @@ trait SimpleTableProvider extends TableProvider {
   }
 
   override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
-    getOrLoadTable(options).schema()
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+    getOrLoadTable(options).columns.asSchema
   }
 
   override def getTable(
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
index 032b04bb887..6be50f36c84 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
@@ -28,7 +28,7 @@ import 
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException,
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
 import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, 
ScalarFunction, UnboundFunction}
-import org.apache.spark.sql.connector.expressions.LogicalExpressions
+import org.apache.spark.sql.connector.expressions.{LogicalExpressions, 
Transform}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, 
LongType, StringType, StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -37,6 +37,7 @@ class CatalogSuite extends SparkFunSuite {
   import CatalogV2Implicits._
 
   private val emptyProps: util.Map[String, String] = 
Collections.emptyMap[String, String]
+  private val emptyTrans: Array[Transform] = Array.empty
   private val schema: StructType = new StructType()
       .add("id", IntegerType)
       .add("data", StringType)
@@ -74,13 +75,13 @@ class CatalogSuite extends SparkFunSuite {
 
     intercept[NoSuchNamespaceException](catalog.listTables(Array("ns")))
 
-    catalog.createTable(ident1, schema, Array.empty, emptyProps)
+    catalog.createTable(ident1, schema, emptyTrans, emptyProps)
 
     assert(catalog.listTables(Array("ns")).toSet == Set(ident1))
     intercept[NoSuchNamespaceException](catalog.listTables(Array("ns2")))
 
-    catalog.createTable(ident3, schema, Array.empty, emptyProps)
-    catalog.createTable(ident2, schema, Array.empty, emptyProps)
+    catalog.createTable(ident3, schema, emptyTrans, emptyProps)
+    catalog.createTable(ident2, schema, emptyTrans, emptyProps)
 
     assert(catalog.listTables(Array("ns")).toSet == Set(ident1, ident2))
     assert(catalog.listTables(Array("ns2")).toSet == Set(ident3))
@@ -100,7 +101,7 @@ class CatalogSuite extends SparkFunSuite {
 
     assert(!catalog.tableExists(testIdent))
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
     assert(parsed == Seq("test", "`", ".", "test_table"))
@@ -118,7 +119,7 @@ class CatalogSuite extends SparkFunSuite {
 
     assert(!catalog.tableExists(testIdent))
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
 
     val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
     assert(parsed == Seq("test", "`", ".", "test_table"))
@@ -133,10 +134,10 @@ class CatalogSuite extends SparkFunSuite {
 
     assert(!catalog.tableExists(testIdent))
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     val exc = intercept[TableAlreadyExistsException] {
-      catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+      catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
     }
 
     checkErrorTableAlreadyExists(exc, testIdentQuoted)
@@ -149,7 +150,7 @@ class CatalogSuite extends SparkFunSuite {
 
     assert(!catalog.tableExists(testIdent))
 
-    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(catalog.tableExists(testIdent))
 
@@ -161,7 +162,7 @@ class CatalogSuite extends SparkFunSuite {
   test("loadTable") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
     val loaded = catalog.loadTable(testIdent)
 
     assert(table.name == loaded.name)
@@ -182,7 +183,7 @@ class CatalogSuite extends SparkFunSuite {
   test("invalidateTable") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
     catalog.invalidateTable(testIdent)
 
     val loaded = catalog.loadTable(testIdent)
@@ -203,7 +204,7 @@ class CatalogSuite extends SparkFunSuite {
   test("alterTable: add property") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.properties.asScala == Map())
 
@@ -222,7 +223,7 @@ class CatalogSuite extends SparkFunSuite {
     val properties = new util.HashMap[String, String]()
     properties.put("prop-1", "1")
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
 
     assert(table.properties.asScala == Map("prop-1" -> "1"))
 
@@ -241,7 +242,7 @@ class CatalogSuite extends SparkFunSuite {
     val properties = new util.HashMap[String, String]()
     properties.put("prop-1", "1")
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
 
     assert(table.properties.asScala == Map("prop-1" -> "1"))
 
@@ -257,7 +258,7 @@ class CatalogSuite extends SparkFunSuite {
   test("alterTable: remove missing property") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.properties.asScala == Map())
 
@@ -273,7 +274,7 @@ class CatalogSuite extends SparkFunSuite {
   test("alterTable: add top-level column") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -285,7 +286,7 @@ class CatalogSuite extends SparkFunSuite {
   test("alterTable: add required column") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -298,7 +299,7 @@ class CatalogSuite extends SparkFunSuite {
   test("alterTable: add column with comment") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -315,7 +316,7 @@ class CatalogSuite extends SparkFunSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, Array.empty, 
emptyProps)
+    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, 
emptyProps)
 
     assert(table.schema == tableSchema)
 
@@ -330,7 +331,7 @@ class CatalogSuite extends SparkFunSuite {
   test("alterTable: add column to primitive field fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -348,7 +349,7 @@ class CatalogSuite extends SparkFunSuite {
   test("alterTable: add field to missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -364,7 +365,7 @@ class CatalogSuite extends SparkFunSuite {
   test("alterTable: update column data type") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -380,7 +381,7 @@ class CatalogSuite extends SparkFunSuite {
     val originalSchema = new StructType()
         .add("id", IntegerType, nullable = false)
         .add("data", StringType)
-    val table = catalog.createTable(testIdent, originalSchema, Array.empty, 
emptyProps)
+    val table = catalog.createTable(testIdent, originalSchema, emptyTrans, 
emptyProps)
 
     assert(table.schema == originalSchema)
 
@@ -394,7 +395,7 @@ class CatalogSuite extends SparkFunSuite {
   test("alterTable: update missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -410,7 +411,7 @@ class CatalogSuite extends SparkFunSuite {
   test("alterTable: add comment") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -426,7 +427,7 @@ class CatalogSuite extends SparkFunSuite {
   test("alterTable: replace comment") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -445,7 +446,7 @@ class CatalogSuite extends SparkFunSuite {
   test("alterTable: add comment to missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -461,7 +462,7 @@ class CatalogSuite extends SparkFunSuite {
   test("alterTable: rename top-level column") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -478,7 +479,7 @@ class CatalogSuite extends SparkFunSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, Array.empty, 
emptyProps)
+    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, 
emptyProps)
 
     assert(table.schema == tableSchema)
 
@@ -497,7 +498,7 @@ class CatalogSuite extends SparkFunSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, Array.empty, 
emptyProps)
+    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, 
emptyProps)
 
     assert(table.schema == tableSchema)
 
@@ -513,7 +514,7 @@ class CatalogSuite extends SparkFunSuite {
   test("alterTable: rename missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -532,7 +533,7 @@ class CatalogSuite extends SparkFunSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, Array.empty, 
emptyProps)
+    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, 
emptyProps)
 
     assert(table.schema == tableSchema)
 
@@ -549,7 +550,7 @@ class CatalogSuite extends SparkFunSuite {
   test("alterTable: delete top-level column") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -566,7 +567,7 @@ class CatalogSuite extends SparkFunSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, Array.empty, 
emptyProps)
+    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, 
emptyProps)
 
     assert(table.schema == tableSchema)
 
@@ -582,7 +583,7 @@ class CatalogSuite extends SparkFunSuite {
   test("alterTable: delete missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -604,7 +605,7 @@ class CatalogSuite extends SparkFunSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, Array.empty, 
emptyProps)
+    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, 
emptyProps)
 
     assert(table.schema == tableSchema)
 
@@ -635,7 +636,7 @@ class CatalogSuite extends SparkFunSuite {
 
     assert(!catalog.tableExists(testIdent))
 
-    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(catalog.tableExists(testIdent))
 
@@ -667,7 +668,7 @@ class CatalogSuite extends SparkFunSuite {
     assert(!catalog.tableExists(testIdent))
     assert(!catalog.tableExists(testIdentNew))
 
-    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(catalog.tableExists(testIdent))
     catalog.renameTable(testIdent, testIdentNew)
@@ -692,8 +693,8 @@ class CatalogSuite extends SparkFunSuite {
     assert(!catalog.tableExists(testIdent))
     assert(!catalog.tableExists(testIdentNew))
 
-    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
-    catalog.createTable(testIdentNew, schema, Array.empty, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdentNew, schema, emptyTrans, emptyProps)
 
     assert(catalog.tableExists(testIdent))
     assert(catalog.tableExists(testIdentNew))
@@ -719,8 +720,8 @@ class CatalogSuite extends SparkFunSuite {
     val ident1 = Identifier.of(Array("ns1", "ns2"), "test_table_1")
     val ident2 = Identifier.of(Array("ns1", "ns2"), "test_table_2")
 
-    catalog.createTable(ident1, schema, Array.empty, emptyProps)
-    catalog.createTable(ident2, schema, Array.empty, emptyProps)
+    catalog.createTable(ident1, schema, emptyTrans, emptyProps)
+    catalog.createTable(ident2, schema, emptyTrans, emptyProps)
 
     assert(catalog.listNamespaces === Array(Array("ns1")))
     assert(catalog.listNamespaces(Array()) === Array(Array("ns1")))
@@ -734,8 +735,8 @@ class CatalogSuite extends SparkFunSuite {
     val ident2 = Identifier.of(Array("ns1", "ns2"), "test_table_2")
 
     catalog.createNamespace(Array("ns1"), Map("property" -> "value").asJava)
-    catalog.createTable(ident1, schema, Array.empty, emptyProps)
-    catalog.createTable(ident2, schema, Array.empty, emptyProps)
+    catalog.createTable(ident1, schema, emptyTrans, emptyProps)
+    catalog.createTable(ident2, schema, emptyTrans, emptyProps)
 
     assert(catalog.listNamespaces === Array(Array("ns1")))
     assert(catalog.listNamespaces(Array()) === Array(Array("ns1")))
@@ -756,7 +757,7 @@ class CatalogSuite extends SparkFunSuite {
   test("loadNamespaceMetadata: no metadata, table exists") {
     val catalog = newCatalog()
 
-    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     val metadata = catalog.loadNamespaceMetadata(testNs)
 
@@ -777,7 +778,7 @@ class CatalogSuite extends SparkFunSuite {
     val catalog = newCatalog()
 
     catalog.createNamespace(testNs, Map("property" -> "value").asJava)
-    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     val metadata = catalog.loadNamespaceMetadata(testNs)
 
@@ -810,7 +811,7 @@ class CatalogSuite extends SparkFunSuite {
   test("createNamespace: fail if namespace already exists from table") {
     val catalog = newCatalog()
 
-    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(catalog.namespaceExists(testNs) === true)
     assert(catalog.loadNamespaceMetadata(testNs).asScala === Map.empty)
@@ -852,7 +853,7 @@ class CatalogSuite extends SparkFunSuite {
     val catalog = newCatalog()
 
     catalog.createNamespace(testNs, Map("property" -> "value").asJava)
-    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(catalog.dropNamespace(testNs, cascade = true))
 
@@ -882,7 +883,7 @@ class CatalogSuite extends SparkFunSuite {
   test("alterNamespace: create metadata if missing and table exists") {
     val catalog = newCatalog()
 
-    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     catalog.alterNamespace(testNs, NamespaceChange.setProperty("property", 
"value"))
 
@@ -902,7 +903,7 @@ class CatalogSuite extends SparkFunSuite {
   test("truncate non-partitioned table") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
       .asInstanceOf[InMemoryTable]
     table.withData(Array(
       new BufferedRows("3").withRow(InternalRow(0, "abc", "3")),
@@ -920,7 +921,7 @@ class CatalogSuite extends SparkFunSuite {
       new StructType()
         .add("col0", IntegerType)
         .add("part0", IntegerType),
-      
Array(LogicalExpressions.identity(LogicalExpressions.parseReference("part0"))),
+      
Array[Transform](LogicalExpressions.identity(LogicalExpressions.parseReference("part0"))),
       util.Collections.emptyMap[String, String])
     val partTable = table.asInstanceOf[InMemoryPartitionTable]
     val partIdent = InternalRow.apply(0)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogV2UtilSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogV2UtilSuite.scala
index da5cfab8be3..eda401ceb6b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogV2UtilSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogV2UtilSuite.scala
@@ -21,14 +21,14 @@ import org.mockito.Mockito.{mock, when}
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.IntegerType
 
 class CatalogV2UtilSuite extends SparkFunSuite {
   test("Load relation should encode the identifiers for V2Relations") {
     val testCatalog = mock(classOf[TableCatalog])
     val ident = mock(classOf[Identifier])
     val table = mock(classOf[Table])
-    when(table.schema()).thenReturn(new StructType().add("i", "int"))
+    when(table.columns()).thenReturn(Array(Column.create("i", IntegerType)))
     when(testCatalog.loadTable(ident)).thenReturn(table)
     val r = CatalogV2Util.loadRelation(testCatalog, ident)
     assert(r.isDefined)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index 06ee588329c..50bea2b8d2f 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -84,6 +84,7 @@ class BasicInMemoryTableCatalog extends TableCatalog {
     invalidatedTables.add(ident)
   }
 
+  // TODO: remove it when no tests calling this deprecated method.
   override def createTable(
       ident: Identifier,
       schema: StructType,
@@ -93,6 +94,15 @@ class BasicInMemoryTableCatalog extends TableCatalog {
       Array.empty, None)
   }
 
+  override def createTable(
+      ident: Identifier,
+      columns: Array[Column],
+      partitions: Array[Transform],
+      properties: util.Map[String, String]): Table = {
+    val schema = CatalogV2Util.v2ColumnsToStructType(columns)
+    createTable(ident, schema, partitions, properties)
+  }
+
   def createTable(
       ident: Identifier,
       schema: StructType,
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala
index 0590ca721cc..90ed106d8ed 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala
@@ -22,7 +22,7 @@ import java.util
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
PartitionsAlreadyExistException}
-import org.apache.spark.sql.connector.expressions.{LogicalExpressions, 
NamedReference}
+import org.apache.spark.sql.connector.expressions.{LogicalExpressions, 
NamedReference, Transform}
 import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -41,7 +41,7 @@ class SupportsAtomicPartitionManagementSuite extends 
SparkFunSuite {
         .add("id", IntegerType)
         .add("data", StringType)
         .add("dt", StringType),
-      Array(LogicalExpressions.identity(ref("dt"))),
+      Array[Transform](LogicalExpressions.identity(ref("dt"))),
       util.Collections.emptyMap[String, String])
     newCatalog
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala
index ddd08185527..40114d063aa 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
PartitionsAlreadyExistException}
-import org.apache.spark.sql.connector.expressions.{LogicalExpressions, 
NamedReference}
+import org.apache.spark.sql.connector.expressions.{LogicalExpressions, 
NamedReference, Transform}
 import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -43,7 +43,7 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
         .add("id", IntegerType)
         .add("data", StringType)
         .add("dt", StringType),
-      Array(LogicalExpressions.identity(ref("dt"))),
+      Array[Transform](LogicalExpressions.identity(ref("dt"))),
       util.Collections.emptyMap[String, String])
     newCatalog
   }
@@ -164,7 +164,8 @@ class SupportsPartitionManagementSuite extends 
SparkFunSuite {
         .add("col0", IntegerType)
         .add("part0", IntegerType)
         .add("part1", StringType),
-      Array(LogicalExpressions.identity(ref("part0")), 
LogicalExpressions.identity(ref("part1"))),
+      Array[Transform](
+        LogicalExpressions.identity(ref("part0")), 
LogicalExpressions.identity(ref("part1"))),
       util.Collections.emptyMap[String, String])
 
     val partTable = table.asInstanceOf[InMemoryPartitionTable]
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index dc4fed49c1c..6fd47a534e9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -244,7 +244,6 @@ case class PreprocessTableCreation(sparkSession: 
SparkSession) extends Rule[Logi
     case create: V2CreateTablePlan if create.childrenResolved =>
       val schema = create.tableSchema
       val partitioning = create.partitioning
-      val identifier = create.tableName
       val isCaseSensitive = conf.caseSensitiveAnalysis
       // Check that columns are not duplicated in the schema
       val flattenedSchema = SchemaUtils.explodeNestedFieldNames(schema)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
index abc6bc60d96..55057844328 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
@@ -23,15 +23,14 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.TableSpec
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, 
TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
Identifier, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.types.StructType
 
 case class CreateTableExec(
     catalog: TableCatalog,
     identifier: Identifier,
-    tableSchema: StructType,
+    columns: Array[Column],
     partitioning: Seq[Transform],
     tableSpec: TableSpec,
     ignoreIfExists: Boolean) extends LeafV2CommandExec {
@@ -42,7 +41,7 @@ case class CreateTableExec(
   override protected def run(): Seq[InternalRow] = {
     if (!catalog.tableExists(identifier)) {
       try {
-        catalog.createTable(identifier, tableSchema, partitioning.toArray, 
tableProperties.asJava)
+        catalog.createTable(identifier, columns, partitioning.toArray, 
tableProperties.asJava)
       } catch {
         case _: TableAlreadyExistsException if ignoreIfExists =>
           logWarning(s"Table ${identifier.quoted} was created concurrently. 
Ignoring.")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 757b66e1534..b45de06371c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -33,6 +33,7 @@ import 
org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{toPrettySQL, ResolveDefaultColumns, 
V2ExpressionBuilder}
 import org.apache.spark.sql.connector.catalog.{Identifier, 
StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, 
SupportsPartitionManagement, SupportsWrite, Table, TableCapability, 
TableCatalog, TruncatableTable}
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Util.structTypeToV2Columns
 import org.apache.spark.sql.connector.catalog.index.SupportsIndex
 import org.apache.spark.sql.connector.expressions.{FieldReference, 
LiteralValue}
 import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not => 
V2Not, Or => V2Or, Predicate}
@@ -177,7 +178,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       val newSchema: StructType =
         ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
           schema, tableSpec.provider, "CREATE TABLE", false)
-      CreateTableExec(catalog.asTableCatalog, ident, newSchema,
+      CreateTableExec(catalog.asTableCatalog, ident, 
structTypeToV2Columns(newSchema),
         partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
 
     case CreateTableAsSelect(ResolvedIdentifier(catalog, ident), parts, query, 
tableSpec,
@@ -200,12 +201,13 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       val newSchema: StructType =
         ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
           schema, tableSpec.provider, "CREATE TABLE", false)
+      val v2Columns = structTypeToV2Columns(newSchema)
       catalog match {
         case staging: StagingTableCatalog =>
-          AtomicReplaceTableExec(staging, ident, newSchema, parts,
+          AtomicReplaceTableExec(staging, ident, v2Columns, parts,
             qualifyLocInTableSpec(tableSpec), orCreate = orCreate, 
invalidateCache) :: Nil
         case _ =>
-          ReplaceTableExec(catalog.asTableCatalog, ident, newSchema, parts,
+          ReplaceTableExec(catalog.asTableCatalog, ident, v2Columns, parts,
             qualifyLocInTableSpec(tableSpec), orCreate = orCreate, 
invalidateCache) :: Nil
       }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
index 0bd25064e35..3cb1a74417d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
@@ -90,8 +90,9 @@ trait FileDataSourceV2 extends TableProvider with 
DataSourceRegister {
   private var t: Table = null
 
   override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
     if (t == null) t = getTable(options)
-    t.schema()
+    t.columns.asSchema
   }
 
   // TODO: implement a light-weight partition inference which only looks at 
the path of one leaf
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
index ea221980fed..55d97577d57 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
@@ -23,16 +23,15 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.TableSpec
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, 
StagedTable, StagingTableCatalog, Table, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
 case class ReplaceTableExec(
     catalog: TableCatalog,
     ident: Identifier,
-    tableSchema: StructType,
+    columns: Array[Column],
     partitioning: Seq[Transform],
     tableSpec: TableSpec,
     orCreate: Boolean,
@@ -48,7 +47,7 @@ case class ReplaceTableExec(
     } else if (!orCreate) {
       throw QueryCompilationErrors.cannotReplaceMissingTableError(ident)
     }
-    catalog.createTable(ident, tableSchema, partitioning.toArray, 
tableProperties.asJava)
+    catalog.createTable(ident, columns, partitioning.toArray, 
tableProperties.asJava)
     Seq.empty
   }
 
@@ -58,7 +57,7 @@ case class ReplaceTableExec(
 case class AtomicReplaceTableExec(
     catalog: StagingTableCatalog,
     identifier: Identifier,
-    tableSchema: StructType,
+    columns: Array[Column],
     partitioning: Seq[Transform],
     tableSpec: TableSpec,
     orCreate: Boolean,
@@ -73,11 +72,11 @@ case class AtomicReplaceTableExec(
     }
     val staged = if (orCreate) {
       catalog.stageCreateOrReplace(
-        identifier, tableSchema, partitioning.toArray, tableProperties.asJava)
+        identifier, columns, partitioning.toArray, tableProperties.asJava)
     } else if (catalog.tableExists(identifier)) {
       try {
         catalog.stageReplace(
-          identifier, tableSchema, partitioning.toArray, 
tableProperties.asJava)
+          identifier, columns, partitioning.toArray, tableProperties.asJava)
       } catch {
         case e: NoSuchTableException =>
           throw 
QueryCompilationErrors.cannotReplaceMissingTableError(identifier, Some(e))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
index ec40ad70b79..5712159ddc8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
@@ -59,7 +59,8 @@ case class ShowCreateTableExec(
   }
 
   private def showTableDataColumns(table: Table, builder: StringBuilder): Unit 
= {
-    val columns = CharVarcharUtils.getRawSchema(table.schema(), 
conf).fields.map(_.toDDL)
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+    val columns = CharVarcharUtils.getRawSchema(table.columns.asSchema, 
conf).fields.map(_.toDDL)
     builder ++= concatByMultiLines(columns)
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index b9afe71d243..461e948b029 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, 
NoSuchTableException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, 
CatalogTableType, CatalogUtils, SessionCatalog}
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, 
FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, 
TableCatalog, TableChange, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, 
Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, 
Table, TableCatalog, TableChange, V1Table}
 import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
 import org.apache.spark.sql.connector.expressions.Transform
@@ -92,6 +92,15 @@ class V2SessionCatalog(catalog: SessionCatalog)
     catalog.refreshTable(ident.asTableIdentifier)
   }
 
+  override def createTable(
+      ident: Identifier,
+      columns: Array[Column],
+      partitions: Array[Transform],
+      properties: util.Map[String, String]): Table = {
+    createTable(ident, CatalogV2Util.v2ColumnsToStructType(columns), 
partitions, properties)
+  }
+
+  // TODO: remove it when no tests calling this deprecated method.
   override def createTable(
       ident: Identifier,
       schema: StructType,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 490b7082223..c53c603ffaa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -86,8 +86,9 @@ case class CreateTableAsSelectExec(
       throw QueryCompilationErrors.tableAlreadyExistsError(ident)
     }
 
-    val schema = CharVarcharUtils.getRawSchema(query.schema, conf).asNullable
-    val table = catalog.createTable(ident, schema,
+    val columns = CatalogV2Util.structTypeToV2Columns(
+      CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
+    val table = catalog.createTable(ident, columns,
       partitioning.toArray, properties.asJava)
     writeToTable(catalog, table, writeOptions, ident)
   }
@@ -125,9 +126,10 @@ case class AtomicCreateTableAsSelectExec(
 
       throw QueryCompilationErrors.tableAlreadyExistsError(ident)
     }
-    val schema = CharVarcharUtils.getRawSchema(query.schema, conf).asNullable
+    val columns = CatalogV2Util.structTypeToV2Columns(
+      CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
     val stagedTable = catalog.stageCreate(
-      ident, schema, partitioning.toArray, properties.asJava)
+      ident, columns, partitioning.toArray, properties.asJava)
     writeToTable(catalog, stagedTable, writeOptions, ident)
   }
 
@@ -174,9 +176,10 @@ case class ReplaceTableAsSelectExec(
     } else if (!orCreate) {
       throw QueryCompilationErrors.cannotReplaceMissingTableError(ident)
     }
-    val schema = CharVarcharUtils.getRawSchema(query.schema, conf).asNullable
+    val columns = CatalogV2Util.structTypeToV2Columns(
+      CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
     val table = catalog.createTable(
-      ident, schema, partitioning.toArray, properties.asJava)
+      ident, columns, partitioning.toArray, properties.asJava)
     writeToTable(catalog, table, writeOptions, ident)
   }
 
@@ -210,18 +213,19 @@ case class AtomicReplaceTableAsSelectExec(
   val properties = CatalogV2Util.convertTableProperties(tableSpec)
 
   override protected def run(): Seq[InternalRow] = {
-    val schema = CharVarcharUtils.getRawSchema(query.schema, conf).asNullable
+    val columns = CatalogV2Util.structTypeToV2Columns(
+      CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
     if (catalog.tableExists(ident)) {
       val table = catalog.loadTable(ident)
       invalidateCache(catalog, table, ident)
     }
     val staged = if (orCreate) {
       catalog.stageCreateOrReplace(
-        ident, schema, partitioning.toArray, properties.asJava)
+        ident, columns, partitioning.toArray, properties.asJava)
     } else if (catalog.tableExists(ident)) {
       try {
         catalog.stageReplace(
-          ident, schema, partitioning.toArray, properties.asJava)
+          ident, columns, partitioning.toArray, properties.asJava)
       } catch {
         case e: NoSuchTableException =>
           throw QueryCompilationErrors.cannotReplaceMissingTableError(ident, 
Some(e))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
index d9a3a074ce6..1ab88cd41d8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
@@ -86,7 +86,10 @@ class TextSocketTable(host: String, port: Int, 
numPartitions: Int, includeTimest
   }
 
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= () => new Scan {
-    override def readSchema(): StructType = schema()
+    override def readSchema(): StructType = {
+      import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+      columns.asSchema
+    }
 
     override def toMicroBatchStream(checkpointLocation: String): 
MicroBatchStream = {
       new TextSocketMicroBatchStream(host, port, numPartitions)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index d4621468f84..13f7695947e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -180,11 +180,12 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
         import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
         table match {
           case _: SupportsRead if table.supportsAny(MICRO_BATCH_READ, 
CONTINUOUS_READ) =>
+            import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
             Dataset.ofRows(
               sparkSession,
               StreamingRelationV2(
                 Some(provider), source, table, dsOptions,
-                table.schema.toAttributes, None, None, v1Relation))
+                table.columns.asSchema.toAttributes, None, None, v1Relation))
 
           // fallback to v1
           // TODO (SPARK-27483): we should move this fallback logic to an 
analyzer rule.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 158e1634d58..3678f29ab49 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, 
Row, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
 import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryTableCatalog}
+import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.internal.SQLConf
@@ -178,7 +179,7 @@ class DataSourceV2DataFrameSuite
         testCatalog.createTable(
           Identifier.of(Array(), "table_name"),
           new StructType().add("i", "interval"),
-          Array.empty, Collections.emptyMap[String, String])
+          Array.empty[Transform], Collections.emptyMap[String, String])
         val df = sql(s"select interval 1 millisecond as i")
         val v2Writer = df.writeTo("testcat.table_name")
         val e1 = intercept[AnalysisException](v2Writer.append())
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 38bd24356f1..83fe2ba51b5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -31,10 +31,11 @@ import 
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
 import org.apache.spark.sql.catalyst.statsEstimation.StatsEstimationTestBase
-import org.apache.spark.sql.catalyst.util.{DateTimeUtils, 
ResolveDefaultColumns}
-import org.apache.spark.sql.connector.catalog._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.connector.catalog.{Column => ColumnV2, _}
 import 
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
+import org.apache.spark.sql.connector.expressions.LiteralValue
 import org.apache.spark.sql.errors.QueryErrorsBase
 import org.apache.spark.sql.execution.FilterExec
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -45,7 +46,7 @@ import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, 
PartitionOverwriteMode, V2_SESSION_CATALOG_IMPLEMENTATION}
 import org.apache.spark.sql.internal.connector.SimpleTableProvider
 import org.apache.spark.sql.sources.SimpleScanSource
-import org.apache.spark.sql.types.{LongType, MetadataBuilder, StringType, 
StructField, StructType}
+import org.apache.spark.sql.types.{LongType, StringType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -585,7 +586,7 @@ class DataSourceV2SQLSuiteV1Filter
     assert(maybeReplacedTable === table, "Table should not have changed.")
   }
 
-  test("ReplaceTable: Erases the table contents and changes the metadata.") {
+  test("ReplaceTable: Erases the table contents and changes the metadata") {
     spark.sql(s"CREATE TABLE testcat.table_name USING $v2Source AS SELECT id, 
data FROM source")
 
     val testCatalog = catalog("testcat").asTableCatalog
@@ -598,14 +599,11 @@ class DataSourceV2SQLSuiteV1Filter
 
       assert(replaced.asInstanceOf[InMemoryTable].rows.isEmpty,
         "Replaced table should have no rows after committing.")
-      assert(replaced.schema().fields.length === 1,
+      assert(replaced.columns.length === 1,
         "Replaced table should have new schema.")
-      val actual = replaced.schema().fields(0)
-      val expected = StructField("id", LongType, nullable = false,
-        new MetadataBuilder().putString(
-          ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "41 + 1")
-          .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, 
"CAST(42 AS BIGINT)")
-          .build())
+      val actual = replaced.columns.head
+      val expected = ColumnV2.create("id", LongType, false, null,
+        new ColumnDefaultValue("41 + 1", LiteralValue(42L, LongType)), null)
       assert(actual === expected,
         "Replaced table should have new schema with DEFAULT column metadata.")
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
index 14b951e66db..781e0f96eaf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
@@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfter
 import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders, 
QueryTest, Row}
 import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryRowLevelOperationTable, InMemoryRowLevelOperationTableCatalog}
 import org.apache.spark.sql.connector.expressions.LogicalExpressions._
+import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.datasources.v2.{DeleteFromTableExec, 
ReplaceDataExec, WriteDeltaExec}
@@ -564,7 +565,8 @@ abstract class DeleteFromTableSuiteBase
 
   protected def createTable(schemaString: String): Unit = {
     val schema = StructType.fromDDL(schemaString)
-    catalog.createTable(ident, schema, Array(identity(reference(Seq("dep")))), 
extraTableProps)
+    catalog.createTable(
+      ident, schema, Array[Transform](identity(reference(Seq("dep")))), 
extraTableProps)
   }
 
   protected def createAndInitTable(schemaString: String, jsonData: String): 
Unit = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
index 0a0aaa80219..46586c622db 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType
-import org.apache.spark.sql.connector.catalog.{DelegatingCatalogExtension, 
Identifier, Table, TableCatalog, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
DelegatingCatalogExtension, Identifier, Table, TableCatalog, V1Table}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.types.StructType
 
@@ -64,6 +64,15 @@ private[connector] trait TestV2SessionCatalogBase[T <: 
Table] extends Delegating
     }
   }
 
+  override def createTable(
+      ident: Identifier,
+      columns: Array[Column],
+      partitions: Array[Transform],
+      properties: java.util.Map[String, String]): Table = {
+    createTable(ident, CatalogV2Util.v2ColumnsToStructType(columns), 
partitions, properties)
+  }
+
+  // TODO: remove it when no tests calling this deprecated method.
   override def createTable(
       ident: Identifier,
       schema: StructType,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
index b262e405d4e..f7905daa20a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
@@ -974,7 +974,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
   }
 
   test("continuous mode allows unspecified distribution and empty ordering") {
-    catalog.createTable(ident, schema, Array.empty, emptyProps)
+    catalog.createTable(ident, schema, Array.empty[Transform], emptyProps)
 
     withTempDir { checkpointDir =>
       val inputData = ContinuousMemoryStream[(Long, String)]
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 44b4166c07a..2cf4792b8c1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -34,17 +34,17 @@ import 
org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
 import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, 
AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect, 
DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, 
InsertIntoStatement, LocalRelation, LogicalPlan, MergeIntoTable, 
OneRowRelation, OverwriteByExpression, OverwritePartitionsDynamic, Project, 
SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, 
UnsetTableProperties, UpdateAction, UpdateTable}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
 import org.apache.spark.sql.connector.FakeV2Provider
-import org.apache.spark.sql.connector.catalog.{CatalogManager, 
CatalogNotFoundException, Identifier, SupportsDelete, Table, TableCapability, 
TableCatalog, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, 
CatalogNotFoundException, Column, ColumnDefaultValue, Identifier, 
SupportsDelete, Table, TableCapability, TableCatalog, V1Table}
 import 
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
-import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.expressions.{LiteralValue, Transform}
 import org.apache.spark.sql.execution.datasources.{CreateTable => 
CreateTableV1}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
 import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, 
PartitionOverwriteMode}
 import org.apache.spark.sql.sources.SimpleScanSource
-import org.apache.spark.sql.types.{BooleanType, CharType, DoubleType, 
IntegerType, LongType, MetadataBuilder, StringType, StructField, StructType}
+import org.apache.spark.sql.types.{BooleanType, CharType, DoubleType, 
IntegerType, LongType, StringType, StructField, StructType, VarcharType}
+import org.apache.spark.unsafe.types.UTF8String
 
 class PlanResolutionSuite extends AnalysisTest {
   import CatalystSqlParser._
@@ -54,21 +54,24 @@ class PlanResolutionSuite extends AnalysisTest {
 
   private val table: Table = {
     val t = mock(classOf[SupportsDelete])
-    when(t.schema()).thenReturn(new StructType().add("i", "int").add("s", 
"string"))
+    when(t.columns()).thenReturn(
+      Array(Column.create("i", IntegerType), Column.create("s", StringType)))
     when(t.partitioning()).thenReturn(Array.empty[Transform])
     t
   }
 
   private val table1: Table = {
     val t = mock(classOf[Table])
-    when(t.schema()).thenReturn(new StructType().add("s", "string").add("i", 
"int"))
+    when(t.columns()).thenReturn(
+      Array(Column.create("s", StringType), Column.create("i", IntegerType)))
     when(t.partitioning()).thenReturn(Array.empty[Transform])
     t
   }
 
   private val table2: Table = {
     val t = mock(classOf[Table])
-    when(t.schema()).thenReturn(new StructType().add("i", "int").add("x", 
"string"))
+    when(t.columns()).thenReturn(
+      Array(Column.create("i", IntegerType), Column.create("x", StringType)))
     when(t.partitioning()).thenReturn(Array.empty[Transform])
     t
   }
@@ -76,53 +79,46 @@ class PlanResolutionSuite extends AnalysisTest {
   private val tableWithAcceptAnySchemaCapability: Table = {
     val t = mock(classOf[Table])
     when(t.name()).thenReturn("v2TableWithAcceptAnySchemaCapability")
-    when(t.schema()).thenReturn(new StructType().add("i", "int"))
+    when(t.columns()).thenReturn(Array(Column.create("i", IntegerType)))
     
when(t.capabilities()).thenReturn(Collections.singleton(TableCapability.ACCEPT_ANY_SCHEMA))
     t
   }
 
   private val charVarcharTable: Table = {
     val t = mock(classOf[Table])
-    when(t.schema()).thenReturn(new StructType().add("c1", 
"char(5)").add("c2", "varchar(5)"))
+    when(t.columns()).thenReturn(
+      Array(Column.create("c1", CharType(5)), Column.create("c2", 
VarcharType(5))))
     when(t.partitioning()).thenReturn(Array.empty[Transform])
     t
   }
 
   private val defaultValues: Table = {
     val t = mock(classOf[Table])
-    when(t.schema()).thenReturn(
-      new StructType()
-        .add("i", BooleanType, true,
-          new MetadataBuilder()
-            
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "true")
-            
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, 
"true").build())
-        .add("s", IntegerType, true,
-          new MetadataBuilder()
-            
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "42")
-            
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, 
"42").build()))
+    val default1 = new ColumnDefaultValue("true", LiteralValue(true, 
BooleanType))
+    val default2 = new ColumnDefaultValue("42", LiteralValue(42, IntegerType))
+    when(t.columns()).thenReturn(Array(
+      Column.create("i", BooleanType, true, null, default1, null),
+      Column.create("s", IntegerType, true, null, default2, null)))
     when(t.partitioning()).thenReturn(Array.empty[Transform])
     t
   }
 
   private val defaultValues2: Table = {
     val t = mock(classOf[Table])
-    when(t.schema()).thenReturn(
-      new StructType()
-        .add("i", StringType)
-        .add("e", StringType, true,
-          new MetadataBuilder()
-            
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'abc'")
-            
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, 
"'abc'").build()))
+    val default = new ColumnDefaultValue(
+      "'abc'", LiteralValue(UTF8String.fromString("abc"), StringType))
+    when(t.columns()).thenReturn(Array(
+      Column.create("i", StringType),
+      Column.create("e", StringType, true, null, default, null)))
     when(t.partitioning()).thenReturn(Array.empty[Transform])
     t
   }
 
   private val tableWithColumnNamedDefault: Table = {
     val t = mock(classOf[Table])
-    when(t.schema()).thenReturn(
-      new StructType()
-        .add("s", StringType)
-        .add("default", StringType))
+    when(t.columns()).thenReturn(Array(
+      Column.create("s", StringType),
+      Column.create("default", StringType)))
     when(t.partitioning()).thenReturn(Array.empty[Transform])
     t
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala
index d0169bde40f..33e2fc46ccb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryTableCatalog}
+import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.functions.lit
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.StructType
@@ -51,7 +52,7 @@ class InMemoryTableMetricSuite
       testCatalog.createTable(
         Identifier.of(Array(), "table_name"),
         new StructType().add("i", "int"),
-        Array.empty, Collections.emptyMap[String, String])
+        Array.empty[Transform], Collections.emptyMap[String, String])
 
       func("testcat.table_name")
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
index 2a441157f9d..8f5996438e2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
@@ -31,6 +31,7 @@ import 
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException,
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, 
NamespaceChange, SupportsNamespaces, TableCatalog, TableChange, V1Table}
+import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, 
StringType, StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -38,6 +39,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 abstract class V2SessionCatalogBaseSuite extends SharedSparkSession with 
BeforeAndAfter {
 
   val emptyProps: util.Map[String, String] = Collections.emptyMap[String, 
String]
+  val emptyTrans: Array[Transform] = Array.empty
   val schema: StructType = new StructType()
       .add("id", IntegerType)
       .add("data", StringType)
@@ -95,13 +97,13 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
 
     assert(catalog.listTables(Array("ns")).isEmpty)
 
-    catalog.createTable(ident1, schema, Array.empty, emptyProps)
+    catalog.createTable(ident1, schema, emptyTrans, emptyProps)
 
     assert(catalog.listTables(Array("ns")).toSet == Set(ident1))
     assert(catalog.listTables(Array("ns2")).isEmpty)
 
-    catalog.createTable(ident3, schema, Array.empty, emptyProps)
-    catalog.createTable(ident2, schema, Array.empty, emptyProps)
+    catalog.createTable(ident3, schema, emptyTrans, emptyProps)
+    catalog.createTable(ident2, schema, emptyTrans, emptyProps)
 
     assert(catalog.listTables(Array("ns")).toSet == Set(ident1, ident2))
     assert(catalog.listTables(Array("ns2")).toSet == Set(ident3))
@@ -123,7 +125,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
 
     assert(!catalog.tableExists(testIdent))
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
     assert(parsed == Seq("db", "test_table"))
@@ -141,7 +143,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
 
     assert(!catalog.tableExists(testIdent))
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
 
     val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
     assert(parsed == Seq("db", "test_table"))
@@ -156,13 +158,13 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
 
     assert(!catalog.tableExists(testIdent))
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
       .map(part => quoteIdentifier(part)).mkString(".")
 
     val exc = intercept[TableAlreadyExistsException] {
-      catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+      catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
     }
 
     checkErrorTableAlreadyExists(exc, parsed)
@@ -183,26 +185,26 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     assert(!catalog.tableExists(testIdent))
 
     // default location
-    val t1 = catalog.createTable(testIdent, schema, Array.empty, 
properties).asInstanceOf[V1Table]
+    val t1 = catalog.createTable(testIdent, schema, emptyTrans, 
properties).asInstanceOf[V1Table]
     assert(t1.catalogTable.location ===
       spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier))
     catalog.dropTable(testIdent)
 
     // relative path
     properties.put(TableCatalog.PROP_LOCATION, "relative/path")
-    val t2 = catalog.createTable(testIdent, schema, Array.empty, 
properties).asInstanceOf[V1Table]
+    val t2 = catalog.createTable(testIdent, schema, emptyTrans, 
properties).asInstanceOf[V1Table]
     assert(t2.catalogTable.location === 
makeQualifiedPathWithWarehouse("db.db/relative/path"))
     catalog.dropTable(testIdent)
 
     // absolute path without scheme
     properties.put(TableCatalog.PROP_LOCATION, "/absolute/path")
-    val t3 = catalog.createTable(testIdent, schema, Array.empty, 
properties).asInstanceOf[V1Table]
+    val t3 = catalog.createTable(testIdent, schema, emptyTrans, 
properties).asInstanceOf[V1Table]
     assert(t3.catalogTable.location.toString === "file:///absolute/path")
     catalog.dropTable(testIdent)
 
     // absolute path with scheme
     properties.put(TableCatalog.PROP_LOCATION, "file:/absolute/path")
-    val t4 = catalog.createTable(testIdent, schema, Array.empty, 
properties).asInstanceOf[V1Table]
+    val t4 = catalog.createTable(testIdent, schema, emptyTrans, 
properties).asInstanceOf[V1Table]
     assert(t4.catalogTable.location.toString === "file:/absolute/path")
     catalog.dropTable(testIdent)
   }
@@ -212,7 +214,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
 
     assert(!catalog.tableExists(testIdent))
 
-    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(catalog.tableExists(testIdent))
 
@@ -224,7 +226,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("loadTable") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
     val loaded = catalog.loadTable(testIdent)
 
     assert(table.name == loaded.name)
@@ -245,7 +247,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("invalidateTable") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
     catalog.invalidateTable(testIdent)
 
     val loaded = catalog.loadTable(testIdent)
@@ -266,7 +268,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: add property") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(filterV2TableProperties(table.properties) == Map())
 
@@ -285,7 +287,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val properties = new util.HashMap[String, String]()
     properties.put("prop-1", "1")
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
 
     assert(filterV2TableProperties(table.properties) == Map("prop-1" -> "1"))
 
@@ -304,7 +306,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val properties = new util.HashMap[String, String]()
     properties.put("prop-1", "1")
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, properties)
 
     assert(filterV2TableProperties(table.properties) == Map("prop-1" -> "1"))
 
@@ -320,7 +322,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: remove missing property") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(filterV2TableProperties(table.properties) == Map())
 
@@ -336,7 +338,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: add top-level column") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -348,7 +350,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: add required column") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -361,7 +363,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: add column with comment") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -378,7 +380,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, Array.empty, 
emptyProps)
+    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, 
emptyProps)
 
     assert(table.schema == tableSchema)
 
@@ -393,7 +395,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: add column to primitive field fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -411,7 +413,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: add field to missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -427,7 +429,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: update column data type") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -443,7 +445,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val originalSchema = new StructType()
         .add("id", IntegerType, nullable = false)
         .add("data", StringType)
-    val table = catalog.createTable(testIdent, originalSchema, Array.empty, 
emptyProps)
+    val table = catalog.createTable(testIdent, originalSchema, emptyTrans, 
emptyProps)
 
     assert(table.schema == originalSchema)
 
@@ -457,7 +459,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: update missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -473,7 +475,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: add comment") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -489,7 +491,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: replace comment") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -508,7 +510,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: add comment to missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -524,7 +526,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: rename top-level column") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -541,7 +543,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, Array.empty, 
emptyProps)
+    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, 
emptyProps)
 
     assert(table.schema == tableSchema)
 
@@ -560,7 +562,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, Array.empty, 
emptyProps)
+    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, 
emptyProps)
 
     assert(table.schema == tableSchema)
 
@@ -576,7 +578,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: rename missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -595,7 +597,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, Array.empty, 
emptyProps)
+    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, 
emptyProps)
 
     assert(table.schema == tableSchema)
 
@@ -612,7 +614,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: delete top-level column") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -629,7 +631,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, Array.empty, 
emptyProps)
+    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, 
emptyProps)
 
     assert(table.schema == tableSchema)
 
@@ -645,7 +647,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
   test("alterTable: delete missing column fails") {
     val catalog = newCatalog()
 
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(table.schema == schema)
 
@@ -667,7 +669,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
     val tableSchema = schema.add("point", pointStruct)
 
-    val table = catalog.createTable(testIdent, tableSchema, Array.empty, 
emptyProps)
+    val table = catalog.createTable(testIdent, tableSchema, emptyTrans, 
emptyProps)
 
     assert(table.schema == tableSchema)
 
@@ -698,7 +700,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     assert(!catalog.tableExists(testIdent))
 
     // default location
-    val t1 = catalog.createTable(testIdent, schema, Array.empty, 
emptyProps).asInstanceOf[V1Table]
+    val t1 = catalog.createTable(testIdent, schema, emptyTrans, 
emptyProps).asInstanceOf[V1Table]
     assert(t1.catalogTable.location ===
       spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier))
 
@@ -723,7 +725,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
 
     assert(!catalog.tableExists(testIdent))
 
-    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(catalog.tableExists(testIdent))
 
@@ -750,7 +752,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     assert(!catalog.tableExists(testIdent))
     assert(!catalog.tableExists(testIdentNew))
 
-    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(catalog.tableExists(testIdent))
     catalog.renameTable(testIdent, testIdentNew)
@@ -775,8 +777,8 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     assert(!catalog.tableExists(testIdent))
     assert(!catalog.tableExists(testIdentNew))
 
-    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
-    catalog.createTable(testIdentNew, schema, Array.empty, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
+    catalog.createTable(testIdentNew, schema, emptyTrans, emptyProps)
 
     assert(catalog.tableExists(testIdent))
     assert(catalog.tableExists(testIdentNew))
@@ -795,7 +797,7 @@ class V2SessionCatalogTableSuite extends 
V2SessionCatalogBaseSuite {
     assert(!catalog.tableExists(testIdent))
     assert(!catalog.tableExists(testIdentNewOtherDb))
 
-    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     assert(catalog.tableExists(testIdent))
 
@@ -982,7 +984,7 @@ class V2SessionCatalogNamespaceSuite extends 
V2SessionCatalogBaseSuite {
     assert(catalog.namespaceExists(testNs) === false)
 
     val exc = intercept[NoSuchDatabaseException] {
-      catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+      catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
     }
 
     assert(exc.getMessage.contains(testNs.quoted))
@@ -1016,7 +1018,7 @@ class V2SessionCatalogNamespaceSuite extends 
V2SessionCatalogBaseSuite {
     val catalog = newCatalog()
 
     catalog.createNamespace(testNs, Map("property" -> "value").asJava)
-    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    catalog.createTable(testIdent, schema, emptyTrans, emptyProps)
 
     val exc = intercept[AnalysisException] {
       catalog.dropNamespace(testNs, cascade = false)
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index 13f2a865936..ea1e9a7e048 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -869,7 +869,7 @@ class InsertSuite extends QueryTest with TestHiveSingleton 
with BeforeAndAfter
               |SORTED BY (s1)
               |INTO 200 BUCKETS
               |STORED AS PARQUET
-          """.stripMargin
+              |""".stripMargin
           } else {
             """
               |CREATE TABLE test1(
@@ -880,14 +880,14 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
               |CLUSTERED BY (v1)
               |SORTED BY (s1)
               |INTO 200 BUCKETS
-          """.stripMargin
+              |""".stripMargin
           }
 
           val insertString =
             """
               |INSERT INTO test1
               |SELECT * FROM VALUES(1,1,1)
-          """.stripMargin
+              |""".stripMargin
 
           val dropString = "DROP TABLE IF EXISTS test1"
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to