This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new c7ed71e970 Spark 4.1: Align handling of branches in reads and writes 
(#15288)
c7ed71e970 is described below

commit c7ed71e970781ca6c7fa44a3900625600830e9d4
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Feb 16 22:35:36 2026 -0800

    Spark 4.1: Align handling of branches in reads and writes (#15288)
---
 .../apache/iceberg/spark/PlanningBenchmark.java    |   2 +-
 .../iceberg/spark/TaskGroupPlanningBenchmark.java  |   5 +-
 .../iceberg/spark/extensions/TestDelete.java       |  16 ++--
 .../apache/iceberg/spark/extensions/TestMerge.java |  26 +++---
 .../iceberg/spark/extensions/TestUpdate.java       |  16 ++--
 .../org/apache/iceberg/spark/SparkReadConf.java    |  42 +++------
 .../org/apache/iceberg/spark/SparkTableUtil.java   | 101 +++++++++++++++++----
 .../org/apache/iceberg/spark/SparkWriteConf.java   |  46 +++-------
 .../apache/iceberg/spark/SparkWriteOptions.java    |   3 +
 .../apache/iceberg/spark/source/SparkTable.java    |   2 +-
 .../iceberg/SparkDistributedDataScanTestBase.java  |   3 +-
 .../TestSparkDistributedDataScanDeletes.java       |   3 +-
 .../TestSparkDistributedDataScanFilterFiles.java   |   3 +-
 .../TestSparkDistributedDataScanReporting.java     |   3 +-
 .../TestSparkDistributionAndOrderingUtil.java      |   7 +-
 .../iceberg/spark/TestSparkExecutorCache.java      |   6 +-
 .../apache/iceberg/spark/TestSparkWriteConf.java   |  50 +++++-----
 .../spark/actions/TestRewriteDataFilesAction.java  |   2 +-
 .../sql/TestPartitionedWritesToWapBranch.java      |  26 ++++--
 19 files changed, 205 insertions(+), 157 deletions(-)

diff --git 
a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java
 
b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java
index 0eff3a847e..c50a3fd406 100644
--- 
a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java
+++ 
b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java
@@ -398,7 +398,7 @@ public class PlanningBenchmark {
         .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
         .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
         .commit();
-    SparkReadConf readConf = new SparkReadConf(spark, table, 
ImmutableMap.of());
+    SparkReadConf readConf = new SparkReadConf(spark, table);
     return new SparkDistributedDataScan(spark, table, readConf);
   }
 
diff --git 
a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java
 
b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java
index 45c95bf997..8a8097834e 100644
--- 
a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java
+++ 
b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java
@@ -39,7 +39,6 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
 import org.apache.iceberg.util.TableScanUtil;
@@ -107,7 +106,7 @@ public class TaskGroupPlanningBenchmark {
   @Benchmark
   @Threads(1)
   public void planTaskGroups(Blackhole blackhole) {
-    SparkReadConf readConf = new SparkReadConf(spark, table, 
ImmutableMap.of());
+    SparkReadConf readConf = new SparkReadConf(spark, table);
     List<ScanTaskGroup<FileScanTask>> taskGroups =
         TableScanUtil.planTaskGroups(
             fileTasks,
@@ -137,7 +136,7 @@ public class TaskGroupPlanningBenchmark {
   @Benchmark
   @Threads(1)
   public void planTaskGroupsWithGrouping(Blackhole blackhole) {
-    SparkReadConf readConf = new SparkReadConf(spark, table, 
ImmutableMap.of());
+    SparkReadConf readConf = new SparkReadConf(spark, table);
 
     List<ScanTaskGroup<FileScanTask>> taskGroups =
         TableScanUtil.planTaskGroups(
diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index fbf6ce3559..7e0f6207ed 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -1375,15 +1375,17 @@ public abstract class TestDelete extends 
SparkRowLevelOperationsTestBase {
     append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new 
Employee(2, "hr"));
     createBranchIfNeeded();
 
+    // writing to explicit branch should succeed even with WAP branch set
     withSQLConf(
         ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
-        () ->
-            assertThatThrownBy(() -> sql("DELETE FROM %s t WHERE id=0", 
commitTarget()))
-                .isInstanceOf(ValidationException.class)
-                .hasMessage(
-                    String.format(
-                        "Cannot write to both branch and WAP branch, but got 
branch [%s] and WAP branch [wap]",
-                        branch)));
+        () -> {
+          sql("DELETE FROM %s t WHERE id=0", commitTarget());
+
+          assertEquals(
+              "Should have deleted row in explicit branch",
+              ImmutableList.of(row(1, "hr"), row(2, "hr")),
+              sql("SELECT * FROM %s ORDER BY id", commitTarget()));
+        });
   }
 
   @TestTemplate
diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index 3f584031d9..b21d1a4042 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -2983,21 +2983,21 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
     ImmutableList<Object[]> expectedRows =
         ImmutableList.of(row(-1), row(0), row(1), row(2), row(3), row(4));
 
+    // writing to explicit branch should succeed even with WAP branch set
     withSQLConf(
         ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
-        () ->
-            assertThatThrownBy(
-                    () ->
-                        sql(
-                            "MERGE INTO %s t USING source s ON t.id = s.id "
-                                + "WHEN MATCHED THEN UPDATE SET *"
-                                + "WHEN NOT MATCHED THEN INSERT *",
-                            commitTarget()))
-                .isInstanceOf(ValidationException.class)
-                .hasMessage(
-                    String.format(
-                        "Cannot write to both branch and WAP branch, but got 
branch [%s] and WAP branch [wap]",
-                        branch)));
+        () -> {
+          sql(
+              "MERGE INTO %s t USING source s ON t.id = s.id "
+                  + "WHEN MATCHED THEN UPDATE SET *"
+                  + "WHEN NOT MATCHED THEN INSERT *",
+              commitTarget());
+
+          assertEquals(
+              "Should have expected rows in explicit branch",
+              expectedRows,
+              sql("SELECT * FROM %s ORDER BY id", commitTarget()));
+        });
   }
 
   private void checkJoinAndFilterConditions(String query, String join, String 
icebergFilters) {
diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
index 2afbc697e1..79ea5c2138 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
@@ -1500,15 +1500,17 @@ public abstract class TestUpdate extends 
SparkRowLevelOperationsTestBase {
         "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
         tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
 
+    // writing to explicit branch should succeed even with WAP branch set
     withSQLConf(
         ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
-        () ->
-            assertThatThrownBy(() -> sql("UPDATE %s SET dep='hr' WHERE 
dep='a'", commitTarget()))
-                .isInstanceOf(ValidationException.class)
-                .hasMessage(
-                    String.format(
-                        "Cannot write to both branch and WAP branch, but got 
branch [%s] and WAP branch [wap]",
-                        branch)));
+        () -> {
+          sql("UPDATE %s SET dep='software' WHERE dep='hr'", commitTarget());
+
+          assertEquals(
+              "Should have updated row in explicit branch",
+              ImmutableList.of(row(1, "software")),
+              sql("SELECT * FROM %s ORDER BY id", commitTarget()));
+        });
   }
 
   private RowLevelOperationMode mode(Table table) {
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 238919ace7..ec56e5f239 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -20,16 +20,14 @@ package org.apache.iceberg.spark;
 
 import static org.apache.iceberg.PlanningMode.LOCAL;
 
-import java.util.Map;
 import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.SupportsDistributedScanPlanning;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.Util;
-import org.apache.iceberg.util.PropertyUtil;
 import org.apache.spark.SparkConf;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 /**
  * A class for common Iceberg configs for Spark reads.
@@ -58,18 +56,24 @@ public class SparkReadConf {
   private final SparkSession spark;
   private final Table table;
   private final String branch;
+  private final CaseInsensitiveStringMap options;
   private final SparkConfParser confParser;
 
-  public SparkReadConf(SparkSession spark, Table table, Map<String, String> 
readOptions) {
-    this(spark, table, null, readOptions);
+  public SparkReadConf(SparkSession spark, Table table) {
+    this(spark, table, CaseInsensitiveStringMap.empty());
+  }
+
+  public SparkReadConf(SparkSession spark, Table table, 
CaseInsensitiveStringMap options) {
+    this(spark, table, null, options);
   }
 
   public SparkReadConf(
-      SparkSession spark, Table table, String branch, Map<String, String> 
readOptions) {
+      SparkSession spark, Table table, String branch, CaseInsensitiveStringMap 
options) {
     this.spark = spark;
     this.table = table;
     this.branch = branch;
-    this.confParser = new SparkConfParser(spark, table, readOptions);
+    this.options = options;
+    this.confParser = new SparkConfParser(spark, table, options);
   }
 
   public boolean caseSensitive() {
@@ -103,29 +107,7 @@ public class SparkReadConf {
   }
 
   public String branch() {
-    String optionBranch = 
confParser.stringConf().option(SparkReadOptions.BRANCH).parseOptional();
-    ValidationException.check(
-        branch == null || optionBranch == null || optionBranch.equals(branch),
-        "Must not specify different branches in both table identifier and read 
option, "
-            + "got [%s] in identifier and [%s] in options",
-        branch,
-        optionBranch);
-    String inputBranch = branch != null ? branch : optionBranch;
-    if (inputBranch != null) {
-      return inputBranch;
-    }
-
-    boolean wapEnabled =
-        PropertyUtil.propertyAsBoolean(
-            table.properties(), TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, 
false);
-    if (wapEnabled) {
-      String wapBranch = spark.conf().get(SparkSQLProperties.WAP_BRANCH, null);
-      if (wapBranch != null && table.refs().containsKey(wapBranch)) {
-        return wapBranch;
-      }
-    }
-
-    return null;
+    return SparkTableUtil.determineReadBranch(spark, table, branch, options);
   }
 
   public String tag() {
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index 0b4578630a..593154d72a 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -60,7 +60,6 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.TableMigrationUtil;
-import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.hadoop.SerializableConfiguration;
 import org.apache.iceberg.hadoop.Util;
@@ -1025,34 +1024,102 @@ public class SparkTableUtil {
         sparkTable, Option.empty(), Option.empty(), options, Option.empty());
   }
 
+  public static String determineWriteBranch(SparkSession spark, Table table, 
String branch) {
+    return determineWriteBranch(spark, table, branch, 
CaseInsensitiveStringMap.empty());
+  }
+
   /**
    * Determine the write branch.
    *
-   * <p>Validate wap config and determine the write branch.
+   * <p>The target branch can be specified via table identifier, write option, 
or in SQL:
+   *
+   * <ul>
+   *   <li>The identifier and option branches can't conflict. If both are set, 
they must match.
+   *   <li>Identifier and option branches take priority over the session WAP 
branch.
+   *   <li>If neither the option nor the identifier branch is set and WAP is 
enabled for this table,
+   *       use the WAP branch from the session SQL config.
+   * </ul>
+   *
+   * <p>Note: WAP ID and WAP branch cannot be set at the same time.
+   *
+   * <p>Note: The target branch may be created during the write operation if 
it does not exist.
    *
    * @param spark a Spark Session
-   * @param branch write branch if there is no WAP branch configured
-   * @return branch for write operation
+   * @param table the table being written to
+   * @param branch write branch configured via table identifier, or null
+   * @param options write options
+   * @return branch for write operation, or null for main branch
    */
-  public static String determineWriteBranch(SparkSession spark, String branch) 
{
+  public static String determineWriteBranch(
+      SparkSession spark, Table table, String branch, CaseInsensitiveStringMap 
options) {
+    String optionBranch = options.get(SparkWriteOptions.BRANCH);
+    if (optionBranch != null) {
+      Preconditions.checkArgument(
+          branch == null || optionBranch.equals(branch),
+          "Explicitly configured branch [%s] and write option [%s] are in 
conflict",
+          branch,
+          optionBranch);
+      return optionBranch;
+    }
+
+    if (branch == null && wapEnabled(table)) {
+      return wapSessionBranch(spark);
+    }
+
+    return branch;
+  }
+
+  /**
+   * Determine the read branch.
+   *
+   * <p>The target branch can be specified via table identifier, read option, 
or in SQL:
+   *
+   * <ul>
+   *   <li>The identifier and option branches can't conflict. If both are set, 
they must match.
+   *   <li>Identifier and option branches take priority over the session WAP 
branch.
+   *   <li>If neither the option nor the identifier branch is set and WAP is 
enabled for this table,
+   *       use the WAP branch from the session SQL config (only if the branch 
already exists).
+   * </ul>
+   *
+   * <p>Note: WAP ID and WAP branch cannot be set at the same time.
+   *
+   * @param spark a Spark Session
+   * @param table the table being read from
+   * @param branch read branch configured via table identifier, or null
+   * @param options read options
+   * @return branch for read operation, or null for main branch
+   */
+  public static String determineReadBranch(
+      SparkSession spark, Table table, String branch, CaseInsensitiveStringMap 
options) {
+    String optionBranch = options.get(SparkReadOptions.BRANCH);
+    if (optionBranch != null) {
+      Preconditions.checkArgument(
+          branch == null || optionBranch.equals(branch),
+          "Explicitly configured branch [%s] and read option [%s] are in 
conflict",
+          branch,
+          optionBranch);
+      return optionBranch;
+    }
+
+    if (branch == null && wapEnabled(table)) {
+      String wapBranch = wapSessionBranch(spark);
+      if (wapBranch != null && table.refs().containsKey(wapBranch)) {
+        return wapBranch;
+      }
+    }
+
+    return branch;
+  }
+
+  private static String wapSessionBranch(SparkSession spark) {
     String wapId = spark.conf().get(SparkSQLProperties.WAP_ID, null);
     String wapBranch = spark.conf().get(SparkSQLProperties.WAP_BRANCH, null);
-    ValidationException.check(
+    Preconditions.checkArgument(
         wapId == null || wapBranch == null,
         "Cannot set both WAP ID and branch, but got ID [%s] and branch [%s]",
         wapId,
         wapBranch);
-
-    if (wapBranch != null) {
-      ValidationException.check(
-          branch == null,
-          "Cannot write to both branch and WAP branch, but got branch [%s] and 
WAP branch [%s]",
-          branch,
-          wapBranch);
-
-      return wapBranch;
-    }
-    return branch;
+    return wapBranch;
   }
 
   public static boolean wapEnabled(Table table) {
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index 6648d7ea38..d74754e4d3 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -46,7 +46,6 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.deletes.DeleteGranularity;
-import org.apache.iceberg.exceptions.ValidationException;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -55,6 +54,7 @@ import org.apache.spark.sql.RuntimeConfig;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.connector.write.RowLevelOperation.Command;
 import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.JavaConverters;
@@ -88,21 +88,25 @@ public class SparkWriteConf {
   private final Table table;
   private final String branch;
   private final RuntimeConfig sessionConf;
-  private final Map<String, String> writeOptions;
+  private final CaseInsensitiveStringMap options;
   private final SparkConfParser confParser;
 
-  public SparkWriteConf(SparkSession spark, Table table, Map<String, String> 
writeOptions) {
-    this(spark, table, null, writeOptions);
+  public SparkWriteConf(SparkSession spark, Table table) {
+    this(spark, table, null, CaseInsensitiveStringMap.empty());
+  }
+
+  public SparkWriteConf(SparkSession spark, Table table, 
CaseInsensitiveStringMap options) {
+    this(spark, table, null, options);
   }
 
   public SparkWriteConf(
-      SparkSession spark, Table table, String branch, Map<String, String> 
writeOptions) {
+      SparkSession spark, Table table, String branch, CaseInsensitiveStringMap 
options) {
     this.spark = spark;
     this.table = table;
     this.branch = branch;
     this.sessionConf = spark.conf();
-    this.writeOptions = writeOptions;
-    this.confParser = new SparkConfParser(spark, table, writeOptions);
+    this.options = options;
+    this.confParser = new SparkConfParser(spark, table, options);
   }
 
   public boolean checkNullability() {
@@ -124,7 +128,7 @@ public class SparkWriteConf {
   }
 
   public String overwriteMode() {
-    String overwriteMode = writeOptions.get(SparkWriteOptions.OVERWRITE_MODE);
+    String overwriteMode = options.get(SparkWriteOptions.OVERWRITE_MODE);
     return overwriteMode != null ? overwriteMode.toLowerCase(Locale.ROOT) : 
null;
   }
 
@@ -262,7 +266,7 @@ public class SparkWriteConf {
 
     // Add write options, overriding session configuration if necessary
     extraSnapshotMetadata.putAll(
-        PropertyUtil.propertiesWithPrefix(writeOptions, 
SnapshotSummary.EXTRA_METADATA_PREFIX));
+        PropertyUtil.propertiesWithPrefix(options, 
SnapshotSummary.EXTRA_METADATA_PREFIX));
 
     return extraSnapshotMetadata;
   }
@@ -459,29 +463,7 @@ public class SparkWriteConf {
   }
 
   public String branch() {
-    if (wapEnabled()) {
-      String wapId = wapId();
-      String wapBranch =
-          
confParser.stringConf().sessionConf(SparkSQLProperties.WAP_BRANCH).parseOptional();
-
-      ValidationException.check(
-          wapId == null || wapBranch == null,
-          "Cannot set both WAP ID and branch, but got ID [%s] and branch [%s]",
-          wapId,
-          wapBranch);
-
-      if (wapBranch != null) {
-        ValidationException.check(
-            branch == null,
-            "Cannot write to both branch and WAP branch, but got branch [%s] 
and WAP branch [%s]",
-            branch,
-            wapBranch);
-
-        return wapBranch;
-      }
-    }
-
-    return branch;
+    return SparkTableUtil.determineWriteBranch(spark, table, branch, options);
   }
 
   public Map<String, String> writeProperties() {
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
index 40816eef2f..86c27acd88 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
@@ -23,6 +23,9 @@ public class SparkWriteOptions {
 
   private SparkWriteOptions() {}
 
+  // Overrides the target branch for write operations
+  public static final String BRANCH = "branch";
+
   // Fileformat for write operations(default: Table write.format.default )
   public static final String WRITE_FORMAT = "write-format";
 
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 335d0f72fd..24915a1bfc 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -390,7 +390,7 @@ public class SparkTable
             .deleteFromRowFilter(deleteExpr);
 
     if (SparkTableUtil.wapEnabled(table())) {
-      branch = SparkTableUtil.determineWriteBranch(sparkSession(), branch);
+      branch = SparkTableUtil.determineWriteBranch(sparkSession(), 
icebergTable, branch);
     }
 
     if (branch != null) {
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
index 404ba72846..aa4f3dc724 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
@@ -23,7 +23,6 @@ import static org.apache.iceberg.PlanningMode.LOCAL;
 
 import java.util.Arrays;
 import java.util.List;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.spark.SparkReadConf;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.internal.SQLConf;
@@ -81,7 +80,7 @@ public abstract class SparkDistributedDataScanTestBase
 
   @Override
   protected BatchScan newScan() {
-    SparkReadConf readConf = new SparkReadConf(spark, table, 
ImmutableMap.of());
+    SparkReadConf readConf = new SparkReadConf(spark, table);
     return new SparkDistributedDataScan(spark, table, readConf);
   }
 
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
index 659507e4c5..6ffaede5b0 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
@@ -23,7 +23,6 @@ import static org.apache.iceberg.PlanningMode.LOCAL;
 
 import java.util.Arrays;
 import java.util.List;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.spark.SparkReadConf;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.internal.SQLConf;
@@ -85,7 +84,7 @@ public class TestSparkDistributedDataScanDeletes
 
   @Override
   protected BatchScan newScan(Table table) {
-    SparkReadConf readConf = new SparkReadConf(spark, table, 
ImmutableMap.of());
+    SparkReadConf readConf = new SparkReadConf(spark, table);
     return new SparkDistributedDataScan(spark, table, readConf);
   }
 }
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
index a218f965ea..1e680ace29 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
@@ -21,7 +21,6 @@ package org.apache.iceberg;
 import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
 import static org.apache.iceberg.PlanningMode.LOCAL;
 
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.spark.SparkReadConf;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.internal.SQLConf;
@@ -79,7 +78,7 @@ public class TestSparkDistributedDataScanFilterFiles
         .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
         .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
         .commit();
-    SparkReadConf readConf = new SparkReadConf(spark, table, 
ImmutableMap.of());
+    SparkReadConf readConf = new SparkReadConf(spark, table);
     return new SparkDistributedDataScan(spark, table, readConf);
   }
 }
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
index 2665d7ba8d..9b736004de 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
@@ -23,7 +23,6 @@ import static org.apache.iceberg.PlanningMode.LOCAL;
 
 import java.util.Arrays;
 import java.util.List;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.spark.SparkReadConf;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.internal.SQLConf;
@@ -80,7 +79,7 @@ public class TestSparkDistributedDataScanReporting
         .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
         .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
         .commit();
-    SparkReadConf readConf = new SparkReadConf(spark, table, 
ImmutableMap.of());
+    SparkReadConf readConf = new SparkReadConf(spark, table);
     return new SparkDistributedDataScan(spark, table, readConf);
   }
 }
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
index ca86350346..21b67b89a9 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
@@ -34,7 +34,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.spark.sql.connector.distributions.Distribution;
 import org.apache.spark.sql.connector.distributions.Distributions;
 import org.apache.spark.sql.connector.expressions.Expression;
@@ -2976,7 +2975,7 @@ public class TestSparkDistributionAndOrderingUtil extends 
TestBaseWithCatalog {
 
   private void checkWriteDistributionAndOrdering(
       Table table, Distribution expectedDistribution, SortOrder[] 
expectedOrdering) {
-    SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
ImmutableMap.of());
+    SparkWriteConf writeConf = new SparkWriteConf(spark, table);
 
     SparkWriteRequirements requirements = writeConf.writeRequirements();
 
@@ -2992,7 +2991,7 @@ public class TestSparkDistributionAndOrderingUtil extends 
TestBaseWithCatalog {
       Command command,
       Distribution expectedDistribution,
       SortOrder[] expectedOrdering) {
-    SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
ImmutableMap.of());
+    SparkWriteConf writeConf = new SparkWriteConf(spark, table);
 
     SparkWriteRequirements requirements = 
writeConf.copyOnWriteRequirements(command);
 
@@ -3008,7 +3007,7 @@ public class TestSparkDistributionAndOrderingUtil extends 
TestBaseWithCatalog {
       Command command,
       Distribution expectedDistribution,
       SortOrder[] expectedOrdering) {
-    SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
ImmutableMap.of());
+    SparkWriteConf writeConf = new SparkWriteConf(spark, table);
 
     SparkWriteRequirements requirements = 
writeConf.positionDeltaRequirements(command);
 
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
index a411c3fc70..607955971d 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
@@ -191,7 +191,7 @@ public class TestSparkExecutorCache extends 
TestBaseWithCatalog {
             SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "true",
             SparkSQLProperties.EXECUTOR_CACHE_DELETE_FILES_ENABLED, "false"),
         () -> {
-          SparkReadConf readConf = new SparkReadConf(spark, table, 
Collections.emptyMap());
+          SparkReadConf readConf = new SparkReadConf(spark, table);
           assertThat(readConf.cacheDeleteFilesOnExecutors()).isFalse();
         });
 
@@ -200,7 +200,7 @@ public class TestSparkExecutorCache extends 
TestBaseWithCatalog {
             SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "true",
             SparkSQLProperties.EXECUTOR_CACHE_DELETE_FILES_ENABLED, "true"),
         () -> {
-          SparkReadConf readConf = new SparkReadConf(spark, table, 
Collections.emptyMap());
+          SparkReadConf readConf = new SparkReadConf(spark, table);
           assertThat(readConf.cacheDeleteFilesOnExecutors()).isTrue();
         });
 
@@ -209,7 +209,7 @@ public class TestSparkExecutorCache extends 
TestBaseWithCatalog {
             SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "false",
             SparkSQLProperties.EXECUTOR_CACHE_DELETE_FILES_ENABLED, "true"),
         () -> {
-          SparkReadConf readConf = new SparkReadConf(spark, table, 
Collections.emptyMap());
+          SparkReadConf readConf = new SparkReadConf(spark, table);
           assertThat(readConf.cacheDeleteFilesOnExecutors()).isFalse();
         });
   }
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
index 61aacfa458..227b93dfa4 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
@@ -60,6 +60,7 @@ import org.apache.iceberg.deletes.DeleteGranularity;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
@@ -143,7 +144,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog 
{
   @TestTemplate
   public void testDeleteGranularityDefault() {
     Table table = validationCatalog.loadTable(tableIdent);
-    SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
ImmutableMap.of());
+    SparkWriteConf writeConf = new SparkWriteConf(spark, table);
 
     DeleteGranularity value = writeConf.deleteGranularity();
     assertThat(value).isEqualTo(DeleteGranularity.FILE);
@@ -158,7 +159,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog 
{
         .set(TableProperties.DELETE_GRANULARITY, 
DeleteGranularity.PARTITION.toString())
         .commit();
 
-    SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
ImmutableMap.of());
+    SparkWriteConf writeConf = new SparkWriteConf(spark, table);
 
     DeleteGranularity value = writeConf.deleteGranularity();
     assertThat(value).isEqualTo(DeleteGranularity.PARTITION);
@@ -176,7 +177,8 @@ public class TestSparkWriteConf extends TestBaseWithCatalog 
{
     Map<String, String> options =
         ImmutableMap.of(SparkWriteOptions.DELETE_GRANULARITY, 
DeleteGranularity.FILE.toString());
 
-    SparkWriteConf writeConf = new SparkWriteConf(spark, table, options);
+    SparkWriteConf writeConf =
+        new SparkWriteConf(spark, table, new 
CaseInsensitiveStringMap(options));
 
     DeleteGranularity value = writeConf.deleteGranularity();
     assertThat(value).isEqualTo(DeleteGranularity.FILE);
@@ -188,7 +190,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog 
{
 
     table.updateProperties().set(TableProperties.DELETE_GRANULARITY, 
"invalid").commit();
 
-    SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
ImmutableMap.of());
+    SparkWriteConf writeConf = new SparkWriteConf(spark, table);
 
     assertThatThrownBy(writeConf::deleteGranularity)
         .isInstanceOf(IllegalArgumentException.class)
@@ -199,7 +201,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog 
{
   public void testAdvisoryPartitionSize() {
     Table table = validationCatalog.loadTable(tableIdent);
 
-    SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
ImmutableMap.of());
+    SparkWriteConf writeConf = new SparkWriteConf(spark, table);
 
     long value1 = writeConf.writeRequirements().advisoryPartitionSize();
     assertThat(value1).isGreaterThan(64L * 1024 * 1024).isLessThan(2L * 1024 * 
1024 * 1024);
@@ -217,7 +219,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog 
{
   public void testSparkWriteConfDistributionDefault() {
     Table table = validationCatalog.loadTable(tableIdent);
 
-    SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
ImmutableMap.of());
+    SparkWriteConf writeConf = new SparkWriteConf(spark, table);
 
     checkMode(DistributionMode.HASH, writeConf);
   }
@@ -226,8 +228,9 @@ public class TestSparkWriteConf extends TestBaseWithCatalog 
{
   public void testSparkWriteConfDistributionModeWithWriteOption() {
     Table table = validationCatalog.loadTable(tableIdent);
 
-    Map<String, String> writeOptions =
-        ImmutableMap.of(SparkWriteOptions.DISTRIBUTION_MODE, 
DistributionMode.NONE.modeName());
+    CaseInsensitiveStringMap writeOptions =
+        new CaseInsensitiveStringMap(
+            ImmutableMap.of(SparkWriteOptions.DISTRIBUTION_MODE, 
DistributionMode.NONE.modeName()));
 
     SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions);
     checkMode(DistributionMode.NONE, writeConf);
@@ -239,7 +242,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog 
{
         ImmutableMap.of(SparkSQLProperties.DISTRIBUTION_MODE, 
DistributionMode.NONE.modeName()),
         () -> {
           Table table = validationCatalog.loadTable(tableIdent);
-          SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
ImmutableMap.of());
+          SparkWriteConf writeConf = new SparkWriteConf(spark, table);
           checkMode(DistributionMode.NONE, writeConf);
         });
   }
@@ -256,7 +259,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog 
{
         .set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE)
         .commit();
 
-    SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
ImmutableMap.of());
+    SparkWriteConf writeConf = new SparkWriteConf(spark, table);
     checkMode(DistributionMode.NONE, writeConf);
   }
 
@@ -275,7 +278,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog 
{
               .set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE)
               .commit();
 
-          SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
ImmutableMap.of());
+          SparkWriteConf writeConf = new SparkWriteConf(spark, table);
           // session config overwrite the table properties
           checkMode(DistributionMode.NONE, writeConf);
         });
@@ -288,9 +291,10 @@ public class TestSparkWriteConf extends 
TestBaseWithCatalog {
         () -> {
           Table table = validationCatalog.loadTable(tableIdent);
 
-          Map<String, String> writeOptions =
-              ImmutableMap.of(
-                  SparkWriteOptions.DISTRIBUTION_MODE, 
DistributionMode.NONE.modeName());
+          CaseInsensitiveStringMap writeOptions =
+              new CaseInsensitiveStringMap(
+                  ImmutableMap.of(
+                      SparkWriteOptions.DISTRIBUTION_MODE, 
DistributionMode.NONE.modeName()));
 
           SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
writeOptions);
           // write options overwrite the session config
@@ -305,9 +309,10 @@ public class TestSparkWriteConf extends 
TestBaseWithCatalog {
         () -> {
           Table table = validationCatalog.loadTable(tableIdent);
 
-          Map<String, String> writeOptions =
-              ImmutableMap.of(
-                  SparkWriteOptions.DISTRIBUTION_MODE, 
DistributionMode.NONE.modeName());
+          CaseInsensitiveStringMap writeOptions =
+              new CaseInsensitiveStringMap(
+                  ImmutableMap.of(
+                      SparkWriteOptions.DISTRIBUTION_MODE, 
DistributionMode.NONE.modeName()));
 
           table
               .updateProperties()
@@ -402,7 +407,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog 
{
         ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key", 
"session-value"),
         () -> {
           Table table = validationCatalog.loadTable(tableIdent);
-          SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
ImmutableMap.of());
+          SparkWriteConf writeConf = new SparkWriteConf(spark, table);
 
           Map<String, String> metadata = writeConf.extraSnapshotMetadata();
 
@@ -416,8 +421,9 @@ public class TestSparkWriteConf extends TestBaseWithCatalog 
{
         ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key", 
"session-value"),
         () -> {
           Table table = validationCatalog.loadTable(tableIdent);
-          Map<String, String> writeOptions =
-              ImmutableMap.of("snapshot-property.test-key", 
"write-option-value");
+          CaseInsensitiveStringMap writeOptions =
+              new CaseInsensitiveStringMap(
+                  ImmutableMap.of("snapshot-property.test-key", 
"write-option-value"));
           SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
writeOptions);
 
           Map<String, String> metadata = writeConf.extraSnapshotMetadata();
@@ -596,7 +602,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog 
{
   public void testDVWriteConf() {
     Table table = validationCatalog.loadTable(tableIdent);
     table.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit();
-    SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
ImmutableMap.of());
+    SparkWriteConf writeConf = new SparkWriteConf(spark, table);
     assertThat(writeConf.deleteFileFormat()).isEqualTo(FileFormat.PUFFIN);
   }
 
@@ -613,7 +619,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog 
{
 
           updateProperties.commit();
 
-          SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
ImmutableMap.of());
+          SparkWriteConf writeConf = new SparkWriteConf(spark, table);
           Map<String, String> writeProperties = writeConf.writeProperties();
           Map<String, String> expectedProperties = propertiesSuite.get(2);
           
assertThat(writeConf.writeProperties()).hasSameSizeAs(expectedProperties);
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 6d965f3dcc..c76c75c9a3 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -2105,7 +2105,7 @@ public class TestRewriteDataFilesAction extends TestBase {
     RewriteDataFilesSparkAction action = 
SparkActions.get(spark).rewriteDataFiles(table);
 
     // The constructor should have set the configuration to false
-    SparkReadConf readConf = new SparkReadConf(action.spark(), table, 
Collections.emptyMap());
+    SparkReadConf readConf = new SparkReadConf(action.spark(), table);
     assertThat(readConf.cacheDeleteFilesOnExecutors())
         .as("Executor cache for delete files should be disabled in 
RewriteDataFilesSparkAction")
         .isFalse();
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWapBranch.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWapBranch.java
index 45268b78f8..1b6334f23e 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWapBranch.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWapBranch.java
@@ -25,7 +25,7 @@ import java.util.UUID;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.spark.SparkSQLProperties;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -61,15 +61,25 @@ public class TestPartitionedWritesToWapBranch extends 
PartitionedWritesTestBase
   }
 
   @TestTemplate
-  public void testBranchAndWapBranchCannotBothBeSetForWrite() {
+  public void testWriteToBranchWithWapBranchSet() {
     Table table = validationCatalog.loadTable(tableIdent);
     table.manageSnapshots().createBranch("test2", 
table.refs().get(BRANCH).snapshotId()).commit();
     sql("REFRESH TABLE " + tableName);
-    assertThatThrownBy(() -> sql("INSERT INTO %s.branch_test2 VALUES (4, 
'd')", tableName))
-        .isInstanceOf(ValidationException.class)
-        .hasMessage(
-            "Cannot write to both branch and WAP branch, but got branch 
[test2] and WAP branch [%s]",
-            BRANCH);
+
+    // writing to explicit branch should succeed even with WAP branch set
+    sql("INSERT INTO TABLE %s.branch_test2 VALUES (4, 'd')", tableName);
+
+    // verify write went to branch test2
+    assertEquals(
+        "Data should be written to branch test2",
+        ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c"), row(4L, 
"d")),
+        sql("SELECT * FROM %s VERSION AS OF 'test2' ORDER BY id", tableName));
+
+    // verify current state is not affected
+    assertEquals(
+        "Data should be written to branch test2",
+        ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
   }
 
   @TestTemplate
@@ -77,7 +87,7 @@ public class TestPartitionedWritesToWapBranch extends 
PartitionedWritesTestBase
     String wapId = UUID.randomUUID().toString();
     spark.conf().set(SparkSQLProperties.WAP_ID, wapId);
     assertThatThrownBy(() -> sql("INSERT INTO %s VALUES (4, 'd')", tableName))
-        .isInstanceOf(ValidationException.class)
+        .isInstanceOf(IllegalArgumentException.class)
         .hasMessage(
             "Cannot set both WAP ID and branch, but got ID [%s] and branch 
[%s]", wapId, BRANCH);
   }


Reply via email to