This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 317968426afb0a254ee3b8cd930578c774ac4fdb
Author: manuzhang <[email protected]>
AuthorDate: Mon Dec 1 15:03:07 2025 +0800

    Spark: Initial support for 4.1.0
---
 .github/workflows/spark-ci.yml                     |  6 ++++-
 .gitignore                                         |  2 ++
 dev/stage-binaries.sh                              |  2 +-
 gradle.properties                                  |  4 ++--
 gradle/libs.versions.toml                          |  1 +
 jmh.gradle                                         |  5 +++++
 settings.gradle                                    | 12 ++++++++++
 spark/build.gradle                                 |  4 ++++
 spark/v4.1/build.gradle                            | 24 ++++++++++++++------
 .../apache/iceberg/DeleteFileIndexBenchmark.java   |  4 ++--
 .../spark/MergeCardinalityCheckBenchmark.java      |  4 ++--
 .../apache/iceberg/spark/PlanningBenchmark.java    |  4 ++--
 .../iceberg/spark/TaskGroupPlanningBenchmark.java  |  4 ++--
 .../spark/sql/catalyst/analysis/ResolveViews.scala |  6 ++---
 .../spark/sql/catalyst/analysis/ViewUtil.scala     |  2 +-
 .../IcebergSparkSqlExtensionsParser.scala          |  2 +-
 .../apache/iceberg/spark/extensions/TestMerge.java | 23 +------------------
 .../spark/extensions/TestMetadataTables.java       |  2 +-
 .../extensions/TestRewriteDataFilesProcedure.java  |  2 +-
 .../extensions/TestRewriteManifestsProcedure.java  |  2 +-
 .../TestRewritePositionDeleteFilesProcedure.java   |  2 +-
 .../iceberg/spark/extensions/TestUpdate.java       |  2 +-
 .../spark/action/DeleteOrphanFilesBenchmark.java   |  4 ++--
 .../SparkParquetReadersFlatDataBenchmark.java      |  4 ++--
 .../SparkParquetReadersNestedDataBenchmark.java    |  4 ++--
 .../SparkParquetWritersFlatDataBenchmark.java      |  4 ++--
 .../SparkParquetWritersNestedDataBenchmark.java    |  4 ++--
 .../spark/source/avro/AvroWritersBenchmark.java    |  4 ++--
 .../IcebergSourceFlatAvroDataReadBenchmark.java    |  4 ++--
 .../IcebergSourceNestedAvroDataReadBenchmark.java  |  4 ++--
 .../orc/IcebergSourceFlatORCDataReadBenchmark.java |  4 ++--
 ...ebergSourceNestedListORCDataWriteBenchmark.java |  4 ++--
 .../IcebergSourceNestedORCDataReadBenchmark.java   |  4 ++--
 ...cebergSourceFlatParquetDataFilterBenchmark.java |  4 ++--
 .../IcebergSourceFlatParquetDataReadBenchmark.java |  4 ++--
 ...IcebergSourceFlatParquetDataWriteBenchmark.java |  4 ++--
 ...gSourceNestedListParquetDataWriteBenchmark.java |  4 ++--
 ...bergSourceNestedParquetDataFilterBenchmark.java |  4 ++--
 ...cebergSourceNestedParquetDataReadBenchmark.java |  4 ++--
 ...ebergSourceNestedParquetDataWriteBenchmark.java |  4 ++--
 .../IcebergSourceParquetEqDeleteBenchmark.java     |  4 ++--
 ...ebergSourceParquetMultiDeleteFileBenchmark.java |  4 ++--
 .../IcebergSourceParquetPosDeleteBenchmark.java    |  4 ++--
 ...gSourceParquetWithUnrelatedDeleteBenchmark.java |  4 ++--
 .../source/parquet/ParquetWritersBenchmark.java    |  4 ++--
 ...dDictionaryEncodedFlatParquetDataBenchmark.java |  4 ++--
 .../VectorizedReadFlatParquetDataBenchmark.java    |  4 ++--
 .../VectorizedReadParquetDecimalBenchmark.java     |  4 ++--
 .../java/org/apache/iceberg/spark/BaseCatalog.java | 11 +++++++++
 .../org/apache/iceberg/spark/SparkTableUtil.java   |  3 ++-
 .../iceberg/spark/data/SparkParquetReaders.java    | 12 ++++++++++
 .../iceberg/spark/procedures/SparkProcedures.java  |  5 +++++
 .../iceberg/spark/source/StructInternalRow.java    | 12 ++++++++++
 .../iceberg/spark/data/TestSparkParquetReader.java |  1 +
 .../spark/source/TestForwardCompatibility.java     | 10 ++++-----
 .../spark/source/TestStructuredStreaming.java      | 16 ++++++-------
 .../iceberg/spark/sql/TestSparkDefaultValues.java  | 26 ----------------------
 57 files changed, 175 insertions(+), 144 deletions(-)

diff --git a/.github/workflows/spark-ci.yml b/.github/workflows/spark-ci.yml
index be4083714d..ecc973e32a 100644
--- a/.github/workflows/spark-ci.yml
+++ b/.github/workflows/spark-ci.yml
@@ -72,17 +72,21 @@ jobs:
     strategy:
       matrix:
         jvm: [11, 17, 21]
-        spark: ['3.4', '3.5', '4.0']
+        spark: ['3.4', '3.5', '4.0', '4.1']
         scala: ['2.12', '2.13']
         exclude:
           # Spark 3.5 is the first version not failing on Java 21 
