This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d0e6aaca9 [spark] Enable dynamic options for format table (#6592)
2d0e6aaca9 is described below

commit 2d0e6aaca973c5d419487f9cd95a94d9a0c4d6f5
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Nov 12 22:04:42 2025 +0800

    [spark] Enable dynamic options for format table (#6592)
---
 .../org/apache/paimon/spark/PaimonFormatTableScan.scala      |  2 +-
 .../src/main/java/org/apache/paimon/spark/SparkCatalog.java  | 11 ++++-------
 .../scala/org/apache/paimon/spark/PaimonSparkTestBase.scala  | 12 ++++++++++--
 .../apache/paimon/spark/table/PaimonFormatTableTest.scala    | 11 +++++++++++
 4 files changed, 26 insertions(+), 10 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
index 43ef29f332..a7bbe4e94e 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.types.StructType
 
 import scala.collection.JavaConverters._
 
-/** Scan for {@link FormatTable} */
+/** Scan for [[ FormatTable ]] */
 case class PaimonFormatTableScan(
     table: FormatTable,
     requiredSchema: StructType,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 835756ffff..46162aabd6 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -648,17 +648,14 @@ public class SparkCatalog extends SparkBaseCatalog
     protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
             Identifier ident, Map<String, String> extraOptions) throws 
NoSuchTableException {
         try {
+            org.apache.paimon.catalog.Identifier tblIdent = 
toIdentifier(ident, catalogName);
             org.apache.paimon.table.Table paimonTable =
-                    catalog.getTable(toIdentifier(ident, catalogName));
+                    copyWithSQLConf(
+                            catalog.getTable(tblIdent), catalogName, tblIdent, 
extraOptions);
             if (paimonTable instanceof FormatTable) {
                 return toSparkFormatTable(ident, (FormatTable) paimonTable);
             } else {
-                return new SparkTable(
-                        copyWithSQLConf(
-                                paimonTable,
-                                catalogName,
-                                toIdentifier(ident, catalogName),
-                                extraOptions));
+                return new SparkTable(paimonTable);
             }
         } catch (Catalog.TableNotExistException e) {
             throw new NoSuchTableException(ident);
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index f1adebb80e..3975e128be 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -30,6 +30,7 @@ import org.apache.paimon.table.FileStoreTable
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.connector.catalog.{Identifier => SparkIdentifier}
+import org.apache.spark.sql.connector.read.Scan
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2ScanRelation}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.paimon.Utils
@@ -183,12 +184,19 @@ class PaimonSparkTestBase
     )
   }
 
-  protected def getPaimonScan(sqlText: String): PaimonScan = {
+  def getScan(sqlText: String): Scan = {
     sql(sqlText).queryExecution.optimizedPlan
       .collectFirst { case relation: DataSourceV2ScanRelation => relation }
       .get
       .scan
-      .asInstanceOf[PaimonScan]
+  }
+
+  protected def getPaimonScan(sqlText: String): PaimonScan = {
+    getScan(sqlText).asInstanceOf[PaimonScan]
+  }
+
+  protected def getFormatTableScan(sqlText: String): PaimonFormatTableScan = {
+    getScan(sqlText).asInstanceOf[PaimonFormatTableScan]
   }
 
   object GenericRow {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
index 71580156fd..96997dec34 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
@@ -118,6 +118,17 @@ class PaimonFormatTableTest extends 
PaimonSparkTestWithRestCatalogBase {
     }
   }
 
+  test("PaimonFormatTable: set dynamic options") {
+    withTable("t") {
+      sql(s"create table t (id INT, v INT, pt STRING) using csv")
+
+      withSparkSQLConf("spark.paimon.write.batch-size" -> "256") {
+        val options = getFormatTableScan("SELECT * FROM t").table.options()
+        assert(options.get("write.batch-size") == "256")
+      }
+    }
+  }
+
   test("PaimonFormatTable non partition table overwrite: csv") {
     val tableName = "paimon_non_partiiton_overwrite_test"
     withTable(tableName) {

Reply via email to