This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new f24f0c093e Spark 3.5, 4.0: Prevent unnecessary failure when executing
DML queries with identifier fields (#13435)
f24f0c093e is described below
commit f24f0c093e55743c291b5835cd19931d1ca787d0
Author: Szehon Ho <[email protected]>
AuthorDate: Tue Jul 1 18:21:19 2025 -0700
Spark 3.5, 4.0: Prevent unnecessary failure when executing DML queries with
identifier fields (#13435)
---
.../org/apache/iceberg/spark/source/SparkScanBuilder.java | 3 ++-
.../iceberg/spark/source/TestSparkMetadataColumns.java | 14 +++++++++++++-
.../org/apache/iceberg/spark/source/SparkScanBuilder.java | 3 ++-
.../iceberg/spark/source/TestSparkMetadataColumns.java | 14 +++++++++++++-
4 files changed, 30 insertions(+), 4 deletions(-)
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 0a06970d09..e4b9072155 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.metrics.InMemoryMetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.Spark3Util;
@@ -381,7 +382,7 @@ public class SparkScanBuilder
AtomicInteger nextId = new AtomicInteger();
return new Schema(
metaColumnFields,
- table.schema().identifierFieldIds(),
+ ImmutableSet.of(),
oldId -> {
if (!idsToReassign.contains(oldId)) {
return oldId;
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
index 683b4d465c..4378e6e1a3 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
@@ -76,7 +76,7 @@ public class TestSparkMetadataColumns extends TestBase {
private static final String TABLE_NAME = "test_table";
private static final Schema SCHEMA =
new Schema(
- Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.required(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "category", Types.StringType.get()),
Types.NestedField.optional(3, "data", Types.StringType.get()));
private static final PartitionSpec SPEC = PartitionSpec.unpartitioned();
@@ -316,6 +316,18 @@ public class TestSparkMetadataColumns extends TestBase {
sql("SELECT _spec_id, _partition, _renamed_spec_id FROM %s",
TABLE_NAME));
}
+ @TestTemplate
+ public void testIdentifierFields() {
+ table.updateSchema().setIdentifierFields("id").commit();
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME);
+
+ assertEquals(
+ "Rows must match",
+ ImmutableList.of(row(1L, 0, null)),
+ sql("SELECT id, _spec_id, _partition FROM %s", TABLE_NAME));
+ }
+
@TestTemplate
public void testRowLineageColumnsAreNullBeforeV3() {
assumeThat(formatVersion).isLessThan(3);
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 0a06970d09..e4b9072155 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.metrics.InMemoryMetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.Spark3Util;
@@ -381,7 +382,7 @@ public class SparkScanBuilder
AtomicInteger nextId = new AtomicInteger();
return new Schema(
metaColumnFields,
- table.schema().identifierFieldIds(),
+ ImmutableSet.of(),
oldId -> {
if (!idsToReassign.contains(oldId)) {
return oldId;
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
index f442c90150..1d11d0e6f1 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
@@ -77,7 +77,7 @@ public class TestSparkMetadataColumns extends TestBase {
private static final String TABLE_NAME = "test_table";
private static final Schema SCHEMA =
new Schema(
- Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.required(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "category", Types.StringType.get()),
Types.NestedField.optional(3, "data", Types.StringType.get()));
private static final PartitionSpec SPEC = PartitionSpec.unpartitioned();
@@ -317,6 +317,18 @@ public class TestSparkMetadataColumns extends TestBase {
sql("SELECT _spec_id, _partition, _renamed_spec_id FROM %s",
TABLE_NAME));
}
+ @TestTemplate
+ public void testIdentifierFields() {
+ table.updateSchema().setIdentifierFields("id").commit();
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME);
+
+ assertEquals(
+ "Rows must match",
+ ImmutableList.of(row(1L, 0, null)),
+ sql("SELECT id, _spec_id, _partition FROM %s", TABLE_NAME));
+ }
+
@TestTemplate
public void testRowLineageColumnsResolvedInV3OrHigher() {
if (formatVersion >= 3) {