This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 303ee3f  [SPARK-24252][SQL] Add TableCatalog API
303ee3f is described below

commit 303ee3fce0d320a330d6b80662b13f6daeeb9c16
Author: Ryan Blue <b...@apache.org>
AuthorDate: Wed May 8 10:31:06 2019 +0800

    [SPARK-24252][SQL] Add TableCatalog API
    
    ## What changes were proposed in this pull request?
    
    This adds the TableCatalog API proposed in the [Table Metadata API 
SPIP](https://docs.google.com/document/d/1zLFiA1VuaWeVxeTDXNg8bL6GP3BVoOZBkewFtEnjEoo/edit#heading=h.m45webtwxf2d).
    
    For `TableCatalog` to use `Table`, it needed to be moved into the catalyst 
module where the v2 catalog API is located. This also required moving 
`TableCapability`. Most of the files touched by this PR are import changes 
needed by this move.
    
    ## How was this patch tested?
    
    This adds a test implementation and contract tests.
    
    Closes #24246 from rdblue/SPARK-24252-add-table-catalog-api.
    
    Authored-by: Ryan Blue <b...@apache.org>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalog/v2/IdentifierImpl.java       |  25 +
 .../apache/spark/sql/catalog/v2/TableCatalog.java  | 137 +++++
 .../apache/spark/sql/catalog/v2/TableChange.java   | 366 ++++++++++++
 .../sql/catalog/v2/expressions/Expressions.java    |  18 +-
 .../org/apache/spark/sql/sources/v2/Table.java     |  21 +
 .../spark/sql/sources/v2/TableCapability.java      |   6 +-
 .../spark/sql/catalog/v2/CatalogV2Implicits.scala  |  98 +++
 .../sql/catalog/v2/expressions/expressions.scala   |  45 +-
 .../catalyst/analysis/AlreadyExistException.scala  |  23 +-
 .../catalyst/analysis/NoSuchItemException.scala    |  22 +-
 .../spark/sql/catalog/v2/TableCatalogSuite.scala   | 657 +++++++++++++++++++++
 .../spark/sql/catalog/v2/TestTableCatalog.scala    | 220 +++++++
 .../datasources/DataSourceResolution.scala         |   2 +-
 .../sql/execution/datasources/v2/FileTable.scala   |  11 +-
 .../sources/v2/JavaPartitionAwareDataSource.java   |   7 +
 15 files changed, 1594 insertions(+), 64 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java
index 8874faa..cd13143 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/IdentifierImpl.java
@@ -17,8 +17,12 @@
 
 package org.apache.spark.sql.catalog.v2;
 
+import com.google.common.base.Preconditions;
 import org.apache.spark.annotation.Experimental;
 
+import java.util.Arrays;
+import java.util.Objects;
+
 /**
  *  An {@link Identifier} implementation.
  */
@@ -29,6 +33,8 @@ class IdentifierImpl implements Identifier {
   private String name;
 
   IdentifierImpl(String[] namespace, String name) {
+    Preconditions.checkNotNull(namespace, "Identifier namespace cannot be 
null");
+    Preconditions.checkNotNull(name, "Identifier name cannot be null");
     this.namespace = namespace;
     this.name = name;
   }
@@ -42,4 +48,23 @@ class IdentifierImpl implements Identifier {
   public String name() {
     return name;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    IdentifierImpl that = (IdentifierImpl) o;
+    return Arrays.equals(namespace, that.namespace) && name.equals(that.name);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(Arrays.hashCode(namespace), name);
+  }
 }
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java
new file mode 100644
index 0000000..681629d
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.catalog.v2.expressions.Transform;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.sources.v2.Table;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Map;
+
+/**
+ * Catalog methods for working with Tables.
+ * <p>
+ * TableCatalog implementations may be case sensitive or case insensitive. 
Spark will pass
+ * {@link Identifier table identifiers} without modification. Field names 
passed to
+ * {@link #alterTable(Identifier, TableChange...)} will be normalized to match 
the case used in the
+ * table schema when updating, renaming, or dropping existing columns when 
catalyst analysis is case
+ * insensitive.
+ */
+public interface TableCatalog extends CatalogPlugin {
+  /**
+   * List the tables in a namespace from the catalog.
+   * <p>
+   * If the catalog supports views, this must return identifiers for only 
tables and not views.
+   *
+   * @param namespace a multi-part namespace
+   * @return an array of Identifiers for tables
+   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional).
+   */
+  Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException;
+
+  /**
+   * Load table metadata by {@link Identifier identifier} from the catalog.
+   * <p>
+   * If the catalog supports views and contains a view for the identifier and 
not a table, this
+   * must throw {@link NoSuchTableException}.
+   *
+   * @param ident a table identifier
+   * @return the table's metadata
+   * @throws NoSuchTableException If the table doesn't exist or is a view
+   */
+  Table loadTable(Identifier ident) throws NoSuchTableException;
+
+  /**
+   * Invalidate cached table metadata for an {@link Identifier identifier}.
+   * <p>
+   * If the table is already loaded or cached, drop cached data. If the table 
does not exist or is
+   * not cached, do nothing. Calling this method should not query remote 
services.
+   *
+   * @param ident a table identifier
+   */
+  default void invalidateTable(Identifier ident) {
+  }
+
+  /**
+   * Test whether a table exists using an {@link Identifier identifier} from 
the catalog.
+   * <p>
+   * If the catalog supports views and contains a view for the identifier and 
not a table, this
+   * must return false.
+   *
+   * @param ident a table identifier
+   * @return true if the table exists, false otherwise
+   */
+  default boolean tableExists(Identifier ident) {
+    try {
+      return loadTable(ident) != null;
+    } catch (NoSuchTableException e) {
+      return false;
+    }
+  }
+
+  /**
+   * Create a table in the catalog.
+   *
+   * @param ident a table identifier
+   * @param schema the schema of the new table, as a struct type
+   * @param partitions transforms to use for partitioning data in the table
+   * @param properties a string map of table properties
+   * @return metadata for the new table
+   * @throws TableAlreadyExistsException If a table or view already exists for 
the identifier
+   * @throws UnsupportedOperationException If a requested partition transform 
is not supported
+   * @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
+   */
+  Table createTable(
+      Identifier ident,
+      StructType schema,
+      Transform[] partitions,
+      Map<String, String> properties) throws TableAlreadyExistsException, 
NoSuchNamespaceException;
+
+  /**
+   * Apply a set of {@link TableChange changes} to a table in the catalog.
+   * <p>
+   * Implementations may reject the requested changes. If any change is 
rejected, none of the
+   * changes should be applied to the table.
+   * <p>
+   * If the catalog supports views and contains a view for the identifier and 
not a table, this
+   * must throw {@link NoSuchTableException}.
+   *
+   * @param ident a table identifier
+   * @param changes changes to apply to the table
+   * @return updated metadata for the table
+   * @throws NoSuchTableException If the table doesn't exist or is a view
+   * @throws IllegalArgumentException If any change is rejected by the 
implementation.
+   */
+  Table alterTable(
+      Identifier ident,
+      TableChange... changes) throws NoSuchTableException;
+
+  /**
+   * Drop a table in the catalog.
+   * <p>
+   * If the catalog supports views and contains a view for the identifier and 
not a table, this
+   * must not drop the view and must return false.
+   *
+   * @param ident a table identifier
+   * @return true if a table was deleted, false if no table exists for the 
identifier
+   */
+  boolean dropTable(Identifier ident);
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java
new file mode 100644
index 0000000..9b87e67
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * TableChange subclasses represent requested changes to a table. These are 
passed to
+ * {@link TableCatalog#alterTable}. For example,
+ * <pre>
+ *   import TableChange._
+ *   val catalog = Catalogs.load(name)
+ *   catalog.asTableCatalog.alterTable(ident,
+ *       addColumn("x", IntegerType),
+ *       renameColumn("a", "b"),
+ *       deleteColumn("c")
+ *     )
+ * </pre>
+ */
+public interface TableChange {
+
+  /**
+   * Create a TableChange for setting a table property.
+   * <p>
+   * If the property already exists, it will be replaced with the new value.
+   *
+   * @param property the property name
+   * @param value the new property value
+   * @return a TableChange for the addition
+   */
+  static TableChange setProperty(String property, String value) {
+    return new SetProperty(property, value);
+  }
+
+  /**
+   * Create a TableChange for removing a table property.
+   * <p>
+   * If the property does not exist, the change will succeed.
+   *
+   * @param property the property name
+   * @return a TableChange for the addition
+   */
+  static TableChange removeProperty(String property) {
+    return new RemoveProperty(property);
+  }
+
+  /**
+   * Create a TableChange for adding an optional column.
+   * <p>
+   * If the field already exists, the change will result in an {@link 
IllegalArgumentException}.
+   * If the new field is nested and its parent does not exist or is not a 
struct, the change will
+   * result in an {@link IllegalArgumentException}.
+   *
+   * @param fieldNames field names of the new column
+   * @param dataType the new column's data type
+   * @return a TableChange for the addition
+   */
+  static TableChange addColumn(String[] fieldNames, DataType dataType) {
+    return new AddColumn(fieldNames, dataType, true, null);
+  }
+
+  /**
+   * Create a TableChange for adding a column.
+   * <p>
+   * If the field already exists, the change will result in an {@link 
IllegalArgumentException}.
+   * If the new field is nested and its parent does not exist or is not a 
struct, the change will
+   * result in an {@link IllegalArgumentException}.
+   *
+   * @param fieldNames field names of the new column
+   * @param dataType the new column's data type
+   * @param isNullable whether the new column can contain null
+   * @return a TableChange for the addition
+   */
+  static TableChange addColumn(String[] fieldNames, DataType dataType, boolean 
isNullable) {
+    return new AddColumn(fieldNames, dataType, isNullable, null);
+  }
+
+  /**
+   * Create a TableChange for adding a column.
+   * <p>
+   * If the field already exists, the change will result in an {@link 
IllegalArgumentException}.
+   * If the new field is nested and its parent does not exist or is not a 
struct, the change will
+   * result in an {@link IllegalArgumentException}.
+   *
+   * @param fieldNames field names of the new column
+   * @param dataType the new column's data type
+   * @param isNullable whether the new column can contain null
+   * @param comment the new field's comment string
+   * @return a TableChange for the addition
+   */
+  static TableChange addColumn(
+      String[] fieldNames,
+      DataType dataType,
+      boolean isNullable,
+      String comment) {
+    return new AddColumn(fieldNames, dataType, isNullable, comment);
+  }
+
+  /**
+   * Create a TableChange for renaming a field.
+   * <p>
+   * The name is used to find the field to rename. The new name will replace 
the leaf field name.
+   * For example, renameColumn(["a", "b", "c"], "x") should produce column 
a.b.x.
+   * <p>
+   * If the field does not exist, the change will result in an {@link 
IllegalArgumentException}.
+   *
+   * @param fieldNames the current field names
+   * @param newName the new name
+   * @return a TableChange for the rename
+   */
+  static TableChange renameColumn(String[] fieldNames, String newName) {
+    return new RenameColumn(fieldNames, newName);
+  }
+
+  /**
+   * Create a TableChange for updating the type of a field that is nullable.
+   * <p>
+   * The field names are used to find the field to update.
+   * <p>
+   * If the field does not exist, the change will result in an {@link 
IllegalArgumentException}.
+   *
+   * @param fieldNames field names of the column to update
+   * @param newDataType the new data type
+   * @return a TableChange for the update
+   */
+  static TableChange updateColumnType(String[] fieldNames, DataType 
newDataType) {
+    return new UpdateColumnType(fieldNames, newDataType, true);
+  }
+
+  /**
+   * Create a TableChange for updating the type of a field.
+   * <p>
+   * The field names are used to find the field to update.
+   * <p>
+   * If the field does not exist, the change will result in an {@link 
IllegalArgumentException}.
+   *
+   * @param fieldNames field names of the column to update
+   * @param newDataType the new data type
+   * @return a TableChange for the update
+   */
+  static TableChange updateColumnType(
+      String[] fieldNames,
+      DataType newDataType,
+      boolean isNullable) {
+    return new UpdateColumnType(fieldNames, newDataType, isNullable);
+  }
+
+  /**
+   * Create a TableChange for updating the comment of a field.
+   * <p>
+   * The name is used to find the field to update.
+   * <p>
+   * If the field does not exist, the change will result in an {@link 
IllegalArgumentException}.
+   *
+   * @param fieldNames field names of the column to update
+   * @param newComment the new comment
+   * @return a TableChange for the update
+   */
+  static TableChange updateColumnComment(String[] fieldNames, String 
newComment) {
+    return new UpdateColumnComment(fieldNames, newComment);
+  }
+
+  /**
+   * Create a TableChange for deleting a field.
+   * <p>
+   * If the field does not exist, the change will result in an {@link 
IllegalArgumentException}.
+   *
+   * @param fieldNames field names of the column to delete
+   * @return a TableChange for the delete
+   */
+  static TableChange deleteColumn(String[] fieldNames) {
+    return new DeleteColumn(fieldNames);
+  }
+
+  /**
+   * A TableChange to set a table property.
+   * <p>
+   * If the property already exists, it must be replaced with the new value.
+   */
+  final class SetProperty implements TableChange {
+    private final String property;
+    private final String value;
+
+    private SetProperty(String property, String value) {
+      this.property = property;
+      this.value = value;
+    }
+
+    public String property() {
+      return property;
+    }
+
+    public String value() {
+      return value;
+    }
+  }
+
+  /**
+   * A TableChange to remove a table property.
+   * <p>
+   * If the property does not exist, the change should succeed.
+   */
+  final class RemoveProperty implements TableChange {
+    private final String property;
+
+    private RemoveProperty(String property) {
+      this.property = property;
+    }
+
+    public String property() {
+      return property;
+    }
+  }
+
+  /**
+   * A TableChange to add a field.
+   * <p>
+   * If the field already exists, the change must result in an {@link 
IllegalArgumentException}.
+   * If the new field is nested and its parent does not exist or is not a 
struct, the change must
+   * result in an {@link IllegalArgumentException}.
+   */
+  final class AddColumn implements TableChange {
+    private final String[] fieldNames;
+    private final DataType dataType;
+    private final boolean isNullable;
+    private final String comment;
+
+    private AddColumn(String[] fieldNames, DataType dataType, boolean 
isNullable, String comment) {
+      this.fieldNames = fieldNames;
+      this.dataType = dataType;
+      this.isNullable = isNullable;
+      this.comment = comment;
+    }
+
+    public String[] fieldNames() {
+      return fieldNames;
+    }
+
+    public DataType dataType() {
+      return dataType;
+    }
+
+    public boolean isNullable() {
+      return isNullable;
+    }
+
+    public String comment() {
+      return comment;
+    }
+  }
+
+  /**
+   * A TableChange to rename a field.
+   * <p>
+   * The name is used to find the field to rename. The new name will replace 
the leaf field name.
+   * For example, renameColumn("a.b.c", "x") should produce column a.b.x.
+   * <p>
+   * If the field does not exist, the change must result in an {@link 
IllegalArgumentException}.
+   */
+  final class RenameColumn implements TableChange {
+    private final String[] fieldNames;
+    private final String newName;
+
+    private RenameColumn(String[] fieldNames, String newName) {
+      this.fieldNames = fieldNames;
+      this.newName = newName;
+    }
+
+    public String[] fieldNames() {
+      return fieldNames;
+    }
+
+    public String newName() {
+      return newName;
+    }
+  }
+
+  /**
+   * A TableChange to update the type of a field.
+   * <p>
+   * The field names are used to find the field to update.
+   * <p>
+   * If the field does not exist, the change must result in an {@link 
IllegalArgumentException}.
+   */
+  final class UpdateColumnType implements TableChange {
+    private final String[] fieldNames;
+    private final DataType newDataType;
+    private final boolean isNullable;
+
+    private UpdateColumnType(String[] fieldNames, DataType newDataType, 
boolean isNullable) {
+      this.fieldNames = fieldNames;
+      this.newDataType = newDataType;
+      this.isNullable = isNullable;
+    }
+
+    public String[] fieldNames() {
+      return fieldNames;
+    }
+
+    public DataType newDataType() {
+      return newDataType;
+    }
+
+    public boolean isNullable() {
+      return isNullable;
+    }
+  }
+
+  /**
+   * A TableChange to update the comment of a field.
+   * <p>
+   * The field names are used to find the field to update.
+   * <p>
+   * If the field does not exist, the change must result in an {@link 
IllegalArgumentException}.
+   */
+  final class UpdateColumnComment implements TableChange {
+    private final String[] fieldNames;
+    private final String newComment;
+
+    private UpdateColumnComment(String[] fieldNames, String newComment) {
+      this.fieldNames = fieldNames;
+      this.newComment = newComment;
+    }
+
+    public String[] fieldNames() {
+      return fieldNames;
+    }
+
+    public String newComment() {
+      return newComment;
+    }
+  }
+
+  /**
+   * A TableChange to delete a field.
+   * <p>
+   * If the field does not exist, the change must result in an {@link 
IllegalArgumentException}.
+   */
+  final class DeleteColumn implements TableChange {
+    private final String[] fieldNames;
+
+    private DeleteColumn(String[] fieldNames) {
+      this.fieldNames = fieldNames;
+    }
+
+    public String[] fieldNames() {
+      return fieldNames;
+    }
+  }
+
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java
index 009e89b..7b264e7 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/expressions/Expressions.java
@@ -40,7 +40,7 @@ public class Expressions {
    * @param args expression arguments to the transform
    * @return a logical transform
    */
-  public Transform apply(String name, Expression... args) {
+  public static Transform apply(String name, Expression... args) {
     return LogicalExpressions.apply(name,
         JavaConverters.asScalaBuffer(Arrays.asList(args)).toSeq());
   }
@@ -51,7 +51,7 @@ public class Expressions {
    * @param name a column name
    * @return a named reference for the column
    */
-  public NamedReference column(String name) {
+  public static NamedReference column(String name) {
     return LogicalExpressions.reference(name);
   }
 
@@ -65,7 +65,7 @@ public class Expressions {
    * @param <T> the JVM type of the value
    * @return a literal expression for the value
    */
-  public <T> Literal<T> literal(T value) {
+  public static <T> Literal<T> literal(T value) {
     return LogicalExpressions.literal(value);
   }
 
@@ -81,7 +81,7 @@ public class Expressions {
    * @param columns input columns for the bucket transform
    * @return a logical bucket transform with name "bucket"
    */
-  public Transform bucket(int numBuckets, String... columns) {
+  public static Transform bucket(int numBuckets, String... columns) {
     return LogicalExpressions.bucket(numBuckets,
         JavaConverters.asScalaBuffer(Arrays.asList(columns)).toSeq());
   }
@@ -96,7 +96,7 @@ public class Expressions {
    * @param column an input column
    * @return a logical identity transform with name "identity"
    */
-  public Transform identity(String column) {
+  public static Transform identity(String column) {
     return LogicalExpressions.identity(column);
   }
 
@@ -110,7 +110,7 @@ public class Expressions {
    * @param column an input timestamp or date column
    * @return a logical yearly transform with name "years"
    */
-  public Transform years(String column) {
+  public static Transform years(String column) {
     return LogicalExpressions.years(column);
   }
 
@@ -125,7 +125,7 @@ public class Expressions {
    * @param column an input timestamp or date column
    * @return a logical monthly transform with name "months"
    */
-  public Transform months(String column) {
+  public static Transform months(String column) {
     return LogicalExpressions.months(column);
   }
 
@@ -140,7 +140,7 @@ public class Expressions {
    * @param column an input timestamp or date column
    * @return a logical daily transform with name "days"
    */
-  public Transform days(String column) {
+  public static Transform days(String column) {
     return LogicalExpressions.days(column);
   }
 
@@ -155,7 +155,7 @@ public class Expressions {
    * @param column an input timestamp column
    * @return a logical hourly transform with name "hours"
    */
-  public Transform hours(String column) {
+  public static Transform hours(String column) {
     return LogicalExpressions.hours(column);
   }
 
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/Table.java
similarity index 72%
rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java
rename to sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/Table.java
index 78f979a..482d3c2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/Table.java
@@ -18,8 +18,11 @@
 package org.apache.spark.sql.sources.v2;
 
 import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.catalog.v2.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -29,6 +32,10 @@ import java.util.Set;
  * <p>
  * This interface can mixin the following interfaces to support different 
operations, like
  * {@code SupportsRead}.
+ * <p>
+ * The default implementation of {@link #partitioning()} returns an empty 
array of partitions, and
+ * the default implementation of {@link #properties()} returns an empty map. 
These should be
+ * overridden by implementations that support partitioning and table 
properties.
  */
 @Evolving
 public interface Table {
@@ -46,6 +53,20 @@ public interface Table {
   StructType schema();
 
   /**
+   * Returns the physical partitioning of this table.
+   */
+  default Transform[] partitioning() {
+    return new Transform[0];
+  }
+
+  /**
+   * Returns the string map of table properties.
+   */
+  default Map<String, String> properties() {
+    return Collections.emptyMap();
+  }
+
+  /**
    * Returns the set of capabilities for this table.
    */
   Set<TableCapability> capabilities();
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java
similarity index 93%
rename from 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java
rename to 
sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java
index 4640c61..7fff09f 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java
@@ -66,7 +66,7 @@ public enum TableCapability {
    * <p>
    * Truncating a table removes all existing rows.
    * <p>
-   * See {@link org.apache.spark.sql.sources.v2.writer.SupportsTruncate}.
+   * See {@code org.apache.spark.sql.sources.v2.writer.SupportsTruncate}.
    */
   TRUNCATE,
 
@@ -74,7 +74,7 @@ public enum TableCapability {
    * Signals that the table can replace existing data that matches a filter 
with appended data in
    * a write operation.
    * <p>
-   * See {@link org.apache.spark.sql.sources.v2.writer.SupportsOverwrite}.
+   * See {@code org.apache.spark.sql.sources.v2.writer.SupportsOverwrite}.
    */
   OVERWRITE_BY_FILTER,
 
@@ -82,7 +82,7 @@ public enum TableCapability {
    * Signals that the table can dynamically replace existing data partitions 
with appended data in
    * a write operation.
    * <p>
-   * See {@link 
org.apache.spark.sql.sources.v2.writer.SupportsDynamicOverwrite}.
+   * See {@code 
org.apache.spark.sql.sources.v2.writer.SupportsDynamicOverwrite}.
    */
   OVERWRITE_DYNAMIC
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogV2Implicits.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogV2Implicits.scala
new file mode 100644
index 0000000..f512cd5
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogV2Implicits.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, 
IdentityTransform, LogicalExpressions, Transform}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Conversion helpers for working with v2 [[CatalogPlugin]].
+ */
+object CatalogV2Implicits {
+  implicit class PartitionTypeHelper(partitionType: StructType) {
+    def asTransforms: Array[Transform] = 
partitionType.names.map(LogicalExpressions.identity)
+  }
+
+  implicit class BucketSpecHelper(spec: BucketSpec) {
+    def asTransform: BucketTransform = {
+      if (spec.sortColumnNames.nonEmpty) {
+        throw new AnalysisException(
+          s"Cannot convert bucketing with sort columns to a transform: $spec")
+      }
+
+      LogicalExpressions.bucket(spec.numBuckets, spec.bucketColumnNames: _*)
+    }
+  }
+
+  implicit class TransformHelper(transforms: Seq[Transform]) {
+    def asPartitionColumns: Seq[String] = {
+      val (idTransforms, nonIdTransforms) = 
transforms.partition(_.isInstanceOf[IdentityTransform])
+
+      if (nonIdTransforms.nonEmpty) {
+        throw new AnalysisException("Transforms cannot be converted to 
partition columns: " +
+            nonIdTransforms.map(_.describe).mkString(", "))
+      }
+
+      idTransforms.map(_.asInstanceOf[IdentityTransform]).map(_.reference).map 
{ ref =>
+        val parts = ref.fieldNames
+        if (parts.size > 1) {
+          throw new AnalysisException(s"Cannot partition by nested column: 
$ref")
+        } else {
+          parts(0)
+        }
+      }
+    }
+  }
+
+  implicit class CatalogHelper(plugin: CatalogPlugin) {
+    def asTableCatalog: TableCatalog = plugin match {
+      case tableCatalog: TableCatalog =>
+        tableCatalog
+      case _ =>
+        throw new AnalysisException(s"Cannot use catalog ${plugin.name}: not a 
TableCatalog")
+    }
+  }
+
+  implicit class NamespaceHelper(namespace: Array[String]) {
+    def quoted: String = namespace.map(quote).mkString(".")
+  }
+
+  implicit class IdentifierHelper(ident: Identifier) {
+    def quoted: String = {
+      if (ident.namespace.nonEmpty) {
+        ident.namespace.map(quote).mkString(".") + "." + quote(ident.name)
+      } else {
+        quote(ident.name)
+      }
+    }
+  }
+
+  implicit class MultipartIdentifierHelper(namespace: Seq[String]) {
+    def quoted: String = namespace.map(quote).mkString(".")
+  }
+
+  private def quote(part: String): String = {
+    if (part.contains(".") || part.contains("`")) {
+      s"`${part.replace("`", "``")}`"
+    } else {
+      part
+    }
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala
index 813d882..2d4d6e7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/expressions/expressions.scala
@@ -17,9 +17,7 @@
 
 package org.apache.spark.sql.catalog.v2.expressions
 
-import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataType, IntegerType, StringType}
@@ -35,38 +33,6 @@ private[sql] object LogicalExpressions {
   // because this is only used for field names, the SQL conf passed in does 
not matter.
   private lazy val parser = new CatalystSqlParser(SQLConf.get)
 
-  def fromPartitionColumns(columns: String*): Array[IdentityTransform] =
-    columns.map(identity).toArray
-
-  def fromBucketSpec(spec: BucketSpec): BucketTransform = {
-    if (spec.sortColumnNames.nonEmpty) {
-      throw new AnalysisException(
-        s"Cannot convert bucketing with sort columns to a transform: $spec")
-    }
-
-    bucket(spec.numBuckets, spec.bucketColumnNames: _*)
-  }
-
-  implicit class TransformHelper(transforms: Seq[Transform]) {
-    def asPartitionColumns: Seq[String] = {
-      val (idTransforms, nonIdTransforms) = 
transforms.partition(_.isInstanceOf[IdentityTransform])
-
-      if (nonIdTransforms.nonEmpty) {
-        throw new AnalysisException("Transforms cannot be converted to 
partition columns: " +
-            nonIdTransforms.map(_.describe).mkString(", "))
-      }
-
-      idTransforms.map(_.asInstanceOf[IdentityTransform]).map(_.reference).map 
{ ref =>
-        val parts = ref.fieldNames
-        if (parts.size > 1) {
-          throw new AnalysisException(s"Cannot partition by nested column: 
$ref")
-        } else {
-          parts(0)
-        }
-      }
-    }
-  }
-
   def literal[T](value: T): LiteralValue[T] = {
     val internalLit = catalyst.expressions.Literal(value)
     literal(value, internalLit.dataType)
@@ -183,17 +149,10 @@ private[sql] final case class LiteralValue[T](value: T, 
dataType: DataType) exte
 }
 
 private[sql] final case class FieldReference(parts: Seq[String]) extends 
NamedReference {
+  import 
org.apache.spark.sql.catalog.v2.CatalogV2Implicits.MultipartIdentifierHelper
   override def fieldNames: Array[String] = parts.toArray
-  override def describe: String = parts.map(quote).mkString(".")
+  override def describe: String = parts.quoted
   override def toString: String = describe
-
-  private def quote(part: String): String = {
-    if (part.contains(".") || part.contains("`")) {
-      s"`${part.replace("`", "``")}`"
-    } else {
-      part
-    }
-  }
 }
 
 private[sql] object FieldReference {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
index 6d587ab..f5e9a14 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+import org.apache.spark.sql.catalog.v2.Identifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 
 /**
@@ -25,13 +27,26 @@ import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
  * as an [[org.apache.spark.sql.AnalysisException]] with the correct position 
information.
  */
 class DatabaseAlreadyExistsException(db: String)
-  extends AnalysisException(s"Database '$db' already exists")
+  extends NamespaceAlreadyExistsException(s"Database '$db' already exists")
 
-class TableAlreadyExistsException(db: String, table: String)
-  extends AnalysisException(s"Table or view '$table' already exists in 
database '$db'")
+class NamespaceAlreadyExistsException(message: String) extends 
AnalysisException(message) {
+  def this(namespace: Array[String]) = {
+    this(s"Namespace '${namespace.quoted}' already exists")
+  }
+}
+
+class TableAlreadyExistsException(message: String) extends 
AnalysisException(message) {
+  def this(db: String, table: String) = {
+    this(s"Table or view '$table' already exists in database '$db'")
+  }
+
+  def this(tableIdent: Identifier) = {
+    this(s"Table ${tableIdent.quoted} already exists")
+  }
+}
 
 class TempTableAlreadyExistsException(table: String)
-  extends AnalysisException(s"Temporary view '$table' already exists")
+  extends TableAlreadyExistsException(s"Temporary view '$table' already 
exists")
 
 class PartitionAlreadyExistsException(db: String, table: String, spec: 
TablePartitionSpec)
   extends AnalysisException(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
index 8bf6f69..7ac8ae6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+import org.apache.spark.sql.catalog.v2.Identifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 
 
@@ -25,10 +27,24 @@ import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
  * Thrown by a catalog when an item cannot be found. The analyzer will rethrow 
the exception
  * as an [[org.apache.spark.sql.AnalysisException]] with the correct position 
information.
  */
-class NoSuchDatabaseException(val db: String) extends 
AnalysisException(s"Database '$db' not found")
+class NoSuchDatabaseException(
+    val db: String) extends NoSuchNamespaceException(s"Database '$db' not 
found")
 
-class NoSuchTableException(db: String, table: String)
-  extends AnalysisException(s"Table or view '$table' not found in database 
'$db'")
+class NoSuchNamespaceException(message: String) extends 
AnalysisException(message) {
+  def this(namespace: Array[String]) = {
+    this(s"Namespace '${namespace.quoted}' not found")
+  }
+}
+
+class NoSuchTableException(message: String) extends AnalysisException(message) 
{
+  def this(db: String, table: String) = {
+    this(s"Table or view '$table' not found in database '$db'")
+  }
+
+  def this(tableIdent: Identifier) = {
+    this(s"Table ${tableIdent.quoted} not found")
+  }
+}
 
 class NoSuchPartitionException(
     db: String,
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala
new file mode 100644
index 0000000..9c1b9a3
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2
+
+import java.util
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, 
TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, 
StringType, StructField, StructType, TimestampType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class TableCatalogSuite extends SparkFunSuite {
+  import CatalogV2Implicits._
+
+  private val emptyProps: util.Map[String, String] = 
Collections.emptyMap[String, String]
+  private val schema: StructType = new StructType()
+      .add("id", IntegerType)
+      .add("data", StringType)
+
+  private def newCatalog(): TableCatalog = {
+    val newCatalog = new TestTableCatalog
+    newCatalog.initialize("test", CaseInsensitiveStringMap.empty())
+    newCatalog
+  }
+
+  private val testIdent = Identifier.of(Array("`", "."), "test_table")
+
+  test("Catalogs can load the catalog") {
+    val catalog = newCatalog()
+
+    val conf = new SQLConf
+    conf.setConfString("spark.sql.catalog.test", catalog.getClass.getName)
+
+    val loaded = Catalogs.load("test", conf)
+    assert(loaded.getClass == catalog.getClass)
+  }
+
+  test("listTables") {
+    val catalog = newCatalog()
+    val ident1 = Identifier.of(Array("ns"), "test_table_1")
+    val ident2 = Identifier.of(Array("ns"), "test_table_2")
+    val ident3 = Identifier.of(Array("ns2"), "test_table_1")
+
+    assert(catalog.listTables(Array("ns")).isEmpty)
+
+    catalog.createTable(ident1, schema, Array.empty, emptyProps)
+
+    assert(catalog.listTables(Array("ns")).toSet == Set(ident1))
+    assert(catalog.listTables(Array("ns2")).isEmpty)
+
+    catalog.createTable(ident3, schema, Array.empty, emptyProps)
+    catalog.createTable(ident2, schema, Array.empty, emptyProps)
+
+    assert(catalog.listTables(Array("ns")).toSet == Set(ident1, ident2))
+    assert(catalog.listTables(Array("ns2")).toSet == Set(ident3))
+
+    catalog.dropTable(ident1)
+
+    assert(catalog.listTables(Array("ns")).toSet == Set(ident2))
+
+    catalog.dropTable(ident2)
+
+    assert(catalog.listTables(Array("ns")).isEmpty)
+    assert(catalog.listTables(Array("ns2")).toSet == Set(ident3))
+  }
+
+  test("createTable") {
+    val catalog = newCatalog()
+
+    assert(!catalog.tableExists(testIdent))
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
+    assert(parsed == Seq("`", ".", "test_table"))
+    assert(table.schema == schema)
+    assert(table.properties.asScala == Map())
+
+    assert(catalog.tableExists(testIdent))
+  }
+
+  test("createTable: with properties") {
+    val catalog = newCatalog()
+
+    val properties = new util.HashMap[String, String]()
+    properties.put("property", "value")
+
+    assert(!catalog.tableExists(testIdent))
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+
+    val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
+    assert(parsed == Seq("`", ".", "test_table"))
+    assert(table.schema == schema)
+    assert(table.properties == properties)
+
+    assert(catalog.tableExists(testIdent))
+  }
+
+  test("createTable: table already exists") {
+    val catalog = newCatalog()
+
+    assert(!catalog.tableExists(testIdent))
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    val exc = intercept[TableAlreadyExistsException] {
+      catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    }
+
+    assert(exc.message.contains(table.name()))
+    assert(exc.message.contains("already exists"))
+
+    assert(catalog.tableExists(testIdent))
+  }
+
+  test("tableExists") {
+    val catalog = newCatalog()
+
+    assert(!catalog.tableExists(testIdent))
+
+    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(catalog.tableExists(testIdent))
+
+    catalog.dropTable(testIdent)
+
+    assert(!catalog.tableExists(testIdent))
+  }
+
+  test("loadTable") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val loaded = catalog.loadTable(testIdent)
+
+    assert(table.name == loaded.name)
+    assert(table.schema == loaded.schema)
+    assert(table.properties == loaded.properties)
+  }
+
+  test("loadTable: table does not exist") {
+    val catalog = newCatalog()
+
+    val exc = intercept[NoSuchTableException] {
+      catalog.loadTable(testIdent)
+    }
+
+    assert(exc.message.contains(testIdent.quoted))
+    assert(exc.message.contains("not found"))
+  }
+
+  test("invalidateTable") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    catalog.invalidateTable(testIdent)
+
+    val loaded = catalog.loadTable(testIdent)
+
+    assert(table.name == loaded.name)
+    assert(table.schema == loaded.schema)
+    assert(table.properties == loaded.properties)
+  }
+
+  test("invalidateTable: table does not exist") {
+    val catalog = newCatalog()
+
+    assert(catalog.tableExists(testIdent) === false)
+
+    catalog.invalidateTable(testIdent)
+  }
+
+  test("alterTable: add property") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(table.properties.asScala == Map())
+
+    val updated = catalog.alterTable(testIdent, 
TableChange.setProperty("prop-1", "1"))
+    assert(updated.properties.asScala == Map("prop-1" -> "1"))
+
+    val loaded = catalog.loadTable(testIdent)
+    assert(loaded.properties.asScala == Map("prop-1" -> "1"))
+
+    assert(table.properties.asScala == Map())
+  }
+
+  test("alterTable: add property to existing") {
+    val catalog = newCatalog()
+
+    val properties = new util.HashMap[String, String]()
+    properties.put("prop-1", "1")
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+
+    assert(table.properties.asScala == Map("prop-1" -> "1"))
+
+    val updated = catalog.alterTable(testIdent, 
TableChange.setProperty("prop-2", "2"))
+    assert(updated.properties.asScala == Map("prop-1" -> "1", "prop-2" -> "2"))
+
+    val loaded = catalog.loadTable(testIdent)
+    assert(loaded.properties.asScala == Map("prop-1" -> "1", "prop-2" -> "2"))
+
+    assert(table.properties.asScala == Map("prop-1" -> "1"))
+  }
+
+  test("alterTable: remove existing property") {
+    val catalog = newCatalog()
+
+    val properties = new util.HashMap[String, String]()
+    properties.put("prop-1", "1")
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+
+    assert(table.properties.asScala == Map("prop-1" -> "1"))
+
+    val updated = catalog.alterTable(testIdent, 
TableChange.removeProperty("prop-1"))
+    assert(updated.properties.asScala == Map())
+
+    val loaded = catalog.loadTable(testIdent)
+    assert(loaded.properties.asScala == Map())
+
+    assert(table.properties.asScala == Map("prop-1" -> "1"))
+  }
+
+  test("alterTable: remove missing property") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(table.properties.asScala == Map())
+
+    val updated = catalog.alterTable(testIdent, 
TableChange.removeProperty("prop-1"))
+    assert(updated.properties.asScala == Map())
+
+    val loaded = catalog.loadTable(testIdent)
+    assert(loaded.properties.asScala == Map())
+
+    assert(table.properties.asScala == Map())
+  }
+
+  test("alterTable: add top-level column") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(table.schema == schema)
+
+    val updated = catalog.alterTable(testIdent, 
TableChange.addColumn(Array("ts"), TimestampType))
+
+    assert(updated.schema == schema.add("ts", TimestampType))
+  }
+
+  test("alterTable: add required column") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(table.schema == schema)
+
+    val updated = catalog.alterTable(testIdent,
+      TableChange.addColumn(Array("ts"), TimestampType, false))
+
+    assert(updated.schema == schema.add("ts", TimestampType, nullable = false))
+  }
+
+  test("alterTable: add column with comment") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(table.schema == schema)
+
+    val updated = catalog.alterTable(testIdent,
+      TableChange.addColumn(Array("ts"), TimestampType, false, "comment text"))
+
+    val field = StructField("ts", TimestampType, nullable = 
false).withComment("comment text")
+    assert(updated.schema == schema.add(field))
+  }
+
+  test("alterTable: add nested column") {
+    val catalog = newCatalog()
+
+    val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
+    val tableSchema = schema.add("point", pointStruct)
+
+    val table = catalog.createTable(testIdent, tableSchema, Array.empty, 
emptyProps)
+
+    assert(table.schema == tableSchema)
+
+    val updated = catalog.alterTable(testIdent,
+      TableChange.addColumn(Array("point", "z"), DoubleType))
+
+    val expectedSchema = schema.add("point", pointStruct.add("z", DoubleType))
+
+    assert(updated.schema == expectedSchema)
+  }
+
+  test("alterTable: add column to primitive field fails") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(table.schema == schema)
+
+    val exc = intercept[IllegalArgumentException] {
+      catalog.alterTable(testIdent, TableChange.addColumn(Array("data", "ts"), 
TimestampType))
+    }
+
+    assert(exc.getMessage.contains("Not a struct"))
+    assert(exc.getMessage.contains("data"))
+
+    // the table has not changed
+    assert(catalog.loadTable(testIdent).schema == schema)
+  }
+
+  test("alterTable: add field to missing column fails") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(table.schema == schema)
+
+    val exc = intercept[IllegalArgumentException] {
+      catalog.alterTable(testIdent,
+        TableChange.addColumn(Array("missing_col", "new_field"), StringType))
+    }
+
+    assert(exc.getMessage.contains("missing_col"))
+    assert(exc.getMessage.contains("Cannot find"))
+  }
+
+  test("alterTable: update column data type") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(table.schema == schema)
+
+    val updated = catalog.alterTable(testIdent, 
TableChange.updateColumnType(Array("id"), LongType))
+
+    val expectedSchema = new StructType().add("id", LongType).add("data", 
StringType)
+    assert(updated.schema == expectedSchema)
+  }
+
+  test("alterTable: update column data type and nullability") {
+    val catalog = newCatalog()
+
+    val originalSchema = new StructType()
+        .add("id", IntegerType, nullable = false)
+        .add("data", StringType)
+    val table = catalog.createTable(testIdent, originalSchema, Array.empty, 
emptyProps)
+
+    assert(table.schema == originalSchema)
+
+    val updated = catalog.alterTable(testIdent,
+      TableChange.updateColumnType(Array("id"), LongType, true))
+
+    val expectedSchema = new StructType().add("id", LongType).add("data", 
StringType)
+    assert(updated.schema == expectedSchema)
+  }
+
+  test("alterTable: update optional column to required fails") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(table.schema == schema)
+
+    val exc = intercept[IllegalArgumentException] {
+      catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), 
LongType, false))
+    }
+
+    assert(exc.getMessage.contains("Cannot change optional column to 
required"))
+    assert(exc.getMessage.contains("id"))
+  }
+
+  test("alterTable: update missing column fails") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(table.schema == schema)
+
+    val exc = intercept[IllegalArgumentException] {
+      catalog.alterTable(testIdent,
+        TableChange.updateColumnType(Array("missing_col"), LongType))
+    }
+
+    assert(exc.getMessage.contains("missing_col"))
+    assert(exc.getMessage.contains("Cannot find"))
+  }
+
+  test("alterTable: add comment") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(table.schema == schema)
+
+    val updated = catalog.alterTable(testIdent,
+      TableChange.updateColumnComment(Array("id"), "comment text"))
+
+    val expectedSchema = new StructType()
+        .add("id", IntegerType, nullable = true, "comment text")
+        .add("data", StringType)
+    assert(updated.schema == expectedSchema)
+  }
+
+  test("alterTable: replace comment") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(table.schema == schema)
+
+    catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), 
"comment text"))
+
+    val expectedSchema = new StructType()
+        .add("id", IntegerType, nullable = true, "replacement comment")
+        .add("data", StringType)
+
+    val updated = catalog.alterTable(testIdent,
+      TableChange.updateColumnComment(Array("id"), "replacement comment"))
+
+    assert(updated.schema == expectedSchema)
+  }
+
+  test("alterTable: add comment to missing column fails") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(table.schema == schema)
+
+    val exc = intercept[IllegalArgumentException] {
+      catalog.alterTable(testIdent,
+        TableChange.updateColumnComment(Array("missing_col"), "comment"))
+    }
+
+    assert(exc.getMessage.contains("missing_col"))
+    assert(exc.getMessage.contains("Cannot find"))
+  }
+
+  test("alterTable: rename top-level column") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(table.schema == schema)
+
+    val updated = catalog.alterTable(testIdent, 
TableChange.renameColumn(Array("id"), "some_id"))
+
+    val expectedSchema = new StructType().add("some_id", 
IntegerType).add("data", StringType)
+
+    assert(updated.schema == expectedSchema)
+  }
+
+  test("alterTable: rename nested column") {
+    val catalog = newCatalog()
+
+    val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
+    val tableSchema = schema.add("point", pointStruct)
+
+    val table = catalog.createTable(testIdent, tableSchema, Array.empty, 
emptyProps)
+
+    assert(table.schema == tableSchema)
+
+    val updated = catalog.alterTable(testIdent,
+      TableChange.renameColumn(Array("point", "x"), "first"))
+
+    val newPointStruct = new StructType().add("first", DoubleType).add("y", 
DoubleType)
+    val expectedSchema = schema.add("point", newPointStruct)
+
+    assert(updated.schema == expectedSchema)
+  }
+
+  test("alterTable: rename struct column") {
+    val catalog = newCatalog()
+
+    val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
+    val tableSchema = schema.add("point", pointStruct)
+
+    val table = catalog.createTable(testIdent, tableSchema, Array.empty, 
emptyProps)
+
+    assert(table.schema == tableSchema)
+
+    val updated = catalog.alterTable(testIdent,
+      TableChange.renameColumn(Array("point"), "p"))
+
+    val newPointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
+    val expectedSchema = schema.add("p", newPointStruct)
+
+    assert(updated.schema == expectedSchema)
+  }
+
+  test("alterTable: rename missing column fails") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(table.schema == schema)
+
+    val exc = intercept[IllegalArgumentException] {
+      catalog.alterTable(testIdent,
+        TableChange.renameColumn(Array("missing_col"), "new_name"))
+    }
+
+    assert(exc.getMessage.contains("missing_col"))
+    assert(exc.getMessage.contains("Cannot find"))
+  }
+
+  test("alterTable: multiple changes") {
+    val catalog = newCatalog()
+
+    val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
+    val tableSchema = schema.add("point", pointStruct)
+
+    val table = catalog.createTable(testIdent, tableSchema, Array.empty, 
emptyProps)
+
+    assert(table.schema == tableSchema)
+
+    val updated = catalog.alterTable(testIdent,
+      TableChange.renameColumn(Array("point", "x"), "first"),
+      TableChange.renameColumn(Array("point", "y"), "second"))
+
+    val newPointStruct = new StructType().add("first", 
DoubleType).add("second", DoubleType)
+    val expectedSchema = schema.add("point", newPointStruct)
+
+    assert(updated.schema == expectedSchema)
+  }
+
+  test("alterTable: delete top-level column") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(table.schema == schema)
+
+    val updated = catalog.alterTable(testIdent,
+      TableChange.deleteColumn(Array("id")))
+
+    val expectedSchema = new StructType().add("data", StringType)
+    assert(updated.schema == expectedSchema)
+  }
+
+  test("alterTable: delete nested column") {
+    val catalog = newCatalog()
+
+    val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
+    val tableSchema = schema.add("point", pointStruct)
+
+    val table = catalog.createTable(testIdent, tableSchema, Array.empty, 
emptyProps)
+
+    assert(table.schema == tableSchema)
+
+    val updated = catalog.alterTable(testIdent,
+      TableChange.deleteColumn(Array("point", "y")))
+
+    val newPointStruct = new StructType().add("x", DoubleType)
+    val expectedSchema = schema.add("point", newPointStruct)
+
+    assert(updated.schema == expectedSchema)
+  }
+
+  test("alterTable: delete missing column fails") {
+    val catalog = newCatalog()
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(table.schema == schema)
+
+    val exc = intercept[IllegalArgumentException] {
+      catalog.alterTable(testIdent, 
TableChange.deleteColumn(Array("missing_col")))
+    }
+
+    assert(exc.getMessage.contains("missing_col"))
+    assert(exc.getMessage.contains("Cannot find"))
+  }
+
+  test("alterTable: delete missing nested column fails") {
+    val catalog = newCatalog()
+
+    val pointStruct = new StructType().add("x", DoubleType).add("y", 
DoubleType)
+    val tableSchema = schema.add("point", pointStruct)
+
+    val table = catalog.createTable(testIdent, tableSchema, Array.empty, 
emptyProps)
+
+    assert(table.schema == tableSchema)
+
+    val exc = intercept[IllegalArgumentException] {
+      catalog.alterTable(testIdent, TableChange.deleteColumn(Array("point", 
"z")))
+    }
+
+    assert(exc.getMessage.contains("z"))
+    assert(exc.getMessage.contains("Cannot find"))
+  }
+
+  test("alterTable: table does not exist") {
+    val catalog = newCatalog()
+
+    val exc = intercept[NoSuchTableException] {
+      catalog.alterTable(testIdent, TableChange.setProperty("prop", "val"))
+    }
+
+    assert(exc.message.contains(testIdent.quoted))
+    assert(exc.message.contains("not found"))
+  }
+
+  test("dropTable") {
+    val catalog = newCatalog()
+
+    assert(!catalog.tableExists(testIdent))
+
+    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(catalog.tableExists(testIdent))
+
+    val wasDropped = catalog.dropTable(testIdent)
+
+    assert(wasDropped)
+    assert(!catalog.tableExists(testIdent))
+  }
+
+  test("dropTable: table does not exist") {
+    val catalog = newCatalog()
+
+    assert(!catalog.tableExists(testIdent))
+
+    val wasDropped = catalog.dropTable(testIdent)
+
+    assert(!wasDropped)
+    assert(!catalog.tableExists(testIdent))
+  }
+}
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
new file mode 100644
index 0000000..7a0b014
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, 
RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, 
UpdateColumnType}
+import org.apache.spark.sql.catalog.v2.expressions.Transform
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, 
TableAlreadyExistsException}
+import org.apache.spark.sql.sources.v2.{Table, TableCapability}
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class TestTableCatalog extends TableCatalog {
+  import CatalogV2Implicits._
+
+  private val tables: util.Map[Identifier, Table] = new 
ConcurrentHashMap[Identifier, Table]()
+  private var _name: Option[String] = None
+
+  override def initialize(name: String, options: CaseInsensitiveStringMap): 
Unit = {
+    _name = Some(name)
+  }
+
+  override def name: String = _name.get
+
+  override def listTables(namespace: Array[String]): Array[Identifier] = {
+    tables.keySet.asScala.filter(_.namespace.sameElements(namespace)).toArray
+  }
+
+  override def loadTable(ident: Identifier): Table = {
+    Option(tables.get(ident)) match {
+      case Some(table) =>
+        table
+      case _ =>
+        throw new NoSuchTableException(ident)
+    }
+  }
+
+  override def createTable(
+      ident: Identifier,
+      schema: StructType,
+      partitions: Array[Transform],
+      properties: util.Map[String, String]): Table = {
+
+    if (tables.containsKey(ident)) {
+      throw new TableAlreadyExistsException(ident)
+    }
+
+    if (partitions.nonEmpty) {
+      throw new UnsupportedOperationException(
+        s"Catalog $name: Partitioned tables are not supported")
+    }
+
+    val table = InMemoryTable(ident.quoted, schema, properties)
+
+    tables.put(ident, table)
+
+    table
+  }
+
+  override def alterTable(ident: Identifier, changes: TableChange*): Table = {
+    val table = loadTable(ident)
+    val properties = TestTableCatalog.applyPropertiesChanges(table.properties, 
changes)
+    val schema = TestTableCatalog.applySchemaChanges(table.schema, changes)
+    val newTable = InMemoryTable(table.name, schema, properties)
+
+    tables.put(ident, newTable)
+
+    newTable
+  }
+
+  override def dropTable(ident: Identifier): Boolean = 
Option(tables.remove(ident)).isDefined
+}
+
+private object TestTableCatalog {
+  /**
+   * Apply properties changes to a map and return the result.
+   */
+  def applyPropertiesChanges(
+      properties: util.Map[String, String],
+      changes: Seq[TableChange]): util.Map[String, String] = {
+    val newProperties = new util.HashMap[String, String](properties)
+
+    changes.foreach {
+      case set: SetProperty =>
+        newProperties.put(set.property, set.value)
+
+      case unset: RemoveProperty =>
+        newProperties.remove(unset.property)
+
+      case _ =>
+      // ignore non-property changes
+    }
+
+    Collections.unmodifiableMap(newProperties)
+  }
+
+  /**
+   * Apply schema changes to a schema and return the result.
+   */
+  def applySchemaChanges(schema: StructType, changes: Seq[TableChange]): 
StructType = {
+    changes.foldLeft(schema) { (schema, change) =>
+      change match {
+        case add: AddColumn =>
+          add.fieldNames match {
+            case Array(name) =>
+              val newField = StructField(name, add.dataType, nullable = 
add.isNullable)
+              Option(add.comment) match {
+                case Some(comment) =>
+                  schema.add(newField.withComment(comment))
+                case _ =>
+                  schema.add(newField)
+              }
+
+            case names =>
+              replace(schema, names.init, parent => parent.dataType match {
+                case parentType: StructType =>
+                  val field = StructField(names.last, add.dataType, nullable = 
add.isNullable)
+                  val newParentType = Option(add.comment) match {
+                    case Some(comment) =>
+                      parentType.add(field.withComment(comment))
+                    case None =>
+                      parentType.add(field)
+                  }
+
+                  Some(StructField(parent.name, newParentType, 
parent.nullable, parent.metadata))
+
+                case _ =>
+                  throw new IllegalArgumentException(s"Not a struct: 
${names.init.last}")
+              })
+          }
+
+        case rename: RenameColumn =>
+          replace(schema, rename.fieldNames, field =>
+            Some(StructField(rename.newName, field.dataType, field.nullable, 
field.metadata)))
+
+        case update: UpdateColumnType =>
+          replace(schema, update.fieldNames, field => {
+            if (!update.isNullable && field.nullable) {
+              throw new IllegalArgumentException(
+                s"Cannot change optional column to required: $field.name")
+            }
+            Some(StructField(field.name, update.newDataType, 
update.isNullable, field.metadata))
+          })
+
+        case update: UpdateColumnComment =>
+          replace(schema, update.fieldNames, field =>
+            Some(field.withComment(update.newComment)))
+
+        case delete: DeleteColumn =>
+          replace(schema, delete.fieldNames, _ => None)
+
+        case _ =>
+          // ignore non-schema changes
+          schema
+      }
+    }
+  }
+
+  private def replace(
+      struct: StructType,
+      path: Seq[String],
+      update: StructField => Option[StructField]): StructType = {
+
+    val pos = struct.getFieldIndex(path.head)
+        .getOrElse(throw new IllegalArgumentException(s"Cannot find field: 
${path.head}"))
+    val field = struct.fields(pos)
+    val replacement: Option[StructField] = if (path.tail.isEmpty) {
+      update(field)
+    } else {
+      field.dataType match {
+        case nestedStruct: StructType =>
+          val updatedType: StructType = replace(nestedStruct, path.tail, 
update)
+          Some(StructField(field.name, updatedType, field.nullable, 
field.metadata))
+        case _ =>
+          throw new IllegalArgumentException(s"Not a struct: ${path.head}")
+      }
+    }
+
+    val newFields = struct.fields.zipWithIndex.flatMap {
+      case (_, index) if pos == index =>
+        replacement
+      case (other, _) =>
+        Some(other)
+    }
+
+    new StructType(newFields)
+  }
+}
+
+case class InMemoryTable(
+    name: String,
+    schema: StructType,
+    override val properties: util.Map[String, String]) extends Table {
+  override def partitioning: Array[Transform] = Array.empty
+  override def capabilities: util.Set[TableCapability] = 
InMemoryTable.CAPABILITIES
+}
+
+object InMemoryTable {
+  val CAPABILITIES: util.Set[TableCapability] = 
Set.empty[TableCapability].asJava
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
index f16138e..03d0c30 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.sources.v2.TableProvider
 import org.apache.spark.sql.types.StructType
 
 case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with 
CastSupport  {
-  import 
org.apache.spark.sql.catalog.v2.expressions.LogicalExpressions.TransformHelper
+  import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.TransformHelper
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case CreateTableStatement(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index c0c57b8..3b0cde5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -16,11 +16,14 @@
  */
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util
+
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.fs.FileStatus
 
 import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalog.v2.expressions.Transform
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, 
TableCapability}
 import org.apache.spark.sql.sources.v2.TableCapability._
@@ -35,6 +38,8 @@ abstract class FileTable(
     userSpecifiedSchema: Option[StructType])
   extends Table with SupportsRead with SupportsWrite {
 
+  import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+
   lazy val fileIndex: PartitioningAwareFileIndex = {
     val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
     // Hadoop Configurations are case sensitive.
@@ -82,7 +87,11 @@ abstract class FileTable(
     StructType(fields)
   }
 
-  override def capabilities(): java.util.Set[TableCapability] = 
FileTable.CAPABILITIES
+  override def partitioning: Array[Transform] = 
fileIndex.partitionSchema.asTransforms
+
+  override def properties: util.Map[String, String] = 
options.asCaseSensitiveMap
+
+  override def capabilities: java.util.Set[TableCapability] = 
FileTable.CAPABILITIES
 
   /**
    * When possible, this method should return the schema of the given `files`. 
 When the format
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
index dfbea92..391af5a 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
@@ -20,6 +20,8 @@ package test.org.apache.spark.sql.sources.v2;
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.spark.sql.catalog.v2.expressions.Expressions;
+import org.apache.spark.sql.catalog.v2.expressions.Transform;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.sources.v2.Table;
@@ -57,6 +59,11 @@ public class JavaPartitionAwareDataSource implements 
TableProvider {
   public Table getTable(CaseInsensitiveStringMap options) {
     return new JavaSimpleBatchTable() {
       @Override
+      public Transform[] partitioning() {
+        return new Transform[] { Expressions.identity("i") };
+      }
+
+      @Override
       public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
         return new MyScanBuilder();
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to