This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a04c926f5 [spark] Optimize the config reading for load table in 
SparkSource (#6049)
5a04c926f5 is described below

commit 5a04c926f54ac1bbe642bc7a8b39110c0949f9ea
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Aug 11 12:57:10 2025 +0800

    [spark] Optimize the config reading for load table in SparkSource (#6049)
---
 docs/content/spark/dataframe.md                    | 28 ++++++++-
 .../org/apache/paimon/spark/SparkSource.scala      | 66 ++++++++++++++--------
 .../org/apache/paimon/spark/util/OptionUtils.scala | 37 ++++++------
 .../spark/sql/DDLWithHiveCatalogTestBase.scala     | 56 ++++++++++++++++++
 .../apache/paimon/spark/sql/PaimonOptionTest.scala | 30 +++++-----
 5 files changed, 159 insertions(+), 58 deletions(-)

diff --git a/docs/content/spark/dataframe.md b/docs/content/spark/dataframe.md
index 453ab6b3c5..b0e507793c 100644
--- a/docs/content/spark/dataframe.md
+++ b/docs/content/spark/dataframe.md
@@ -44,13 +44,13 @@ data.write.format("paimon")
 ## Insert
 
 ### Insert Into
-You can achieve INSERT INTO semantics by setting the mode to `append` (by 
default).
+You can achieve INSERT INTO semantics by setting the mode to `append`.
 
 ```scala
 val data: DataFrame = ...
 
 data.write.format("paimon")
-  .mode("append")         // by default
+  .mode("append")
   .insertInto("test_tbl") // or .saveAsTable("test_tbl") or 
.save("/path/to/default.db/test_tbl")
 ```
 
@@ -95,3 +95,27 @@ spark.read.format("paimon")
   .table("t") // or .load("/path/to/default.db/test_tbl")
   .show()
 ```
+
+To specify the catalog or database, you can use
+
+```scala
+// recommend
+spark.read.format("paimon")
+  .table("<catalogName>.<databaseName>.<tableName>")
+
+// or
+spark.read.format("paimon")
+  .option("catalog", "<catalogName>")
+  .option("database", "<databaseName>")
+  .option("table", "<tableName>")
+  .load("/path/to/default.db/test_tbl")
+```
+
+You can specify other read configs through option:
+
+```scala
+// time travel
+spark.read.format("paimon")
+  .option("scan.snapshot-id", 1)
+  .table("t")
+```
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
index 979191cfab..7ad32a3a00 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -19,18 +19,18 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.CoreOptions
-import org.apache.paimon.catalog.{CatalogContext, CatalogUtils, Identifier}
+import org.apache.paimon.catalog.{CatalogContext, CatalogUtils}
 import org.apache.paimon.options.Options
-import org.apache.paimon.spark.SparkSource.NAME
+import org.apache.paimon.spark.SparkSource._
 import org.apache.paimon.spark.commands.WriteIntoPaimonTable
 import org.apache.paimon.spark.sources.PaimonSink
-import org.apache.paimon.spark.util.OptionUtils.{extractCatalogName, 
mergeSQLConfWithIdentifier}
+import org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf
 import org.apache.paimon.table.{DataTable, FileStoreTable, 
FileStoreTableFactory}
 import org.apache.paimon.table.FormatTable.Format
 import org.apache.paimon.table.system.AuditLogTable
 
-import org.apache.spark.sql.{DataFrame, PaimonSparkSession, SaveMode => 
SparkSaveMode, SparkSession, SQLContext}
-import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, Table}
+import org.apache.spark.sql.{DataFrame, PaimonSparkSession, SaveMode => 
SparkSaveMode, SQLContext}
+import org.apache.spark.sql.connector.catalog.{Identifier => SparkIdentifier, 
SessionConfigSupport, Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, 
DataSourceRegister, StreamSinkProvider}
@@ -82,24 +82,6 @@ class SparkSource
     SparkSource.toBaseRelation(table, sqlContext)
   }
 
-  private def loadTable(options: JMap[String, String]): DataTable = {
-    val path = CoreOptions.path(options)
-    val catalogContext = CatalogContext.create(
-      Options.fromMap(
-        mergeSQLConfWithIdentifier(
-          options,
-          extractCatalogName().getOrElse(NAME),
-          Identifier.create(CatalogUtils.database(path), 
CatalogUtils.table(path)))),
-      PaimonSparkSession.active.sessionState.newHadoopConf()
-    )
-    val table = FileStoreTableFactory.create(catalogContext)
-    if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) {
-      new AuditLogTable(table)
-    } else {
-      table
-    }
-  }
-
   override def createSink(
       sqlContext: SQLContext,
       parameters: Map[String, String],
@@ -113,6 +95,35 @@ class SparkSource
     new PaimonSink(sqlContext, table, partitionColumns, outputMode, options)
   }
 
+  private def loadTable(options: JMap[String, String]): DataTable = {
+    val path = CoreOptions.path(options)
+    val sessionState = PaimonSparkSession.active.sessionState
+
+    // Only if user specifies the catalog name, then use catalog to get table, 
otherwise
+    // use FileStoreTableFactory to get table by path
+    val table = if (options.containsKey(CATALOG)) {
+      val catalogName = options.get(CATALOG)
+      val dataBaseName = 
Option(options.get(DATABASE)).getOrElse(CatalogUtils.database(path))
+      val tableName = 
Option(options.get(TABLE)).getOrElse(CatalogUtils.table(path))
+      val sparkCatalog = 
sessionState.catalogManager.catalog(catalogName).asInstanceOf[TableCatalog]
+      sparkCatalog
+        .loadTable(SparkIdentifier.of(Array(dataBaseName), tableName))
+        .asInstanceOf[SparkTable]
+        .getTable
+        .asInstanceOf[FileStoreTable]
+        .copy(options)
+    } else {
+      val catalogContext =
+        CatalogContext.create(Options.fromMap(options), 
sessionState.newHadoopConf())
+      copyWithSQLConf(FileStoreTableFactory.create(catalogContext), 
extraOptions = options)
+    }
+
+    if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) {
+      new AuditLogTable(table)
+    } else {
+      table
+    }
+  }
 }
 
 object SparkSource {
@@ -121,7 +132,14 @@ object SparkSource {
 
   val FORMAT_NAMES: Seq[String] = 
Format.values.map(_.toString.toLowerCase).toSeq
 
-  def toBaseRelation(table: FileStoreTable, _sqlContext: SQLContext): 
BaseRelation = {
+  // Spark dataframe read options
+  private val CATALOG = "catalog"
+
+  private val DATABASE = "database"
+
+  private val TABLE = "table"
+
+  private def toBaseRelation(table: FileStoreTable, _sqlContext: SQLContext): 
BaseRelation = {
     new BaseRelation {
       override def sqlContext: SQLContext = _sqlContext
       override def schema: StructType = 
SparkTypeUtils.fromPaimonRowType(table.rowType())
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
index 253c0dac89..2acfc4665d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
@@ -26,7 +26,7 @@ import org.apache.paimon.table.Table
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.internal.StaticSQLConf
 
-import java.util.{Map => JMap}
+import java.util.{HashMap => JHashMap, Map => JMap}
 import java.util.regex.Pattern
 
 import scala.collection.JavaConverters._
@@ -71,19 +71,21 @@ object OptionUtils extends SQLConfHelper {
     getOptionString(SparkConnectorOptions.EXPLICIT_CAST).toBoolean
   }
 
-  def extractCatalogName(): Option[String] = {
-    val sparkCatalogTemplate = String.format("%s([^.]*)$", 
SPARK_CATALOG_PREFIX)
-    val sparkCatalogPattern = Pattern.compile(sparkCatalogTemplate)
-    conf.getAllConfs.filterKeys(_.startsWith(SPARK_CATALOG_PREFIX)).foreach {
-      case (key, _) =>
-        val matcher = sparkCatalogPattern.matcher(key)
-        if (matcher.find())
-          return Option(matcher.group(1))
-    }
-    Option.empty
+  private def mergeSQLConf(extraOptions: JMap[String, String]): JMap[String, 
String] = {
+    val mergedOptions = new JHashMap[String, String](
+      conf.getAllConfs
+        .filterKeys(_.startsWith(PAIMON_OPTION_PREFIX))
+        .map {
+          case (key, value) =>
+            key.stripPrefix(PAIMON_OPTION_PREFIX) -> value
+        }
+        .toMap
+        .asJava)
+    mergedOptions.putAll(extraOptions)
+    mergedOptions
   }
 
-  def mergeSQLConfWithIdentifier(
+  private def mergeSQLConfWithIdentifier(
       extraOptions: JMap[String, String],
       catalogName: String,
       ident: Identifier): JMap[String, String] = {
@@ -106,11 +108,14 @@ object OptionUtils extends SQLConfHelper {
 
   def copyWithSQLConf[T <: Table](
       table: T,
-      catalogName: String,
-      ident: Identifier,
-      extraOptions: JMap[String, String]): T = {
-    val mergedOptions: JMap[String, String] =
+      catalogName: String = null,
+      ident: Identifier = null,
+      extraOptions: JMap[String, String] = new JHashMap[String, String]()): T 
= {
+    val mergedOptions = if (catalogName != null && ident != null) {
       mergeSQLConfWithIdentifier(extraOptions, catalogName, ident)
+    } else {
+      mergeSQLConf(extraOptions)
+    }
     if (mergedOptions.isEmpty) {
       table
     } else {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
index 2a8b8e9b39..d0fa8f1b74 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
@@ -758,6 +758,62 @@ abstract class DDLWithHiveCatalogTestBase extends 
PaimonHiveTestBase {
     }
   }
 
+  test("Paimon Dataframe: read write with catalog name") {
+    withTempDir {
+      tmpDir =>
+        spark.sql(s"USE $paimonHiveCatalogName")
+        val dbName = "paimon_db"
+        withDatabase(dbName) {
+          spark.sql(s"CREATE DATABASE $dbName")
+          spark.sql(s"USE $dbName")
+          withTable("t") {
+            val tblLocation = tmpDir.getCanonicalPath
+            spark.sql(s"CREATE TABLE t (a INT, b STRING) USING paimon LOCATION 
'$tblLocation'")
+
+            // write by path
+            Seq((1, "x1"), (2, "x2"))
+              .toDF("a", "b")
+              .write
+              .format("paimon")
+              .option("catalog", paimonHiveCatalogName)
+              .option("database", dbName)
+              .option("table", "t")
+              .mode("append")
+              .save(tblLocation)
+
+            // write by name
+            Seq((3, "x3"), (2, "x4"))
+              .toDF("a", "b")
+              .write
+              .format("paimon")
+              .mode("append")
+              .saveAsTable(s"$paimonHiveCatalogName.$dbName.t")
+
+            // read by path with time travel
+            checkAnswer(
+              spark.read
+                .format("paimon")
+                .option("catalog", paimonHiveCatalogName)
+                .option("database", dbName)
+                .option("table", "t")
+                .option("scan.snapshot-id", 1)
+                .load(tblLocation),
+              Seq(Row(1, "x1"), Row(2, "x2"))
+            )
+
+            // read by name with time travel
+            checkAnswer(
+              spark.read
+                .format("paimon")
+                .option("scan.snapshot-id", 1)
+                .table(s"$paimonHiveCatalogName.$dbName.t"),
+              Seq(Row(1, "x1"), Row(2, "x2"))
+            )
+          }
+        }
+    }
+  }
+
   def getDatabaseProp(dbName: String, propertyName: String): String = {
     spark
       .sql(s"DESC DATABASE EXTENDED $dbName")
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
index 9f2b7cad7f..a51893941e 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
@@ -95,7 +95,7 @@ class PaimonOptionTest extends PaimonSparkTestBase {
     // query with table options
     withSparkSQLConf("spark.paimon.*.*.T.scan.snapshot-id" -> "1") {
       checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1))
-      checkAnswer(spark.read.format("paimon").load(table.location().toString), 
Row(1))
+      checkAnswer(spark.read.format("paimon").table("T"), Row(1))
     }
 
     // query with both global and table options
@@ -103,9 +103,7 @@ class PaimonOptionTest extends PaimonSparkTestBase {
       "spark.paimon.scan.snapshot-id" -> "1",
       "spark.paimon.*.*.T.scan.snapshot-id" -> "2") {
       checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1) :: Row(2) :: Nil)
-      checkAnswer(
-        spark.read.format("paimon").load(table.location().toString),
-        Row(1) :: Row(2) :: Nil)
+      checkAnswer(spark.read.format("paimon").table("T"), Row(1) :: Row(2) :: 
Nil)
     }
   }
 
@@ -129,8 +127,8 @@ class PaimonOptionTest extends PaimonSparkTestBase {
       checkAnswer(
         spark.read
           .format("paimon")
-          .load(table1.location().toString)
-          .join(spark.read.format("paimon").load(table2.location().toString), 
"id"),
+          .table("T1")
+          .join(spark.read.format("paimon").table("T2"), "id"),
         Row(1)
       )
     }
@@ -141,8 +139,8 @@ class PaimonOptionTest extends PaimonSparkTestBase {
       checkAnswer(
         spark.read
           .format("paimon")
-          .load(table1.location().toString)
-          .join(spark.read.format("paimon").load(table2.location().toString), 
"id"),
+          .table("T1")
+          .join(spark.read.format("paimon").table("T2"), "id"),
         Row(1)
       )
     }
@@ -157,8 +155,8 @@ class PaimonOptionTest extends PaimonSparkTestBase {
       checkAnswer(
         spark.read
           .format("paimon")
-          .load(table1.location().toString)
-          .join(spark.read.format("paimon").load(table2.location().toString), 
"id"),
+          .table("T1")
+          .join(spark.read.format("paimon").table("T2"), "id"),
         Row(1) :: Row(2) :: Nil
       )
     }
@@ -170,8 +168,8 @@ class PaimonOptionTest extends PaimonSparkTestBase {
       checkAnswer(
         spark.read
           .format("paimon")
-          .load(table1.location().toString)
-          .join(spark.read.format("paimon").load(table2.location().toString), 
"id"),
+          .table("T1")
+          .join(spark.read.format("paimon").table("T2"), "id"),
         Row(1)
       )
     }
@@ -183,8 +181,8 @@ class PaimonOptionTest extends PaimonSparkTestBase {
       checkAnswer(
         spark.read
           .format("paimon")
-          .load(table1.location().toString)
-          .join(spark.read.format("paimon").load(table2.location().toString), 
"id"),
+          .table("T1")
+          .join(spark.read.format("paimon").table("T2"), "id"),
         Row(1)
       )
     }
@@ -198,8 +196,8 @@ class PaimonOptionTest extends PaimonSparkTestBase {
       checkAnswer(
         spark.read
           .format("paimon")
-          .load(table1.location().toString)
-          .join(spark.read.format("paimon").load(table2.location().toString), 
"id"),
+          .table("T1")
+          .join(spark.read.format("paimon").table("T2"), "id"),
         Row(1) :: Row(2) :: Nil
       )
     }

Reply via email to