This is an automated email from the ASF dual-hosted git repository.

RussellSpitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 62fe817f74 Spark 4.1: Add session configs for adaptive split sizing 
and parallelism (#16088)
62fe817f74 is described below

commit 62fe817f7459a904944f5b902867db61816635ee
Author: Karuppayya <[email protected]>
AuthorDate: Thu May 14 17:30:28 2026 -0700

    Spark 4.1: Add session configs for adaptive split sizing and parallelism 
(#16088)
---
 .../org/apache/iceberg/spark/SparkReadConf.java    | 12 +++
 .../apache/iceberg/spark/SparkSQLProperties.java   |  9 ++
 .../org/apache/iceberg/spark/source/SparkScan.java | 13 ++-
 .../apache/iceberg/spark/TestSparkReadConf.java    | 95 ++++++++++++++++++++++
 4 files changed, 127 insertions(+), 2 deletions(-)

diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 36c34251c3..8128babfa3 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -279,6 +279,7 @@ public class SparkReadConf {
   public boolean adaptiveSplitSizeEnabled() {
     return confParser
         .booleanConf()
+        .sessionConf(SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_ENABLED)
         .tableProperty(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED)
         .defaultValue(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT)
         .parse();
@@ -290,6 +291,17 @@ public class SparkReadConf {
     return Math.max(defaultParallelism, numShufflePartitions);
   }
 
+  public int splitParallelism() {
+    int parallelism =
+        confParser
+            .intConf()
+            
.sessionConf(SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM)
+            .defaultValue(parallelism())
+            .parse();
+    Preconditions.checkArgument(parallelism > 0, "Split parallelism must be > 
0: %s", parallelism);
+    return parallelism;
+  }
+
   public boolean distributedPlanningEnabled() {
     return table instanceof SupportsDistributedScanPlanning distributed
         && distributed.allowDistributedPlanning()
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index af549dfd8e..ddedc36c71 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -110,6 +110,15 @@ public class SparkSQLProperties {
   // Prefix for custom snapshot properties
   public static final String SNAPSHOT_PROPERTY_PREFIX = 
"spark.sql.iceberg.snapshot-property.";
 
+  // Controls whether adaptive split sizing is enabled
+  public static final String READ_ADAPTIVE_SPLIT_SIZE_ENABLED =
+      "spark.sql.iceberg.read.adaptive-split-size.enabled";
+
+  // Overrides the parallelism used for adaptive split sizing. When unset, the 
parallelism
+  // defaults to max(spark.default.parallelism, spark.sql.shuffle.partitions).
+  public static final String READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM =
+      "spark.sql.iceberg.read.adaptive-split-size.parallelism";
+
   // Controls whether to enable async micro batch planning for session
   public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
       "spark.sql.iceberg.async-micro-batch-planning-enabled";
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 6b80199a25..ee61523d80 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -369,8 +369,17 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
   protected long adjustSplitSize(List<? extends ScanTask> tasks, long 
splitSize) {
     if (readConf.splitSizeOption() == null && 
readConf.adaptiveSplitSizeEnabled()) {
       long scanSize = tasks.stream().mapToLong(ScanTask::sizeBytes).sum();
-      int parallelism = readConf.parallelism();
-      return TableScanUtil.adjustSplitSize(scanSize, parallelism, splitSize);
+      int parallelism = readConf.splitParallelism();
+      long adjustedSplitSize = TableScanUtil.adjustSplitSize(scanSize, 
parallelism, splitSize);
+      if (adjustedSplitSize != splitSize) {
+        LOG.debug(
+            "Adjusted split size from {} to {} for table {} with parallelism 
{}",
+            splitSize,
+            adjustedSplitSize,
+            table().name(),
+            parallelism);
+      }
+      return adjustedSplitSize;
     } else {
       return splitSize;
     }
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java
new file mode 100644
index 0000000000..c3fc69c8b2
--- /dev/null
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkReadConf.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static 
org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestSparkReadConf extends TestBaseWithCatalog {
+
+  @BeforeEach
+  public void before() {
+    super.before();
+    sql("CREATE TABLE %s (id BIGINT, data STRING) USING iceberg", tableName);
+  }
+
+  @AfterEach
+  public void after() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @TestTemplate
+  public void testSplitParallelismDefault() {
+    Table table = validationCatalog.loadTable(tableIdent);
+    SparkReadConf conf = new SparkReadConf(spark, table, 
CaseInsensitiveStringMap.empty());
+    assertThat(conf.splitParallelism()).isEqualTo(conf.parallelism());
+  }
+
+  @TestTemplate
+  public void testSplitParallelismSessionConf() {
+    Table table = validationCatalog.loadTable(tableIdent);
+    withSQLConf(
+        ImmutableMap.of(
+            SQLConf.SHUFFLE_PARTITIONS().key(),
+            "999",
+            SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM,
+            "42"),
+        () -> {
+          SparkReadConf conf = new SparkReadConf(spark, table, 
CaseInsensitiveStringMap.empty());
+          assertThat(conf.splitParallelism()).isEqualTo(42);
+        });
+  }
+
+  @TestTemplate
+  public void testSplitParallelismRejectsZero() {
+    Table table = validationCatalog.loadTable(tableIdent);
+    withSQLConf(
+        
ImmutableMap.of(SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM, "0"),
+        () -> {
+          SparkReadConf conf = new SparkReadConf(spark, table, 
CaseInsensitiveStringMap.empty());
+          assertThatIllegalArgumentException()
+              .isThrownBy(conf::splitParallelism)
+              .withMessageContaining("Split parallelism must be > 0");
+        });
+  }
+
+  @TestTemplate
+  public void testSplitParallelismRejectsNegative() {
+    Table table = validationCatalog.loadTable(tableIdent);
+    withSQLConf(
+        
ImmutableMap.of(SparkSQLProperties.READ_ADAPTIVE_SPLIT_SIZE_PARALLELISM, "-5"),
+        () -> {
+          SparkReadConf conf = new SparkReadConf(spark, table, 
CaseInsensitiveStringMap.empty());
+          assertThatIllegalArgumentException()
+              .isThrownBy(conf::splitParallelism)
+              .withMessageContaining("Split parallelism must be > 0");
+        });
+  }
+}

Reply via email to