This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/1.10.x by this push:
new ca3f8cf5d2 Core, Spark: (Backport #15514) Fix equality deletes
non-deterministic schema ordering
ca3f8cf5d2 is described below
commit ca3f8cf5d236f40b99b44d226742eff77cc9faf5
Author: Russell Spitzer <[email protected]>
AuthorDate: Thu Mar 12 17:20:55 2026 -0500
Core, Spark: (Backport #15514) Fix equality deletes non-deterministic
schema ordering
Equality delete schemas constructed in DeleteFilter.applyEqDeletes relied on
the field order of requiredSchema, which varies depending on the query's
projection. When the SparkExecutorCache returned delete records read with
one
field ordering to a reader expecting another, StructProjection silently
misinterpreted the positional data, causing deletes to be skipped.
We fix this by Canonicalize the deleteSchema by sorting fields by field ID.
Now
every reader produces the same schema for deletes regardless of projection,
ensuring cache hits return correctly ordered records.
Coded with the help of Cursor and claude-4.6.opus-high
---
.../java/org/apache/iceberg/types/TypeUtil.java | 14 +++++
.../org/apache/iceberg/types/TestTypeUtil.java | 25 ++++++++
.../java/org/apache/iceberg/data/DeleteFilter.java | 2 +-
.../spark/source/TestSparkReaderDeletes.java | 72 ++++++++++++++++++++++
4 files changed, 112 insertions(+), 1 deletion(-)
diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
index b1c556be06..fdc5dfdf37 100644
--- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
+++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.types;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -102,6 +103,19 @@ public class TypeUtil {
return new Schema(ImmutableList.of(), schema.getAliases());
}
+ /**
+ * Selects fields from a schema by ID and returns them ordered by field ID.
+ *
+ * <p>Unlike {@link #select(Schema, Set)}, which preserves the field
ordering of the input schema,
+ * this method always returns columns sorted by field ID.
+ */
+ public static Schema selectInIdOrder(Schema schema, Set<Integer> fieldIds) {
+ Schema selected = select(schema, fieldIds);
+ List<Types.NestedField> sorted = Lists.newArrayList(selected.columns());
+ sorted.sort(Comparator.comparingInt(Types.NestedField::fieldId));
+ return new Schema(sorted);
+ }
+
public static Types.StructType select(Types.StructType struct, Set<Integer>
fieldIds) {
Preconditions.checkNotNull(struct, "Struct cannot be null");
Preconditions.checkNotNull(fieldIds, "Field ids cannot be null");
diff --git a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
index d4742f5187..dd8afebab8 100644
--- a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
+++ b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
@@ -317,6 +317,31 @@ public class TestTypeUtil {
assertThat(actualDepthTwo.asStruct()).isEqualTo(expectedDepthTwo.asStruct());
}
+ @Test
+ public void testSelectInIdOrder() {
+ Schema schema =
+ new Schema(
+ required(1, "id", Types.IntegerType.get()),
+ required(3, "b", Types.IntegerType.get()),
+ required(2, "a", Types.IntegerType.get()));
+
+ Schema result = TypeUtil.selectInIdOrder(schema, Sets.newHashSet(2, 3));
+
+ assertThat(result.columns()).hasSize(2);
+ assertThat(result.columns().get(0).fieldId()).isEqualTo(2);
+ assertThat(result.columns().get(1).fieldId()).isEqualTo(3);
+
+ // verify that different input orderings produce the same result
+ Schema schemaReversed =
+ new Schema(
+ required(2, "a", Types.IntegerType.get()),
+ required(3, "b", Types.IntegerType.get()),
+ required(1, "id", Types.IntegerType.get()));
+
+ Schema resultReversed = TypeUtil.selectInIdOrder(schemaReversed,
Sets.newHashSet(2, 3));
+ assertThat(resultReversed.asStruct()).isEqualTo(result.asStruct());
+ }
+
@Test
public void testProjectMap() {
// We can't partially project keys because it changes key equality
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
index 797e6d6408..abded8dcd0 100644
--- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
@@ -199,7 +199,7 @@ public abstract class DeleteFilter<T> {
Set<Integer> ids = entry.getKey();
Iterable<DeleteFile> deletes = entry.getValue();
- Schema deleteSchema = TypeUtil.select(requiredSchema, ids);
+ Schema deleteSchema = TypeUtil.selectInIdOrder(requiredSchema, ids);
// a projection to select and reorder fields of the file schema to match
the delete rows
StructProjection projectRow = StructProjection.create(requiredSchema,
deleteSchema);
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index 42699f4662..6b3b918fe9 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -162,11 +162,14 @@ public class TestSparkReaderDeletes extends
DeleteReadTests {
spark = null;
}
+ private static final String EQ_CACHE_TABLE = "test_eq_cache_ordering";
+
@AfterEach
@Override
public void cleanup() throws IOException {
super.cleanup();
dropTable("test3");
+ dropTable(EQ_CACHE_TABLE);
}
@Override
@@ -697,6 +700,75 @@ public class TestSparkReaderDeletes extends
DeleteReadTests {
}
}
+ /**
+ * Covers a bug where equality deletes columns are appended to the required
schema in a different
+ * order than the table schema, which can cause different deleteSchema
orderings, poisoning the
+ * cache.
+ */
+ @TestTemplate
+ public void testEqualityDeletesAppliedWithCachedFieldReordering() throws
IOException {
+ Schema eqDeleteTestSchema =
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "a", Types.IntegerType.get()),
+ Types.NestedField.optional(3, "b", Types.IntegerType.get()));
+ Table eqTestTable =
+ createTable(EQ_CACHE_TABLE, eqDeleteTestSchema,
PartitionSpec.unpartitioned());
+
+ GenericRecord record = GenericRecord.create(eqDeleteTestSchema);
+ List<Record> data = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ data.add(record.copy("id", i, "a", i * 10, "b", i * 100));
+ }
+
+ DataFile dataFile =
+ FileHelpers.writeDataFile(
+ eqTestTable,
+ Files.localOutput(File.createTempFile("junit", null,
temp.toFile())),
+ data);
+ eqTestTable.newAppend().appendFile(dataFile).commit();
+
+ Schema deleteSchema =
+ new Schema(
+ Types.NestedField.optional(3, "b", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "a", Types.IntegerType.get()));
+
+ Record eqDel = GenericRecord.create(deleteSchema);
+ List<Record> deletes =
+ Lists.newArrayList(
+ eqDel.copy("b", 0, "a", 0),
+ eqDel.copy("b", 100, "a", 10),
+ eqDel.copy("b", 200, "a", 20));
+
+ DeleteFile eqFile =
+ FileHelpers.writeDeleteFile(
+ eqTestTable,
+ Files.localOutput(File.createTempFile("junit", null,
temp.toFile())),
+ deletes,
+ deleteSchema);
+ eqTestTable.newRowDelta().addDeletes(eqFile).commit();
+
+ String tableRef = TableIdentifier.of("default", EQ_CACHE_TABLE).toString();
+ int expectedRows = data.size() - deletes.size();
+
+ // Narrow projection: Spark will not request a or b columns, so the delete
columns are appended
+ // in identifier fields definition order [b, a]
+ long narrowCount =
+
spark.read().format("iceberg").load(tableRef).select("id").collectAsList().size();
+
+ // Wide projection: Spark will request all columns, so the delete columns
are already present in
+ // table schema order [a, b].
+ long wideCount =
+
spark.read().format("iceberg").load(tableRef).select("*").collectAsList().size();
+
+ assertThat(narrowCount)
+ .as("Narrow projection should return %d rows after equality deletes",
expectedRows)
+ .isEqualTo(expectedRows);
+ assertThat(wideCount)
+ .as("Wide projection should return %d rows after equality deletes",
expectedRows)
+ .isEqualTo(expectedRows);
+ }
+
private static final Schema PROJECTION_SCHEMA =
new Schema(
required(1, "id", Types.IntegerType.get()),