This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 115a145 Add Spark 3 SQL tests (#1156)
115a145 is described below
commit 115a1450c0e62edd835d0dd77a6513ff3717479c
Author: Ryan Blue <[email protected]>
AuthorDate: Wed Jul 8 16:03:50 2020 -0700
Add Spark 3 SQL tests (#1156)
---
.../org/apache/iceberg/BaseMetastoreCatalog.java | 12 +-
.../java/org/apache/iceberg/TableMetadata.java | 4 +-
.../test/java/org/apache/iceberg/TestTables.java | 2 +-
.../java/org/apache/iceberg/hive/HiveCatalog.java | 18 +-
.../iceberg/hive/HiveCreateReplaceTableTest.java | 4 +-
.../org/apache/iceberg/spark/SparkTestBase.java | 39 ++-
.../apache/iceberg/spark/source/SimpleRecord.java | 2 +-
.../apache/iceberg/spark/SparkCatalogTestBase.java | 12 +
.../apache/iceberg/spark/sql/TestAlterTable.java | 216 ++++++++++++++
.../apache/iceberg/spark/sql/TestCreateTable.java | 224 ++++++++++++++
.../iceberg/spark/sql/TestCreateTableAsSelect.java | 322 +++++++++++++++++++++
.../apache/iceberg/spark/sql/TestDeleteFrom.java | 95 ++++++
.../apache/iceberg/spark/sql/TestNamespaceSQL.java | 16 +-
.../iceberg/spark/sql/TestPartitionedWrites.java | 162 +++++++++++
.../org/apache/iceberg/spark/sql/TestSelect.java | 109 +++++++
.../iceberg/spark/sql/TestUnpartitionedWrites.java | 155 ++++++++++
16 files changed, 1372 insertions(+), 20 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
index 9c3c7a9..6afc352 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
@@ -110,9 +110,17 @@ public abstract class BaseMetastoreCatalog implements
Catalog {
throw new NoSuchTableException("No such table: " + identifier);
}
- String baseLocation = location != null ? location :
defaultWarehouseLocation(identifier);
Map<String, String> tableProperties = properties != null ? properties :
Maps.newHashMap();
- TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec,
baseLocation, tableProperties);
+
+ TableMetadata metadata;
+ if (ops.current() != null) {
+ String baseLocation = location != null ? location :
ops.current().location();
+ metadata = ops.current().buildReplacement(schema, spec, baseLocation,
tableProperties);
+ } else {
+ String baseLocation = location != null ? location :
defaultWarehouseLocation(identifier);
+ metadata = TableMetadata.newTableMetadata(schema, spec, baseLocation,
tableProperties);
+ }
+
if (orCreate) {
return
Transactions.createOrReplaceTableTransaction(identifier.toString(), ops,
metadata);
} else {
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java
b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 8501aa9..b7fb7cb 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -566,7 +566,7 @@ public class TableMetadata implements Serializable {
// The caller is responsible to pass a updatedPartitionSpec with correct
partition field IDs
public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec
updatedPartitionSpec,
- Map<String, String> updatedProperties) {
+ String newLocation, Map<String,
String> updatedProperties) {
ValidationException.check(formatVersion > 1 ||
PartitionSpec.hasSequentialIds(updatedPartitionSpec),
"Spec does not use sequential IDs that are required in v1: %s",
updatedPartitionSpec);
@@ -602,7 +602,7 @@ public class TableMetadata implements Serializable {
newProperties.putAll(this.properties);
newProperties.putAll(updatedProperties);
- return new TableMetadata(null, formatVersion, uuid, location,
+ return new TableMetadata(null, formatVersion, uuid, newLocation,
lastSequenceNumber, System.currentTimeMillis(),
nextLastColumnId.get(), freshSchema,
specId, builder.build(), ImmutableMap.copyOf(newProperties),
-1, snapshots, ImmutableList.of(), addPreviousFile(file,
lastUpdatedMillis, newProperties));
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java
b/core/src/test/java/org/apache/iceberg/TestTables.java
index 8961820..b0057cd 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -78,7 +78,7 @@ public class TestTables {
TableMetadata metadata;
if (current != null) {
- metadata = current.buildReplacement(schema, spec, properties);
+ metadata = current.buildReplacement(schema, spec, current.location(),
properties);
return Transactions.replaceTableTransaction(name, ops, metadata);
} else {
metadata = newTableMetadata(schema, spec, temp.toString(), properties);
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 3fa0499..e7ef9bf 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -153,10 +153,12 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements Closeable, Supp
}
@Override
- public void renameTable(TableIdentifier from, TableIdentifier to) {
+ public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
if (!isValidIdentifier(from)) {
throw new NoSuchTableException("Invalid identifier: %s", from);
}
+
+ TableIdentifier to = removeCatalogName(originalTo);
Preconditions.checkArgument(isValidIdentifier(to), "Invalid identifier:
%s", to);
String toDatabase = to.namespace().level(0);
@@ -347,6 +349,20 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements Closeable, Supp
return tableIdentifier.namespace().levels().length == 1;
}
+ private TableIdentifier removeCatalogName(TableIdentifier to) {
+ if (isValidIdentifier(to)) {
+ return to;
+ }
+
+ // check if the identifier includes the catalog name and remove it
+ if (to.namespace().levels().length == 2 &&
name().equalsIgnoreCase(to.namespace().level(0))) {
+ return TableIdentifier.of(Namespace.of(to.namespace().level(1)),
to.name());
+ }
+
+ // return the original unmodified
+ return to;
+ }
+
private boolean isValidateNamespace(Namespace namespace) {
return namespace.levels().length == 1;
}
diff --git
a/hive/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java
b/hive/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java
index b714605..93ba58d 100644
--- a/hive/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java
+++ b/hive/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java
@@ -155,7 +155,7 @@ public class HiveCreateReplaceTableTest extends
HiveMetastoreTest {
txn.commitTransaction();
Table table = catalog.loadTable(TABLE_IDENTIFIER);
- Assert.assertEquals("Partition spec should match",
PartitionSpec.unpartitioned(), table.spec());
+ Assert.assertEquals("Partition spec should be unpartitioned", 0,
table.spec().fields().size());
}
@Test
@@ -233,7 +233,7 @@ public class HiveCreateReplaceTableTest extends
HiveMetastoreTest {
txn.commitTransaction();
Table table = catalog.loadTable(TABLE_IDENTIFIER);
- Assert.assertEquals("Partition spec should match",
PartitionSpec.unpartitioned(), table.spec());
+ Assert.assertEquals("Partition spec should be unpartitioned", 0,
table.spec().fields().size());
}
@Test
diff --git a/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
b/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
index 3cf279f..e66f3a4 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
@@ -26,15 +26,20 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
public class SparkTestBase {
+ protected static final Object ANY = new Object();
+
private static TestHiveMetastore metastore = null;
private static HiveConf hiveConf = null;
protected static SparkSession spark = null;
@@ -48,6 +53,7 @@ public class SparkTestBase {
SparkTestBase.spark = SparkSession.builder()
.master("local[2]")
+ .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
.config("spark.hadoop." + METASTOREURIS.varname,
hiveConf.get(METASTOREURIS.varname))
.enableHiveSupport()
.getOrCreate();
@@ -65,7 +71,7 @@ public class SparkTestBase {
SparkTestBase.spark = null;
}
- protected List<String[]> sql(String query, Object... args) {
+ protected List<Object[]> sql(String query, Object... args) {
List<Row> rows = spark.sql(String.format(query, args)).collectAsList();
if (rows.size() < 1) {
return ImmutableList.of();
@@ -73,11 +79,38 @@ public class SparkTestBase {
return rows.stream()
.map(row -> IntStream.range(0, row.size())
- .mapToObj(pos -> row.isNullAt(pos) ? null :
row.get(pos).toString())
- .toArray(String[]::new)
+ .mapToObj(pos -> row.isNullAt(pos) ? null : row.get(pos))
+ .toArray(Object[]::new)
).collect(Collectors.toList());
}
+ protected Object scalarSql(String query, Object... args) {
+ List<Object[]> rows = sql(query, args);
+ Assert.assertEquals("Scalar SQL should return one row", 1, rows.size());
+ Object[] row = Iterables.getOnlyElement(rows);
+ Assert.assertEquals("Scalar SQL should return one value", 1, row.length);
+ return row[0];
+ }
+
+ protected Object[] row(Object... values) {
+ return values;
+ }
+
+ protected void assertEquals(String context, List<Object[]> expectedRows,
List<Object[]> actualRows) {
+ Assert.assertEquals(context + ": number of results should match",
expectedRows.size(), actualRows.size());
+ for (int row = 0; row < expectedRows.size(); row += 1) {
+ Object[] expected = expectedRows.get(row);
+ Object[] actual = actualRows.get(row);
+ Assert.assertEquals("Number of columns should match", expected.length,
actual.length);
+ for (int col = 0; col < actualRows.get(row).length; col += 1) {
+ if (expected[col] != ANY) {
+ Assert.assertEquals(context + ": row " + row + " col " + col + "
contents should match",
+ expected[col], actual[col]);
+ }
+ }
+ }
+ }
+
protected static String dbPath(String dbName) {
return metastore.getDatabasePath(dbName);
}
diff --git
a/spark/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java
b/spark/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java
index 76cd5ca..c8b7a31 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java
@@ -28,7 +28,7 @@ public class SimpleRecord {
public SimpleRecord() {
}
- SimpleRecord(Integer id, String data) {
+ public SimpleRecord(Integer id, String data) {
this.id = id;
this.data = data;
}
diff --git
a/spark3/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java
b/spark3/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java
index 4013cfc..5b8b2b3 100644
--- a/spark3/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java
+++ b/spark3/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java
@@ -23,7 +23,9 @@ import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.AfterClass;
@@ -76,6 +78,8 @@ public abstract class SparkCatalogTestBase extends
SparkTestBase {
protected final String catalogName;
protected final Catalog validationCatalog;
protected final SupportsNamespaces validationNamespaceCatalog;
+ protected final TableIdentifier tableIdent =
TableIdentifier.of(Namespace.of("default"), "table");
+ protected final String tableName;
public SparkCatalogTestBase(String catalogName, String implementation,
Map<String, String> config) {
this.catalogName = catalogName;
@@ -90,5 +94,13 @@ public abstract class SparkCatalogTestBase extends
SparkTestBase {
if (config.get("type").equalsIgnoreCase("hadoop")) {
spark.conf().set("spark.sql.catalog." + catalogName + ".warehouse",
"file:" + warehouse);
}
+
+ this.tableName = (catalogName.equals("spark_catalog") ? "" : catalogName +
".") + "default.table";
+
+ sql("CREATE NAMESPACE IF NOT EXISTS default");
+ }
+
+ protected String tableName(String name) {
+ return (catalogName.equals("spark_catalog") ? "" : catalogName + ".") +
"default." + name;
}
}
diff --git
a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestAlterTable.java
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestAlterTable.java
new file mode 100644
index 0000000..27ff6e1
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestAlterTable.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.spark.sql.AnalysisException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAlterTable extends SparkCatalogTestBase {
+ private final TableIdentifier renamedIdent =
TableIdentifier.of(Namespace.of("default"), "table2");
+
+ public TestAlterTable(String catalogName, String implementation, Map<String,
String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @Before
+ public void createTable() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ }
+
+ @After
+ public void removeTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ sql("DROP TABLE IF EXISTS %s2", tableName);
+ }
+
+ @Test
+ public void testAddColumn() {
+ sql("ALTER TABLE %s ADD COLUMN point struct<x: double NOT NULL, y: double
NOT NULL> AFTER id", tableName);
+
+ Types.StructType expectedSchema = Types.StructType.of(
+ NestedField.required(1, "id", Types.LongType.get()),
+ NestedField.optional(3, "point", Types.StructType.of(
+ NestedField.required(4, "x", Types.DoubleType.get()),
+ NestedField.required(5, "y", Types.DoubleType.get())
+ )),
+ NestedField.optional(2, "data", Types.StringType.get()));
+
+ Assert.assertEquals("Schema should match expected",
+ expectedSchema,
validationCatalog.loadTable(tableIdent).schema().asStruct());
+
+ sql("ALTER TABLE %s ADD COLUMN point.z double COMMENT 'May be null'
FIRST", tableName);
+
+ Types.StructType expectedSchema2 = Types.StructType.of(
+ NestedField.required(1, "id", Types.LongType.get()),
+ NestedField.optional(3, "point", Types.StructType.of(
+ NestedField.optional(6, "z", Types.DoubleType.get(), "May be
null"),
+ NestedField.required(4, "x", Types.DoubleType.get()),
+ NestedField.required(5, "y", Types.DoubleType.get())
+ )),
+ NestedField.optional(2, "data", Types.StringType.get()));
+
+ Assert.assertEquals("Schema should match expected",
+ expectedSchema2,
validationCatalog.loadTable(tableIdent).schema().asStruct());
+ }
+
+ @Test
+ public void testDropColumn() {
+ sql("ALTER TABLE %s DROP COLUMN data", tableName);
+
+ Types.StructType expectedSchema = Types.StructType.of(
+ NestedField.required(1, "id", Types.LongType.get()));
+
+ Assert.assertEquals("Schema should match expected",
+ expectedSchema,
validationCatalog.loadTable(tableIdent).schema().asStruct());
+ }
+
+ @Test
+ public void testRenameColumn() {
+ sql("ALTER TABLE %s RENAME COLUMN id TO row_id", tableName);
+
+ Types.StructType expectedSchema = Types.StructType.of(
+ NestedField.required(1, "row_id", Types.LongType.get()),
+ NestedField.optional(2, "data", Types.StringType.get()));
+
+ Assert.assertEquals("Schema should match expected",
+ expectedSchema,
validationCatalog.loadTable(tableIdent).schema().asStruct());
+ }
+
+ @Test
+ public void testAlterColumnComment() {
+ sql("ALTER TABLE %s ALTER COLUMN id COMMENT 'Record id'", tableName);
+
+ Types.StructType expectedSchema = Types.StructType.of(
+ NestedField.required(1, "id", Types.LongType.get(), "Record id"),
+ NestedField.optional(2, "data", Types.StringType.get()));
+
+ Assert.assertEquals("Schema should match expected",
+ expectedSchema,
validationCatalog.loadTable(tableIdent).schema().asStruct());
+ }
+
+ @Test
+ public void testAlterColumnType() {
+ sql("ALTER TABLE %s ADD COLUMN count int", tableName);
+ sql("ALTER TABLE %s ALTER COLUMN count TYPE bigint", tableName);
+
+ Types.StructType expectedSchema = Types.StructType.of(
+ NestedField.required(1, "id", Types.LongType.get()),
+ NestedField.optional(2, "data", Types.StringType.get()),
+ NestedField.optional(3, "count", Types.LongType.get()));
+
+ Assert.assertEquals("Schema should match expected",
+ expectedSchema,
validationCatalog.loadTable(tableIdent).schema().asStruct());
+ }
+
+ @Test
+ public void testAlterColumnDropNotNull() {
+ sql("ALTER TABLE %s ALTER COLUMN id DROP NOT NULL", tableName);
+
+ Types.StructType expectedSchema = Types.StructType.of(
+ NestedField.optional(1, "id", Types.LongType.get()),
+ NestedField.optional(2, "data", Types.StringType.get()));
+
+ Assert.assertEquals("Schema should match expected",
+ expectedSchema,
validationCatalog.loadTable(tableIdent).schema().asStruct());
+ }
+
+ @Test
+ public void testAlterColumnSetNotNull() {
+ // no-op changes are allowed
+ sql("ALTER TABLE %s ALTER COLUMN id SET NOT NULL", tableName);
+
+ Types.StructType expectedSchema = Types.StructType.of(
+ NestedField.required(1, "id", Types.LongType.get()),
+ NestedField.optional(2, "data", Types.StringType.get()));
+
+ Assert.assertEquals("Schema should match expected",
+ expectedSchema,
validationCatalog.loadTable(tableIdent).schema().asStruct());
+
+ AssertHelpers.assertThrows("Should reject adding NOT NULL constraint to an
optional column",
+ AnalysisException.class, "Cannot change nullable column to
non-nullable: data",
+ () -> sql("ALTER TABLE %s ALTER COLUMN data SET NOT NULL", tableName));
+ }
+
+ @Test
+ public void testAlterColumnPositionAfter() {
+ sql("ALTER TABLE %s ADD COLUMN count int", tableName);
+ sql("ALTER TABLE %s ALTER COLUMN count AFTER id", tableName);
+
+ Types.StructType expectedSchema = Types.StructType.of(
+ NestedField.required(1, "id", Types.LongType.get()),
+ NestedField.optional(3, "count", Types.IntegerType.get()),
+ NestedField.optional(2, "data", Types.StringType.get()));
+
+ Assert.assertEquals("Schema should match expected",
+ expectedSchema,
validationCatalog.loadTable(tableIdent).schema().asStruct());
+ }
+
+ @Test
+ public void testAlterColumnPositionFirst() {
+ sql("ALTER TABLE %s ADD COLUMN count int", tableName);
+ sql("ALTER TABLE %s ALTER COLUMN count FIRST", tableName);
+
+ Types.StructType expectedSchema = Types.StructType.of(
+ NestedField.optional(3, "count", Types.IntegerType.get()),
+ NestedField.required(1, "id", Types.LongType.get()),
+ NestedField.optional(2, "data", Types.StringType.get()));
+
+ Assert.assertEquals("Schema should match expected",
+ expectedSchema,
validationCatalog.loadTable(tableIdent).schema().asStruct());
+ }
+
+ @Test
+ public void testTableRename() {
+ Assume.assumeFalse("Hadoop catalog does not support rename",
validationCatalog instanceof HadoopCatalog);
+
+ Assert.assertTrue("Initial name should exist",
validationCatalog.tableExists(tableIdent));
+ Assert.assertFalse("New name should not exist",
validationCatalog.tableExists(renamedIdent));
+
+ sql("ALTER TABLE %s RENAME TO %s2", tableName, tableName);
+
+ Assert.assertFalse("Initial name should not exist",
validationCatalog.tableExists(tableIdent));
+ Assert.assertTrue("New name should exist",
validationCatalog.tableExists(renamedIdent));
+ }
+
+ @Test
+ public void testSetTableProperties() {
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('prop'='value')", tableName);
+
+ Assert.assertEquals("Should have the new table property",
+ "value",
validationCatalog.loadTable(tableIdent).properties().get("prop"));
+
+ sql("ALTER TABLE %s UNSET TBLPROPERTIES ('prop')", tableName);
+
+ Assert.assertNull("Should not have the removed table property",
+ validationCatalog.loadTable(tableIdent).properties().get("prop"));
+ }
+}
diff --git
a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
new file mode 100644
index 0000000..9d8711c
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.io.File;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class TestCreateTable extends SparkCatalogTestBase {
+ public TestCreateTable(String catalogName, String implementation,
Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void dropTestTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testCreateTable() {
+ Assert.assertFalse("Table should not already exist",
validationCatalog.tableExists(tableIdent));
+
+ sql("CREATE TABLE %s (id BIGINT NOT NULL, data STRING) USING iceberg",
tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertNotNull("Should load the new table", table);
+
+ StructType expectedSchema = StructType.of(
+ NestedField.required(1, "id", Types.LongType.get()),
+ NestedField.optional(2, "data", Types.StringType.get()));
+ Assert.assertEquals("Should have the expected schema", expectedSchema,
table.schema().asStruct());
+ Assert.assertEquals("Should not be partitioned", 0,
table.spec().fields().size());
+ Assert.assertNull("Should not have the default format set",
+ table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
+ }
+
+ @Test
+ public void testCreateTableUsingParquet() {
+ Assume.assumeTrue(
+ "Not working with session catalog because Spark will not use v2 for a
Parquet table",
+ !"spark_catalog".equals(catalogName));
+
+ Assert.assertFalse("Table should not already exist",
validationCatalog.tableExists(tableIdent));
+
+ sql("CREATE TABLE %s (id BIGINT NOT NULL, data STRING) USING parquet",
tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertNotNull("Should load the new table", table);
+
+ StructType expectedSchema = StructType.of(
+ NestedField.required(1, "id", Types.LongType.get()),
+ NestedField.optional(2, "data", Types.StringType.get()));
+ Assert.assertEquals("Should have the expected schema", expectedSchema,
table.schema().asStruct());
+ Assert.assertEquals("Should not be partitioned", 0,
table.spec().fields().size());
+ Assert.assertEquals("Should not have default format parquet",
+ "parquet",
+ table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
+
+ AssertHelpers.assertThrows("Should reject unsupported format names",
+ IllegalArgumentException.class, "Unsupported format in USING:
crocodile",
+ () -> sql("CREATE TABLE %s.default.fail (id BIGINT NOT NULL, data
STRING) USING crocodile", catalogName));
+ }
+
+ @Test
+ public void testCreateTablePartitionedBy() {
+ Assert.assertFalse("Table should not already exist",
validationCatalog.tableExists(tableIdent));
+
+ sql("CREATE TABLE %s " +
+ "(id BIGINT NOT NULL, created_at TIMESTAMP, category STRING, data
STRING) " +
+ "USING iceberg " +
+ "PARTITIONED BY (category, bucket(8, id), days(created_at))",
tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertNotNull("Should load the new table", table);
+
+ StructType expectedSchema = StructType.of(
+ NestedField.required(1, "id", Types.LongType.get()),
+ NestedField.optional(2, "created_at", Types.TimestampType.withZone()),
+ NestedField.optional(3, "category", Types.StringType.get()),
+ NestedField.optional(4, "data", Types.StringType.get()));
+ Assert.assertEquals("Should have the expected schema", expectedSchema,
table.schema().asStruct());
+
+ PartitionSpec expectedSpec = PartitionSpec.builderFor(new
Schema(expectedSchema.fields()))
+ .identity("category")
+ .bucket("id", 8)
+ .day("created_at")
+ .build();
+ Assert.assertEquals("Should be partitioned correctly", expectedSpec,
table.spec());
+
+ Assert.assertNull("Should not have the default format set",
+ table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
+ }
+
+ @Test
+ public void testCreateTableColumnComments() {
+ Assert.assertFalse("Table should not already exist",
validationCatalog.tableExists(tableIdent));
+
+ sql("CREATE TABLE %s " +
+ "(id BIGINT NOT NULL COMMENT 'Unique identifier', data STRING COMMENT
'Data value') " +
+ "USING iceberg",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertNotNull("Should load the new table", table);
+
+ StructType expectedSchema = StructType.of(
+ NestedField.required(1, "id", Types.LongType.get(), "Unique
identifier"),
+ NestedField.optional(2, "data", Types.StringType.get(), "Data value"));
+ Assert.assertEquals("Should have the expected schema", expectedSchema,
table.schema().asStruct());
+ Assert.assertEquals("Should not be partitioned", 0,
table.spec().fields().size());
+ Assert.assertNull("Should not have the default format set",
+ table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
+ }
+
+ @Test
+ public void testCreateTableComment() {
+ Assert.assertFalse("Table should not already exist",
validationCatalog.tableExists(tableIdent));
+
+ sql("CREATE TABLE %s " +
+ "(id BIGINT NOT NULL, data STRING) " +
+ "USING iceberg " +
+ "COMMENT 'Table doc'",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertNotNull("Should load the new table", table);
+
+ StructType expectedSchema = StructType.of(
+ NestedField.required(1, "id", Types.LongType.get()),
+ NestedField.optional(2, "data", Types.StringType.get()));
+ Assert.assertEquals("Should have the expected schema", expectedSchema,
table.schema().asStruct());
+ Assert.assertEquals("Should not be partitioned", 0,
table.spec().fields().size());
+ Assert.assertNull("Should not have the default format set",
+ table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
+ Assert.assertEquals("Should have the table comment set in properties",
+ "Table doc", table.properties().get(TableCatalog.PROP_COMMENT));
+ }
+
+ @Test
+ public void testCreateTableLocation() throws Exception {
+ Assume.assumeTrue(
+ "Cannot set custom locations for Hadoop catalog tables",
+ !(validationCatalog instanceof HadoopCatalog));
+
+ Assert.assertFalse("Table should not already exist",
validationCatalog.tableExists(tableIdent));
+
+ File tableLocation = temp.newFolder();
+ Assert.assertTrue(tableLocation.delete());
+
+ String location = "file:" + tableLocation.toString();
+
+ sql("CREATE TABLE %s " +
+ "(id BIGINT NOT NULL, data STRING) " +
+ "USING iceberg " +
+ "LOCATION '%s'",
+ tableName, location);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertNotNull("Should load the new table", table);
+
+ StructType expectedSchema = StructType.of(
+ NestedField.required(1, "id", Types.LongType.get()),
+ NestedField.optional(2, "data", Types.StringType.get()));
+ Assert.assertEquals("Should have the expected schema", expectedSchema,
table.schema().asStruct());
+ Assert.assertEquals("Should not be partitioned", 0,
table.spec().fields().size());
+ Assert.assertNull("Should not have the default format set",
+ table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
+ Assert.assertEquals("Should have a custom table location",
+ location, table.location());
+ }
+
+ @Test
+ public void testCreateTableProperties() {
+ Assert.assertFalse("Table should not already exist",
validationCatalog.tableExists(tableIdent));
+
+ sql("CREATE TABLE %s " +
+ "(id BIGINT NOT NULL, data STRING) " +
+ "USING iceberg " +
+ "TBLPROPERTIES (p1=2, p2='x')",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertNotNull("Should load the new table", table);
+
+ StructType expectedSchema = StructType.of(
+ NestedField.required(1, "id", Types.LongType.get()),
+ NestedField.optional(2, "data", Types.StringType.get()));
+ Assert.assertEquals("Should have the expected schema", expectedSchema,
table.schema().asStruct());
+ Assert.assertEquals("Should not be partitioned", 0,
table.spec().fields().size());
+ Assert.assertEquals("Should have property p1", "2",
table.properties().get("p1"));
+ Assert.assertEquals("Should have property p2", "x",
table.properties().get("p2"));
+ }
+}
diff --git
a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
new file mode 100644
index 0000000..2bde262
--- /dev/null
+++
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.types.Types;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.lit;
+import static org.apache.spark.sql.functions.when;
+
+public class TestCreateTableAsSelect extends SparkCatalogTestBase {
+
+ private final String sourceName;
+
+ public TestCreateTableAsSelect(String catalogName, String implementation,
Map<String, String> config) {
+ super(catalogName, implementation, config);
+ this.sourceName = tableName("source");
+
+ sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) " +
+ "USING iceberg PARTITIONED BY (truncate(id, 3))", sourceName);
+ sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", sourceName);
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testUnpartitionedCTAS() {
+ sql("CREATE TABLE %s USING iceberg AS SELECT * FROM %s", tableName,
sourceName);
+
+ Schema expectedSchema = new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get())
+ );
+
+ Table ctasTable = validationCatalog.loadTable(tableIdent);
+
+ Assert.assertEquals("Should have expected nullable schema",
+ expectedSchema.asStruct(), ctasTable.schema().asStruct());
+ Assert.assertEquals("Should be an unpartitioned table",
+ 0, ctasTable.spec().fields().size());
+ assertEquals("Should have rows matching the source table",
+ sql("SELECT * FROM %s ORDER BY id", sourceName),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testPartitionedCTAS() {
+ sql("CREATE TABLE %s USING iceberg PARTITIONED BY (id) AS SELECT * FROM %s
ORDER BY id", tableName, sourceName);
+
+ Schema expectedSchema = new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get())
+ );
+
+ PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
+ .identity("id")
+ .build();
+
+ Table ctasTable = validationCatalog.loadTable(tableIdent);
+
+ Assert.assertEquals("Should have expected nullable schema",
+ expectedSchema.asStruct(), ctasTable.schema().asStruct());
+ Assert.assertEquals("Should be partitioned by id",
+ expectedSpec, ctasTable.spec());
+ assertEquals("Should have rows matching the source table",
+ sql("SELECT * FROM %s ORDER BY id", sourceName),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testRTAS() {
+ sql("CREATE TABLE %s USING iceberg AS SELECT * FROM %s", tableName,
sourceName);
+
+ assertEquals("Should have rows matching the source table",
+ sql("SELECT * FROM %s ORDER BY id", sourceName),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ sql("REPLACE TABLE %s USING iceberg PARTITIONED BY (part) AS " +
+ "SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' END
AS part " +
+ "FROM %s ORDER BY 3, 1", tableName, sourceName);
+
+ // spark_catalog does not use an atomic replace, so the table history and
old spec is dropped
+ // the other catalogs do use atomic replace, so the spec id is incremented
+ boolean isAtomic = !"spark_catalog".equals(catalogName);
+
+ Schema expectedSchema = new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get()),
+ Types.NestedField.optional(3, "part", Types.StringType.get())
+ );
+
+ int specId = isAtomic ? 1 : 0;
+ PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
+ .identity("part")
+ .withSpecId(specId)
+ .build();
+
+ Table rtasTable = validationCatalog.loadTable(tableIdent);
+
+ // the replacement table has a different schema and partition spec than
the original
+ Assert.assertEquals("Should have expected nullable schema",
+ expectedSchema.asStruct(), rtasTable.schema().asStruct());
+ Assert.assertEquals("Should be partitioned by part",
+ expectedSpec, rtasTable.spec());
+
+ assertEquals("Should have rows matching the source table",
+ sql("SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd'
END AS part " +
+ "FROM %s ORDER BY id", sourceName),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ Assert.assertEquals("Table should have expected snapshots",
+ isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
+ }
+
+ @Test
+ public void testCreateRTAS() {
+ sql("CREATE OR REPLACE TABLE %s USING iceberg PARTITIONED BY (part) AS " +
+ "SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' END
AS part " +
+ "FROM %s ORDER BY 3, 1", tableName, sourceName);
+
+ assertEquals("Should have rows matching the source table",
+ sql("SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd'
END AS part " +
+ "FROM %s ORDER BY id", sourceName),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ sql("CREATE OR REPLACE TABLE %s USING iceberg PARTITIONED BY (part) AS " +
+ "SELECT 2 * id as id, data, CASE WHEN ((2 * id) %% 2) = 0 THEN 'even'
ELSE 'odd' END AS part " +
+ "FROM %s ORDER BY 3, 1", tableName, sourceName);
+
+ // spark_catalog does not use an atomic replace, so the table history is
dropped
+ boolean isAtomic = !"spark_catalog".equals(catalogName);
+
+ Schema expectedSchema = new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get()),
+ Types.NestedField.optional(3, "part", Types.StringType.get())
+ );
+
+ PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
+ .identity("part")
+ .withSpecId(0) // the spec is identical and should be reused
+ .build();
+
+ Table rtasTable = validationCatalog.loadTable(tableIdent);
+
+ // the replacement table has a different schema and partition spec than
the original
+ Assert.assertEquals("Should have expected nullable schema",
+ expectedSchema.asStruct(), rtasTable.schema().asStruct());
+ Assert.assertEquals("Should be partitioned by part",
+ expectedSpec, rtasTable.spec());
+
+ assertEquals("Should have rows matching the source table",
+ sql("SELECT 2 * id, data, CASE WHEN ((2 * id) %% 2) = 0 THEN 'even'
ELSE 'odd' END AS part " +
+ "FROM %s ORDER BY id", sourceName),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ Assert.assertEquals("Table should have expected snapshots",
+ isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
+ }
+
+ @Test
+ public void testDataFrameV2Create() throws Exception {
+ spark.table(sourceName).writeTo(tableName).using("iceberg").create();
+
+ Schema expectedSchema = new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get())
+ );
+
+ Table ctasTable = validationCatalog.loadTable(tableIdent);
+
+ Assert.assertEquals("Should have expected nullable schema",
+ expectedSchema.asStruct(), ctasTable.schema().asStruct());
+ Assert.assertEquals("Should be an unpartitioned table",
+ 0, ctasTable.spec().fields().size());
+ assertEquals("Should have rows matching the source table",
+ sql("SELECT * FROM %s ORDER BY id", sourceName),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testDataFrameV2Replace() throws Exception {
+ spark.table(sourceName).writeTo(tableName).using("iceberg").create();
+
+ assertEquals("Should have rows matching the source table",
+ sql("SELECT * FROM %s ORDER BY id", sourceName),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ spark.table(sourceName)
+ .select(
+ col("id"),
+ col("data"),
+ when(col("id").mod(lit(2)).equalTo(lit(0)),
lit("even")).otherwise("odd").as("part"))
+ .orderBy("part", "id")
+ .writeTo(tableName)
+ .partitionedBy(col("part"))
+ .using("iceberg")
+ .replace();
+
+ // spark_catalog does not use an atomic replace, so the table history and
old spec is dropped
+ // the other catalogs do use atomic replace, so the spec id is incremented
+ boolean isAtomic = !"spark_catalog".equals(catalogName);
+
+ Schema expectedSchema = new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get()),
+ Types.NestedField.optional(3, "part", Types.StringType.get())
+ );
+
+ int specId = isAtomic ? 1 : 0;
+ PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
+ .identity("part")
+ .withSpecId(specId)
+ .build();
+
+ Table rtasTable = validationCatalog.loadTable(tableIdent);
+
+ // the replacement table has a different schema and partition spec than
the original
+ Assert.assertEquals("Should have expected nullable schema",
+ expectedSchema.asStruct(), rtasTable.schema().asStruct());
+ Assert.assertEquals("Should be partitioned by part",
+ expectedSpec, rtasTable.spec());
+
+ assertEquals("Should have rows matching the source table",
+ sql("SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd'
END AS part " +
+ "FROM %s ORDER BY id", sourceName),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ Assert.assertEquals("Table should have expected snapshots",
+ isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
+ }
+
+ @Test
+ public void testDataFrameV2CreateOrReplace() {
+ spark.table(sourceName)
+ .select(
+ col("id"),
+ col("data"),
+ when(col("id").mod(lit(2)).equalTo(lit(0)),
lit("even")).otherwise("odd").as("part"))
+ .orderBy("part", "id")
+ .writeTo(tableName)
+ .partitionedBy(col("part"))
+ .using("iceberg")
+ .createOrReplace();
+
+ assertEquals("Should have rows matching the source table",
+ sql("SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd'
END AS part " +
+ "FROM %s ORDER BY id", sourceName),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ spark.table(sourceName)
+ .select(col("id").multiply(lit(2)).as("id"), col("data"))
+ .select(
+ col("id"),
+ col("data"),
+ when(col("id").mod(lit(2)).equalTo(lit(0)),
lit("even")).otherwise("odd").as("part"))
+ .orderBy("part", "id")
+ .writeTo(tableName)
+ .partitionedBy(col("part"))
+ .using("iceberg")
+ .createOrReplace();
+
+ // spark_catalog does not use an atomic replace, so the table history is
dropped
+ boolean isAtomic = !"spark_catalog".equals(catalogName);
+
+ Schema expectedSchema = new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get()),
+ Types.NestedField.optional(3, "part", Types.StringType.get())
+ );
+
+ PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
+ .identity("part")
+ .withSpecId(0) // the spec is identical and should be reused
+ .build();
+
+ Table rtasTable = validationCatalog.loadTable(tableIdent);
+
+ // the replacement table has a different schema and partition spec than
the original
+ Assert.assertEquals("Should have expected nullable schema",
+ expectedSchema.asStruct(), rtasTable.schema().asStruct());
+ Assert.assertEquals("Should be partitioned by part",
+ expectedSpec, rtasTable.spec());
+
+ assertEquals("Should have rows matching the source table",
+ sql("SELECT 2 * id, data, CASE WHEN ((2 * id) %% 2) = 0 THEN 'even'
ELSE 'odd' END AS part " +
+ "FROM %s ORDER BY id", sourceName),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ Assert.assertEquals("Table should have expected snapshots",
+ isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
+ }
+}
diff --git
a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java
new file mode 100644
index 0000000..3387d76
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDeleteFrom extends SparkCatalogTestBase {
+ public TestDeleteFrom(String catalogName, String implementation, Map<String,
String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testDeleteFromUnpartitionedTable() {
+ // set the shuffle partitions to 1 to force the write to use a single task
and produce 1 file
+ String originalParallelism =
spark.conf().get("spark.sql.shuffle.partitions");
+ spark.conf().set("spark.sql.shuffle.partitions", "1");
+ try {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c')",
tableName);
+
+ assertEquals("Should have expected rows",
+ ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c")),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ AssertHelpers.assertThrows("Should not delete when not all rows of a
file match the filter",
+ IllegalArgumentException.class, "Failed to cleanly delete data
files",
+ () -> sql("DELETE FROM %s WHERE id < 2", tableName));
+
+ sql("DELETE FROM %s WHERE id < 4", tableName);
+
+ Assert.assertEquals("Should have no rows after successful delete",
+ 0L, scalarSql("SELECT count(1) FROM %s", tableName));
+
+ } finally {
+ spark.conf().set("spark.sql.shuffle.partitions", originalParallelism);
+ }
+ }
+
+ @Test
+ public void testDeleteFromPartitionedTable() {
+ // set the shuffle partitions to 1 to force the write to use a single task
and produce 1 file per partition
+ String originalParallelism =
spark.conf().get("spark.sql.shuffle.partitions");
+ spark.conf().set("spark.sql.shuffle.partitions", "1");
+ try {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg " +
+ "PARTITIONED BY (truncate(id, 2))", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c')",
tableName);
+
+ assertEquals("Should have 3 rows in 2 partitions",
+ ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c")),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ AssertHelpers.assertThrows("Should not delete when not all rows of a
file match the filter",
+ IllegalArgumentException.class, "Failed to cleanly delete data
files",
+ () -> sql("DELETE FROM %s WHERE id > 2", tableName));
+
+ sql("DELETE FROM %s WHERE id < 2", tableName);
+
+ assertEquals("Should have two rows in the second partition",
+ ImmutableList.of(row(2L, "b"), row(3L, "c")),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+
+ } finally {
+ spark.conf().set("spark.sql.shuffle.partitions", originalParallelism);
+ }
+ }
+}
diff --git
a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestNamespaceSQL.java
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestNamespaceSQL.java
index 1b11845..d1eac31 100644
--- a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestNamespaceSQL.java
+++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestNamespaceSQL.java
@@ -69,7 +69,7 @@ public class TestNamespaceSQL extends SparkCatalogTestBase {
sql("USE %s", catalogName);
- String[] current = Iterables.getOnlyElement(sql("SHOW CURRENT NAMESPACE"));
+ Object[] current = Iterables.getOnlyElement(sql("SHOW CURRENT NAMESPACE"));
Assert.assertEquals("Should use the current catalog", current[0],
catalogName);
Assert.assertEquals("Should use the configured default namespace",
current[1], "default");
}
@@ -114,12 +114,12 @@ public class TestNamespaceSQL extends
SparkCatalogTestBase {
Assert.assertTrue("Namespace should exist",
validationNamespaceCatalog.namespaceExists(NS));
- List<String[]> rows = sql("SHOW TABLES IN %s", fullNamespace);
+ List<Object[]> rows = sql("SHOW TABLES IN %s", fullNamespace);
Assert.assertEquals("Should not list any tables", 0, rows.size());
sql("CREATE TABLE %s.table (id bigint) USING iceberg", fullNamespace);
- String[] row = Iterables.getOnlyElement(sql("SHOW TABLES IN %s",
fullNamespace));
+ Object[] row = Iterables.getOnlyElement(sql("SHOW TABLES IN %s",
fullNamespace));
Assert.assertEquals("Namespace should match", "db", row[0]);
Assert.assertEquals("Table name should match", "table", row[1]);
}
@@ -132,21 +132,21 @@ public class TestNamespaceSQL extends
SparkCatalogTestBase {
Assert.assertTrue("Namespace should exist",
validationNamespaceCatalog.namespaceExists(NS));
- List<String[]> namespaces = sql("SHOW NAMESPACES IN %s", catalogName);
+ List<Object[]> namespaces = sql("SHOW NAMESPACES IN %s", catalogName);
if (isHadoopCatalog) {
Assert.assertEquals("Should have 1 namespace", 1, namespaces.size());
- Set<String> namespaceNames = namespaces.stream().map(arr ->
arr[0]).collect(Collectors.toSet());
+ Set<String> namespaceNames = namespaces.stream().map(arr ->
arr[0].toString()).collect(Collectors.toSet());
Assert.assertEquals("Should have only db namespace",
ImmutableSet.of("db"), namespaceNames);
} else {
Assert.assertEquals("Should have 2 namespaces", 2, namespaces.size());
- Set<String> namespaceNames = namespaces.stream().map(arr ->
arr[0]).collect(Collectors.toSet());
+ Set<String> namespaceNames = namespaces.stream().map(arr ->
arr[0].toString()).collect(Collectors.toSet());
Assert.assertEquals("Should have default and db namespaces",
ImmutableSet.of("default", "db"), namespaceNames);
}
- List<String[]> nestedNamespaces = sql("SHOW NAMESPACES IN %s",
fullNamespace);
+ List<Object[]> nestedNamespaces = sql("SHOW NAMESPACES IN %s",
fullNamespace);
- Set<String> nestedNames = nestedNamespaces.stream().map(arr ->
arr[0]).collect(Collectors.toSet());
+ Set<String> nestedNames = nestedNamespaces.stream().map(arr ->
arr[0].toString()).collect(Collectors.toSet());
Assert.assertEquals("Should not have nested namespaces",
ImmutableSet.of(), nestedNames);
}
diff --git
a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java
new file mode 100644
index 0000000..8dcc6a0
--- /dev/null
+++
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.functions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestPartitionedWrites extends SparkCatalogTestBase {
+ public TestPartitionedWrites(String catalogName, String implementation,
Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @Before
+ public void createTables() {
+ sql("CREATE TABLE %s (id bigint, data string) USING iceberg PARTITIONED BY
(truncate(id, 3))", tableName);
+ sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName);
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testInsertAppend() {
+ Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*)
FROM %s", tableName));
+
+ sql("INSERT INTO %s VALUES (4, 'd'), (5, 'e')", tableName);
+
+ Assert.assertEquals("Should have 5 rows after insert", 5L,
scalarSql("SELECT count(*) FROM %s", tableName));
+
+ List<Object[]> expected = ImmutableList.of(
+ row(1L, "a"),
+ row(2L, "b"),
+ row(3L, "c"),
+ row(4L, "d"),
+ row(5L, "e")
+ );
+
+ assertEquals("Row data should match expected", expected, sql("SELECT *
FROM %s ORDER BY id", tableName));
+ }
+
+ @Ignore // broken because of SPARK-32168, which should be fixed in 3.0.1
+ public void testInsertOverwrite() {
+ Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*)
FROM %s", tableName));
+
+ // 4 and 5 replace 3 in the partition (id - (id % 3)) = 3
+ sql("INSERT OVERWRITE %s VALUES (4, 'd'), (5, 'e')", tableName);
+
+ Assert.assertEquals("Should have 4 rows after overwrite", 4L,
scalarSql("SELECT count(*) FROM %s", tableName));
+
+ List<Object[]> expected = ImmutableList.of(
+ row(1L, "a"),
+ row(2L, "b"),
+ row(4L, "d"),
+ row(5L, "e")
+ );
+
+ assertEquals("Row data should match expected", expected, sql("SELECT *
FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testDataFrameV2Append() throws NoSuchTableException {
+ Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*)
FROM %s", tableName));
+
+ List<SimpleRecord> data = ImmutableList.of(
+ new SimpleRecord(4, "d"),
+ new SimpleRecord(5, "e")
+ );
+ Dataset<Row> ds = spark.createDataFrame(data, SimpleRecord.class);
+
+ ds.writeTo(tableName).append();
+
+ Assert.assertEquals("Should have 5 rows after insert", 5L,
scalarSql("SELECT count(*) FROM %s", tableName));
+
+ List<Object[]> expected = ImmutableList.of(
+ row(1L, "a"),
+ row(2L, "b"),
+ row(3L, "c"),
+ row(4L, "d"),
+ row(5L, "e")
+ );
+
+ assertEquals("Row data should match expected", expected, sql("SELECT *
FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testDataFrameV2DynamicOverwrite() throws NoSuchTableException {
+ Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*)
FROM %s", tableName));
+
+ List<SimpleRecord> data = ImmutableList.of(
+ new SimpleRecord(4, "d"),
+ new SimpleRecord(5, "e")
+ );
+ Dataset<Row> ds = spark.createDataFrame(data, SimpleRecord.class);
+
+ ds.writeTo(tableName).overwritePartitions();
+
+ Assert.assertEquals("Should have 4 rows after overwrite", 4L,
scalarSql("SELECT count(*) FROM %s", tableName));
+
+ List<Object[]> expected = ImmutableList.of(
+ row(1L, "a"),
+ row(2L, "b"),
+ row(4L, "d"),
+ row(5L, "e")
+ );
+
+ assertEquals("Row data should match expected", expected, sql("SELECT *
FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testDataFrameV2Overwrite() throws NoSuchTableException {
+ Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*)
FROM %s", tableName));
+
+ List<SimpleRecord> data = ImmutableList.of(
+ new SimpleRecord(4, "d"),
+ new SimpleRecord(5, "e")
+ );
+ Dataset<Row> ds = spark.createDataFrame(data, SimpleRecord.class);
+
+ ds.writeTo(tableName).overwrite(functions.col("id").$less(3));
+
+ Assert.assertEquals("Should have 3 rows after overwrite", 3L,
scalarSql("SELECT count(*) FROM %s", tableName));
+
+ List<Object[]> expected = ImmutableList.of(
+ row(3L, "c"),
+ row(4L, "d"),
+ row(5L, "e")
+ );
+
+ assertEquals("Row data should match expected", expected, sql("SELECT *
FROM %s ORDER BY id", tableName));
+ }
+}
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
new file mode 100644
index 0000000..51da073
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.events.ScanEvent;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSelect extends SparkCatalogTestBase {
+ private int scanEventCount = 0;
+ private ScanEvent lastScanEvent = null;
+
+ public TestSelect(String catalogName, String implementation, Map<String,
String> config) {
+ super(catalogName, implementation, config);
+
+ // register a scan event listener to validate pushdown
+ Listeners.register(event -> {
+ scanEventCount += 1;
+ lastScanEvent = event;
+ }, ScanEvent.class);
+ }
+
+ @Before
+ public void createTables() {
+ sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+ sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName);
+
+ this.scanEventCount = 0;
+ this.lastScanEvent = null;
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testSelect() {
+ List<Object[]> expected = ImmutableList.of(row(1L, "a"), row(2L, "b"),
row(3L, "c"));
+
+ assertEquals("Should return all expected rows", expected, sql("SELECT *
FROM %s", tableName));
+ }
+
+ @Test
+ public void testProjection() {
+ List<Object[]> expected = ImmutableList.of(row(1L), row(2L), row(3L));
+
+ assertEquals("Should return all expected rows", expected, sql("SELECT id
FROM %s", tableName));
+
+ Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+ Assert.assertEquals("Should not push down a filter",
Expressions.alwaysTrue(), lastScanEvent.filter());
+ Assert.assertEquals("Should project only the id column",
+
validationCatalog.loadTable(tableIdent).schema().select("id").asStruct(),
+ lastScanEvent.projection().asStruct());
+ }
+
+ @Test
+ public void testExpressionPushdown() {
+ List<Object[]> expected = ImmutableList.of(row("b"));
+
+ assertEquals("Should return all expected rows", expected, sql("SELECT data
FROM %s WHERE id = 2", tableName));
+
+ Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+ Assert.assertEquals("Should not push down a filter",
+ "(id IS NOT NULL AND id = 2)",
+ Spark3Util.describe(lastScanEvent.filter()));
+ Assert.assertEquals("Should project only the id column",
+ validationCatalog.loadTable(tableIdent).schema().asStruct(),
+ lastScanEvent.projection().asStruct());
+ }
+
+ @Test
+ public void testMetadataTables() {
+ Assume.assumeFalse(
+ "Spark session catalog does not support metadata tables",
+ "spark_catalog".equals(catalogName));
+
+ assertEquals("Snapshot metadata table",
+ ImmutableList.of(row(ANY, ANY, null, "append", ANY, ANY)),
+ sql("SELECT * FROM %s.snapshots", tableName));
+ }
+}
diff --git
a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestUnpartitionedWrites.java
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestUnpartitionedWrites.java
new file mode 100644
index 0000000..988df10
--- /dev/null
+++
b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestUnpartitionedWrites.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.functions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestUnpartitionedWrites extends SparkCatalogTestBase {
+ public TestUnpartitionedWrites(String catalogName, String implementation,
Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @Before
+ public void createTables() {
+ sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+ sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName);
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testInsertAppend() {
+ Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*)
FROM %s", tableName));
+
+ sql("INSERT INTO %s VALUES (4, 'd'), (5, 'e')", tableName);
+
+ Assert.assertEquals("Should have 5 rows after insert", 5L,
scalarSql("SELECT count(*) FROM %s", tableName));
+
+ List<Object[]> expected = ImmutableList.of(
+ row(1L, "a"),
+ row(2L, "b"),
+ row(3L, "c"),
+ row(4L, "d"),
+ row(5L, "e")
+ );
+
+ assertEquals("Row data should match expected", expected, sql("SELECT *
FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testInsertOverwrite() {
+ Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*)
FROM %s", tableName));
+
+ sql("INSERT OVERWRITE %s VALUES (4, 'd'), (5, 'e')", tableName);
+
+ Assert.assertEquals("Should have 2 rows after overwrite", 2L,
scalarSql("SELECT count(*) FROM %s", tableName));
+
+ List<Object[]> expected = ImmutableList.of(
+ row(4L, "d"),
+ row(5L, "e")
+ );
+
+ assertEquals("Row data should match expected", expected, sql("SELECT *
FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testDataFrameV2Append() throws NoSuchTableException {
+ Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*)
FROM %s", tableName));
+
+ List<SimpleRecord> data = ImmutableList.of(
+ new SimpleRecord(4, "d"),
+ new SimpleRecord(5, "e")
+ );
+ Dataset<Row> ds = spark.createDataFrame(data, SimpleRecord.class);
+
+ ds.writeTo(tableName).append();
+
+ Assert.assertEquals("Should have 5 rows after insert", 5L,
scalarSql("SELECT count(*) FROM %s", tableName));
+
+ List<Object[]> expected = ImmutableList.of(
+ row(1L, "a"),
+ row(2L, "b"),
+ row(3L, "c"),
+ row(4L, "d"),
+ row(5L, "e")
+ );
+
+ assertEquals("Row data should match expected", expected, sql("SELECT *
FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testDataFrameV2DynamicOverwrite() throws NoSuchTableException {
+ Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*)
FROM %s", tableName));
+
+ List<SimpleRecord> data = ImmutableList.of(
+ new SimpleRecord(4, "d"),
+ new SimpleRecord(5, "e")
+ );
+ Dataset<Row> ds = spark.createDataFrame(data, SimpleRecord.class);
+
+ ds.writeTo(tableName).overwritePartitions();
+
+ Assert.assertEquals("Should have 2 rows after overwrite", 2L,
scalarSql("SELECT count(*) FROM %s", tableName));
+
+ List<Object[]> expected = ImmutableList.of(
+ row(4L, "d"),
+ row(5L, "e")
+ );
+
+ assertEquals("Row data should match expected", expected, sql("SELECT *
FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testDataFrameV2Overwrite() throws NoSuchTableException {
+ Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*)
FROM %s", tableName));
+
+ List<SimpleRecord> data = ImmutableList.of(
+ new SimpleRecord(4, "d"),
+ new SimpleRecord(5, "e")
+ );
+ Dataset<Row> ds = spark.createDataFrame(data, SimpleRecord.class);
+
+ ds.writeTo(tableName).overwrite(functions.col("id").$less$eq(3));
+
+ Assert.assertEquals("Should have 2 rows after overwrite", 2L,
scalarSql("SELECT count(*) FROM %s", tableName));
+
+ List<Object[]> expected = ImmutableList.of(
+ row(4L, "d"),
+ row(5L, "e")
+ );
+
+ assertEquals("Row data should match expected", expected, sql("SELECT *
FROM %s ORDER BY id", tableName));
+ }
+}