This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new a99dc4f2fa Spark 4.0: Add schema conversion support for default values 
(#14407)
a99dc4f2fa is described below

commit a99dc4f2faa3b52140f3f1432187f5e575bb32bd
Author: Drew Gallardo <[email protected]>
AuthorDate: Wed Oct 29 12:02:52 2025 -0700

    Spark 4.0: Add schema conversion support for default values (#14407)
---
 .../org/apache/iceberg/spark/TypeToSparkType.java  |  17 ++
 .../apache/iceberg/spark/TestSparkSchemaUtil.java  | 189 +++++++++++++++++
 .../iceberg/spark/sql/TestSparkDefaultValues.java  | 234 +++++++++++++++++++++
 3 files changed, 440 insertions(+)

diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java
index cbee10fbb0..09c89bbba8 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java
@@ -25,6 +25,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.expressions.Literal$;
 import org.apache.spark.sql.types.ArrayType$;
 import org.apache.spark.sql.types.BinaryType$;
 import org.apache.spark.sql.types.BooleanType$;
@@ -69,6 +70,22 @@ class TypeToSparkType extends 
TypeUtil.SchemaVisitor<DataType> {
       if (field.doc() != null) {
         sparkField = sparkField.withComment(field.doc());
       }
+
+      // Convert both write and initial default values to Spark SQL string 
literal representations
+      // on the StructField metadata
+      if (field.writeDefault() != null) {
+        Object writeDefault = SparkUtil.internalToSpark(field.type(), 
field.writeDefault());
+        sparkField =
+            
sparkField.withCurrentDefaultValue(Literal$.MODULE$.create(writeDefault, 
type).sql());
+      }
+
+      if (field.initialDefault() != null) {
+        Object initialDefault = SparkUtil.internalToSpark(field.type(), 
field.initialDefault());
+        sparkField =
+            sparkField.withExistenceDefaultValue(
+                Literal$.MODULE$.create(initialDefault, type).sql());
+      }
+
       sparkFields.add(sparkField);
     }
 
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java
index 4045847d5a..d5f407a715 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java
@@ -21,15 +21,27 @@ package org.apache.iceberg.spark;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.TimeZone;
+import java.util.stream.Stream;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.catalyst.expressions.AttributeReference;
 import org.apache.spark.sql.catalyst.expressions.MetadataAttribute;
 import org.apache.spark.sql.catalyst.types.DataTypeUtils;
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils$;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 public class TestSparkSchemaUtil {
   private static final Schema TEST_SCHEMA =
@@ -80,4 +92,181 @@ public class TestSparkSchemaUtil {
       }
     }
   }
+
+  @Test
+  public void testSchemaConversionWithOnlyWriteDefault() {
+    Schema schema =
+        new Schema(
+            Types.NestedField.optional("field")
+                .withId(1)
+                .ofType(Types.StringType.get())
+                .withWriteDefault(Literal.of("write_only"))
+                .build());
+
+    StructType sparkSchema = SparkSchemaUtil.convert(schema);
+    Metadata metadata = sparkSchema.fields()[0].metadata();
+
+    assertThat(
+            metadata.contains(
+                
ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY()))
+        .as("Field with only write default should have CURRENT_DEFAULT 
metadata")
+        .isTrue();
+    assertThat(
+            metadata.contains(
+                
ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY()))
+        .as("Field with only write default should not have EXISTS_DEFAULT 
metadata")
+        .isFalse();
+    assertThat(
+            metadata.getString(
+                
ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY()))
+        .as("Spark metadata CURRENT_DEFAULT should contain correctly formatted 
literal")
+        .isEqualTo("'write_only'");
+  }
+
+  @Test
+  public void testSchemaConversionWithOnlyInitialDefault() {
+    Schema schema =
+        new Schema(
+            Types.NestedField.optional("field")
+                .withId(1)
+                .ofType(Types.IntegerType.get())
+                .withInitialDefault(Literal.of(42))
+                .build());
+
+    StructType sparkSchema = SparkSchemaUtil.convert(schema);
+    Metadata metadata = sparkSchema.fields()[0].metadata();
+
+    assertThat(
+            metadata.contains(
+                
ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY()))
+        .as("Field with only initial default should not have CURRENT_DEFAULT 
metadata")
+        .isFalse();
+    assertThat(
+            metadata.contains(
+                
ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY()))
+        .as("Field with only initial default should have EXISTS_DEFAULT 
metadata")
+        .isTrue();
+    assertThat(
+            metadata.getString(
+                
ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY()))
+        .as("Spark metadata EXISTS_DEFAULT should contain correctly formatted 
literal")
+        .isEqualTo("42");
+  }
+
+  @ParameterizedTest(name = "{0} with writeDefault={1}, initialDefault={2}")
+  @MethodSource("schemaConversionWithDefaultsTestCases")
+  public void testSchemaConversionWithDefaultsForPrimitiveTypes(
+      Type type,
+      Literal<?> writeDefault,
+      Literal<?> initialDefault,
+      String expectedCurrentDefaultValue,
+      String expectedExistsDefaultValue) {
+    TimeZone systemTimeZone = TimeZone.getDefault();
+    try {
+      TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+      Schema schema =
+          new Schema(
+              Types.NestedField.optional("field")
+                  .withId(1)
+                  .ofType(type)
+                  .withWriteDefault(writeDefault)
+                  .withInitialDefault(initialDefault)
+                  .build());
+
+      StructType sparkSchema = SparkSchemaUtil.convert(schema);
+      StructField defaultField = sparkSchema.fields()[0];
+      Metadata metadata = defaultField.metadata();
+
+      assertThat(
+              metadata.contains(
+                  
ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY()))
+          .as("Field of type %s should have CURRENT_DEFAULT metadata", type)
+          .isTrue();
+      assertThat(
+              metadata.contains(
+                  
ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY()))
+          .as("Field of type %s should have EXISTS_DEFAULT metadata", type)
+          .isTrue();
+      assertThat(
+              metadata.getString(
+                  
ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY()))
+          .as(
+              "Spark metadata CURRENT_DEFAULT for type %s should contain 
correctly formatted literal",
+              type)
+          .isEqualTo(expectedCurrentDefaultValue);
+      assertThat(
+              metadata.getString(
+                  
ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY()))
+          .as(
+              "Spark metadata EXISTS_DEFAULT for type %s should contain 
correctly formatted literal",
+              type)
+          .isEqualTo(expectedExistsDefaultValue);
+    } finally {
+      TimeZone.setDefault(systemTimeZone);
+    }
+  }
+
+  private static Stream<Arguments> schemaConversionWithDefaultsTestCases() {
+    return Stream.of(
+        Arguments.of(Types.IntegerType.get(), Literal.of(1), Literal.of(2), 
"1", "2"),
+        Arguments.of(
+            Types.StringType.get(),
+            Literal.of("write_default"),
+            Literal.of("initial_default"),
+            "'write_default'",
+            "'initial_default'"),
+        Arguments.of(
+            Types.UUIDType.get(),
+            
Literal.of("f79c3e09-677c-4bbd-a479-3f349cb785e7").to(Types.UUIDType.get()),
+            
Literal.of("f79c3e09-677c-4bbd-a479-3f349cb685e7").to(Types.UUIDType.get()),
+            "'f79c3e09-677c-4bbd-a479-3f349cb785e7'",
+            "'f79c3e09-677c-4bbd-a479-3f349cb685e7'"),
+        Arguments.of(Types.BooleanType.get(), Literal.of(true), 
Literal.of(false), "true", "false"),
+        Arguments.of(Types.IntegerType.get(), Literal.of(42), Literal.of(10), 
"42", "10"),
+        Arguments.of(Types.LongType.get(), Literal.of(100L), Literal.of(50L), 
"100L", "50L"),
+        Arguments.of(
+            Types.FloatType.get(),
+            Literal.of(3.14f),
+            Literal.of(1.5f),
+            "CAST('3.14' AS FLOAT)",
+            "CAST('1.5' AS FLOAT)"),
+        Arguments.of(
+            Types.DoubleType.get(), Literal.of(2.718), Literal.of(1.414), 
"2.718D", "1.414D"),
+        Arguments.of(
+            Types.DecimalType.of(10, 2),
+            Literal.of(new BigDecimal("99.99")),
+            Literal.of(new BigDecimal("11.11")),
+            "99.99BD",
+            "11.11BD"),
+        Arguments.of(
+            Types.DateType.get(),
+            Literal.of("2024-01-01").to(Types.DateType.get()),
+            Literal.of("2023-01-01").to(Types.DateType.get()),
+            "DATE '2024-01-01'",
+            "DATE '2023-01-01'"),
+        Arguments.of(
+            Types.TimestampType.withZone(),
+            
Literal.of("2017-11-30T10:30:07.123456+00:00").to(Types.TimestampType.withZone()),
+            
Literal.of("2017-11-29T10:30:07.123456+00:00").to(Types.TimestampType.withZone()),
+            "TIMESTAMP '2017-11-30 10:30:07.123456'",
+            "TIMESTAMP '2017-11-29 10:30:07.123456'"),
+        Arguments.of(
+            Types.TimestampType.withoutZone(),
+            
Literal.of("2017-11-30T10:30:07.123456").to(Types.TimestampType.withoutZone()),
+            
Literal.of("2017-11-29T10:30:07.123456").to(Types.TimestampType.withoutZone()),
+            "TIMESTAMP_NTZ '2017-11-30 10:30:07.123456'",
+            "TIMESTAMP_NTZ '2017-11-29 10:30:07.123456'"),
+        Arguments.of(
+            Types.BinaryType.get(),
+            Literal.of(ByteBuffer.wrap(new byte[] {0x0a, 0x0b})),
+            Literal.of(ByteBuffer.wrap(new byte[] {0x01, 0x02})),
+            "X'0A0B'",
+            "X'0102'"),
+        Arguments.of(
+            Types.FixedType.ofLength(4),
+            Literal.of("test".getBytes()),
+            Literal.of("init".getBytes()),
+            "X'74657374'",
+            "X'696E6974'"));
+  }
 }
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java
new file mode 100644
index 0000000000..ba856dc538
--- /dev/null
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.AnalysisException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+
+/**
+ * Tests for Spark SQL Default values integration with Iceberg default values.
+ *
+ * <p>Note: These tests use {@code validationCatalog.createTable()} to create 
tables with default
+ * values because the Iceberg Spark integration does not yet support default 
value clauses in Spark
+ * DDL.
+ *
+ * <p>Partial column INSERT statements (e.g., {@code INSERT INTO table (col1) 
VALUES (val1)}) are
+ * not supported for DSV2 in Spark 4.0
+ */
+public class TestSparkDefaultValues extends CatalogTestBase {
+
+  @AfterEach
+  public void dropTestTable() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @TestTemplate
+  public void testWriteDefaultWithSparkDefaultKeyword() {
+    assertThat(validationCatalog.tableExists(tableIdent))
+        .as("Table should not already exist")
+        .isFalse();
+
+    Schema schema =
+        new Schema(
+            Types.NestedField.required(1, "id", Types.IntegerType.get()),
+            Types.NestedField.optional("bool_col")
+                .withId(2)
+                .ofType(Types.BooleanType.get())
+                .withWriteDefault(Literal.of(true))
+                .build(),
+            Types.NestedField.optional("int_col")
+                .withId(3)
+                .ofType(Types.IntegerType.get())
+                .withWriteDefault(Literal.of(42))
+                .build(),
+            Types.NestedField.optional("long_col")
+                .withId(4)
+                .ofType(Types.LongType.get())
+                .withWriteDefault(Literal.of(100L))
+                .build());
+
+    validationCatalog.createTable(
+        tableIdent, schema, PartitionSpec.unpartitioned(), 
ImmutableMap.of("format-version", "3"));
+
+    sql("INSERT INTO %s VALUES (1, DEFAULT, DEFAULT, DEFAULT)", 
commitTarget());
+
+    assertEquals(
+        "Should have expected default values",
+        ImmutableList.of(row(1, true, 42, 100L)),
+        sql("SELECT * FROM %s", selectTarget()));
+  }
+
+  @TestTemplate
+  public void testWriteDefaultWithDefaultKeywordAndReorderedSchema() {
+    assertThat(validationCatalog.tableExists(tableIdent))
+        .as("Table should not already exist")
+        .isFalse();
+
+    Schema schema =
+        new Schema(
+            Types.NestedField.required(1, "id", Types.IntegerType.get()),
+            Types.NestedField.optional("int_col")
+                .withId(2)
+                .ofType(Types.IntegerType.get())
+                .withWriteDefault(Literal.of(123))
+                .build(),
+            Types.NestedField.optional("string_col")
+                .withId(3)
+                .ofType(Types.StringType.get())
+                .withWriteDefault(Literal.of("doom"))
+                .build());
+
+    validationCatalog.createTable(
+        tableIdent, schema, PartitionSpec.unpartitioned(), 
ImmutableMap.of("format-version", "3"));
+
+    // Insert with columns in different order than table schema
+    sql("INSERT INTO %s (int_col, id, string_col) VALUES (DEFAULT, 1, 
DEFAULT)", commitTarget());
+
+    assertEquals(
+        "Should apply correct defaults regardless of column order",
+        ImmutableList.of(row(1, 123, "doom")),
+        sql("SELECT * FROM %s", selectTarget()));
+  }
+
+  @TestTemplate
+  public void testBulkInsertWithDefaults() {
+    assertThat(validationCatalog.tableExists(tableIdent))
+        .as("Table should not already exist")
+        .isFalse();
+
+    Schema schema =
+        new Schema(
+            Types.NestedField.required(1, "id", Types.IntegerType.get()),
+            Types.NestedField.optional("data")
+                .withId(2)
+                .ofType(Types.StringType.get())
+                .withWriteDefault(Literal.of("default_data"))
+                .build());
+
+    validationCatalog.createTable(
+        tableIdent, schema, PartitionSpec.unpartitioned(), 
ImmutableMap.of("format-version", "3"));
+
+    sql("INSERT INTO %s VALUES (1, DEFAULT), (2, DEFAULT), (3, DEFAULT)", 
commitTarget());
+
+    assertEquals(
+        "Should insert multiple rows with default values",
+        ImmutableList.of(row(1, "default_data"), row(2, "default_data"), 
row(3, "default_data")),
+        sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+  }
+
+  @TestTemplate
+  public void testCreateTableWithDefaultsUnsupported() {
+    assertThat(validationCatalog.tableExists(tableIdent))
+        .as("Table should not already exist")
+        .isFalse();
+
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CREATE TABLE %s (id INT, data STRING DEFAULT 
'default-value') USING iceberg",
+                    tableName))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("does not support column default value");
+  }
+
+  @TestTemplate
+  public void testAlterTableAddColumnWithDefaultUnsupported() {
+    assertThat(validationCatalog.tableExists(tableIdent))
+        .as("Table should not already exist")
+        .isFalse();
+
+    Schema schema = new Schema(Types.NestedField.required(1, "id", 
Types.IntegerType.get()));
+
+    validationCatalog.createTable(
+        tableIdent, schema, PartitionSpec.unpartitioned(), 
ImmutableMap.of("format-version", "3"));
+
+    assertThatThrownBy(
+            () -> sql("ALTER TABLE %s ADD COLUMN data STRING DEFAULT 
'default-value'", tableName))
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessageContaining("default values in Spark is currently 
unsupported");
+  }
+
+  @TestTemplate
+  public void testPartialInsertUnsupported() {
+    assertThat(validationCatalog.tableExists(tableIdent))
+        .as("Table should not already exist")
+        .isFalse();
+
+    Schema schema =
+        new Schema(
+            Types.NestedField.required(1, "id", Types.IntegerType.get()),
+            Types.NestedField.optional("data")
+                .withId(2)
+                .ofType(Types.StringType.get())
+                .withWriteDefault(Literal.of("default-data"))
+                .build());
+
+    validationCatalog.createTable(
+        tableIdent, schema, PartitionSpec.unpartitioned(), 
ImmutableMap.of("format-version", "3"));
+
+    assertThatThrownBy(() -> sql("INSERT INTO %s (id) VALUES (1)", 
commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Cannot find data for the output column");
+  }
+
+  @TestTemplate
+  public void testSchemaEvolutionWithDefaultValueChanges() {
+    assertThat(validationCatalog.tableExists(tableIdent))
+        .as("Table should not already exist")
+        .isFalse();
+
+    Schema initialSchema = new Schema(Types.NestedField.required(1, "id", 
Types.IntegerType.get()));
+
+    validationCatalog.createTable(
+        tableIdent,
+        initialSchema,
+        PartitionSpec.unpartitioned(),
+        ImmutableMap.of("format-version", "3"));
+
+    sql("INSERT INTO %s VALUES (1), (2)", commitTarget());
+
+    // Add a column with a default value
+    validationCatalog
+        .loadTable(tableIdent)
+        .updateSchema()
+        .addColumn("data", Types.StringType.get(), Literal.of("default_data"))
+        .commit();
+
+    // Refresh this when using SparkCatalog since otherwise the new column 
would not be caught.
+    sql("REFRESH TABLE %s", commitTarget());
+
+    sql("INSERT INTO %s VALUES (3, DEFAULT)", commitTarget());
+
+    assertEquals(
+        "Should have correct default values for existing and new rows",
+        ImmutableList.of(row(1, "default_data"), row(2, "default_data"), 
row(3, "default_data")),
+        sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+  }
+}

Reply via email to