This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 4c2e60d154 Spark 4.1: Simplify handling of metadata columns (#15297)
4c2e60d154 is described below
commit 4c2e60d1541fb37d6ea6be31b2ea2cf733a087c3
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Feb 16 17:20:38 2026 -0800
Spark 4.1: Simplify handling of metadata columns (#15297)
---
api/src/main/java/org/apache/iceberg/Schema.java | 4 +
.../java/org/apache/iceberg/types/TypeUtil.java | 46 +++++++
.../org/apache/iceberg/spark/SparkSchemaUtil.java | 5 +
.../iceberg/spark/source/SparkScanBuilder.java | 153 ++++++++++-----------
4 files changed, 125 insertions(+), 83 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/Schema.java
b/api/src/main/java/org/apache/iceberg/Schema.java
index fbc557f697..3e59998be4 100644
--- a/api/src/main/java/org/apache/iceberg/Schema.java
+++ b/api/src/main/java/org/apache/iceberg/Schema.java
@@ -105,6 +105,10 @@ public class Schema implements Serializable {
this(DEFAULT_SCHEMA_ID, columns, identifierFieldIds, getId);
}
+ public Schema(List<NestedField> columns, TypeUtil.GetID getId) {
+ this(DEFAULT_SCHEMA_ID, columns, ImmutableSet.of(), getId);
+ }
+
public Schema(int schemaId, List<NestedField> columns) {
this(schemaId, columns, ImmutableSet.of());
}
diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
index b1c556be06..09478dc012 100644
--- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
+++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
@@ -601,6 +601,52 @@ public class TypeUtil {
int get(int oldId);
}
+ /**
+ * Creates a function that reassigns specified field IDs.
+ *
+ * <p>This is useful for merging schemas where some field IDs in one schema
might conflict with
+ * IDs already in use by another schema. The function will reassign the
provided IDs to new unused
+ * IDs, while preserving other IDs.
+ *
+ * @param conflictingIds the set of conflicting field IDs that should be
reassigned
+ * @param allUsedIds the set of field IDs that are already in use and cannot
be reused
+ * @return a function that reassigns conflicting field IDs while preserving
others
+ */
+ public static GetID reassignConflictingIds(Set<Integer> conflictingIds,
Set<Integer> allUsedIds) {
+ return new ReassignConflictingIds(conflictingIds, allUsedIds);
+ }
+
+ private static class ReassignConflictingIds implements GetID {
+ private final Set<Integer> conflictingIds;
+ private final Set<Integer> allUsedIds;
+ private final AtomicInteger nextId;
+
+ private ReassignConflictingIds(Set<Integer> conflictingIds, Set<Integer>
allUsedIds) {
+ this.conflictingIds = conflictingIds;
+ this.allUsedIds = allUsedIds;
+ this.nextId = new AtomicInteger();
+ }
+
+ @Override
+ public int get(int oldId) {
+ if (conflictingIds.contains(oldId)) {
+ return nextAvailableId();
+ } else {
+ return oldId;
+ }
+ }
+
+ private int nextAvailableId() {
+ int candidateId = nextId.incrementAndGet();
+
+ while (allUsedIds.contains(candidateId)) {
+ candidateId = nextId.incrementAndGet();
+ }
+
+ return candidateId;
+ }
+ }
+
public static class SchemaVisitor<T> {
public void beforeField(Types.NestedField field) {}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java
index fcf5fbeb2a..4c3713d3ff 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java
@@ -42,6 +42,7 @@ import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalog.Column;
import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/** Helper methods for working with Spark/Hive metadata. */
@@ -369,4 +370,8 @@ public class SparkSchemaUtil {
Function<String, String> quotingFunc = name -> String.format("`%s`",
name.replace("`", "``"));
return TypeUtil.indexQuotedNameById(schema.asStruct(), quotingFunc);
}
+
+ public static StructType toStructType(List<StructField> fields) {
+ return new StructType(fields.toArray(new StructField[0]));
+ }
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index cd4945c975..2fb188d83d 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -22,9 +22,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.BatchScan;
import org.apache.iceberg.FileScanTask;
@@ -48,7 +46,6 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.metrics.InMemoryMetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.Spark3Util;
@@ -59,6 +56,7 @@ import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkV2Filters;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.TypeUtil.GetID;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
@@ -96,10 +94,10 @@ public class SparkScanBuilder
private final Table table;
private final CaseInsensitiveStringMap options;
private final SparkReadConf readConf;
- private final List<String> metaColumns = Lists.newArrayList();
+ private final Set<String> metaFieldNames = Sets.newLinkedHashSet();
private final InMemoryMetricsReporter metricsReporter;
- private Schema schema;
+ private Schema projection;
private boolean caseSensitive;
private List<Expression> filterExpressions = null;
private Predicate[] pushedPredicates = NO_PREDICATES;
@@ -113,7 +111,7 @@ public class SparkScanBuilder
CaseInsensitiveStringMap options) {
this.spark = spark;
this.table = table;
- this.schema = schema;
+ this.projection = schema;
this.options = options;
this.readConf = new SparkReadConf(spark, table, branch, options);
this.caseSensitive = readConf.caseSensitive();
@@ -168,7 +166,7 @@ public class SparkScanBuilder
if (expr != null) {
// try binding the expression to ensure it can be pushed down
- Binder.bind(schema.asStruct(), expr, caseSensitive);
+ Binder.bind(projection.asStruct(), expr, caseSensitive);
expressions.add(expr);
pushableFilters.add(predicate);
}
@@ -210,7 +208,7 @@ public class SparkScanBuilder
try {
Expression expr = SparkAggregates.convert(aggregateFunc);
if (expr != null) {
- Expression bound = Binder.bind(schema.asStruct(), expr,
caseSensitive);
+ Expression bound = Binder.bind(projection.asStruct(), expr,
caseSensitive);
expressions.add((BoundAggregate<?, ?>) bound);
} else {
LOG.info(
@@ -231,7 +229,7 @@ public class SparkScanBuilder
}
org.apache.iceberg.Scan scan =
- buildIcebergBatchScan(true /* include Column Stats */,
schemaWithMetadataColumns());
+ buildIcebergBatchScan(true /* include Column Stats */,
projectionWithMetadataColumns());
try (CloseableIterable<FileScanTask> fileScanTasks = scan.planFiles()) {
for (FileScanTask task : fileScanTasks) {
@@ -320,74 +318,63 @@ public class SparkScanBuilder
@Override
public void pruneColumns(StructType requestedSchema) {
- StructType requestedProjection =
- new StructType(
- Stream.of(requestedSchema.fields())
- .filter(field ->
MetadataColumns.nonMetadataColumn(field.name()))
- .toArray(StructField[]::new));
-
- // the projection should include all columns that will be returned,
including those only used in
- // filters
- this.schema =
- SparkSchemaUtil.prune(schema, requestedProjection, filterExpression(),
caseSensitive);
-
- Stream.of(requestedSchema.fields())
- .map(StructField::name)
- .filter(MetadataColumns::isMetadataColumn)
- .distinct()
- .forEach(metaColumns::add);
- }
-
- private Schema schemaWithMetadataColumns() {
- // metadata columns
- List<Types.NestedField> metadataFields =
- metaColumns.stream()
- .distinct()
- .map(name -> MetadataColumns.metadataColumn(table, name))
- .collect(Collectors.toList());
- Schema metadataSchema = calculateMetadataSchema(metadataFields);
-
- // schema or rows returned by readers
- return TypeUtil.join(schema, metadataSchema);
- }
-
- private Schema calculateMetadataSchema(List<Types.NestedField>
metaColumnFields) {
- Optional<Types.NestedField> partitionField =
- metaColumnFields.stream()
- .filter(f -> MetadataColumns.PARTITION_COLUMN_ID == f.fieldId())
- .findFirst();
-
- // only calculate potential column id collision if partition metadata
column was requested
- if (!partitionField.isPresent()) {
- return new Schema(metaColumnFields);
- }
-
- Set<Integer> idsToReassign =
-
TypeUtil.indexById(partitionField.get().type().asStructType()).keySet();
-
- // Calculate used ids by union metadata columns with all base table schemas
- Set<Integer> currentlyUsedIds =
-
metaColumnFields.stream().map(Types.NestedField::fieldId).collect(Collectors.toSet());
- Set<Integer> allUsedIds =
- table.schemas().values().stream()
- .map(currSchema ->
TypeUtil.indexById(currSchema.asStruct()).keySet())
- .reduce(currentlyUsedIds, Sets::union);
-
- // Reassign selected ids to deduplicate with used ids.
- AtomicInteger nextId = new AtomicInteger();
- return new Schema(
- metaColumnFields,
- ImmutableSet.of(),
- oldId -> {
- if (!idsToReassign.contains(oldId)) {
- return oldId;
- }
- int candidate = nextId.incrementAndGet();
- while (allUsedIds.contains(candidate)) {
- candidate = nextId.incrementAndGet();
- }
- return candidate;
- });
+ List<StructField> dataFields = Lists.newArrayList();
+
+ for (StructField field : requestedSchema.fields()) {
+ if (MetadataColumns.isMetadataColumn(field.name())) {
+ metaFieldNames.add(field.name());
+ } else {
+ dataFields.add(field);
+ }
+ }
+
+ StructType requestedProjection = SparkSchemaUtil.toStructType(dataFields);
+ this.projection = prune(projection, requestedProjection);
+ }
+
+ // the projection should include all columns that will be returned,
+ // including those only used in filters
+ private Schema prune(Schema schema, StructType requestedSchema) {
+ return SparkSchemaUtil.prune(schema, requestedSchema, filterExpression(),
caseSensitive);
+ }
+
+ // schema of rows that must be returned by readers
+ protected Schema projectionWithMetadataColumns() {
+ return TypeUtil.join(projection, calculateMetadataSchema());
+ }
+
+ // computes metadata schema avoiding conflicts between partition and data
field IDs
+ private Schema calculateMetadataSchema() {
+ List<Types.NestedField> metaFields = metaFields();
+ Optional<Types.NestedField> partitionField =
findPartitionField(metaFields);
+
+ if (partitionField.isEmpty()) {
+ return new Schema(metaFields);
+ }
+
+ Types.StructType partitionType =
partitionField.get().type().asStructType();
+ Set<Integer> partitionFieldIds = TypeUtil.getProjectedIds(partitionType);
+ GetID getId = TypeUtil.reassignConflictingIds(partitionFieldIds,
allUsedFieldIds());
+ return new Schema(metaFields, getId);
+ }
+
+ private List<Types.NestedField> metaFields() {
+ return metaFieldNames.stream()
+ .map(name -> MetadataColumns.metadataColumn(table, name))
+ .collect(Collectors.toList());
+ }
+
+ private Optional<Types.NestedField>
findPartitionField(List<Types.NestedField> fields) {
+ return fields.stream()
+ .filter(field -> MetadataColumns.PARTITION_COLUMN_ID ==
field.fieldId())
+ .findFirst();
+ }
+
+ // collects used data field IDs across all known table schemas
+ private Set<Integer> allUsedFieldIds() {
+ return table.schemas().values().stream()
+ .flatMap(tableSchema ->
TypeUtil.getProjectedIds(tableSchema.asStruct()).stream())
+ .collect(Collectors.toSet());
}
@Override
@@ -400,7 +387,7 @@ public class SparkScanBuilder
}
private Scan buildBatchScan() {
- Schema expectedSchema = schemaWithMetadataColumns();
+ Schema expectedSchema = projectionWithMetadataColumns();
return new SparkBatchQueryScan(
spark,
table,
@@ -572,7 +559,7 @@ public class SparkScanBuilder
}
}
- Schema expectedSchema = schemaWithMetadataColumns();
+ Schema expectedSchema = projectionWithMetadataColumns();
IncrementalChangelogScan scan =
table
@@ -641,7 +628,7 @@ public class SparkScanBuilder
table,
null,
readConf,
- schemaWithMetadataColumns(),
+ projectionWithMetadataColumns(),
filterExpressions,
metricsReporter::scanReport);
}
@@ -654,7 +641,7 @@ public class SparkScanBuilder
SparkReadConf adjustedReadConf =
new SparkReadConf(spark, table, readConf.branch(), adjustedOptions);
- Schema expectedSchema = schemaWithMetadataColumns();
+ Schema expectedSchema = projectionWithMetadataColumns();
BatchScan scan =
newBatchScan()
@@ -684,12 +671,12 @@ public class SparkScanBuilder
spark,
table,
readConf,
- schemaWithMetadataColumns(),
+ projectionWithMetadataColumns(),
filterExpressions,
metricsReporter::scanReport);
}
- Schema expectedSchema = schemaWithMetadataColumns();
+ Schema expectedSchema = projectionWithMetadataColumns();
BatchScan scan =
newBatchScan()