This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/1.10.x by this push:
     new ca3f8cf5d2 Core, Spark: (Backport #15514) Fix equality deletes 
non-deterministic schema ordering
ca3f8cf5d2 is described below

commit ca3f8cf5d236f40b99b44d226742eff77cc9faf5
Author: Russell Spitzer <[email protected]>
AuthorDate: Thu Mar 12 17:20:55 2026 -0500

    Core, Spark: (Backport #15514) Fix equality deletes non-deterministic 
schema ordering
    
    Equality delete schemas constructed in DeleteFilter.applyEqDeletes relied on
    the field order of requiredSchema, which varies depending on the query's
    projection. When the SparkExecutorCache returned delete records read with 
one
    field ordering to a reader expecting another, StructProjection silently
    misinterpreted the positional data, causing deletes to be skipped.
    
    We fix this by Canonicalize the deleteSchema by sorting fields by field ID. 
Now
    every reader produces the same schema for deletes regardless of projection,
    ensuring cache hits return correctly ordered records.
    
    Coded with the help of Cursor and claude-4.6.opus-high
---
 .../java/org/apache/iceberg/types/TypeUtil.java    | 14 +++++
 .../org/apache/iceberg/types/TestTypeUtil.java     | 25 ++++++++
 .../java/org/apache/iceberg/data/DeleteFilter.java |  2 +-
 .../spark/source/TestSparkReaderDeletes.java       | 72 ++++++++++++++++++++++
 4 files changed, 112 insertions(+), 1 deletion(-)

diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java 
b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
index b1c556be06..fdc5dfdf37 100644
--- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
+++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.types;
 
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -102,6 +103,19 @@ public class TypeUtil {
     return new Schema(ImmutableList.of(), schema.getAliases());
   }
 
+  /**
+   * Selects fields from a schema by ID and returns them ordered by field ID.
+   *
+   * <p>Unlike {@link #select(Schema, Set)}, which preserves the field 
ordering of the input schema,
+   * this method always returns columns sorted by field ID.
+   */
+  public static Schema selectInIdOrder(Schema schema, Set<Integer> fieldIds) {
+    Schema selected = select(schema, fieldIds);
+    List<Types.NestedField> sorted = Lists.newArrayList(selected.columns());
+    sorted.sort(Comparator.comparingInt(Types.NestedField::fieldId));
+    return new Schema(sorted);
+  }
+
   public static Types.StructType select(Types.StructType struct, Set<Integer> 
fieldIds) {
     Preconditions.checkNotNull(struct, "Struct cannot be null");
     Preconditions.checkNotNull(fieldIds, "Field ids cannot be null");
diff --git a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java 
b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
index d4742f5187..dd8afebab8 100644
--- a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
+++ b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
@@ -317,6 +317,31 @@ public class TestTypeUtil {
     
assertThat(actualDepthTwo.asStruct()).isEqualTo(expectedDepthTwo.asStruct());
   }
 
+  @Test
+  public void testSelectInIdOrder() {
+    Schema schema =
+        new Schema(
+            required(1, "id", Types.IntegerType.get()),
+            required(3, "b", Types.IntegerType.get()),
+            required(2, "a", Types.IntegerType.get()));
+
+    Schema result = TypeUtil.selectInIdOrder(schema, Sets.newHashSet(2, 3));
+
+    assertThat(result.columns()).hasSize(2);
+    assertThat(result.columns().get(0).fieldId()).isEqualTo(2);
+    assertThat(result.columns().get(1).fieldId()).isEqualTo(3);
+
+    // verify that different input orderings produce the same result
+    Schema schemaReversed =
+        new Schema(
+            required(2, "a", Types.IntegerType.get()),
+            required(3, "b", Types.IntegerType.get()),
+            required(1, "id", Types.IntegerType.get()));
+
+    Schema resultReversed = TypeUtil.selectInIdOrder(schemaReversed, 
Sets.newHashSet(2, 3));
+    assertThat(resultReversed.asStruct()).isEqualTo(result.asStruct());
+  }
+
   @Test
   public void testProjectMap() {
     // We can't partially project keys because it changes key equality
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java 
b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
index 797e6d6408..abded8dcd0 100644
--- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
@@ -199,7 +199,7 @@ public abstract class DeleteFilter<T> {
       Set<Integer> ids = entry.getKey();
       Iterable<DeleteFile> deletes = entry.getValue();
 
-      Schema deleteSchema = TypeUtil.select(requiredSchema, ids);
+      Schema deleteSchema = TypeUtil.selectInIdOrder(requiredSchema, ids);
 
       // a projection to select and reorder fields of the file schema to match 
the delete rows
       StructProjection projectRow = StructProjection.create(requiredSchema, 
deleteSchema);
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index 42699f4662..6b3b918fe9 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -162,11 +162,14 @@ public class TestSparkReaderDeletes extends 
DeleteReadTests {
     spark = null;
   }
 
+  private static final String EQ_CACHE_TABLE = "test_eq_cache_ordering";
+
   @AfterEach
   @Override
   public void cleanup() throws IOException {
     super.cleanup();
     dropTable("test3");
+    dropTable(EQ_CACHE_TABLE);
   }
 
   @Override
@@ -697,6 +700,75 @@ public class TestSparkReaderDeletes extends 
DeleteReadTests {
     }
   }
 
+  /**
+   * Covers a bug where equality deletes columns are appended to the required 
schema in a different
+   * order than the table schema, which can cause different deleteSchema 
orderings, poisoning the
+   * cache.
+   */
+  @TestTemplate
+  public void testEqualityDeletesAppliedWithCachedFieldReordering() throws 
IOException {
+    Schema eqDeleteTestSchema =
+        new Schema(
+            Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "a", Types.IntegerType.get()),
+            Types.NestedField.optional(3, "b", Types.IntegerType.get()));
+    Table eqTestTable =
+        createTable(EQ_CACHE_TABLE, eqDeleteTestSchema, 
PartitionSpec.unpartitioned());
+
+    GenericRecord record = GenericRecord.create(eqDeleteTestSchema);
+    List<Record> data = Lists.newArrayList();
+    for (int i = 0; i < 10; i++) {
+      data.add(record.copy("id", i, "a", i * 10, "b", i * 100));
+    }
+
+    DataFile dataFile =
+        FileHelpers.writeDataFile(
+            eqTestTable,
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+            data);
+    eqTestTable.newAppend().appendFile(dataFile).commit();
+
+    Schema deleteSchema =
+        new Schema(
+            Types.NestedField.optional(3, "b", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "a", Types.IntegerType.get()));
+
+    Record eqDel = GenericRecord.create(deleteSchema);
+    List<Record> deletes =
+        Lists.newArrayList(
+            eqDel.copy("b", 0, "a", 0),
+            eqDel.copy("b", 100, "a", 10),
+            eqDel.copy("b", 200, "a", 20));
+
+    DeleteFile eqFile =
+        FileHelpers.writeDeleteFile(
+            eqTestTable,
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+            deletes,
+            deleteSchema);
+    eqTestTable.newRowDelta().addDeletes(eqFile).commit();
+
+    String tableRef = TableIdentifier.of("default", EQ_CACHE_TABLE).toString();
+    int expectedRows = data.size() - deletes.size();
+
+    // Narrow projection: Spark will not request a or b columns, so the delete 
columns are appended
+    // in identifier fields definition order [b, a]
+    long narrowCount =
+        
spark.read().format("iceberg").load(tableRef).select("id").collectAsList().size();
+
+    // Wide projection: Spark will request all columns, so the delete columns 
are already present in
+    // table schema order [a, b].
+    long wideCount =
+        
spark.read().format("iceberg").load(tableRef).select("*").collectAsList().size();
+
+    assertThat(narrowCount)
+        .as("Narrow projection should return %d rows after equality deletes", 
expectedRows)
+        .isEqualTo(expectedRows);
+    assertThat(wideCount)
+        .as("Wide projection should return %d rows after equality deletes", 
expectedRows)
+        .isEqualTo(expectedRows);
+  }
+
   private static final Schema PROJECTION_SCHEMA =
       new Schema(
           required(1, "id", Types.IntegerType.get()),

Reply via email to