This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ed7658bdc Spark 4.1: Introduce modes in SparkWriteBuilder (#15374)
4ed7658bdc is described below

commit 4ed7658bdcbab03c696472f56f3cfc810d1fbec8
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Feb 19 22:33:25 2026 -0800

    Spark 4.1: Introduce modes in SparkWriteBuilder (#15374)
---
 .../iceberg/spark/source/SparkWriteBuilder.java    | 219 +++++++++++----------
 1 file changed, 111 insertions(+), 108 deletions(-)

diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
index 182e56a861..14c6c8b068 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
@@ -32,6 +32,10 @@ import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.spark.SparkWriteConf;
 import org.apache.iceberg.spark.SparkWriteRequirements;
+import org.apache.iceberg.spark.source.SparkWriteBuilder.Mode.Append;
+import 
org.apache.iceberg.spark.source.SparkWriteBuilder.Mode.CopyOnWriteOperation;
+import org.apache.iceberg.spark.source.SparkWriteBuilder.Mode.DynamicOverwrite;
+import 
org.apache.iceberg.spark.source.SparkWriteBuilder.Mode.OverwriteByFilter;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.connector.read.Scan;
@@ -44,115 +48,105 @@ import org.apache.spark.sql.connector.write.Write;
 import org.apache.spark.sql.connector.write.WriteBuilder;
 import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
 import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.types.LongType$;
+import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
 class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, 
SupportsOverwrite {
   private final SparkSession spark;
   private final Table table;
   private final SparkWriteConf writeConf;
-  private final LogicalWriteInfo writeInfo;
-  private final StructType dsSchema;
-  private final String overwriteMode;
-  private boolean overwriteDynamic = false;
-  private boolean overwriteByFilter = false;
-  private Expression overwriteExpr = null;
-  private boolean overwriteFiles = false;
-  private SparkCopyOnWriteScan copyOnWriteScan = null;
-  private Command copyOnWriteCommand = null;
-  private IsolationLevel copyOnWriteIsolationLevel = null;
+  private final LogicalWriteInfo info;
+  private final boolean caseSensitive;
+  private final boolean checkNullability;
+  private final boolean checkOrdering;
+  private final boolean mergeSchema;
+  private Mode mode = null;
 
   SparkWriteBuilder(SparkSession spark, Table table, String branch, 
LogicalWriteInfo info) {
     this.spark = spark;
     this.table = table;
     this.writeConf = new SparkWriteConf(spark, table, branch, info.options());
-    this.writeInfo = info;
-    this.dsSchema = info.schema();
-    this.overwriteMode = writeConf.overwriteMode();
+    this.info = info;
+    this.caseSensitive = writeConf.caseSensitive();
+    this.checkNullability = writeConf.checkNullability();
+    this.checkOrdering = writeConf.checkOrdering();
+    this.mergeSchema = writeConf.mergeSchema();
   }
 
   public WriteBuilder overwriteFiles(Scan scan, Command command, 
IsolationLevel isolationLevel) {
-    Preconditions.checkState(!overwriteByFilter, "Cannot overwrite individual 
files and by filter");
-    Preconditions.checkState(
-        !overwriteDynamic, "Cannot overwrite individual files and 
dynamically");
-
-    this.overwriteFiles = true;
-    this.copyOnWriteScan = (SparkCopyOnWriteScan) scan;
-    this.copyOnWriteCommand = command;
-    this.copyOnWriteIsolationLevel = isolationLevel;
+    Preconditions.checkState(mode == null, "Cannot use copy-on-write with 
other modes");
+    this.mode = new CopyOnWriteOperation((SparkCopyOnWriteScan) scan, command, 
isolationLevel);
     return this;
   }
 
   @Override
   public WriteBuilder overwriteDynamicPartitions() {
-    Preconditions.checkState(
-        !overwriteByFilter, "Cannot overwrite dynamically and by filter: %s", 
overwriteExpr);
-    Preconditions.checkState(!overwriteFiles, "Cannot overwrite individual 
files and dynamically");
-
-    this.overwriteDynamic = true;
+    Preconditions.checkState(mode == null, "Cannot use dynamic overwrite with 
other modes");
+    this.mode = new DynamicOverwrite();
     return this;
   }
 
   @Override
   public WriteBuilder overwrite(Filter[] filters) {
-    Preconditions.checkState(
-        !overwriteFiles, "Cannot overwrite individual files and using 
filters");
+    Preconditions.checkState(mode == null, "Cannot use overwrite by filter 
with other modes");
+    Expression expr = SparkFilters.convert(filters);
+    this.mode = useDynamicOverwrite(expr) ? new DynamicOverwrite() : new 
OverwriteByFilter(expr);
+    return this;
+  }
+
+  private boolean useDynamicOverwrite(Expression expr) {
+    return expr == Expressions.alwaysTrue() && 
"dynamic".equals(writeConf.overwriteMode());
+  }
+
+  private boolean writeNeedsRowLineage() {
+    return TableUtil.supportsRowLineage(table) && mode instanceof 
CopyOnWriteOperation;
+  }
 
-    this.overwriteExpr = SparkFilters.convert(filters);
-    if (overwriteExpr == Expressions.alwaysTrue() && 
"dynamic".equals(overwriteMode)) {
-      // use the write option to override truncating the table. use dynamic 
overwrite instead.
-      this.overwriteDynamic = true;
+  private boolean writeIncludesRowLineage() {
+    return info.metadataSchema()
+        .map(schema -> schema.exists(field -> 
field.name().equals(MetadataColumns.ROW_ID.name())))
+        .orElse(false);
+  }
+
+  private StructType sparkWriteSchema() {
+    if (writeIncludesRowLineage()) {
+      StructType writeSchema = info.schema();
+      StructType metaSchema = info.metadataSchema().get();
+      StructField rowId = metaSchema.apply(MetadataColumns.ROW_ID.name());
+      writeSchema = writeSchema.add(rowId);
+      StructField rowSeq = 
metaSchema.apply(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name());
+      writeSchema = writeSchema.add(rowSeq);
+      return writeSchema;
     } else {
-      Preconditions.checkState(
-          !overwriteDynamic, "Cannot overwrite dynamically and by filter: %s", 
overwriteExpr);
-      this.overwriteByFilter = true;
+      return info.schema();
     }
-    return this;
   }
 
   @Override
   public Write build() {
-    // The write schema should only include row lineage in the output if it's 
an overwrite
-    // operation or if it's a compaction.
-    // In any other case, only null row IDs and sequence numbers would be 
produced which
-    // means the row lineage columns can be excluded from the output files
-    boolean writeRequiresRowLineage = TableUtil.supportsRowLineage(table) && 
overwriteFiles;
-    boolean writeAlreadyIncludesLineage =
-        dsSchema.exists(field -> 
field.name().equals(MetadataColumns.ROW_ID.name()));
-    StructType sparkWriteSchema = dsSchema;
-    if (writeRequiresRowLineage && !writeAlreadyIncludesLineage) {
-      sparkWriteSchema = sparkWriteSchema.add(MetadataColumns.ROW_ID.name(), 
LongType$.MODULE$);
-      sparkWriteSchema =
-          sparkWriteSchema.add(
-              MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), 
LongType$.MODULE$);
-    }
-
-    Schema writeSchema =
-        validateOrMergeWriteSchema(table, sparkWriteSchema, writeConf, 
writeRequiresRowLineage);
-
+    validateRowLineage();
+    Schema writeSchema = mergeSchema ? mergeAndValidateWriteSchema() : 
validateWriteSchema();
     SparkUtil.validatePartitionTransforms(table.spec());
-
-    // Get application id
     String appId = spark.sparkContext().applicationId();
 
     return new SparkWrite(
         spark,
         table,
         writeConf,
-        writeInfo,
+        info,
         appId,
         writeSchema,
-        sparkWriteSchema,
+        sparkWriteSchema(),
         writeRequirements()) {
 
       @Override
       public BatchWrite toBatch() {
-        if (overwriteByFilter) {
-          return asOverwriteByFilter(overwriteExpr);
-        } else if (overwriteDynamic) {
+        if (mode instanceof OverwriteByFilter overwrite) {
+          return asOverwriteByFilter(overwrite.expr());
+        } else if (mode instanceof DynamicOverwrite) {
           return asDynamicOverwrite();
-        } else if (overwriteFiles) {
-          return asCopyOnWriteOperation(copyOnWriteScan, 
copyOnWriteIsolationLevel);
+        } else if (mode instanceof CopyOnWriteOperation cow) {
+          return asCopyOnWriteOperation(cow.scan(), cow.isolationLevel());
         } else {
           return asBatchAppend();
         }
@@ -160,66 +154,75 @@ class SparkWriteBuilder implements WriteBuilder, 
SupportsDynamicOverwrite, Suppo
 
       @Override
       public StreamingWrite toStreaming() {
-        Preconditions.checkState(
-            !overwriteDynamic, "Unsupported streaming operation: dynamic 
partition overwrite");
-        Preconditions.checkState(
-            !overwriteByFilter || overwriteExpr == Expressions.alwaysTrue(),
-            "Unsupported streaming operation: overwrite by filter: %s",
-            overwriteExpr);
-
-        if (overwriteByFilter) {
+        if (mode instanceof OverwriteByFilter overwrite) {
+          Preconditions.checkState(
+              overwrite.expr() == Expressions.alwaysTrue(),
+              "Unsupported streaming overwrite filter: " + overwrite.expr());
           return asStreamingOverwrite();
-        } else {
+        } else if (mode == null || mode instanceof Append) {
           return asStreamingAppend();
+        } else {
+          throw new IllegalStateException("Unsupported streaming write mode: " 
+ mode);
         }
       }
     };
   }
 
   private SparkWriteRequirements writeRequirements() {
-    if (overwriteFiles) {
-      return writeConf.copyOnWriteRequirements(copyOnWriteCommand);
+    if (mode instanceof CopyOnWriteOperation cow) {
+      return writeConf.copyOnWriteRequirements(cow.command());
     } else {
       return writeConf.writeRequirements();
     }
   }
 
-  private static Schema validateOrMergeWriteSchema(
-      Table table, StructType dsSchema, SparkWriteConf writeConf, boolean 
writeIncludesRowLineage) {
-    Schema writeSchema;
-    boolean caseSensitive = writeConf.caseSensitive();
-    if (writeConf.mergeSchema()) {
-      // convert the dataset schema and assign fresh ids for new fields
-      Schema newSchema =
-          SparkSchemaUtil.convertWithFreshIds(table.schema(), dsSchema, 
caseSensitive);
-
-      // update the table to get final id assignments and validate the changes
-      UpdateSchema update =
-          
table.updateSchema().caseSensitive(caseSensitive).unionByNameWith(newSchema);
-      Schema mergedSchema = update.apply();
-      if (writeIncludesRowLineage) {
-        mergedSchema =
-            TypeUtil.join(mergedSchema, 
MetadataColumns.schemaWithRowLineage(table.schema()));
-      }
+  private void validateRowLineage() {
+    Preconditions.checkArgument(
+        writeIncludesRowLineage() || !writeNeedsRowLineage(),
+        "Row lineage information is missing for write in mode: %s",
+        mode);
+  }
 
-      // reconvert the dsSchema without assignment to use the ids assigned by 
UpdateSchema
-      writeSchema = SparkSchemaUtil.convert(mergedSchema, dsSchema, 
caseSensitive);
+  private Schema validateWriteSchema() {
+    Schema writeSchema = SparkSchemaUtil.convert(table.schema(), 
info.schema(), caseSensitive);
+    TypeUtil.validateWriteSchema(table.schema(), writeSchema, 
checkNullability, checkOrdering);
+    return addRowLineageIfNeeded(writeSchema);
+  }
 
-      TypeUtil.validateWriteSchema(
-          mergedSchema, writeSchema, writeConf.checkNullability(), 
writeConf.checkOrdering());
+  // merge schema flow:
+  // - convert Spark schema and assign fresh IDs for new fields
+  // - update table to get final ID assignments and validate changes
+  // - reconvert Spark schema without assignment to use IDs assigned by 
UpdateSchema
+  // - if validation passed, update table schema
+  private Schema mergeAndValidateWriteSchema() {
+    Schema newSchema =
+        SparkSchemaUtil.convertWithFreshIds(table.schema(), info.schema(), 
caseSensitive);
+    UpdateSchema update =
+        
table.updateSchema().caseSensitive(caseSensitive).unionByNameWith(newSchema);
+    Schema mergedSchema = update.apply();
+    Schema writeSchema = SparkSchemaUtil.convert(mergedSchema, info.schema(), 
caseSensitive);
+    TypeUtil.validateWriteSchema(mergedSchema, writeSchema, checkNullability, 
checkOrdering);
+    update.commit();
+    return addRowLineageIfNeeded(writeSchema);
+  }
 
-      // if the validation passed, update the table schema
-      update.commit();
-    } else {
-      Schema schema =
-          writeIncludesRowLineage
-              ? MetadataColumns.schemaWithRowLineage(table.schema())
-              : table.schema();
-      writeSchema = SparkSchemaUtil.convert(schema, dsSchema, caseSensitive);
-      TypeUtil.validateWriteSchema(
-          table.schema(), writeSchema, writeConf.checkNullability(), 
writeConf.checkOrdering());
-    }
+  private Schema addRowLineageIfNeeded(Schema schema) {
+    return writeNeedsRowLineage() ? 
MetadataColumns.schemaWithRowLineage(schema) : schema;
+  }
+
+  sealed interface Mode {
+    // add new data
+    record Append() implements Mode {}
+
+    // overwrite partitions that receive new data (determined at runtime)
+    record DynamicOverwrite() implements Mode {}
+
+    // overwrite data files matching filter expression (a.k.a static overwrite)
+    record OverwriteByFilter(Expression expr) implements Mode {}
 
-    return writeSchema;
+    // copy-on-write operation (UPDATE/DELETE/MERGE) that completely rewrites 
affected files
+    record CopyOnWriteOperation(
+        SparkCopyOnWriteScan scan, Command command, IsolationLevel 
isolationLevel)
+        implements Mode {}
   }
 }

Reply via email to