This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2d0e6aaca9 [spark] Enable dynamic options for format table (#6592)
2d0e6aaca9 is described below
commit 2d0e6aaca973c5d419487f9cd95a94d9a0c4d6f5
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Nov 12 22:04:42 2025 +0800
[spark] Enable dynamic options for format table (#6592)
---
.../org/apache/paimon/spark/PaimonFormatTableScan.scala | 2 +-
.../src/main/java/org/apache/paimon/spark/SparkCatalog.java | 11 ++++-------
.../scala/org/apache/paimon/spark/PaimonSparkTestBase.scala | 12 ++++++++++--
.../apache/paimon/spark/table/PaimonFormatTableTest.scala | 11 +++++++++++
4 files changed, 26 insertions(+), 10 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
index 43ef29f332..a7bbe4e94e 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
-/** Scan for {@link FormatTable} */
+/** Scan for [[ FormatTable ]] */
case class PaimonFormatTableScan(
table: FormatTable,
requiredSchema: StructType,
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 835756ffff..46162aabd6 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -648,17 +648,14 @@ public class SparkCatalog extends SparkBaseCatalog
protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
Identifier ident, Map<String, String> extraOptions) throws
NoSuchTableException {
try {
+ org.apache.paimon.catalog.Identifier tblIdent =
toIdentifier(ident, catalogName);
org.apache.paimon.table.Table paimonTable =
- catalog.getTable(toIdentifier(ident, catalogName));
+ copyWithSQLConf(
+ catalog.getTable(tblIdent), catalogName, tblIdent,
extraOptions);
if (paimonTable instanceof FormatTable) {
return toSparkFormatTable(ident, (FormatTable) paimonTable);
} else {
- return new SparkTable(
- copyWithSQLConf(
- paimonTable,
- catalogName,
- toIdentifier(ident, catalogName),
- extraOptions));
+ return new SparkTable(paimonTable);
}
} catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(ident);
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index f1adebb80e..3975e128be 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -30,6 +30,7 @@ import org.apache.paimon.table.FileStoreTable
import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.connector.catalog.{Identifier => SparkIdentifier}
+import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
DataSourceV2ScanRelation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.paimon.Utils
@@ -183,12 +184,19 @@ class PaimonSparkTestBase
)
}
- protected def getPaimonScan(sqlText: String): PaimonScan = {
+ def getScan(sqlText: String): Scan = {
sql(sqlText).queryExecution.optimizedPlan
.collectFirst { case relation: DataSourceV2ScanRelation => relation }
.get
.scan
- .asInstanceOf[PaimonScan]
+ }
+
+ protected def getPaimonScan(sqlText: String): PaimonScan = {
+ getScan(sqlText).asInstanceOf[PaimonScan]
+ }
+
+ protected def getFormatTableScan(sqlText: String): PaimonFormatTableScan = {
+ getScan(sqlText).asInstanceOf[PaimonFormatTableScan]
}
object GenericRow {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
index 71580156fd..96997dec34 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
@@ -118,6 +118,17 @@ class PaimonFormatTableTest extends
PaimonSparkTestWithRestCatalogBase {
}
}
+ test("PaimonFormatTable: set dynamic options") {
+ withTable("t") {
+ sql(s"create table t (id INT, v INT, pt STRING) using csv")
+
+ withSparkSQLConf("spark.paimon.write.batch-size" -> "256") {
+ val options = getFormatTableScan("SELECT * FROM t").table.options()
+ assert(options.get("write.batch-size") == "256")
+ }
+ }
+ }
+
test("PaimonFormatTable non partition table overwrite: csv") {
val tableName = "paimon_non_partiiton_overwrite_test"
withTable(tableName) {