This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 2f170322d4 Spark: Backport various fixes for SparkFileWriterFactory
(#15357)
2f170322d4 is described below
commit 2f170322d425a4c6267a9033efa2107c9bfc53db
Author: pvary <[email protected]>
AuthorDate: Wed Feb 18 19:02:32 2026 +0100
Spark: Backport various fixes for SparkFileWriterFactory (#15357)
Backports #15356
---
.../org/apache/iceberg/spark/data/SparkParquetWriters.java | 9 ---------
.../apache/iceberg/spark/source/SparkFileWriterFactory.java | 11 +++++------
.../org/apache/iceberg/spark/data/SparkParquetWriters.java | 9 ---------
.../apache/iceberg/spark/source/SparkFileWriterFactory.java | 11 +++++------
.../org/apache/iceberg/spark/data/SparkParquetWriters.java | 9 ---------
.../apache/iceberg/spark/source/SparkFileWriterFactory.java | 11 +++++------
6 files changed, 15 insertions(+), 45 deletions(-)
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
index 6a99912e1e..f4ae6114c8 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
@@ -77,15 +77,6 @@ public class SparkParquetWriters {
new WriteBuilder(type));
}
- public static <T> ParquetValueWriter<T> buildWriter(
- StructType dfSchema, MessageType type, Schema icebergSchema) {
- return (ParquetValueWriter<T>)
- ParquetWithSparkSchemaVisitor.visit(
- dfSchema != null ? dfSchema :
SparkSchemaUtil.convert(icebergSchema),
- type,
- new WriteBuilder(type));
- }
-
private static class WriteBuilder extends
ParquetWithSparkSchemaVisitor<ParquetValueWriter<?>> {
private final MessageType type;
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
index 2b3bf73d56..39110f0b05 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
@@ -60,7 +60,7 @@ class SparkFileWriterFactory extends
RegistryBasedFileWriterFactory<InternalRow,
private StructType positionDeleteSparkType;
private final Schema positionDeleteRowSchema;
private final Table table;
- private final FileFormat format;
+ private final FileFormat deleteFormat;
private final Map<String, String> writeProperties;
/**
@@ -100,7 +100,7 @@ class SparkFileWriterFactory extends
RegistryBasedFileWriterFactory<InternalRow,
useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
this.table = table;
- this.format = dataFileFormat;
+ this.deleteFormat = deleteFileFormat;
this.writeProperties = writeProperties != null ? writeProperties :
ImmutableMap.of();
this.positionDeleteRowSchema = positionDeleteRowSchema;
this.positionDeleteSparkType = positionDeleteSparkType;
@@ -138,7 +138,7 @@ class SparkFileWriterFactory extends
RegistryBasedFileWriterFactory<InternalRow,
useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
this.table = table;
- this.format = dataFileFormat;
+ this.deleteFormat = deleteFileFormat;
this.writeProperties = writeProperties != null ? writeProperties :
ImmutableMap.of();
this.positionDeleteRowSchema = null;
this.useDeprecatedPositionDeleteWriter = false;
@@ -172,7 +172,7 @@ class SparkFileWriterFactory extends
RegistryBasedFileWriterFactory<InternalRow,
: MetricsConfig.forPositionDelete(table);
try {
- return switch (format) {
+ return switch (deleteFormat) {
case AVRO ->
Avro.writeDeletes(file)
.createWriterFunc(
@@ -215,14 +215,13 @@ class SparkFileWriterFactory extends
RegistryBasedFileWriterFactory<InternalRow,
.metricsConfig(metricsConfig)
.withPartition(partition)
.overwrite()
- .metricsConfig(metricsConfig)
.rowSchema(positionDeleteRowSchema)
.withSpec(spec)
.withKeyMetadata(file.keyMetadata())
.buildPositionWriter();
default ->
throw new UnsupportedOperationException(
- "Cannot write pos-deletes for unsupported file format: " +
format);
+ "Cannot write pos-deletes for unsupported file format: " +
deleteFormat);
};
} catch (IOException e) {
throw new UncheckedIOException("Failed to create new position delete
writer", e);
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
index dda634a46f..58be7f610c 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
@@ -77,15 +77,6 @@ public class SparkParquetWriters {
new WriteBuilder(type));
}
- public static <T> ParquetValueWriter<T> buildWriter(
- StructType dfSchema, MessageType type, Schema icebergSchema) {
- return (ParquetValueWriter<T>)
- ParquetWithSparkSchemaVisitor.visit(
- dfSchema != null ? dfSchema :
SparkSchemaUtil.convert(icebergSchema),
- type,
- new WriteBuilder(type));
- }
-
private static class WriteBuilder extends
ParquetWithSparkSchemaVisitor<ParquetValueWriter<?>> {
private final MessageType type;
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
index 2b3bf73d56..39110f0b05 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
@@ -60,7 +60,7 @@ class SparkFileWriterFactory extends
RegistryBasedFileWriterFactory<InternalRow,
private StructType positionDeleteSparkType;
private final Schema positionDeleteRowSchema;
private final Table table;
- private final FileFormat format;
+ private final FileFormat deleteFormat;
private final Map<String, String> writeProperties;
/**
@@ -100,7 +100,7 @@ class SparkFileWriterFactory extends
RegistryBasedFileWriterFactory<InternalRow,
useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
this.table = table;
- this.format = dataFileFormat;
+ this.deleteFormat = deleteFileFormat;
this.writeProperties = writeProperties != null ? writeProperties :
ImmutableMap.of();
this.positionDeleteRowSchema = positionDeleteRowSchema;
this.positionDeleteSparkType = positionDeleteSparkType;
@@ -138,7 +138,7 @@ class SparkFileWriterFactory extends
RegistryBasedFileWriterFactory<InternalRow,
useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
this.table = table;
- this.format = dataFileFormat;
+ this.deleteFormat = deleteFileFormat;
this.writeProperties = writeProperties != null ? writeProperties :
ImmutableMap.of();
this.positionDeleteRowSchema = null;
this.useDeprecatedPositionDeleteWriter = false;
@@ -172,7 +172,7 @@ class SparkFileWriterFactory extends
RegistryBasedFileWriterFactory<InternalRow,
: MetricsConfig.forPositionDelete(table);
try {
- return switch (format) {
+ return switch (deleteFormat) {
case AVRO ->
Avro.writeDeletes(file)
.createWriterFunc(
@@ -215,14 +215,13 @@ class SparkFileWriterFactory extends
RegistryBasedFileWriterFactory<InternalRow,
.metricsConfig(metricsConfig)
.withPartition(partition)
.overwrite()
- .metricsConfig(metricsConfig)
.rowSchema(positionDeleteRowSchema)
.withSpec(spec)
.withKeyMetadata(file.keyMetadata())
.buildPositionWriter();
default ->
throw new UnsupportedOperationException(
- "Cannot write pos-deletes for unsupported file format: " +
format);
+ "Cannot write pos-deletes for unsupported file format: " +
deleteFormat);
};
} catch (IOException e) {
throw new UncheckedIOException("Failed to create new position delete
writer", e);
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
index 3ff5ef9c57..ba816efc0a 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
@@ -91,15 +91,6 @@ public class SparkParquetWriters {
new WriteBuilder(type));
}
- public static <T> ParquetValueWriter<T> buildWriter(
- StructType dfSchema, MessageType type, Schema icebergSchema) {
- return (ParquetValueWriter<T>)
- ParquetWithSparkSchemaVisitor.visit(
- dfSchema != null ? dfSchema :
SparkSchemaUtil.convert(icebergSchema),
- type,
- new WriteBuilder(type));
- }
-
private static class WriteBuilder extends
ParquetWithSparkSchemaVisitor<ParquetValueWriter<?>> {
private final MessageType type;
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
index 2b3bf73d56..39110f0b05 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
@@ -60,7 +60,7 @@ class SparkFileWriterFactory extends
RegistryBasedFileWriterFactory<InternalRow,
private StructType positionDeleteSparkType;
private final Schema positionDeleteRowSchema;
private final Table table;
- private final FileFormat format;
+ private final FileFormat deleteFormat;
private final Map<String, String> writeProperties;
/**
@@ -100,7 +100,7 @@ class SparkFileWriterFactory extends
RegistryBasedFileWriterFactory<InternalRow,
useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
this.table = table;
- this.format = dataFileFormat;
+ this.deleteFormat = deleteFileFormat;
this.writeProperties = writeProperties != null ? writeProperties :
ImmutableMap.of();
this.positionDeleteRowSchema = positionDeleteRowSchema;
this.positionDeleteSparkType = positionDeleteSparkType;
@@ -138,7 +138,7 @@ class SparkFileWriterFactory extends
RegistryBasedFileWriterFactory<InternalRow,
useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
this.table = table;
- this.format = dataFileFormat;
+ this.deleteFormat = deleteFileFormat;
this.writeProperties = writeProperties != null ? writeProperties :
ImmutableMap.of();
this.positionDeleteRowSchema = null;
this.useDeprecatedPositionDeleteWriter = false;
@@ -172,7 +172,7 @@ class SparkFileWriterFactory extends
RegistryBasedFileWriterFactory<InternalRow,
: MetricsConfig.forPositionDelete(table);
try {
- return switch (format) {
+ return switch (deleteFormat) {
case AVRO ->
Avro.writeDeletes(file)
.createWriterFunc(
@@ -215,14 +215,13 @@ class SparkFileWriterFactory extends
RegistryBasedFileWriterFactory<InternalRow,
.metricsConfig(metricsConfig)
.withPartition(partition)
.overwrite()
- .metricsConfig(metricsConfig)
.rowSchema(positionDeleteRowSchema)
.withSpec(spec)
.withKeyMetadata(file.keyMetadata())
.buildPositionWriter();
default ->
throw new UnsupportedOperationException(
- "Cannot write pos-deletes for unsupported file format: " +
format);
+ "Cannot write pos-deletes for unsupported file format: " +
deleteFormat);
};
} catch (IOException e) {
throw new UncheckedIOException("Failed to create new position delete
writer", e);