(https://issues.apache.org/jira/browse/SPARK-42369)
           # Full Java 21 support is coming in Spark 4 
(https://issues.apache.org/jira/browse/SPARK-43831)
           - jvm: 11
             spark: '4.0'
+          - jvm: 11
+            spark: '4.1'
           - jvm: 21
             spark: '3.4'
           - spark: '4.0'
             scala: '2.12'
+          - spark: '4.1'
+            scala: '2.12'
     env:
       SPARK_LOCAL_IP: localhost
     steps:
diff --git a/.gitignore b/.gitignore
index f931c10e94..bcac4d1610 100644
--- a/.gitignore
+++ b/.gitignore
@@ -38,6 +38,8 @@ spark/v3.5/spark/benchmark/*
 spark/v3.5/spark-extensions/benchmark/*
 spark/v4.0/spark/benchmark/*
 spark/v4.0/spark-extensions/benchmark/*
+spark/v4.1/spark/benchmark/*
+spark/v4.1/spark-extensions/benchmark/*
 */benchmark/*
 
 __pycache__/
diff --git a/dev/stage-binaries.sh b/dev/stage-binaries.sh
index ec3080575b..50f984eb40 100755
--- a/dev/stage-binaries.sh
+++ b/dev/stage-binaries.sh
@@ -20,7 +20,7 @@
 
 SCALA_VERSION=2.12
 FLINK_VERSIONS=1.20,2.0,2.1
-SPARK_VERSIONS=3.4,3.5,4.0
+SPARK_VERSIONS=3.4,3.5,4.0,4.1
 KAFKA_VERSIONS=3
 
 ./gradlew -Prelease -DscalaVersion=$SCALA_VERSION 
-DflinkVersions=$FLINK_VERSIONS -DsparkVersions=$SPARK_VERSIONS 
-DkafkaVersions=$KAFKA_VERSIONS publishApachePublicationToMavenRepository 
--no-parallel --no-configuration-cache
diff --git a/gradle.properties b/gradle.properties
index 0f70b49eb7..c0f283303a 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -18,8 +18,8 @@ jmhJsonOutputPath=build/reports/jmh/results.json
 jmhIncludeRegex=.*
 systemProp.defaultFlinkVersions=2.1
 systemProp.knownFlinkVersions=1.20,2.0,2.1
-systemProp.defaultSparkVersions=4.0
-systemProp.knownSparkVersions=3.4,3.5,4.0
+systemProp.defaultSparkVersions=4.1
+systemProp.knownSparkVersions=3.4,3.5,4.0,4.1
 systemProp.defaultKafkaVersions=3
 systemProp.knownKafkaVersions=3
 systemProp.defaultScalaVersion=2.12
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 5e2d5435eb..4fbba96317 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -87,6 +87,7 @@ snowflake-jdbc = "3.28.0"
 spark34 = "3.4.4"
 spark35 = "3.5.7"
 spark40 = "4.0.1"
+spark41 = "4.1.0"
 sqlite-jdbc = "3.51.1.0"
 testcontainers = "2.0.3"
 tez08 = { strictly = "0.8.4"}  # see rich version usage explanation above
diff --git a/jmh.gradle b/jmh.gradle
index 57efb3821d..d2c4709bf3 100644
--- a/jmh.gradle
+++ b/jmh.gradle
@@ -53,6 +53,11 @@ if (sparkVersions.contains("4.0")) {
   jmhProjects.add(project(":iceberg-spark:iceberg-spark-extensions-4.0_2.13"))
 }
 
+if (sparkVersions.contains("4.1")) {
+    jmhProjects.add(project(":iceberg-spark:iceberg-spark-4.1_2.13"))
+    
jmhProjects.add(project(":iceberg-spark:iceberg-spark-extensions-4.1_2.13"))
+}
+
 configure(jmhProjects) {
   apply plugin: 'me.champeau.jmh'
   apply plugin: 'io.morethan.jmhreport'
diff --git a/settings.gradle b/settings.gradle
index de342dda14..70f9343a25 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -175,6 +175,18 @@ if (sparkVersions.contains("4.0")) {
   project(":iceberg-spark:spark-runtime-4.0_2.13").name = 
"iceberg-spark-runtime-4.0_2.13"
 }
 
+if (sparkVersions.contains("4.1")) {
+    include ":iceberg-spark:spark-4.1_2.13"
+    include ":iceberg-spark:spark-extensions-4.1_2.13"
+    include ":iceberg-spark:spark-runtime-4.1_2.13"
+    project(":iceberg-spark:spark-4.1_2.13").projectDir = 
file('spark/v4.1/spark')
+    project(":iceberg-spark:spark-4.1_2.13").name = "iceberg-spark-4.1_2.13"
+    project(":iceberg-spark:spark-extensions-4.1_2.13").projectDir = 
file('spark/v4.1/spark-extensions')
+    project(":iceberg-spark:spark-extensions-4.1_2.13").name = 
"iceberg-spark-extensions-4.1_2.13"
+    project(":iceberg-spark:spark-runtime-4.1_2.13").projectDir = 
file('spark/v4.1/spark-runtime')
+    project(":iceberg-spark:spark-runtime-4.1_2.13").name = 
"iceberg-spark-runtime-4.1_2.13"
+}
+
 if (kafkaVersions.contains("3")) {
   include 'kafka-connect'
   project(':kafka-connect').name = 'iceberg-kafka-connect'
diff --git a/spark/build.gradle b/spark/build.gradle
index 75d3f899e5..4d4a84fd39 100644
--- a/spark/build.gradle
+++ b/spark/build.gradle
@@ -31,3 +31,7 @@ if (sparkVersions.contains("3.5")) {
 if (sparkVersions.contains("4.0")) {
   apply from: file("$projectDir/v4.0/build.gradle")
 }
+
+if (sparkVersions.contains("4.1")) {
+    apply from: file("$projectDir/v4.1/build.gradle")
+}
diff --git a/spark/v4.1/build.gradle b/spark/v4.1/build.gradle
index 8ebed9bd43..14a07ac543 100644
--- a/spark/v4.1/build.gradle
+++ b/spark/v4.1/build.gradle
@@ -17,13 +17,13 @@
  * under the License.
  */
 
-String sparkMajorVersion = '4.0'
+String sparkMajorVersion = '4.1'
 String scalaVersion = '2.13'
 
 JavaVersion javaVersion = JavaVersion.current()
 Boolean javaVersionSupported = javaVersion == JavaVersion.VERSION_17 || 
javaVersion == JavaVersion.VERSION_21
 if (!javaVersionSupported) {
-  logger.warn("Skip Spark 4.0 build which requires JDK 17 or 21 but was 
executed with JDK " + javaVersion)
+  logger.warn("Skip Spark 4.1 build which requires JDK 17 or 21 but was 
executed with JDK " + javaVersion)
 }
 
 def sparkProjects = [
@@ -51,6 +51,14 @@ 
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
   apply plugin: 'scala'
   apply plugin: 'com.github.alisiikh.scalastyle'
 
+  // Set target to JDK17 for Spark 4.1 to fix following error
+  // 
"spark/v4.1/spark/src/main/scala/org/apache/spark/sql/stats/ThetaSketchAgg.scala:52:12:
 Class java.lang.Record not found"
+  tasks.withType(ScalaCompile.class) {
+    sourceCompatibility = "17"
+    targetCompatibility = "17"
+    scalaCompileOptions.additionalParameters.add("-release:17")
+  }
+
   sourceSets {
     main {
       scala.srcDirs = ['src/main/scala', 'src/main/java']
@@ -74,7 +82,7 @@ 
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
 
     compileOnly libs.errorprone.annotations
     compileOnly libs.avro.avro
-    
compileOnly("org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark40.get()}")
 {
+    
compileOnly("org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark41.get()}")
 {
       exclude group: 'org.apache.avro', module: 'avro'
       exclude group: 'org.apache.arrow'
       exclude group: 'org.apache.parquet'
@@ -84,7 +92,8 @@ 
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
       exclude group: 'org.roaringbitmap'
     }
 
-    compileOnly 
"org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_2.13:${libs.versions.comet.get()}"
+    // TODO: datafusion-comet Spark 4.1 support
+    compileOnly 
"org.apache.datafusion:comet-spark-spark4.0_2.13:${libs.versions.comet.get()}"
 
     implementation libs.parquet.column
     implementation libs.parquet.hadoop
@@ -165,7 +174,7 @@ 
project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
     compileOnly project(':iceberg-core')
     compileOnly project(':iceberg-common')
     compileOnly 
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}")
-    
compileOnly("org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark40.get()}")
 {
+    
compileOnly("org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark41.get()}")
 {
       exclude group: 'org.apache.avro', module: 'avro'
       exclude group: 'org.apache.arrow'
       exclude group: 'org.apache.parquet'
@@ -194,7 +203,8 @@ 
project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
     testImplementation libs.avro.avro
     testImplementation libs.parquet.hadoop
     testImplementation libs.awaitility
-    testImplementation 
"org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_2.13:${libs.versions.comet.get()}"
+    // TODO: datafusion-comet Spark 4.1 support
+    testImplementation 
"org.apache.datafusion:comet-spark-spark4.0_2.13:${libs.versions.comet.get()}"
     testImplementation(testFixtures(project(':iceberg-parquet')))
 
     // Required because we remove antlr plugin dependencies from the compile 
configuration, see note above
@@ -267,7 +277,7 @@ 
project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
     }
 
     integrationImplementation 
"org.scala-lang.modules:scala-collection-compat_${scalaVersion}:${libs.versions.scala.collection.compat.get()}"
-    integrationImplementation 
"org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark40.get()}"
+    integrationImplementation 
"org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark41.get()}"
     integrationImplementation libs.junit.jupiter
     integrationImplementation libs.junit.platform.launcher
     integrationImplementation libs.slf4j.simple
diff --git 
a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java
 
b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java
index 9375ca3a4f..a8b226ea1e 100644
--- 
a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java
+++ 
b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java
@@ -54,8 +54,8 @@ import org.openjdk.jmh.infra.Blackhole;
 /**
  * A benchmark that evaluates the delete file index build and lookup 
performance.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 
:iceberg-spark:iceberg-spark-extensions-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 
:iceberg-spark:iceberg-spark-extensions-4.1_2.13:jmh
  *       -PjmhIncludeRegex=DeleteFileIndexBenchmark
  *       -PjmhOutputPath=benchmark/iceberg-delete-file-index-benchmark.txt
  * </code>
diff --git 
a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java
 
b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java
index 963daa2c36..eeea816345 100644
--- 
a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java
+++ 
b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java
@@ -53,8 +53,8 @@ import org.openjdk.jmh.annotations.Warmup;
 /**
  * A benchmark that evaluates the performance of the cardinality check in 
MERGE operations.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 
:iceberg-spark:iceberg-spark-extensions-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 
:iceberg-spark:iceberg-spark-extensions-4.1_2.13:jmh
  *       -PjmhIncludeRegex=MergeCardinalityCheckBenchmark
  *       
-PjmhOutputPath=benchmark/iceberg-merge-cardinality-check-benchmark.txt
  * </code>
diff --git 
a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java
 
b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java
index 34d9d70e6c..0eff3a847e 100644
--- 
a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java
+++ 
b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java
@@ -73,8 +73,8 @@ import org.openjdk.jmh.infra.Blackhole;
 /**
  * A benchmark that evaluates the job planning performance.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 
:iceberg-spark:iceberg-spark-extensions-4.0_2.12:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 
:iceberg-spark:iceberg-spark-extensions-4.1_2.13:jmh
  *       -PjmhIncludeRegex=PlanningBenchmark
  *       -PjmhOutputPath=benchmark/iceberg-planning-benchmark.txt
  * </code>
diff --git 
a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java
 
b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java
index 7c2def2378..45c95bf997 100644
--- 
a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java
+++ 
b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java
@@ -63,8 +63,8 @@ import org.openjdk.jmh.infra.Blackhole;
 /**
  * A benchmark that evaluates the task group planning performance.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 
:iceberg-spark:iceberg-spark-extensions-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 
:iceberg-spark:iceberg-spark-extensions-4.1_2.13:jmh
  *       -PjmhIncludeRegex=TaskGroupPlanningBenchmark
  *       -PjmhOutputPath=benchmark/iceberg-task-group-planning-benchmark.txt
  * </code>
diff --git 
a/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
 
b/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
index ff7d20241b..76db30a5b6 100644
--- 
a/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
+++ 
b/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
@@ -137,11 +137,9 @@ case class ResolveViews(spark: SparkSession) extends 
Rule[LogicalPlan] with Look
   private def rewriteIdentifiers(
       plan: LogicalPlan,
       catalogAndNamespace: Seq[String]): LogicalPlan = {
-    // Substitute CTEs and Unresolved Ordinals within the view, then rewrite 
unresolved functions and relations
+    // Rewrite unresolved functions and relations
     qualifyTableIdentifiers(
-      qualifyFunctionIdentifiers(
-        SubstituteUnresolvedOrdinals.apply(CTESubstitution.apply(plan)),
-        catalogAndNamespace),
+      qualifyFunctionIdentifiers(CTESubstitution.apply(plan), 
catalogAndNamespace),
       catalogAndNamespace)
   }
 
diff --git 
a/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewUtil.scala
 
b/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewUtil.scala
index a7188837c5..c27cb14034 100644
--- 
a/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewUtil.scala
+++ 
b/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewUtil.scala
@@ -44,7 +44,7 @@ object ViewUtil {
       case viewCatalog: ViewCatalog =>
         viewCatalog
       case _ =>
-        throw QueryCompilationErrors.missingCatalogAbilityError(plugin, 
"views")
+        throw QueryCompilationErrors.missingCatalogViewsAbilityError(plugin)
     }
   }
 }
diff --git 
a/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
 
b/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
index 25e056ee2d..ac127f754a 100644
--- 
a/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
+++ 
b/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
@@ -319,7 +319,7 @@ class IcebergParseException(
     val builder = new StringBuilder
     builder ++= "\n" ++= message
     start match {
-      case Origin(Some(l), Some(p), Some(_), Some(_), Some(_), Some(_), 
Some(_), _, _) =>
+      case Origin(Some(l), Some(p), Some(_), Some(_), Some(_), Some(_), 
Some(_), _, _, _) =>
         builder ++= s"(line $l, pos $p)\n"
         command.foreach { cmd =>
           val (above, below) = cmd.split("\n").splitAt(l)
diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index 9b7ed8f9be..dd1a5b74aa 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -2567,16 +2567,6 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
               .isInstanceOf(SparkRuntimeException.class)
               .hasMessageContaining(
                   "[NOT_NULL_ASSERT_VIOLATION] NULL value appeared in 
non-nullable field");
-          assertThatThrownBy(
-                  () ->
-                      sql(
-                          "MERGE INTO %s t USING source s "
-                              + "ON t.id == s.c1 "
-                              + "WHEN MATCHED THEN "
-                              + "  UPDATE SET t.s = s.c2",
-                          commitTarget()))
-              .isInstanceOf(AnalysisException.class)
-              .hasMessageContaining("Cannot find data for the output column 
`s`.`n2`");
           assertThatThrownBy(
                   () ->
                       sql(
@@ -2634,17 +2624,6 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
                           commitTarget()))
               .isInstanceOf(AnalysisException.class)
               .hasMessageContaining("Cannot safely cast `s`.`n1` \"VOID\" to 
\"INT\"");
-
-          assertThatThrownBy(
-                  () ->
-                      sql(
-                          "MERGE INTO %s t USING source s "
-                              + "ON t.id == s.c1 "
-                              + "WHEN MATCHED THEN "
-                              + "  UPDATE SET t.s = s.c2",
-                          commitTarget()))
-              .isInstanceOf(AnalysisException.class)
-              .hasMessageContaining("Cannot find data for the output column 
`s`.`n2`");
           assertThatThrownBy(
                   () ->
                       sql(
@@ -2864,7 +2843,7 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
                         + "WHEN MATCHED THEN "
                         + "  UPDATE SET *"))
         .isInstanceOf(UnsupportedOperationException.class)
-        .hasMessage("MERGE INTO TABLE is not supported temporarily.");
+        .hasMessageContaining("Table `unknown` does not support MERGE INTO 
TABLE");
   }
 
   /**
diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
index ac528d1c47..96e002979f 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -31,7 +31,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.avro.generic.GenericData.Record;
-import org.apache.commons.collections.ListUtils;
+import org.apache.commons.collections4.ListUtils;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 3aabd635bb..e23a06a76d 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -978,7 +978,7 @@ public class TestRewriteDataFilesProcedure extends 
ExtensionsTestBase {
         .containsKey(CatalogProperties.APP_ID)
         .containsEntry(EnvironmentContext.ENGINE_NAME, "spark")
         .hasEntrySatisfying(
-            EnvironmentContext.ENGINE_VERSION, v -> 
assertThat(v).startsWith("4.0"));
+            EnvironmentContext.ENGINE_VERSION, v -> 
assertThat(v).startsWith("4.1"));
   }
 
   @TestTemplate
diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index b8dca4b2cd..98bcd401a5 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -322,7 +322,7 @@ public class TestRewriteManifestsProcedure extends 
ExtensionsTestBase {
             () -> sql("CALL %s.system.rewrite_manifests(table => 't', tAbLe => 
't')", catalogName))
         .isInstanceOf(AnalysisException.class)
         .hasMessage(
-            "[UNRECOGNIZED_PARAMETER_NAME] Cannot invoke routine 
`rewrite_manifests` because the routine call included a named argument 
reference for the argument named `tAbLe`, but this routine does not include any 
signature containing an argument with this name. Did you mean one of the 
following? [`table` `spec_id` `use_caching`]. SQLSTATE: 4274K");
+            
"[DUPLICATE_ROUTINE_PARAMETER_ASSIGNMENT.DOUBLE_NAMED_ARGUMENT_REFERENCE] Call 
to routine `rewrite_manifests` is invalid because it includes multiple argument 
assignments to the same parameter name `tAbLe`. More than one named argument 
referred to the same parameter. Please assign a value only once. SQLSTATE: 
4274K");
 
     assertThatThrownBy(() -> sql("CALL %s.system.rewrite_manifests('')", 
catalogName))
         .isInstanceOf(IllegalArgumentException.class)
diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
index 4a6d2ff197..cd824c1910 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
@@ -241,7 +241,7 @@ public class TestRewritePositionDeleteFilesProcedure 
extends ExtensionsTestBase
         .containsKey(CatalogProperties.APP_ID)
         .containsEntry(EnvironmentContext.ENGINE_NAME, "spark")
         .hasEntrySatisfying(
-            EnvironmentContext.ENGINE_VERSION, v -> 
assertThat(v).startsWith("4.0"));
+            EnvironmentContext.ENGINE_VERSION, v -> 
assertThat(v).startsWith("4.1"));
   }
 
   private Map<String, String> snapshotSummary() {
diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
index 77011fd28c..2afbc697e1 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
@@ -1447,7 +1447,7 @@ public abstract class TestUpdate extends 
SparkRowLevelOperationsTestBase {
 
     assertThatThrownBy(() -> sql("UPDATE %s SET c1 = -1 WHERE c2 = 1", 
"testtable"))
         .isInstanceOf(UnsupportedOperationException.class)
-        .hasMessage("UPDATE TABLE is not supported temporarily.");
+        .hasMessageContaining("Table `unknown` does not support UPDATE 
TABLE.");
   }
 
   @TestTemplate
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
index e1d9ac18da..231bb7c619 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
@@ -59,8 +59,8 @@ import org.openjdk.jmh.infra.Blackhole;
 /**
  * A benchmark that evaluates the performance of remove orphan files action in 
Spark.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh
  *       -PjmhIncludeRegex=DeleteOrphanFilesBenchmark
  *       -PjmhOutputPath=benchmark/delete-orphan-files-benchmark-results.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
index 3dbee5dfd0..7f5d701715 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
@@ -58,8 +58,8 @@ import org.openjdk.jmh.infra.Blackhole;
  * A benchmark that evaluates the performance of reading Parquet data with a 
flat schema using
  * Iceberg and Spark Parquet readers.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh
  *       -PjmhIncludeRegex=SparkParquetReadersFlatDataBenchmark
  *       
-PjmhOutputPath=benchmark/spark-parquet-readers-flat-data-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
index 8487988d9e..e16f18b281 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
@@ -58,8 +58,8 @@ import org.openjdk.jmh.infra.Blackhole;
  * A benchmark that evaluates the performance of reading nested Parquet data 
using Iceberg and Spark
  * Parquet readers.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh
  *       -PjmhIncludeRegex=SparkParquetReadersNestedDataBenchmark
  *       
-PjmhOutputPath=benchmark/spark-parquet-readers-nested-data-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
index 47f0b72088..00c361449a 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
@@ -51,8 +51,8 @@ import org.openjdk.jmh.annotations.Warmup;
  * A benchmark that evaluates the performance of writing Parquet data with a 
flat schema using
  * Iceberg and Spark Parquet writers.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh
  *       -PjmhIncludeRegex=SparkParquetWritersFlatDataBenchmark
  *       
-PjmhOutputPath=benchmark/spark-parquet-writers-flat-data-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
index 4df890d861..24d7fa4051 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
@@ -51,8 +51,8 @@ import org.openjdk.jmh.annotations.Warmup;
  * A benchmark that evaluates the performance of writing nested Parquet data 
using Iceberg and Spark
  * Parquet writers.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh
  *       -PjmhIncludeRegex=SparkParquetWritersNestedDataBenchmark
  *       
-PjmhOutputPath=benchmark/spark-parquet-writers-nested-data-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/AvroWritersBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/AvroWritersBenchmark.java
index 4dcd58c0c4..5c46a740da 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/AvroWritersBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/AvroWritersBenchmark.java
@@ -24,8 +24,8 @@ import org.apache.iceberg.spark.source.WritersBenchmark;
 /**
  * A benchmark that evaluates the performance of various Iceberg writers for 
Avro data.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh
  *       -PjmhIncludeRegex=AvroWritersBenchmark
  *       -PjmhOutputPath=benchmark/avro-writers-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/IcebergSourceFlatAvroDataReadBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/IcebergSourceFlatAvroDataReadBenchmark.java
index f0297f644a..23f72f0dff 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/IcebergSourceFlatAvroDataReadBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/IcebergSourceFlatAvroDataReadBenchmark.java
@@ -40,8 +40,8 @@ import org.openjdk.jmh.annotations.Threads;
  * A benchmark that evaluates the performance of reading Avro data with a flat 
schema using Iceberg
  * and the built-in file source in Spark.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh
  *       -PjmhIncludeRegex=IcebergSourceFlatAvroDataReadBenchmark
  *       
-PjmhOutputPath=benchmark/iceberg-source-flat-avro-data-read-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/IcebergSourceNestedAvroDataReadBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/IcebergSourceNestedAvroDataReadBenchmark.java
index 00d06566fb..fe641747df 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/IcebergSourceNestedAvroDataReadBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/IcebergSourceNestedAvroDataReadBenchmark.java
@@ -40,8 +40,8 @@ import org.openjdk.jmh.annotations.Threads;
  * A benchmark that evaluates the performance of reading Avro data with a flat 
schema using Iceberg
  * and the built-in file source in Spark.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh
  *       -PjmhIncludeRegex=IcebergSourceNestedAvroDataReadBenchmark
  *       
-PjmhOutputPath=benchmark/iceberg-source-nested-avro-data-read-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceFlatORCDataReadBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceFlatORCDataReadBenchmark.java
index 593fbc9557..7b473770bf 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceFlatORCDataReadBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceFlatORCDataReadBenchmark.java
@@ -40,8 +40,8 @@ import org.openjdk.jmh.annotations.Threads;
  * A benchmark that evaluates the performance of reading ORC data with a flat 
schema using Iceberg
  * and the built-in file source in Spark.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh
  *       -PjmhIncludeRegex=IcebergSourceFlatORCDataReadBenchmark
  *       
-PjmhOutputPath=benchmark/iceberg-source-flat-orc-data-read-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceNestedListORCDataWriteBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceNestedListORCDataWriteBenchmark.java
index 0442ed02eb..3053b3c93c 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceNestedListORCDataWriteBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceNestedListORCDataWriteBenchmark.java
@@ -39,8 +39,8 @@ import org.openjdk.jmh.annotations.Threads;
  * A benchmark that evaluates the performance of writing nested Parquet data 
using Iceberg and the
  * built-in file source in Spark.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh
  *       -PjmhIncludeRegex=IcebergSourceNestedListORCDataWriteBenchmark
  *       
-PjmhOutputPath=benchmark/iceberg-source-nested-list-orc-data-write-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceNestedORCDataReadBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceNestedORCDataReadBenchmark.java
index a64a23774e..6faeb21b5a 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceNestedORCDataReadBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/orc/IcebergSourceNestedORCDataReadBenchmark.java
@@ -41,8 +41,8 @@ import org.openjdk.jmh.annotations.Threads;
  * A benchmark that evaluates the performance of reading ORC data with a flat 
schema using Iceberg
  * and the built-in file source in Spark.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh
  *       -PjmhIncludeRegex=IcebergSourceNestedORCDataReadBenchmark
  *       
-PjmhOutputPath=benchmark/iceberg-source-nested-orc-data-read-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
index 5b7b22f5ea..73e479053b 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataFilterBenchmark.java
@@ -43,8 +43,8 @@ import org.openjdk.jmh.annotations.Threads;
  *
  * <p>The performance is compared to the built-in file source in Spark.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh
  *       -PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
  *       
-PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java
index ec1514fe42..f95de3cf7f 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataReadBenchmark.java
@@ -39,8 +39,8 @@ import org.openjdk.jmh.annotations.Threads;
  * A benchmark that evaluates the performance of reading Parquet data with a 
flat schema using
  * Iceberg and the built-in file source in Spark.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh
  *       -PjmhIncludeRegex=IcebergSourceFlatParquetDataReadBenchmark
  *       
-PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-read-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java
index 787ae389ca..dccd0c7664 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceFlatParquetDataWriteBenchmark.java
@@ -37,8 +37,8 @@ import org.openjdk.jmh.annotations.Threads;
  * A benchmark that evaluates the performance of writing Parquet data with a 
flat schema using
  * Iceberg and the built-in file source in Spark.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh
  *       -PjmhIncludeRegex=IcebergSourceFlatParquetDataWriteBenchmark
  *       
-PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-write-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedListParquetDataWriteBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedListParquetDataWriteBenchmark.java
index 0d17bd3e56..383604c186 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedListParquetDataWriteBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedListParquetDataWriteBenchmark.java
@@ -40,8 +40,8 @@ import org.openjdk.jmh.annotations.Threads;
  * A benchmark that evaluates the performance of writing nested Parquet data 
using Iceberg and the
  * built-in file source in Spark.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh
  *       -PjmhIncludeRegex=IcebergSourceNestedListParquetDataWriteBenchmark
  *       
-PjmhOutputPath=benchmark/iceberg-source-nested-list-parquet-data-write-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataFilterBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataFilterBenchmark.java
index a5ddd06042..c24e336c12 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataFilterBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataFilterBenchmark.java
@@ -43,8 +43,8 @@ import org.openjdk.jmh.annotations.Threads;
  *
  * <p>The performance is compared to the built-in file source in Spark.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh
  *       -PjmhIncludeRegex=IcebergSourceNestedParquetDataFilterBenchmark
  *       
-PjmhOutputPath=benchmark/iceberg-source-nested-parquet-data-filter-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataReadBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataReadBenchmark.java
index 24e2d99902..1c365d49f2 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataReadBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataReadBenchmark.java
@@ -39,8 +39,8 @@ import org.openjdk.jmh.annotations.Threads;
  * A benchmark that evaluates the performance of reading nested Parquet data 
using Iceberg and the
  * built-in file source in Spark.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh
  *       -PjmhIncludeRegex=IcebergSourceNestedParquetDataReadBenchmark
  *       
-PjmhOutputPath=benchmark/iceberg-source-nested-parquet-data-read-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataWriteBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataWriteBenchmark.java
index eef14854c4..e851e6a930 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataWriteBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceNestedParquetDataWriteBenchmark.java
@@ -38,8 +38,8 @@ import org.openjdk.jmh.annotations.Threads;
  * A benchmark that evaluates the performance of writing nested Parquet data 
using Iceberg and the
  * built-in file source in Spark.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh
  *       -PjmhIncludeRegex=IcebergSourceNestedParquetDataWriteBenchmark
  *       
-PjmhOutputPath=benchmark/iceberg-source-nested-parquet-data-write-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetEqDeleteBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetEqDeleteBenchmark.java
index 3b54b448b8..d4555f8c6b 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetEqDeleteBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetEqDeleteBenchmark.java
@@ -27,8 +27,8 @@ import org.openjdk.jmh.annotations.Param;
  * A benchmark that evaluates the non-vectorized read and vectorized read with 
equality delete in
  * the Spark data source for Iceberg.
  *
- * <p>This class uses a dataset with a flat schema. To run this benchmark for 
spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0:jmh
+ * <p>This class uses a dataset with a flat schema. To run this benchmark for 
spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1:jmh
  *       -PjmhIncludeRegex=IcebergSourceParquetEqDeleteBenchmark
  *       
-PjmhOutputPath=benchmark/iceberg-source-parquet-eq-delete-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java
index 7891890dff..9110ff071f 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetMultiDeleteFileBenchmark.java
@@ -28,8 +28,8 @@ import org.openjdk.jmh.annotations.Param;
  * A benchmark that evaluates the non-vectorized read and vectorized read with 
pos-delete in the
  * Spark data source for Iceberg.
  *
- * <p>This class uses a dataset with a flat schema. To run this benchmark for 
spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0:jmh \
+ * <p>This class uses a dataset with a flat schema. To run this benchmark for 
spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1:jmh \
  *       -PjmhIncludeRegex=IcebergSourceParquetMultiDeleteFileBenchmark \
  *       
-PjmhOutputPath=benchmark/iceberg-source-parquet-multi-delete-file-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetPosDeleteBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetPosDeleteBenchmark.java
index 3c6dfa6bd9..daf7c295c3 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetPosDeleteBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetPosDeleteBenchmark.java
@@ -28,8 +28,8 @@ import org.openjdk.jmh.annotations.Param;
  * A benchmark that evaluates the non-vectorized read and vectorized read with 
pos-delete in the
  * Spark data source for Iceberg.
  *
- * <p>This class uses a dataset with a flat schema. To run this benchmark for 
spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0:jmh
+ * <p>This class uses a dataset with a flat schema. To run this benchmark for 
spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1:jmh
  *       -PjmhIncludeRegex=IcebergSourceParquetPosDeleteBenchmark
  *       
-PjmhOutputPath=benchmark/iceberg-source-parquet-pos-delete-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java
index 01096ac796..ab83b688f5 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/IcebergSourceParquetWithUnrelatedDeleteBenchmark.java
@@ -28,8 +28,8 @@ import org.openjdk.jmh.annotations.Param;
  * A benchmark that evaluates the non-vectorized read and vectorized read with 
pos-delete in the
  * Spark data source for Iceberg.
  *
- * <p>This class uses a dataset with a flat schema. To run this benchmark for 
spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0:jmh
+ * <p>This class uses a dataset with a flat schema. To run this benchmark for 
spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1:jmh
  *       -PjmhIncludeRegex=IcebergSourceParquetWithUnrelatedDeleteBenchmark
  *       
-PjmhOutputPath=benchmark/iceberg-source-parquet-with-unrelated-delete-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/ParquetWritersBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/ParquetWritersBenchmark.java
index 8bcd871a07..2b8e522148 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/ParquetWritersBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/ParquetWritersBenchmark.java
@@ -24,8 +24,8 @@ import org.apache.iceberg.spark.source.WritersBenchmark;
 /**
  * A benchmark that evaluates the performance of various Iceberg writers for 
Parquet data.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh \
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh \
  *       -PjmhIncludeRegex=ParquetWritersBenchmark \
  *       -PjmhOutputPath=benchmark/parquet-writers-benchmark-result.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
index 73d4f62118..8c88a4952f 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
@@ -39,8 +39,8 @@ import org.openjdk.jmh.annotations.Setup;
  * Benchmark to compare performance of reading Parquet dictionary encoded data 
with a flat schema
  * using vectorized Iceberg read path and the built-in file source in Spark.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh \
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh \
  *       
-PjmhIncludeRegex=VectorizedReadDictionaryEncodedFlatParquetDataBenchmark \
  *       -PjmhOutputPath=benchmark/results.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
index 6cf327c1cf..429171212a 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
@@ -50,8 +50,8 @@ import org.openjdk.jmh.annotations.Threads;
  * Benchmark to compare performance of reading Parquet data with a flat schema 
using vectorized
  * Iceberg read path and the built-in file source in Spark.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh \
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh \
  *       -PjmhIncludeRegex=VectorizedReadFlatParquetDataBenchmark \
  *       -PjmhOutputPath=benchmark/results.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadParquetDecimalBenchmark.java
 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadParquetDecimalBenchmark.java
index ccf28e3fdc..3aafe8a72a 100644
--- 
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadParquetDecimalBenchmark.java
+++ 
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadParquetDecimalBenchmark.java
@@ -48,8 +48,8 @@ import org.openjdk.jmh.annotations.Threads;
  * Benchmark to compare performance of reading Parquet decimal data using 
vectorized Iceberg read
  * path and the built-in file source in Spark.
  *
- * <p>To run this benchmark for spark-4.0: <code>
- *   ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-4.0_2.13:jmh \
+ * <p>To run this benchmark for spark-4.1: <code>
+ *   ./gradlew -DsparkVersions=4.1 :iceberg-spark:iceberg-spark-4.1_2.13:jmh \
  *       -PjmhIncludeRegex=VectorizedReadParquetDecimalBenchmark \
  *       -PjmhOutputPath=benchmark/results.txt
  * </code>
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
index c6784f1041..a0a5b0518d 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
@@ -60,6 +60,17 @@ abstract class BaseCatalog
     throw new RuntimeException("Procedure " + ident + " not found");
   }
 
+  @Override
+  public Identifier[] listProcedures(String[] namespace) {
+    if (isSystemNamespace(namespace)) {
+      return SparkProcedures.names().stream()
+          .map(name -> Identifier.of(namespace, name))
+          .toArray(Identifier[]::new);
+    } else {
+      return new Identifier[0];
+    }
+  }
+
   @Override
   public boolean isFunctionNamespace(String[] namespace) {
     // Allow for empty namespace, as Spark's storage partitioned joins look up
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index 98d3e41535..0b4578630a 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -1021,7 +1021,8 @@ public class SparkTableUtil {
   private static DataSourceV2Relation createRelation(
       SparkTable sparkTable, Map<String, String> extraOptions) {
     CaseInsensitiveStringMap options = new 
CaseInsensitiveStringMap(extraOptions);
-    return DataSourceV2Relation.create(sparkTable, Option.empty(), 
Option.empty(), options);
+    return DataSourceV2Relation.create(
+        sparkTable, Option.empty(), Option.empty(), options, Option.empty());
   }
 
   /**
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index 28a9a31c6a..a19ed80607 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -66,6 +66,8 @@ import org.apache.spark.sql.catalyst.util.MapData;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.GeographyVal;
+import org.apache.spark.unsafe.types.GeometryVal;
 import org.apache.spark.unsafe.types.UTF8String;
 import org.apache.spark.unsafe.types.VariantVal;
 
@@ -778,5 +780,15 @@ public class SparkParquetReaders {
     public VariantVal getVariant(int ordinal) {
       return (VariantVal) values[ordinal];
     }
+
+    @Override
+    public GeographyVal getGeography(int ordinal) {
+      return (GeographyVal) values[ordinal];
+    }
+
+    @Override
+    public GeometryVal getGeometry(int ordinal) {
+      return (GeometryVal) values[ordinal];
+    }
   }
 }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index 6b42a04421..bad31a12c1 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.procedures;
 
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Supplier;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -37,6 +38,10 @@ public class SparkProcedures {
     return builderSupplier != null ? builderSupplier.get() : null;
   }
 
+  public static Set<String> names() {
+    return BUILDERS.keySet();
+  }
+
   private static Map<String, Supplier<ProcedureBuilder>> 
initProcedureBuilders() {
     ImmutableMap.Builder<String, Supplier<ProcedureBuilder>> mapBuilder = 
ImmutableMap.builder();
     mapBuilder.put(RollbackToSnapshotProcedure.NAME, 
RollbackToSnapshotProcedure::builder);
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
index 2d3c917e58..074f04d034 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
@@ -62,6 +62,8 @@ import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.types.TimestampType;
 import org.apache.spark.sql.types.VariantType;
 import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.GeographyVal;
+import org.apache.spark.unsafe.types.GeometryVal;
 import org.apache.spark.unsafe.types.UTF8String;
 import org.apache.spark.unsafe.types.VariantVal;
 
@@ -247,6 +249,16 @@ class StructInternalRow extends InternalRow {
     return toVariantVal(value);
   }
 
+  @Override
+  public GeographyVal getGeography(int ordinal) {
+    return isNullAt(ordinal) ? null : 
GeographyVal.fromBytes(getBinaryInternal(ordinal));
+  }
+
+  @Override
+  public GeometryVal getGeometry(int ordinal) {
+    return isNullAt(ordinal) ? null : 
GeometryVal.fromBytes(getBinaryInternal(ordinal));
+  }
+
   @Override
   @SuppressWarnings("checkstyle:CyclomaticComplexity")
   public Object get(int ordinal, DataType dataType) {
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
index 1d1ccca1a2..f42c37f5e4 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java
@@ -171,6 +171,7 @@ public class TestSparkParquetReader extends 
AvroDataTestBase {
             .set("spark.sql.parquet.writeLegacyFormat", "false")
             .set("spark.sql.parquet.outputTimestampType", "INT96")
             .set("spark.sql.parquet.fieldId.write.enabled", "true")
+            .set("spark.sql.parquet.variant.annotateLogicalType.enabled", 
"true")
             .build()) {
       for (InternalRow row : rows) {
         writer.write(row);
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
index c4e0d26c1c..290e73c3bd 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
@@ -53,9 +53,8 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoder;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.execution.streaming.MemoryStream;
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream;
 import org.apache.spark.sql.streaming.StreamingQuery;
 import org.apache.spark.sql.streaming.StreamingQueryException;
 import org.junit.jupiter.api.AfterAll;
@@ -147,7 +146,7 @@ public class TestForwardCompatibility {
     HadoopTables tables = new HadoopTables(CONF);
     tables.create(SCHEMA, UNKNOWN_SPEC, location.toString());
 
-    MemoryStream<Integer> inputStream = newMemoryStream(1, spark.sqlContext(), 
Encoders.INT());
+    MemoryStream<Integer> inputStream = newMemoryStream(1, spark, 
Encoders.INT());
     StreamingQuery query =
         inputStream
             .toDF()
@@ -219,8 +218,9 @@ public class TestForwardCompatibility {
     }
   }
 
-  private <T> MemoryStream<T> newMemoryStream(int id, SQLContext sqlContext, 
Encoder<T> encoder) {
-    return new MemoryStream<>(id, sqlContext, Option.empty(), encoder);
+  private <T> MemoryStream<T> newMemoryStream(
+      int id, SparkSession sparkSession, Encoder<T> encoder) {
+    return new MemoryStream<>(id, sparkSession, Option.empty(), encoder);
   }
 
   private <T> void send(List<T> records, MemoryStream<T> stream) {
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
index 54048bbf21..635229f6a0 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
@@ -39,9 +39,8 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoder;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.execution.streaming.MemoryStream;
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream;
 import org.apache.spark.sql.streaming.DataStreamWriter;
 import org.apache.spark.sql.streaming.StreamingQuery;
 import org.apache.spark.sql.streaming.StreamingQueryException;
@@ -96,7 +95,7 @@ public class TestStructuredStreaming {
             new SimpleRecord(3, "3"),
             new SimpleRecord(4, "4"));
 
-    MemoryStream<Integer> inputStream = newMemoryStream(1, spark.sqlContext(), 
Encoders.INT());
+    MemoryStream<Integer> inputStream = newMemoryStream(1, spark, 
Encoders.INT());
     DataStreamWriter<Row> streamWriter =
         inputStream
             .toDF()
@@ -155,7 +154,7 @@ public class TestStructuredStreaming {
         Lists.newArrayList(
             new SimpleRecord(2, "1"), new SimpleRecord(3, "2"), new 
SimpleRecord(1, "3"));
 
-    MemoryStream<Integer> inputStream = newMemoryStream(1, spark.sqlContext(), 
Encoders.INT());
+    MemoryStream<Integer> inputStream = newMemoryStream(1, spark, 
Encoders.INT());
     DataStreamWriter<Row> streamWriter =
         inputStream
             .toDF()
@@ -216,7 +215,7 @@ public class TestStructuredStreaming {
         Lists.newArrayList(
             new SimpleRecord(1, null), new SimpleRecord(2, null), new 
SimpleRecord(3, null));
 
-    MemoryStream<Integer> inputStream = newMemoryStream(1, spark.sqlContext(), 
Encoders.INT());
+    MemoryStream<Integer> inputStream = newMemoryStream(1, spark, 
Encoders.INT());
     DataStreamWriter<Row> streamWriter =
         inputStream
             .toDF()
@@ -273,7 +272,7 @@ public class TestStructuredStreaming {
     PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("data").build();
     tables.create(SCHEMA, spec, location.toString());
 
-    MemoryStream<Integer> inputStream = newMemoryStream(1, spark.sqlContext(), 
Encoders.INT());
+    MemoryStream<Integer> inputStream = newMemoryStream(1, spark, 
Encoders.INT());
     DataStreamWriter<Row> streamWriter =
         inputStream
             .toDF()
@@ -299,8 +298,9 @@ public class TestStructuredStreaming {
     }
   }
 
-  private <T> MemoryStream<T> newMemoryStream(int id, SQLContext sqlContext, 
Encoder<T> encoder) {
-    return new MemoryStream<>(id, sqlContext, Option.empty(), encoder);
+  private <T> MemoryStream<T> newMemoryStream(
+      int id, SparkSession sparkSession, Encoder<T> encoder) {
+    return new MemoryStream<>(id, sparkSession, Option.empty(), encoder);
   }
 
   private <T> void send(List<T> records, MemoryStream<T> stream) {
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java
index ba856dc538..838e735c75 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java
@@ -38,9 +38,6 @@ import org.junit.jupiter.api.TestTemplate;
  * <p>Note: These tests use {@code validationCatalog.createTable()} to create 
tables with default
  * values because the Iceberg Spark integration does not yet support default 
value clauses in Spark
  * DDL.
- *
- * <p>Partial column INSERT statements (e.g., {@code INSERT INTO table (col1) 
VALUES (val1)}) are
- * not supported for DSV2 in Spark 4.0
  */
 public class TestSparkDefaultValues extends CatalogTestBase {
 
@@ -175,29 +172,6 @@ public class TestSparkDefaultValues extends 
CatalogTestBase {
         .hasMessageContaining("default values in Spark is currently 
unsupported");
   }
 
-  @TestTemplate
-  public void testPartialInsertUnsupported() {
-    assertThat(validationCatalog.tableExists(tableIdent))
-        .as("Table should not already exist")
-        .isFalse();
-
-    Schema schema =
-        new Schema(
-            Types.NestedField.required(1, "id", Types.IntegerType.get()),
-            Types.NestedField.optional("data")
-                .withId(2)
-                .ofType(Types.StringType.get())
-                .withWriteDefault(Literal.of("default-data"))
-                .build());
-
-    validationCatalog.createTable(
-        tableIdent, schema, PartitionSpec.unpartitioned(), 
ImmutableMap.of("format-version", "3"));
-
-    assertThatThrownBy(() -> sql("INSERT INTO %s (id) VALUES (1)", 
commitTarget()))
-        .isInstanceOf(AnalysisException.class)
-        .hasMessageContaining("Cannot find data for the output column");
-  }
-
   @TestTemplate
   public void testSchemaEvolutionWithDefaultValueChanges() {
     assertThat(validationCatalog.tableExists(tableIdent))

Reply via email to