This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 115a145  Add Spark 3 SQL tests (#1156)
115a145 is described below

commit 115a1450c0e62edd835d0dd77a6513ff3717479c
Author: Ryan Blue <[email protected]>
AuthorDate: Wed Jul 8 16:03:50 2020 -0700

    Add Spark 3 SQL tests (#1156)
---
 .../org/apache/iceberg/BaseMetastoreCatalog.java   |  12 +-
 .../java/org/apache/iceberg/TableMetadata.java     |   4 +-
 .../test/java/org/apache/iceberg/TestTables.java   |   2 +-
 .../java/org/apache/iceberg/hive/HiveCatalog.java  |  18 +-
 .../iceberg/hive/HiveCreateReplaceTableTest.java   |   4 +-
 .../org/apache/iceberg/spark/SparkTestBase.java    |  39 ++-
 .../apache/iceberg/spark/source/SimpleRecord.java  |   2 +-
 .../apache/iceberg/spark/SparkCatalogTestBase.java |  12 +
 .../apache/iceberg/spark/sql/TestAlterTable.java   | 216 ++++++++++++++
 .../apache/iceberg/spark/sql/TestCreateTable.java  | 224 ++++++++++++++
 .../iceberg/spark/sql/TestCreateTableAsSelect.java | 322 +++++++++++++++++++++
 .../apache/iceberg/spark/sql/TestDeleteFrom.java   |  95 ++++++
 .../apache/iceberg/spark/sql/TestNamespaceSQL.java |  16 +-
 .../iceberg/spark/sql/TestPartitionedWrites.java   | 162 +++++++++++
 .../org/apache/iceberg/spark/sql/TestSelect.java   | 109 +++++++
 .../iceberg/spark/sql/TestUnpartitionedWrites.java | 155 ++++++++++
 16 files changed, 1372 insertions(+), 20 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java 
b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
index 9c3c7a9..6afc352 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
@@ -110,9 +110,17 @@ public abstract class BaseMetastoreCatalog implements 
Catalog {
       throw new NoSuchTableException("No such table: " + identifier);
     }
 
-    String baseLocation = location != null ? location : 
defaultWarehouseLocation(identifier);
     Map<String, String> tableProperties = properties != null ? properties : 
Maps.newHashMap();
-    TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, 
baseLocation, tableProperties);
+
+    TableMetadata metadata;
+    if (ops.current() != null) {
+      String baseLocation = location != null ? location : 
ops.current().location();
+      metadata = ops.current().buildReplacement(schema, spec, baseLocation, 
tableProperties);
+    } else {
+      String baseLocation = location != null ? location : 
defaultWarehouseLocation(identifier);
+      metadata = TableMetadata.newTableMetadata(schema, spec, baseLocation, 
tableProperties);
+    }
+
     if (orCreate) {
       return 
Transactions.createOrReplaceTableTransaction(identifier.toString(), ops, 
metadata);
     } else {
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java 
b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 8501aa9..b7fb7cb 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -566,7 +566,7 @@ public class TableMetadata implements Serializable {
 
   // The caller is responsible to pass a updatedPartitionSpec with correct 
partition field IDs
   public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec 
updatedPartitionSpec,
-                                 Map<String, String> updatedProperties) {
+                                        String newLocation, Map<String, 
String> updatedProperties) {
     ValidationException.check(formatVersion > 1 || 
PartitionSpec.hasSequentialIds(updatedPartitionSpec),
         "Spec does not use sequential IDs that are required in v1: %s", 
updatedPartitionSpec);
 
@@ -602,7 +602,7 @@ public class TableMetadata implements Serializable {
     newProperties.putAll(this.properties);
     newProperties.putAll(updatedProperties);
 
-    return new TableMetadata(null, formatVersion, uuid, location,
+    return new TableMetadata(null, formatVersion, uuid, newLocation,
         lastSequenceNumber, System.currentTimeMillis(), 
nextLastColumnId.get(), freshSchema,
         specId, builder.build(), ImmutableMap.copyOf(newProperties),
         -1, snapshots, ImmutableList.of(), addPreviousFile(file, 
lastUpdatedMillis, newProperties));
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java 
b/core/src/test/java/org/apache/iceberg/TestTables.java
index 8961820..b0057cd 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -78,7 +78,7 @@ public class TestTables {
 
     TableMetadata metadata;
     if (current != null) {
-      metadata = current.buildReplacement(schema, spec, properties);
+      metadata = current.buildReplacement(schema, spec, current.location(), 
properties);
       return Transactions.replaceTableTransaction(name, ops, metadata);
     } else {
       metadata = newTableMetadata(schema, spec, temp.toString(), properties);
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java 
b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 3fa0499..e7ef9bf 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -153,10 +153,12 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements Closeable, Supp
   }
 
   @Override
-  public void renameTable(TableIdentifier from, TableIdentifier to) {
+  public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
     if (!isValidIdentifier(from)) {
       throw new NoSuchTableException("Invalid identifier: %s", from);
     }
+
+    TableIdentifier to = removeCatalogName(originalTo);
     Preconditions.checkArgument(isValidIdentifier(to), "Invalid identifier: 
%s", to);
 
     String toDatabase = to.namespace().level(0);
@@ -347,6 +349,20 @@ public class HiveCatalog extends BaseMetastoreCatalog 
implements Closeable, Supp
     return tableIdentifier.namespace().levels().length == 1;
   }
 
+  private TableIdentifier removeCatalogName(TableIdentifier to) {
+    if (isValidIdentifier(to)) {
+      return to;
+    }
+
+    // check if the identifier includes the catalog name and remove it
+    if (to.namespace().levels().length == 2 && 
name().equalsIgnoreCase(to.namespace().level(0))) {
+      return TableIdentifier.of(Namespace.of(to.namespace().level(1)), 
to.name());
+    }
+
+    // return the original unmodified
+    return to;
+  }
+
   private boolean isValidateNamespace(Namespace namespace) {
     return namespace.levels().length == 1;
   }
diff --git 
a/hive/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java 
b/hive/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java
index b714605..93ba58d 100644
--- a/hive/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java
+++ b/hive/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java
@@ -155,7 +155,7 @@ public class HiveCreateReplaceTableTest extends 
HiveMetastoreTest {
     txn.commitTransaction();
 
     Table table = catalog.loadTable(TABLE_IDENTIFIER);
-    Assert.assertEquals("Partition spec should match", 
PartitionSpec.unpartitioned(), table.spec());
+    Assert.assertEquals("Partition spec should be unpartitioned", 0, 
table.spec().fields().size());
   }
 
   @Test
@@ -233,7 +233,7 @@ public class HiveCreateReplaceTableTest extends 
HiveMetastoreTest {
     txn.commitTransaction();
 
     Table table = catalog.loadTable(TABLE_IDENTIFIER);
-    Assert.assertEquals("Partition spec should match", 
PartitionSpec.unpartitioned(), table.spec());
+    Assert.assertEquals("Partition spec should be unpartitioned", 0, 
table.spec().fields().size());
   }
 
   @Test
diff --git a/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java 
b/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
index 3cf279f..e66f3a4 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
@@ -26,15 +26,20 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.hive.TestHiveMetastore;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
 
 public class SparkTestBase {
 
+  protected static final Object ANY = new Object();
+
   private static TestHiveMetastore metastore = null;
   private static HiveConf hiveConf = null;
   protected static SparkSession spark = null;
@@ -48,6 +53,7 @@ public class SparkTestBase {
 
     SparkTestBase.spark = SparkSession.builder()
         .master("local[2]")
+        .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
         .config("spark.hadoop." + METASTOREURIS.varname, 
hiveConf.get(METASTOREURIS.varname))
         .enableHiveSupport()
         .getOrCreate();
@@ -65,7 +71,7 @@ public class SparkTestBase {
     SparkTestBase.spark = null;
   }
 
-  protected List<String[]> sql(String query, Object... args) {
+  protected List<Object[]> sql(String query, Object... args) {
     List<Row> rows = spark.sql(String.format(query, args)).collectAsList();
     if (rows.size() < 1) {
       return ImmutableList.of();
@@ -73,11 +79,38 @@ public class SparkTestBase {
 
     return rows.stream()
         .map(row -> IntStream.range(0, row.size())
-            .mapToObj(pos -> row.isNullAt(pos) ? null : 
row.get(pos).toString())
-            .toArray(String[]::new)
+            .mapToObj(pos -> row.isNullAt(pos) ? null : row.get(pos))
+            .toArray(Object[]::new)
         ).collect(Collectors.toList());
   }
 
+  protected Object scalarSql(String query, Object... args) {
+    List<Object[]> rows = sql(query, args);
+    Assert.assertEquals("Scalar SQL should return one row", 1, rows.size());
+    Object[] row = Iterables.getOnlyElement(rows);
+    Assert.assertEquals("Scalar SQL should return one value", 1, row.length);
+    return row[0];
+  }
+
+  protected Object[] row(Object... values) {
+    return values;
+  }
+
+  protected void assertEquals(String context, List<Object[]> expectedRows, 
List<Object[]> actualRows) {
+    Assert.assertEquals(context + ": number of results should match", 
expectedRows.size(), actualRows.size());
+    for (int row = 0; row < expectedRows.size(); row += 1) {
+      Object[] expected = expectedRows.get(row);
+      Object[] actual = actualRows.get(row);
+      Assert.assertEquals("Number of columns should match", expected.length, 
actual.length);
+      for (int col = 0; col < actualRows.get(row).length; col += 1) {
+        if (expected[col] != ANY) {
+          Assert.assertEquals(context + ": row " + row + " col " + col + " 
contents should match",
+              expected[col], actual[col]);
+        }
+      }
+    }
+  }
+
   protected static String dbPath(String dbName) {
     return metastore.getDatabasePath(dbName);
   }
diff --git 
a/spark/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java 
b/spark/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java
index 76cd5ca..c8b7a31 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java
@@ -28,7 +28,7 @@ public class SimpleRecord {
   public SimpleRecord() {
   }
 
-  SimpleRecord(Integer id, String data) {
+  public SimpleRecord(Integer id, String data) {
     this.id = id;
     this.data = data;
   }
diff --git 
a/spark3/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java 
b/spark3/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java
index 4013cfc..5b8b2b3 100644
--- a/spark3/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java
+++ b/spark3/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java
@@ -23,7 +23,9 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.junit.AfterClass;
@@ -76,6 +78,8 @@ public abstract class SparkCatalogTestBase extends 
SparkTestBase {
   protected final String catalogName;
   protected final Catalog validationCatalog;
   protected final SupportsNamespaces validationNamespaceCatalog;
+  protected final TableIdentifier tableIdent = 
TableIdentifier.of(Namespace.of("default"), "table");
+  protected final String tableName;
 
   public SparkCatalogTestBase(String catalogName, String implementation, 
Map<String, String> config) {
     this.catalogName = catalogName;
@@ -90,5 +94,13 @@ public abstract class SparkCatalogTestBase extends 
SparkTestBase {
     if (config.get("type").equalsIgnoreCase("hadoop")) {
       spark.conf().set("spark.sql.catalog." + catalogName + ".warehouse", 
"file:" + warehouse);
     }
+
+    this.tableName = (catalogName.equals("spark_catalog") ? "" : catalogName + 
".") + "default.table";
+
+    sql("CREATE NAMESPACE IF NOT EXISTS default");
+  }
+
+  protected String tableName(String name) {
+    return (catalogName.equals("spark_catalog") ? "" : catalogName + ".") + 
"default." + name;
   }
 }
diff --git 
a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestAlterTable.java 
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestAlterTable.java
new file mode 100644
index 0000000..27ff6e1
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestAlterTable.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.spark.sql.AnalysisException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAlterTable extends SparkCatalogTestBase {
+  private final TableIdentifier renamedIdent = 
TableIdentifier.of(Namespace.of("default"), "table2");
+
+  public TestAlterTable(String catalogName, String implementation, Map<String, 
String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Before
+  public void createTable() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+  }
+
+  @After
+  public void removeTable() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %s2", tableName);
+  }
+
+  @Test
+  public void testAddColumn() {
+    sql("ALTER TABLE %s ADD COLUMN point struct<x: double NOT NULL, y: double 
NOT NULL> AFTER id", tableName);
+
+    Types.StructType expectedSchema = Types.StructType.of(
+        NestedField.required(1, "id", Types.LongType.get()),
+        NestedField.optional(3, "point", Types.StructType.of(
+            NestedField.required(4, "x", Types.DoubleType.get()),
+            NestedField.required(5, "y", Types.DoubleType.get())
+        )),
+        NestedField.optional(2, "data", Types.StringType.get()));
+
+    Assert.assertEquals("Schema should match expected",
+        expectedSchema, 
validationCatalog.loadTable(tableIdent).schema().asStruct());
+
+    sql("ALTER TABLE %s ADD COLUMN point.z double COMMENT 'May be null' 
FIRST", tableName);
+
+    Types.StructType expectedSchema2 = Types.StructType.of(
+        NestedField.required(1, "id", Types.LongType.get()),
+        NestedField.optional(3, "point", Types.StructType.of(
+            NestedField.optional(6, "z", Types.DoubleType.get(), "May be 
null"),
+            NestedField.required(4, "x", Types.DoubleType.get()),
+            NestedField.required(5, "y", Types.DoubleType.get())
+        )),
+        NestedField.optional(2, "data", Types.StringType.get()));
+
+    Assert.assertEquals("Schema should match expected",
+        expectedSchema2, 
validationCatalog.loadTable(tableIdent).schema().asStruct());
+  }
+
+  @Test
+  public void testDropColumn() {
+    sql("ALTER TABLE %s DROP COLUMN data", tableName);
+
+    Types.StructType expectedSchema = Types.StructType.of(
+        NestedField.required(1, "id", Types.LongType.get()));
+
+    Assert.assertEquals("Schema should match expected",
+        expectedSchema, 
validationCatalog.loadTable(tableIdent).schema().asStruct());
+  }
+
+  @Test
+  public void testRenameColumn() {
+    sql("ALTER TABLE %s RENAME COLUMN id TO row_id", tableName);
+
+    Types.StructType expectedSchema = Types.StructType.of(
+        NestedField.required(1, "row_id", Types.LongType.get()),
+        NestedField.optional(2, "data", Types.StringType.get()));
+
+    Assert.assertEquals("Schema should match expected",
+        expectedSchema, 
validationCatalog.loadTable(tableIdent).schema().asStruct());
+  }
+
+  @Test
+  public void testAlterColumnComment() {
+    sql("ALTER TABLE %s ALTER COLUMN id COMMENT 'Record id'", tableName);
+
+    Types.StructType expectedSchema = Types.StructType.of(
+        NestedField.required(1, "id", Types.LongType.get(), "Record id"),
+        NestedField.optional(2, "data", Types.StringType.get()));
+
+    Assert.assertEquals("Schema should match expected",
+        expectedSchema, 
validationCatalog.loadTable(tableIdent).schema().asStruct());
+  }
+
+  @Test
+  public void testAlterColumnType() {
+    sql("ALTER TABLE %s ADD COLUMN count int", tableName);
+    sql("ALTER TABLE %s ALTER COLUMN count TYPE bigint", tableName);
+
+    Types.StructType expectedSchema = Types.StructType.of(
+        NestedField.required(1, "id", Types.LongType.get()),
+        NestedField.optional(2, "data", Types.StringType.get()),
+        NestedField.optional(3, "count", Types.LongType.get()));
+
+    Assert.assertEquals("Schema should match expected",
+        expectedSchema, 
validationCatalog.loadTable(tableIdent).schema().asStruct());
+  }
+
+  @Test
+  public void testAlterColumnDropNotNull() {
+    sql("ALTER TABLE %s ALTER COLUMN id DROP NOT NULL", tableName);
+
+    Types.StructType expectedSchema = Types.StructType.of(
+        NestedField.optional(1, "id", Types.LongType.get()),
+        NestedField.optional(2, "data", Types.StringType.get()));
+
+    Assert.assertEquals("Schema should match expected",
+        expectedSchema, 
validationCatalog.loadTable(tableIdent).schema().asStruct());
+  }
+
+  @Test
+  public void testAlterColumnSetNotNull() {
+    // no-op changes are allowed
+    sql("ALTER TABLE %s ALTER COLUMN id SET NOT NULL", tableName);
+
+    Types.StructType expectedSchema = Types.StructType.of(
+        NestedField.required(1, "id", Types.LongType.get()),
+        NestedField.optional(2, "data", Types.StringType.get()));
+
+    Assert.assertEquals("Schema should match expected",
+        expectedSchema, 
validationCatalog.loadTable(tableIdent).schema().asStruct());
+
+    AssertHelpers.assertThrows("Should reject adding NOT NULL constraint to an 
optional column",
+        AnalysisException.class, "Cannot change nullable column to 
non-nullable: data",
+        () -> sql("ALTER TABLE %s ALTER COLUMN data SET NOT NULL", tableName));
+  }
+
+  @Test
+  public void testAlterColumnPositionAfter() {
+    sql("ALTER TABLE %s ADD COLUMN count int", tableName);
+    sql("ALTER TABLE %s ALTER COLUMN count AFTER id", tableName);
+
+    Types.StructType expectedSchema = Types.StructType.of(
+        NestedField.required(1, "id", Types.LongType.get()),
+        NestedField.optional(3, "count", Types.IntegerType.get()),
+        NestedField.optional(2, "data", Types.StringType.get()));
+
+    Assert.assertEquals("Schema should match expected",
+        expectedSchema, 
validationCatalog.loadTable(tableIdent).schema().asStruct());
+  }
+
+  @Test
+  public void testAlterColumnPositionFirst() {
+    sql("ALTER TABLE %s ADD COLUMN count int", tableName);
+    sql("ALTER TABLE %s ALTER COLUMN count FIRST", tableName);
+
+    Types.StructType expectedSchema = Types.StructType.of(
+        NestedField.optional(3, "count", Types.IntegerType.get()),
+        NestedField.required(1, "id", Types.LongType.get()),
+        NestedField.optional(2, "data", Types.StringType.get()));
+
+    Assert.assertEquals("Schema should match expected",
+        expectedSchema, 
validationCatalog.loadTable(tableIdent).schema().asStruct());
+  }
+
+  @Test
+  public void testTableRename() {
+    Assume.assumeFalse("Hadoop catalog does not support rename", 
validationCatalog instanceof HadoopCatalog);
+
+    Assert.assertTrue("Initial name should exist", 
validationCatalog.tableExists(tableIdent));
+    Assert.assertFalse("New name should not exist", 
validationCatalog.tableExists(renamedIdent));
+
+    sql("ALTER TABLE %s RENAME TO %s2", tableName, tableName);
+
+    Assert.assertFalse("Initial name should not exist", 
validationCatalog.tableExists(tableIdent));
+    Assert.assertTrue("New name should exist", 
validationCatalog.tableExists(renamedIdent));
+  }
+
+  @Test
+  public void testSetTableProperties() {
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('prop'='value')", tableName);
+
+    Assert.assertEquals("Should have the new table property",
+        "value", 
validationCatalog.loadTable(tableIdent).properties().get("prop"));
+
+    sql("ALTER TABLE %s UNSET TBLPROPERTIES ('prop')", tableName);
+
+    Assert.assertNull("Should not have the removed table property",
+        validationCatalog.loadTable(tableIdent).properties().get("prop"));
+  }
+}
diff --git 
a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java 
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
new file mode 100644
index 0000000..9d8711c
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.io.File;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class TestCreateTable extends SparkCatalogTestBase {
+  public TestCreateTable(String catalogName, String implementation, 
Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void dropTestTable() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testCreateTable() {
+    Assert.assertFalse("Table should not already exist", 
validationCatalog.tableExists(tableIdent));
+
+    sql("CREATE TABLE %s (id BIGINT NOT NULL, data STRING) USING iceberg", 
tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertNotNull("Should load the new table", table);
+
+    StructType expectedSchema = StructType.of(
+        NestedField.required(1, "id", Types.LongType.get()),
+        NestedField.optional(2, "data", Types.StringType.get()));
+    Assert.assertEquals("Should have the expected schema", expectedSchema, 
table.schema().asStruct());
+    Assert.assertEquals("Should not be partitioned", 0, 
table.spec().fields().size());
+    Assert.assertNull("Should not have the default format set",
+        table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
+  }
+
+  @Test
+  public void testCreateTableUsingParquet() {
+    Assume.assumeTrue(
+        "Not working with session catalog because Spark will not use v2 for a 
Parquet table",
+        !"spark_catalog".equals(catalogName));
+
+    Assert.assertFalse("Table should not already exist", 
validationCatalog.tableExists(tableIdent));
+
+    sql("CREATE TABLE %s (id BIGINT NOT NULL, data STRING) USING parquet", 
tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertNotNull("Should load the new table", table);
+
+    StructType expectedSchema = StructType.of(
+        NestedField.required(1, "id", Types.LongType.get()),
+        NestedField.optional(2, "data", Types.StringType.get()));
+    Assert.assertEquals("Should have the expected schema", expectedSchema, 
table.schema().asStruct());
+    Assert.assertEquals("Should not be partitioned", 0, 
table.spec().fields().size());
+    Assert.assertEquals("Should not have default format parquet",
+        "parquet",
+        table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
+
+    AssertHelpers.assertThrows("Should reject unsupported format names",
+        IllegalArgumentException.class, "Unsupported format in USING: 
crocodile",
+        () -> sql("CREATE TABLE %s.default.fail (id BIGINT NOT NULL, data 
STRING) USING crocodile", catalogName));
+  }
+
+  @Test
+  public void testCreateTablePartitionedBy() {
+    Assert.assertFalse("Table should not already exist", 
validationCatalog.tableExists(tableIdent));
+
+    sql("CREATE TABLE %s " +
+        "(id BIGINT NOT NULL, created_at TIMESTAMP, category STRING, data 
STRING) " +
+        "USING iceberg " +
+        "PARTITIONED BY (category, bucket(8, id), days(created_at))", 
tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertNotNull("Should load the new table", table);
+
+    StructType expectedSchema = StructType.of(
+        NestedField.required(1, "id", Types.LongType.get()),
+        NestedField.optional(2, "created_at", Types.TimestampType.withZone()),
+        NestedField.optional(3, "category", Types.StringType.get()),
+        NestedField.optional(4, "data", Types.StringType.get()));
+    Assert.assertEquals("Should have the expected schema", expectedSchema, 
table.schema().asStruct());
+
+    PartitionSpec expectedSpec = PartitionSpec.builderFor(new 
Schema(expectedSchema.fields()))
+        .identity("category")
+        .bucket("id", 8)
+        .day("created_at")
+        .build();
+    Assert.assertEquals("Should be partitioned correctly", expectedSpec, 
table.spec());
+
+    Assert.assertNull("Should not have the default format set",
+        table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
+  }
+
+  @Test
+  public void testCreateTableColumnComments() {
+    Assert.assertFalse("Table should not already exist", 
validationCatalog.tableExists(tableIdent));
+
+    sql("CREATE TABLE %s " +
+        "(id BIGINT NOT NULL COMMENT 'Unique identifier', data STRING COMMENT 
'Data value') " +
+        "USING iceberg",
+        tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertNotNull("Should load the new table", table);
+
+    StructType expectedSchema = StructType.of(
+        NestedField.required(1, "id", Types.LongType.get(), "Unique 
identifier"),
+        NestedField.optional(2, "data", Types.StringType.get(), "Data value"));
+    Assert.assertEquals("Should have the expected schema", expectedSchema, 
table.schema().asStruct());
+    Assert.assertEquals("Should not be partitioned", 0, 
table.spec().fields().size());
+    Assert.assertNull("Should not have the default format set",
+        table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
+  }
+
+  @Test
+  public void testCreateTableComment() {
+    Assert.assertFalse("Table should not already exist", 
validationCatalog.tableExists(tableIdent));
+
+    sql("CREATE TABLE %s " +
+        "(id BIGINT NOT NULL, data STRING) " +
+        "USING iceberg " +
+        "COMMENT 'Table doc'",
+        tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertNotNull("Should load the new table", table);
+
+    StructType expectedSchema = StructType.of(
+        NestedField.required(1, "id", Types.LongType.get()),
+        NestedField.optional(2, "data", Types.StringType.get()));
+    Assert.assertEquals("Should have the expected schema", expectedSchema, 
table.schema().asStruct());
+    Assert.assertEquals("Should not be partitioned", 0, 
table.spec().fields().size());
+    Assert.assertNull("Should not have the default format set",
+        table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
+    Assert.assertEquals("Should have the table comment set in properties",
+        "Table doc", table.properties().get(TableCatalog.PROP_COMMENT));
+  }
+
+  @Test
+  public void testCreateTableLocation() throws Exception {
+    Assume.assumeTrue(
+        "Cannot set custom locations for Hadoop catalog tables",
+        !(validationCatalog instanceof HadoopCatalog));
+
+    Assert.assertFalse("Table should not already exist", 
validationCatalog.tableExists(tableIdent));
+
+    File tableLocation = temp.newFolder();
+    Assert.assertTrue(tableLocation.delete());
+
+    String location = "file:" + tableLocation.toString();
+
+    sql("CREATE TABLE %s " +
+        "(id BIGINT NOT NULL, data STRING) " +
+        "USING iceberg " +
+        "LOCATION '%s'",
+        tableName, location);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertNotNull("Should load the new table", table);
+
+    StructType expectedSchema = StructType.of(
+        NestedField.required(1, "id", Types.LongType.get()),
+        NestedField.optional(2, "data", Types.StringType.get()));
+    Assert.assertEquals("Should have the expected schema", expectedSchema, 
table.schema().asStruct());
+    Assert.assertEquals("Should not be partitioned", 0, 
table.spec().fields().size());
+    Assert.assertNull("Should not have the default format set",
+        table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
+    Assert.assertEquals("Should have a custom table location",
+        location, table.location());
+  }
+
+  @Test
+  public void testCreateTableProperties() {
+    Assert.assertFalse("Table should not already exist", 
validationCatalog.tableExists(tableIdent));
+
+    sql("CREATE TABLE %s " +
+        "(id BIGINT NOT NULL, data STRING) " +
+        "USING iceberg " +
+        "TBLPROPERTIES (p1=2, p2='x')",
+        tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertNotNull("Should load the new table", table);
+
+    StructType expectedSchema = StructType.of(
+        NestedField.required(1, "id", Types.LongType.get()),
+        NestedField.optional(2, "data", Types.StringType.get()));
+    Assert.assertEquals("Should have the expected schema", expectedSchema, 
table.schema().asStruct());
+    Assert.assertEquals("Should not be partitioned", 0, 
table.spec().fields().size());
+    Assert.assertEquals("Should have property p1", "2", 
table.properties().get("p1"));
+    Assert.assertEquals("Should have property p2", "x", 
table.properties().get("p2"));
+  }
+}
diff --git 
a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
 
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
new file mode 100644
index 0000000..2bde262
--- /dev/null
+++ 
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.types.Types;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.lit;
+import static org.apache.spark.sql.functions.when;
+
+public class TestCreateTableAsSelect extends SparkCatalogTestBase {
+
+  private final String sourceName;
+
+  public TestCreateTableAsSelect(String catalogName, String implementation, 
Map<String, String> config) {
+    super(catalogName, implementation, config);
+    this.sourceName = tableName("source");
+
+    sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) " +
+        "USING iceberg PARTITIONED BY (truncate(id, 3))", sourceName);
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", sourceName);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testUnpartitionedCTAS() {
+    sql("CREATE TABLE %s USING iceberg AS SELECT * FROM %s", tableName, 
sourceName);
+
+    Schema expectedSchema = new Schema(
+        Types.NestedField.optional(1, "id", Types.LongType.get()),
+        Types.NestedField.optional(2, "data", Types.StringType.get())
+    );
+
+    Table ctasTable = validationCatalog.loadTable(tableIdent);
+
+    Assert.assertEquals("Should have expected nullable schema",
+        expectedSchema.asStruct(), ctasTable.schema().asStruct());
+    Assert.assertEquals("Should be an unpartitioned table",
+        0, ctasTable.spec().fields().size());
+    assertEquals("Should have rows matching the source table",
+        sql("SELECT * FROM %s ORDER BY id", sourceName),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testPartitionedCTAS() {
+    sql("CREATE TABLE %s USING iceberg PARTITIONED BY (id) AS SELECT * FROM %s 
ORDER BY id", tableName, sourceName);
+
+    Schema expectedSchema = new Schema(
+        Types.NestedField.optional(1, "id", Types.LongType.get()),
+        Types.NestedField.optional(2, "data", Types.StringType.get())
+    );
+
+    PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
+        .identity("id")
+        .build();
+
+    Table ctasTable = validationCatalog.loadTable(tableIdent);
+
+    Assert.assertEquals("Should have expected nullable schema",
+        expectedSchema.asStruct(), ctasTable.schema().asStruct());
+    Assert.assertEquals("Should be partitioned by id",
+        expectedSpec, ctasTable.spec());
+    assertEquals("Should have rows matching the source table",
+        sql("SELECT * FROM %s ORDER BY id", sourceName),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testRTAS() {
+    sql("CREATE TABLE %s USING iceberg AS SELECT * FROM %s", tableName, 
sourceName);
+
+    assertEquals("Should have rows matching the source table",
+        sql("SELECT * FROM %s ORDER BY id", sourceName),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    sql("REPLACE TABLE %s USING iceberg PARTITIONED BY (part) AS " +
+        "SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' END 
AS part " +
+        "FROM %s ORDER BY 3, 1", tableName, sourceName);
+
+    // spark_catalog does not use an atomic replace, so the table history and 
old spec is dropped
+    // the other catalogs do use atomic replace, so the spec id is incremented
+    boolean isAtomic = !"spark_catalog".equals(catalogName);
+
+    Schema expectedSchema = new Schema(
+        Types.NestedField.optional(1, "id", Types.LongType.get()),
+        Types.NestedField.optional(2, "data", Types.StringType.get()),
+        Types.NestedField.optional(3, "part", Types.StringType.get())
+    );
+
+    int specId = isAtomic ? 1 : 0;
+    PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
+        .identity("part")
+        .withSpecId(specId)
+        .build();
+
+    Table rtasTable = validationCatalog.loadTable(tableIdent);
+
+    // the replacement table has a different schema and partition spec than 
the original
+    Assert.assertEquals("Should have expected nullable schema",
+        expectedSchema.asStruct(), rtasTable.schema().asStruct());
+    Assert.assertEquals("Should be partitioned by part",
+        expectedSpec, rtasTable.spec());
+
+    assertEquals("Should have rows matching the source table",
+        sql("SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' 
END AS part " +
+            "FROM %s ORDER BY id", sourceName),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    Assert.assertEquals("Table should have expected snapshots",
+        isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
+  }
+
+  @Test
+  public void testCreateRTAS() {
+    sql("CREATE OR REPLACE TABLE %s USING iceberg PARTITIONED BY (part) AS " +
+        "SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' END 
AS part " +
+        "FROM %s ORDER BY 3, 1", tableName, sourceName);
+
+    assertEquals("Should have rows matching the source table",
+        sql("SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' 
END AS part " +
+            "FROM %s ORDER BY id", sourceName),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    sql("CREATE OR REPLACE TABLE %s USING iceberg PARTITIONED BY (part) AS " +
+        "SELECT 2 * id as id, data, CASE WHEN ((2 * id) %% 2) = 0 THEN 'even' 
ELSE 'odd' END AS part " +
+        "FROM %s ORDER BY 3, 1", tableName, sourceName);
+
+    // spark_catalog does not use an atomic replace, so the table history is 
dropped
+    boolean isAtomic = !"spark_catalog".equals(catalogName);
+
+    Schema expectedSchema = new Schema(
+        Types.NestedField.optional(1, "id", Types.LongType.get()),
+        Types.NestedField.optional(2, "data", Types.StringType.get()),
+        Types.NestedField.optional(3, "part", Types.StringType.get())
+    );
+
+    PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
+        .identity("part")
+        .withSpecId(0) // the spec is identical and should be reused
+        .build();
+
+    Table rtasTable = validationCatalog.loadTable(tableIdent);
+
+    // the replacement table has a different schema and partition spec than 
the original
+    Assert.assertEquals("Should have expected nullable schema",
+        expectedSchema.asStruct(), rtasTable.schema().asStruct());
+    Assert.assertEquals("Should be partitioned by part",
+        expectedSpec, rtasTable.spec());
+
+    assertEquals("Should have rows matching the source table",
+        sql("SELECT 2 * id, data, CASE WHEN ((2 * id) %% 2) = 0 THEN 'even' 
ELSE 'odd' END AS part " +
+            "FROM %s ORDER BY id", sourceName),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    Assert.assertEquals("Table should have expected snapshots",
+        isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
+  }
+
+  @Test
+  public void testDataFrameV2Create() throws Exception {
+    spark.table(sourceName).writeTo(tableName).using("iceberg").create();
+
+    Schema expectedSchema = new Schema(
+        Types.NestedField.optional(1, "id", Types.LongType.get()),
+        Types.NestedField.optional(2, "data", Types.StringType.get())
+    );
+
+    Table ctasTable = validationCatalog.loadTable(tableIdent);
+
+    Assert.assertEquals("Should have expected nullable schema",
+        expectedSchema.asStruct(), ctasTable.schema().asStruct());
+    Assert.assertEquals("Should be an unpartitioned table",
+        0, ctasTable.spec().fields().size());
+    assertEquals("Should have rows matching the source table",
+        sql("SELECT * FROM %s ORDER BY id", sourceName),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testDataFrameV2Replace() throws Exception {
+    spark.table(sourceName).writeTo(tableName).using("iceberg").create();
+
+    assertEquals("Should have rows matching the source table",
+        sql("SELECT * FROM %s ORDER BY id", sourceName),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    spark.table(sourceName)
+        .select(
+            col("id"),
+            col("data"),
+            when(col("id").mod(lit(2)).equalTo(lit(0)), 
lit("even")).otherwise("odd").as("part"))
+        .orderBy("part", "id")
+        .writeTo(tableName)
+        .partitionedBy(col("part"))
+        .using("iceberg")
+        .replace();
+
+    // spark_catalog does not use an atomic replace, so the table history and 
old spec is dropped
+    // the other catalogs do use atomic replace, so the spec id is incremented
+    boolean isAtomic = !"spark_catalog".equals(catalogName);
+
+    Schema expectedSchema = new Schema(
+        Types.NestedField.optional(1, "id", Types.LongType.get()),
+        Types.NestedField.optional(2, "data", Types.StringType.get()),
+        Types.NestedField.optional(3, "part", Types.StringType.get())
+    );
+
+    int specId = isAtomic ? 1 : 0;
+    PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
+        .identity("part")
+        .withSpecId(specId)
+        .build();
+
+    Table rtasTable = validationCatalog.loadTable(tableIdent);
+
+    // the replacement table has a different schema and partition spec than 
the original
+    Assert.assertEquals("Should have expected nullable schema",
+        expectedSchema.asStruct(), rtasTable.schema().asStruct());
+    Assert.assertEquals("Should be partitioned by part",
+        expectedSpec, rtasTable.spec());
+
+    assertEquals("Should have rows matching the source table",
+        sql("SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' 
END AS part " +
+            "FROM %s ORDER BY id", sourceName),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    Assert.assertEquals("Table should have expected snapshots",
+        isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
+  }
+
+  @Test
+  public void testDataFrameV2CreateOrReplace() {
+    spark.table(sourceName)
+        .select(
+            col("id"),
+            col("data"),
+            when(col("id").mod(lit(2)).equalTo(lit(0)), 
lit("even")).otherwise("odd").as("part"))
+        .orderBy("part", "id")
+        .writeTo(tableName)
+        .partitionedBy(col("part"))
+        .using("iceberg")
+        .createOrReplace();
+
+    assertEquals("Should have rows matching the source table",
+        sql("SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' 
END AS part " +
+            "FROM %s ORDER BY id", sourceName),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    spark.table(sourceName)
+        .select(col("id").multiply(lit(2)).as("id"), col("data"))
+        .select(
+            col("id"),
+            col("data"),
+            when(col("id").mod(lit(2)).equalTo(lit(0)), 
lit("even")).otherwise("odd").as("part"))
+        .orderBy("part", "id")
+        .writeTo(tableName)
+        .partitionedBy(col("part"))
+        .using("iceberg")
+        .createOrReplace();
+
+    // spark_catalog does not use an atomic replace, so the table history is 
dropped
+    boolean isAtomic = !"spark_catalog".equals(catalogName);
+
+    Schema expectedSchema = new Schema(
+        Types.NestedField.optional(1, "id", Types.LongType.get()),
+        Types.NestedField.optional(2, "data", Types.StringType.get()),
+        Types.NestedField.optional(3, "part", Types.StringType.get())
+    );
+
+    PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
+        .identity("part")
+        .withSpecId(0) // the spec is identical and should be reused
+        .build();
+
+    Table rtasTable = validationCatalog.loadTable(tableIdent);
+
+    // the replacement table has a different schema and partition spec than 
the original
+    Assert.assertEquals("Should have expected nullable schema",
+        expectedSchema.asStruct(), rtasTable.schema().asStruct());
+    Assert.assertEquals("Should be partitioned by part",
+        expectedSpec, rtasTable.spec());
+
+    assertEquals("Should have rows matching the source table",
+        sql("SELECT 2 * id, data, CASE WHEN ((2 * id) %% 2) = 0 THEN 'even' 
ELSE 'odd' END AS part " +
+            "FROM %s ORDER BY id", sourceName),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    Assert.assertEquals("Table should have expected snapshots",
+        isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
+  }
+}
diff --git 
a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java 
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java
new file mode 100644
index 0000000..3387d76
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDeleteFrom extends SparkCatalogTestBase {
+  public TestDeleteFrom(String catalogName, String implementation, Map<String, 
String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testDeleteFromUnpartitionedTable() {
+    // set the shuffle partitions to 1 to force the write to use a single task 
and produce 1 file
+    String originalParallelism = 
spark.conf().get("spark.sql.shuffle.partitions");
+    spark.conf().set("spark.sql.shuffle.partitions", "1");
+    try {
+      sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+      sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", 
tableName);
+
+      assertEquals("Should have expected rows",
+          ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c")),
+          sql("SELECT * FROM %s ORDER BY id", tableName));
+
+      AssertHelpers.assertThrows("Should not delete when not all rows of a 
file match the filter",
+          IllegalArgumentException.class, "Failed to cleanly delete data 
files",
+          () -> sql("DELETE FROM %s WHERE id < 2", tableName));
+
+      sql("DELETE FROM %s WHERE id < 4", tableName);
+
+      Assert.assertEquals("Should have no rows after successful delete",
+          0L, scalarSql("SELECT count(1) FROM %s", tableName));
+
+    } finally {
+      spark.conf().set("spark.sql.shuffle.partitions", originalParallelism);
+    }
+  }
+
+  @Test
+  public void testDeleteFromPartitionedTable() {
+    // set the shuffle partitions to 1 to force the write to use a single task 
and produce 1 file per partition
+    String originalParallelism = 
spark.conf().get("spark.sql.shuffle.partitions");
+    spark.conf().set("spark.sql.shuffle.partitions", "1");
+    try {
+      sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg " +
+          "PARTITIONED BY (truncate(id, 2))", tableName);
+      sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", 
tableName);
+
+      assertEquals("Should have 3 rows in 2 partitions",
+          ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c")),
+          sql("SELECT * FROM %s ORDER BY id", tableName));
+
+      AssertHelpers.assertThrows("Should not delete when not all rows of a 
file match the filter",
+          IllegalArgumentException.class, "Failed to cleanly delete data 
files",
+          () -> sql("DELETE FROM %s WHERE id > 2", tableName));
+
+      sql("DELETE FROM %s WHERE id < 2", tableName);
+
+      assertEquals("Should have two rows in the second partition",
+          ImmutableList.of(row(2L, "b"), row(3L, "c")),
+          sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    } finally {
+      spark.conf().set("spark.sql.shuffle.partitions", originalParallelism);
+    }
+  }
+}
diff --git 
a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestNamespaceSQL.java 
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestNamespaceSQL.java
index 1b11845..d1eac31 100644
--- a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestNamespaceSQL.java
+++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestNamespaceSQL.java
@@ -69,7 +69,7 @@ public class TestNamespaceSQL extends SparkCatalogTestBase {
 
     sql("USE %s", catalogName);
 
-    String[] current = Iterables.getOnlyElement(sql("SHOW CURRENT NAMESPACE"));
+    Object[] current = Iterables.getOnlyElement(sql("SHOW CURRENT NAMESPACE"));
     Assert.assertEquals("Should use the current catalog", current[0], 
catalogName);
     Assert.assertEquals("Should use the configured default namespace", 
current[1], "default");
   }
@@ -114,12 +114,12 @@ public class TestNamespaceSQL extends 
SparkCatalogTestBase {
 
     Assert.assertTrue("Namespace should exist", 
validationNamespaceCatalog.namespaceExists(NS));
 
-    List<String[]> rows = sql("SHOW TABLES IN %s", fullNamespace);
+    List<Object[]> rows = sql("SHOW TABLES IN %s", fullNamespace);
     Assert.assertEquals("Should not list any tables", 0, rows.size());
 
     sql("CREATE TABLE %s.table (id bigint) USING iceberg", fullNamespace);
 
-    String[] row = Iterables.getOnlyElement(sql("SHOW TABLES IN %s", 
fullNamespace));
+    Object[] row = Iterables.getOnlyElement(sql("SHOW TABLES IN %s", 
fullNamespace));
     Assert.assertEquals("Namespace should match", "db", row[0]);
     Assert.assertEquals("Table name should match", "table", row[1]);
   }
@@ -132,21 +132,21 @@ public class TestNamespaceSQL extends 
SparkCatalogTestBase {
 
     Assert.assertTrue("Namespace should exist", 
validationNamespaceCatalog.namespaceExists(NS));
 
-    List<String[]> namespaces = sql("SHOW NAMESPACES IN %s", catalogName);
+    List<Object[]> namespaces = sql("SHOW NAMESPACES IN %s", catalogName);
 
     if (isHadoopCatalog) {
       Assert.assertEquals("Should have 1 namespace", 1, namespaces.size());
-      Set<String> namespaceNames = namespaces.stream().map(arr -> 
arr[0]).collect(Collectors.toSet());
+      Set<String> namespaceNames = namespaces.stream().map(arr -> 
arr[0].toString()).collect(Collectors.toSet());
       Assert.assertEquals("Should have only db namespace", 
ImmutableSet.of("db"), namespaceNames);
     } else {
       Assert.assertEquals("Should have 2 namespaces", 2, namespaces.size());
-      Set<String> namespaceNames = namespaces.stream().map(arr -> 
arr[0]).collect(Collectors.toSet());
+      Set<String> namespaceNames = namespaces.stream().map(arr -> 
arr[0].toString()).collect(Collectors.toSet());
       Assert.assertEquals("Should have default and db namespaces", 
ImmutableSet.of("default", "db"), namespaceNames);
     }
 
-    List<String[]> nestedNamespaces = sql("SHOW NAMESPACES IN %s", 
fullNamespace);
+    List<Object[]> nestedNamespaces = sql("SHOW NAMESPACES IN %s", 
fullNamespace);
 
-    Set<String> nestedNames = nestedNamespaces.stream().map(arr -> 
arr[0]).collect(Collectors.toSet());
+    Set<String> nestedNames = nestedNamespaces.stream().map(arr -> 
arr[0].toString()).collect(Collectors.toSet());
     Assert.assertEquals("Should not have nested namespaces", 
ImmutableSet.of(), nestedNames);
   }
 
diff --git 
a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java 
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java
new file mode 100644
index 0000000..8dcc6a0
--- /dev/null
+++ 
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.functions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestPartitionedWrites extends SparkCatalogTestBase {
+  public TestPartitionedWrites(String catalogName, String implementation, 
Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Before
+  public void createTables() {
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg PARTITIONED BY 
(truncate(id, 3))", tableName);
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testInsertAppend() {
+    Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) 
FROM %s", tableName));
+
+    sql("INSERT INTO %s VALUES (4, 'd'), (5, 'e')", tableName);
+
+    Assert.assertEquals("Should have 5 rows after insert", 5L, 
scalarSql("SELECT count(*) FROM %s", tableName));
+
+    List<Object[]> expected = ImmutableList.of(
+        row(1L, "a"),
+        row(2L, "b"),
+        row(3L, "c"),
+        row(4L, "d"),
+        row(5L, "e")
+    );
+
+    assertEquals("Row data should match expected", expected, sql("SELECT * 
FROM %s ORDER BY id", tableName));
+  }
+
+  @Ignore // broken because of SPARK-32168, which should be fixed in 3.0.1
+  public void testInsertOverwrite() {
+    Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) 
FROM %s", tableName));
+
+    // 4 and 5 replace 3 in the partition (id - (id % 3)) = 3
+    sql("INSERT OVERWRITE %s VALUES (4, 'd'), (5, 'e')", tableName);
+
+    Assert.assertEquals("Should have 4 rows after overwrite", 4L, 
scalarSql("SELECT count(*) FROM %s", tableName));
+
+    List<Object[]> expected = ImmutableList.of(
+        row(1L, "a"),
+        row(2L, "b"),
+        row(4L, "d"),
+        row(5L, "e")
+    );
+
+    assertEquals("Row data should match expected", expected, sql("SELECT * 
FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testDataFrameV2Append() throws NoSuchTableException {
+    Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) 
FROM %s", tableName));
+
+    List<SimpleRecord> data = ImmutableList.of(
+        new SimpleRecord(4, "d"),
+        new SimpleRecord(5, "e")
+    );
+    Dataset<Row> ds = spark.createDataFrame(data, SimpleRecord.class);
+
+    ds.writeTo(tableName).append();
+
+    Assert.assertEquals("Should have 5 rows after insert", 5L, 
scalarSql("SELECT count(*) FROM %s", tableName));
+
+    List<Object[]> expected = ImmutableList.of(
+        row(1L, "a"),
+        row(2L, "b"),
+        row(3L, "c"),
+        row(4L, "d"),
+        row(5L, "e")
+    );
+
+    assertEquals("Row data should match expected", expected, sql("SELECT * 
FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testDataFrameV2DynamicOverwrite() throws NoSuchTableException {
+    Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) 
FROM %s", tableName));
+
+    List<SimpleRecord> data = ImmutableList.of(
+        new SimpleRecord(4, "d"),
+        new SimpleRecord(5, "e")
+    );
+    Dataset<Row> ds = spark.createDataFrame(data, SimpleRecord.class);
+
+    ds.writeTo(tableName).overwritePartitions();
+
+    Assert.assertEquals("Should have 4 rows after overwrite", 4L, 
scalarSql("SELECT count(*) FROM %s", tableName));
+
+    List<Object[]> expected = ImmutableList.of(
+        row(1L, "a"),
+        row(2L, "b"),
+        row(4L, "d"),
+        row(5L, "e")
+    );
+
+    assertEquals("Row data should match expected", expected, sql("SELECT * 
FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testDataFrameV2Overwrite() throws NoSuchTableException {
+    Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) 
FROM %s", tableName));
+
+    List<SimpleRecord> data = ImmutableList.of(
+        new SimpleRecord(4, "d"),
+        new SimpleRecord(5, "e")
+    );
+    Dataset<Row> ds = spark.createDataFrame(data, SimpleRecord.class);
+
+    ds.writeTo(tableName).overwrite(functions.col("id").$less(3));
+
+    Assert.assertEquals("Should have 3 rows after overwrite", 3L, 
scalarSql("SELECT count(*) FROM %s", tableName));
+
+    List<Object[]> expected = ImmutableList.of(
+        row(3L, "c"),
+        row(4L, "d"),
+        row(5L, "e")
+    );
+
+    assertEquals("Row data should match expected", expected, sql("SELECT * 
FROM %s ORDER BY id", tableName));
+  }
+}
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java 
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
new file mode 100644
index 0000000..51da073
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.events.ScanEvent;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSelect extends SparkCatalogTestBase {
+  private int scanEventCount = 0;
+  private ScanEvent lastScanEvent = null;
+
+  public TestSelect(String catalogName, String implementation, Map<String, 
String> config) {
+    super(catalogName, implementation, config);
+
+    // register a scan event listener to validate pushdown
+    Listeners.register(event -> {
+      scanEventCount += 1;
+      lastScanEvent = event;
+    }, ScanEvent.class);
+  }
+
+  @Before
+  public void createTables() {
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName);
+
+    this.scanEventCount = 0;
+    this.lastScanEvent = null;
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testSelect() {
+    List<Object[]> expected = ImmutableList.of(row(1L, "a"), row(2L, "b"), 
row(3L, "c"));
+
+    assertEquals("Should return all expected rows", expected, sql("SELECT * 
FROM %s", tableName));
+  }
+
+  @Test
+  public void testProjection() {
+    List<Object[]> expected = ImmutableList.of(row(1L), row(2L), row(3L));
+
+    assertEquals("Should return all expected rows", expected, sql("SELECT id 
FROM %s", tableName));
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should not push down a filter", 
Expressions.alwaysTrue(), lastScanEvent.filter());
+    Assert.assertEquals("Should project only the id column",
+        
validationCatalog.loadTable(tableIdent).schema().select("id").asStruct(),
+        lastScanEvent.projection().asStruct());
+  }
+
+  @Test
+  public void testExpressionPushdown() {
+    List<Object[]> expected = ImmutableList.of(row("b"));
+
+    assertEquals("Should return all expected rows", expected, sql("SELECT data 
FROM %s WHERE id = 2", tableName));
+
+    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    Assert.assertEquals("Should not push down a filter",
+        "(id IS NOT NULL AND id = 2)",
+        Spark3Util.describe(lastScanEvent.filter()));
+    Assert.assertEquals("Should project only the id column",
+        validationCatalog.loadTable(tableIdent).schema().asStruct(),
+        lastScanEvent.projection().asStruct());
+  }
+
+  @Test
+  public void testMetadataTables() {
+    Assume.assumeFalse(
+        "Spark session catalog does not support metadata tables",
+        "spark_catalog".equals(catalogName));
+
+    assertEquals("Snapshot metadata table",
+        ImmutableList.of(row(ANY, ANY, null, "append", ANY, ANY)),
+        sql("SELECT * FROM %s.snapshots", tableName));
+  }
+}
diff --git 
a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestUnpartitionedWrites.java
 
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestUnpartitionedWrites.java
new file mode 100644
index 0000000..988df10
--- /dev/null
+++ 
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestUnpartitionedWrites.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.functions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestUnpartitionedWrites extends SparkCatalogTestBase {
+  public TestUnpartitionedWrites(String catalogName, String implementation, 
Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Before
+  public void createTables() {
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testInsertAppend() {
+    Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) 
FROM %s", tableName));
+
+    sql("INSERT INTO %s VALUES (4, 'd'), (5, 'e')", tableName);
+
+    Assert.assertEquals("Should have 5 rows after insert", 5L, 
scalarSql("SELECT count(*) FROM %s", tableName));
+
+    List<Object[]> expected = ImmutableList.of(
+        row(1L, "a"),
+        row(2L, "b"),
+        row(3L, "c"),
+        row(4L, "d"),
+        row(5L, "e")
+    );
+
+    assertEquals("Row data should match expected", expected, sql("SELECT * 
FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testInsertOverwrite() {
+    Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) 
FROM %s", tableName));
+
+    sql("INSERT OVERWRITE %s VALUES (4, 'd'), (5, 'e')", tableName);
+
+    Assert.assertEquals("Should have 2 rows after overwrite", 2L, 
scalarSql("SELECT count(*) FROM %s", tableName));
+
+    List<Object[]> expected = ImmutableList.of(
+        row(4L, "d"),
+        row(5L, "e")
+    );
+
+    assertEquals("Row data should match expected", expected, sql("SELECT * 
FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testDataFrameV2Append() throws NoSuchTableException {
+    Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) 
FROM %s", tableName));
+
+    List<SimpleRecord> data = ImmutableList.of(
+        new SimpleRecord(4, "d"),
+        new SimpleRecord(5, "e")
+    );
+    Dataset<Row> ds = spark.createDataFrame(data, SimpleRecord.class);
+
+    ds.writeTo(tableName).append();
+
+    Assert.assertEquals("Should have 5 rows after insert", 5L, 
scalarSql("SELECT count(*) FROM %s", tableName));
+
+    List<Object[]> expected = ImmutableList.of(
+        row(1L, "a"),
+        row(2L, "b"),
+        row(3L, "c"),
+        row(4L, "d"),
+        row(5L, "e")
+    );
+
+    assertEquals("Row data should match expected", expected, sql("SELECT * 
FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testDataFrameV2DynamicOverwrite() throws NoSuchTableException {
+    Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) 
FROM %s", tableName));
+
+    List<SimpleRecord> data = ImmutableList.of(
+        new SimpleRecord(4, "d"),
+        new SimpleRecord(5, "e")
+    );
+    Dataset<Row> ds = spark.createDataFrame(data, SimpleRecord.class);
+
+    ds.writeTo(tableName).overwritePartitions();
+
+    Assert.assertEquals("Should have 2 rows after overwrite", 2L, 
scalarSql("SELECT count(*) FROM %s", tableName));
+
+    List<Object[]> expected = ImmutableList.of(
+        row(4L, "d"),
+        row(5L, "e")
+    );
+
+    assertEquals("Row data should match expected", expected, sql("SELECT * 
FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testDataFrameV2Overwrite() throws NoSuchTableException {
+    Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) 
FROM %s", tableName));
+
+    List<SimpleRecord> data = ImmutableList.of(
+        new SimpleRecord(4, "d"),
+        new SimpleRecord(5, "e")
+    );
+    Dataset<Row> ds = spark.createDataFrame(data, SimpleRecord.class);
+
+    ds.writeTo(tableName).overwrite(functions.col("id").$less$eq(3));
+
+    Assert.assertEquals("Should have 2 rows after overwrite", 2L, 
scalarSql("SELECT count(*) FROM %s", tableName));
+
+    List<Object[]> expected = ImmutableList.of(
+        row(4L, "d"),
+        row(5L, "e")
+    );
+
+    assertEquals("Row data should match expected", expected, sql("SELECT * 
FROM %s ORDER BY id", tableName));
+  }
+}

Reply via email to