This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new a99dc4f2fa Spark 4.0: Add schema conversion support for default values
(#14407)
a99dc4f2fa is described below
commit a99dc4f2faa3b52140f3f1432187f5e575bb32bd
Author: Drew Gallardo <[email protected]>
AuthorDate: Wed Oct 29 12:02:52 2025 -0700
Spark 4.0: Add schema conversion support for default values (#14407)
---
.../org/apache/iceberg/spark/TypeToSparkType.java | 17 ++
.../apache/iceberg/spark/TestSparkSchemaUtil.java | 189 +++++++++++++++++
.../iceberg/spark/sql/TestSparkDefaultValues.java | 234 +++++++++++++++++++++
3 files changed, 440 insertions(+)
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java
index cbee10fbb0..09c89bbba8 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java
@@ -25,6 +25,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.expressions.Literal$;
import org.apache.spark.sql.types.ArrayType$;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.BooleanType$;
@@ -69,6 +70,22 @@ class TypeToSparkType extends
TypeUtil.SchemaVisitor<DataType> {
if (field.doc() != null) {
sparkField = sparkField.withComment(field.doc());
}
+
+ // Convert both write and initial default values to Spark SQL string
literal representations
+ // on the StructField metadata
+ if (field.writeDefault() != null) {
+ Object writeDefault = SparkUtil.internalToSpark(field.type(),
field.writeDefault());
+ sparkField =
+
sparkField.withCurrentDefaultValue(Literal$.MODULE$.create(writeDefault,
type).sql());
+ }
+
+ if (field.initialDefault() != null) {
+ Object initialDefault = SparkUtil.internalToSpark(field.type(),
field.initialDefault());
+ sparkField =
+ sparkField.withExistenceDefaultValue(
+ Literal$.MODULE$.create(initialDefault, type).sql());
+ }
+
sparkFields.add(sparkField);
}
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java
index 4045847d5a..d5f407a715 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java
@@ -21,15 +21,27 @@ package org.apache.iceberg.spark;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assertions.assertThat;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.List;
+import java.util.TimeZone;
+import java.util.stream.Stream;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.MetadataAttribute;
import org.apache.spark.sql.catalyst.types.DataTypeUtils;
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils$;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
public class TestSparkSchemaUtil {
private static final Schema TEST_SCHEMA =
@@ -80,4 +92,181 @@ public class TestSparkSchemaUtil {
}
}
}
+
+ @Test
+ public void testSchemaConversionWithOnlyWriteDefault() {
+ Schema schema =
+ new Schema(
+ Types.NestedField.optional("field")
+ .withId(1)
+ .ofType(Types.StringType.get())
+ .withWriteDefault(Literal.of("write_only"))
+ .build());
+
+ StructType sparkSchema = SparkSchemaUtil.convert(schema);
+ Metadata metadata = sparkSchema.fields()[0].metadata();
+
+ assertThat(
+ metadata.contains(
+
ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY()))
+ .as("Field with only write default should have CURRENT_DEFAULT
metadata")
+ .isTrue();
+ assertThat(
+ metadata.contains(
+
ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY()))
+ .as("Field with only write default should not have EXISTS_DEFAULT
metadata")
+ .isFalse();
+ assertThat(
+ metadata.getString(
+
ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY()))
+ .as("Spark metadata CURRENT_DEFAULT should contain correctly formatted
literal")
+ .isEqualTo("'write_only'");
+ }
+
+ @Test
+ public void testSchemaConversionWithOnlyInitialDefault() {
+ Schema schema =
+ new Schema(
+ Types.NestedField.optional("field")
+ .withId(1)
+ .ofType(Types.IntegerType.get())
+ .withInitialDefault(Literal.of(42))
+ .build());
+
+ StructType sparkSchema = SparkSchemaUtil.convert(schema);
+ Metadata metadata = sparkSchema.fields()[0].metadata();
+
+ assertThat(
+ metadata.contains(
+
ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY()))
+ .as("Field with only initial default should not have CURRENT_DEFAULT
metadata")
+ .isFalse();
+ assertThat(
+ metadata.contains(
+
ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY()))
+ .as("Field with only initial default should have EXISTS_DEFAULT
metadata")
+ .isTrue();
+ assertThat(
+ metadata.getString(
+
ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY()))
+ .as("Spark metadata EXISTS_DEFAULT should contain correctly formatted
literal")
+ .isEqualTo("42");
+ }
+
+ @ParameterizedTest(name = "{0} with writeDefault={1}, initialDefault={2}")
+ @MethodSource("schemaConversionWithDefaultsTestCases")
+ public void testSchemaConversionWithDefaultsForPrimitiveTypes(
+ Type type,
+ Literal<?> writeDefault,
+ Literal<?> initialDefault,
+ String expectedCurrentDefaultValue,
+ String expectedExistsDefaultValue) {
+ TimeZone systemTimeZone = TimeZone.getDefault();
+ try {
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+ Schema schema =
+ new Schema(
+ Types.NestedField.optional("field")
+ .withId(1)
+ .ofType(type)
+ .withWriteDefault(writeDefault)
+ .withInitialDefault(initialDefault)
+ .build());
+
+ StructType sparkSchema = SparkSchemaUtil.convert(schema);
+ StructField defaultField = sparkSchema.fields()[0];
+ Metadata metadata = defaultField.metadata();
+
+ assertThat(
+ metadata.contains(
+
ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY()))
+ .as("Field of type %s should have CURRENT_DEFAULT metadata", type)
+ .isTrue();
+ assertThat(
+ metadata.contains(
+
ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY()))
+ .as("Field of type %s should have EXISTS_DEFAULT metadata", type)
+ .isTrue();
+ assertThat(
+ metadata.getString(
+
ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY()))
+ .as(
+ "Spark metadata CURRENT_DEFAULT for type %s should contain
correctly formatted literal",
+ type)
+ .isEqualTo(expectedCurrentDefaultValue);
+ assertThat(
+ metadata.getString(
+
ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY()))
+ .as(
+ "Spark metadata EXISTS_DEFAULT for type %s should contain
correctly formatted literal",
+ type)
+ .isEqualTo(expectedExistsDefaultValue);
+ } finally {
+ TimeZone.setDefault(systemTimeZone);
+ }
+ }
+
+ private static Stream<Arguments> schemaConversionWithDefaultsTestCases() {
+ return Stream.of(
+ Arguments.of(Types.IntegerType.get(), Literal.of(1), Literal.of(2),
"1", "2"),
+ Arguments.of(
+ Types.StringType.get(),
+ Literal.of("write_default"),
+ Literal.of("initial_default"),
+ "'write_default'",
+ "'initial_default'"),
+ Arguments.of(
+ Types.UUIDType.get(),
+
Literal.of("f79c3e09-677c-4bbd-a479-3f349cb785e7").to(Types.UUIDType.get()),
+
Literal.of("f79c3e09-677c-4bbd-a479-3f349cb685e7").to(Types.UUIDType.get()),
+ "'f79c3e09-677c-4bbd-a479-3f349cb785e7'",
+ "'f79c3e09-677c-4bbd-a479-3f349cb685e7'"),
+ Arguments.of(Types.BooleanType.get(), Literal.of(true),
Literal.of(false), "true", "false"),
+ Arguments.of(Types.IntegerType.get(), Literal.of(42), Literal.of(10),
"42", "10"),
+ Arguments.of(Types.LongType.get(), Literal.of(100L), Literal.of(50L),
"100L", "50L"),
+ Arguments.of(
+ Types.FloatType.get(),
+ Literal.of(3.14f),
+ Literal.of(1.5f),
+ "CAST('3.14' AS FLOAT)",
+ "CAST('1.5' AS FLOAT)"),
+ Arguments.of(
+ Types.DoubleType.get(), Literal.of(2.718), Literal.of(1.414),
"2.718D", "1.414D"),
+ Arguments.of(
+ Types.DecimalType.of(10, 2),
+ Literal.of(new BigDecimal("99.99")),
+ Literal.of(new BigDecimal("11.11")),
+ "99.99BD",
+ "11.11BD"),
+ Arguments.of(
+ Types.DateType.get(),
+ Literal.of("2024-01-01").to(Types.DateType.get()),
+ Literal.of("2023-01-01").to(Types.DateType.get()),
+ "DATE '2024-01-01'",
+ "DATE '2023-01-01'"),
+ Arguments.of(
+ Types.TimestampType.withZone(),
+
Literal.of("2017-11-30T10:30:07.123456+00:00").to(Types.TimestampType.withZone()),
+
Literal.of("2017-11-29T10:30:07.123456+00:00").to(Types.TimestampType.withZone()),
+ "TIMESTAMP '2017-11-30 10:30:07.123456'",
+ "TIMESTAMP '2017-11-29 10:30:07.123456'"),
+ Arguments.of(
+ Types.TimestampType.withoutZone(),
+
Literal.of("2017-11-30T10:30:07.123456").to(Types.TimestampType.withoutZone()),
+
Literal.of("2017-11-29T10:30:07.123456").to(Types.TimestampType.withoutZone()),
+ "TIMESTAMP_NTZ '2017-11-30 10:30:07.123456'",
+ "TIMESTAMP_NTZ '2017-11-29 10:30:07.123456'"),
+ Arguments.of(
+ Types.BinaryType.get(),
+ Literal.of(ByteBuffer.wrap(new byte[] {0x0a, 0x0b})),
+ Literal.of(ByteBuffer.wrap(new byte[] {0x01, 0x02})),
+ "X'0A0B'",
+ "X'0102'"),
+ Arguments.of(
+ Types.FixedType.ofLength(4),
+ Literal.of("test".getBytes()),
+ Literal.of("init".getBytes()),
+ "X'74657374'",
+ "X'696E6974'"));
+ }
}
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java
new file mode 100644
index 0000000000..ba856dc538
--- /dev/null
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.AnalysisException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+
+/**
+ * Tests for Spark SQL Default values integration with Iceberg default values.
+ *
+ * <p>Note: These tests use {@code validationCatalog.createTable()} to create
tables with default
+ * values because the Iceberg Spark integration does not yet support default
value clauses in Spark
+ * DDL.
+ *
+ * <p>Partial column INSERT statements (e.g., {@code INSERT INTO table (col1)
VALUES (val1)}) are
+ * not supported for DSV2 in Spark 4.0
+ */
+public class TestSparkDefaultValues extends CatalogTestBase {
+
+ @AfterEach
+ public void dropTestTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @TestTemplate
+ public void testWriteDefaultWithSparkDefaultKeyword() {
+ assertThat(validationCatalog.tableExists(tableIdent))
+ .as("Table should not already exist")
+ .isFalse();
+
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional("bool_col")
+ .withId(2)
+ .ofType(Types.BooleanType.get())
+ .withWriteDefault(Literal.of(true))
+ .build(),
+ Types.NestedField.optional("int_col")
+ .withId(3)
+ .ofType(Types.IntegerType.get())
+ .withWriteDefault(Literal.of(42))
+ .build(),
+ Types.NestedField.optional("long_col")
+ .withId(4)
+ .ofType(Types.LongType.get())
+ .withWriteDefault(Literal.of(100L))
+ .build());
+
+ validationCatalog.createTable(
+ tableIdent, schema, PartitionSpec.unpartitioned(),
ImmutableMap.of("format-version", "3"));
+
+ sql("INSERT INTO %s VALUES (1, DEFAULT, DEFAULT, DEFAULT)",
commitTarget());
+
+ assertEquals(
+ "Should have expected default values",
+ ImmutableList.of(row(1, true, 42, 100L)),
+ sql("SELECT * FROM %s", selectTarget()));
+ }
+
+ @TestTemplate
+ public void testWriteDefaultWithDefaultKeywordAndReorderedSchema() {
+ assertThat(validationCatalog.tableExists(tableIdent))
+ .as("Table should not already exist")
+ .isFalse();
+
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional("int_col")
+ .withId(2)
+ .ofType(Types.IntegerType.get())
+ .withWriteDefault(Literal.of(123))
+ .build(),
+ Types.NestedField.optional("string_col")
+ .withId(3)
+ .ofType(Types.StringType.get())
+ .withWriteDefault(Literal.of("doom"))
+ .build());
+
+ validationCatalog.createTable(
+ tableIdent, schema, PartitionSpec.unpartitioned(),
ImmutableMap.of("format-version", "3"));
+
+ // Insert with columns in different order than table schema
+ sql("INSERT INTO %s (int_col, id, string_col) VALUES (DEFAULT, 1,
DEFAULT)", commitTarget());
+
+ assertEquals(
+ "Should apply correct defaults regardless of column order",
+ ImmutableList.of(row(1, 123, "doom")),
+ sql("SELECT * FROM %s", selectTarget()));
+ }
+
+ @TestTemplate
+ public void testBulkInsertWithDefaults() {
+ assertThat(validationCatalog.tableExists(tableIdent))
+ .as("Table should not already exist")
+ .isFalse();
+
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional("data")
+ .withId(2)
+ .ofType(Types.StringType.get())
+ .withWriteDefault(Literal.of("default_data"))
+ .build());
+
+ validationCatalog.createTable(
+ tableIdent, schema, PartitionSpec.unpartitioned(),
ImmutableMap.of("format-version", "3"));
+
+ sql("INSERT INTO %s VALUES (1, DEFAULT), (2, DEFAULT), (3, DEFAULT)",
commitTarget());
+
+ assertEquals(
+ "Should insert multiple rows with default values",
+ ImmutableList.of(row(1, "default_data"), row(2, "default_data"),
row(3, "default_data")),
+ sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+ }
+
+ @TestTemplate
+ public void testCreateTableWithDefaultsUnsupported() {
+ assertThat(validationCatalog.tableExists(tableIdent))
+ .as("Table should not already exist")
+ .isFalse();
+
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CREATE TABLE %s (id INT, data STRING DEFAULT
'default-value') USING iceberg",
+ tableName))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("does not support column default value");
+ }
+
+ @TestTemplate
+ public void testAlterTableAddColumnWithDefaultUnsupported() {
+ assertThat(validationCatalog.tableExists(tableIdent))
+ .as("Table should not already exist")
+ .isFalse();
+
+ Schema schema = new Schema(Types.NestedField.required(1, "id",
Types.IntegerType.get()));
+
+ validationCatalog.createTable(
+ tableIdent, schema, PartitionSpec.unpartitioned(),
ImmutableMap.of("format-version", "3"));
+
+ assertThatThrownBy(
+ () -> sql("ALTER TABLE %s ADD COLUMN data STRING DEFAULT
'default-value'", tableName))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("default values in Spark is currently
unsupported");
+ }
+
+ @TestTemplate
+ public void testPartialInsertUnsupported() {
+ assertThat(validationCatalog.tableExists(tableIdent))
+ .as("Table should not already exist")
+ .isFalse();
+
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional("data")
+ .withId(2)
+ .ofType(Types.StringType.get())
+ .withWriteDefault(Literal.of("default-data"))
+ .build());
+
+ validationCatalog.createTable(
+ tableIdent, schema, PartitionSpec.unpartitioned(),
ImmutableMap.of("format-version", "3"));
+
+ assertThatThrownBy(() -> sql("INSERT INTO %s (id) VALUES (1)",
commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot find data for the output column");
+ }
+
+ @TestTemplate
+ public void testSchemaEvolutionWithDefaultValueChanges() {
+ assertThat(validationCatalog.tableExists(tableIdent))
+ .as("Table should not already exist")
+ .isFalse();
+
+ Schema initialSchema = new Schema(Types.NestedField.required(1, "id",
Types.IntegerType.get()));
+
+ validationCatalog.createTable(
+ tableIdent,
+ initialSchema,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of("format-version", "3"));
+
+ sql("INSERT INTO %s VALUES (1), (2)", commitTarget());
+
+ // Add a column with a default value
+ validationCatalog
+ .loadTable(tableIdent)
+ .updateSchema()
+ .addColumn("data", Types.StringType.get(), Literal.of("default_data"))
+ .commit();
+
+ // Refresh this when using SparkCatalog since otherwise the new column
would not be caught.
+ sql("REFRESH TABLE %s", commitTarget());
+
+ sql("INSERT INTO %s VALUES (3, DEFAULT)", commitTarget());
+
+ assertEquals(
+ "Should have correct default values for existing and new rows",
+ ImmutableList.of(row(1, "default_data"), row(2, "default_data"),
row(3, "default_data")),
+ sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+ }
+}