This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5a04c926f5 [spark] Optimize the config reading for load table in
SparkSource (#6049)
5a04c926f5 is described below
commit 5a04c926f54ac1bbe642bc7a8b39110c0949f9ea
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Aug 11 12:57:10 2025 +0800
[spark] Optimize the config reading for load table in SparkSource (#6049)
---
docs/content/spark/dataframe.md | 28 ++++++++-
.../org/apache/paimon/spark/SparkSource.scala | 66 ++++++++++++++--------
.../org/apache/paimon/spark/util/OptionUtils.scala | 37 ++++++------
.../spark/sql/DDLWithHiveCatalogTestBase.scala | 56 ++++++++++++++++++
.../apache/paimon/spark/sql/PaimonOptionTest.scala | 30 +++++-----
5 files changed, 159 insertions(+), 58 deletions(-)
diff --git a/docs/content/spark/dataframe.md b/docs/content/spark/dataframe.md
index 453ab6b3c5..b0e507793c 100644
--- a/docs/content/spark/dataframe.md
+++ b/docs/content/spark/dataframe.md
@@ -44,13 +44,13 @@ data.write.format("paimon")
## Insert
### Insert Into
-You can achieve INSERT INTO semantics by setting the mode to `append` (by
default).
+You can achieve INSERT INTO semantics by setting the mode to `append`.
```scala
val data: DataFrame = ...
data.write.format("paimon")
- .mode("append") // by default
+ .mode("append")
.insertInto("test_tbl") // or .saveAsTable("test_tbl") or
.save("/path/to/default.db/test_tbl")
```
@@ -95,3 +95,27 @@ spark.read.format("paimon")
.table("t") // or .load("/path/to/default.db/test_tbl")
.show()
```
+
+To specify the catalog or database, you can use
+
+```scala
+// recommend
+spark.read.format("paimon")
+ .table("<catalogName>.<databaseName>.<tableName>")
+
+// or
+spark.read.format("paimon")
+ .option("catalog", "<catalogName>")
+ .option("database", "<databaseName>")
+ .option("table", "<tableName>")
+ .load("/path/to/default.db/test_tbl")
+```
+
+You can specify other read configs through option:
+
+```scala
+// time travel
+spark.read.format("paimon")
+ .option("scan.snapshot-id", 1)
+ .table("t")
+```
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
index 979191cfab..7ad32a3a00 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -19,18 +19,18 @@
package org.apache.paimon.spark
import org.apache.paimon.CoreOptions
-import org.apache.paimon.catalog.{CatalogContext, CatalogUtils, Identifier}
+import org.apache.paimon.catalog.{CatalogContext, CatalogUtils}
import org.apache.paimon.options.Options
-import org.apache.paimon.spark.SparkSource.NAME
+import org.apache.paimon.spark.SparkSource._
import org.apache.paimon.spark.commands.WriteIntoPaimonTable
import org.apache.paimon.spark.sources.PaimonSink
-import org.apache.paimon.spark.util.OptionUtils.{extractCatalogName,
mergeSQLConfWithIdentifier}
+import org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf
import org.apache.paimon.table.{DataTable, FileStoreTable,
FileStoreTableFactory}
import org.apache.paimon.table.FormatTable.Format
import org.apache.paimon.table.system.AuditLogTable
-import org.apache.spark.sql.{DataFrame, PaimonSparkSession, SaveMode =>
SparkSaveMode, SparkSession, SQLContext}
-import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, Table}
+import org.apache.spark.sql.{DataFrame, PaimonSparkSession, SaveMode =>
SparkSaveMode, SQLContext}
+import org.apache.spark.sql.connector.catalog.{Identifier => SparkIdentifier,
SessionConfigSupport, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider,
DataSourceRegister, StreamSinkProvider}
@@ -82,24 +82,6 @@ class SparkSource
SparkSource.toBaseRelation(table, sqlContext)
}
- private def loadTable(options: JMap[String, String]): DataTable = {
- val path = CoreOptions.path(options)
- val catalogContext = CatalogContext.create(
- Options.fromMap(
- mergeSQLConfWithIdentifier(
- options,
- extractCatalogName().getOrElse(NAME),
- Identifier.create(CatalogUtils.database(path),
CatalogUtils.table(path)))),
- PaimonSparkSession.active.sessionState.newHadoopConf()
- )
- val table = FileStoreTableFactory.create(catalogContext)
- if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) {
- new AuditLogTable(table)
- } else {
- table
- }
- }
-
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
@@ -113,6 +95,35 @@ class SparkSource
new PaimonSink(sqlContext, table, partitionColumns, outputMode, options)
}
+ private def loadTable(options: JMap[String, String]): DataTable = {
+ val path = CoreOptions.path(options)
+ val sessionState = PaimonSparkSession.active.sessionState
+
+ // Only if user specifies the catalog name, then use catalog to get table,
otherwise
+ // use FileStoreTableFactory to get table by path
+ val table = if (options.containsKey(CATALOG)) {
+ val catalogName = options.get(CATALOG)
+ val dataBaseName =
Option(options.get(DATABASE)).getOrElse(CatalogUtils.database(path))
+ val tableName =
Option(options.get(TABLE)).getOrElse(CatalogUtils.table(path))
+ val sparkCatalog =
sessionState.catalogManager.catalog(catalogName).asInstanceOf[TableCatalog]
+ sparkCatalog
+ .loadTable(SparkIdentifier.of(Array(dataBaseName), tableName))
+ .asInstanceOf[SparkTable]
+ .getTable
+ .asInstanceOf[FileStoreTable]
+ .copy(options)
+ } else {
+ val catalogContext =
+ CatalogContext.create(Options.fromMap(options),
sessionState.newHadoopConf())
+ copyWithSQLConf(FileStoreTableFactory.create(catalogContext),
extraOptions = options)
+ }
+
+ if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) {
+ new AuditLogTable(table)
+ } else {
+ table
+ }
+ }
}
object SparkSource {
@@ -121,7 +132,14 @@ object SparkSource {
val FORMAT_NAMES: Seq[String] =
Format.values.map(_.toString.toLowerCase).toSeq
- def toBaseRelation(table: FileStoreTable, _sqlContext: SQLContext):
BaseRelation = {
+ // Spark dataframe read options
+ private val CATALOG = "catalog"
+
+ private val DATABASE = "database"
+
+ private val TABLE = "table"
+
+ private def toBaseRelation(table: FileStoreTable, _sqlContext: SQLContext):
BaseRelation = {
new BaseRelation {
override def sqlContext: SQLContext = _sqlContext
override def schema: StructType =
SparkTypeUtils.fromPaimonRowType(table.rowType())
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
index 253c0dac89..2acfc4665d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
@@ -26,7 +26,7 @@ import org.apache.paimon.table.Table
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.internal.StaticSQLConf
-import java.util.{Map => JMap}
+import java.util.{HashMap => JHashMap, Map => JMap}
import java.util.regex.Pattern
import scala.collection.JavaConverters._
@@ -71,19 +71,21 @@ object OptionUtils extends SQLConfHelper {
getOptionString(SparkConnectorOptions.EXPLICIT_CAST).toBoolean
}
- def extractCatalogName(): Option[String] = {
- val sparkCatalogTemplate = String.format("%s([^.]*)$",
SPARK_CATALOG_PREFIX)
- val sparkCatalogPattern = Pattern.compile(sparkCatalogTemplate)
- conf.getAllConfs.filterKeys(_.startsWith(SPARK_CATALOG_PREFIX)).foreach {
- case (key, _) =>
- val matcher = sparkCatalogPattern.matcher(key)
- if (matcher.find())
- return Option(matcher.group(1))
- }
- Option.empty
+ private def mergeSQLConf(extraOptions: JMap[String, String]): JMap[String,
String] = {
+ val mergedOptions = new JHashMap[String, String](
+ conf.getAllConfs
+ .filterKeys(_.startsWith(PAIMON_OPTION_PREFIX))
+ .map {
+ case (key, value) =>
+ key.stripPrefix(PAIMON_OPTION_PREFIX) -> value
+ }
+ .toMap
+ .asJava)
+ mergedOptions.putAll(extraOptions)
+ mergedOptions
}
- def mergeSQLConfWithIdentifier(
+ private def mergeSQLConfWithIdentifier(
extraOptions: JMap[String, String],
catalogName: String,
ident: Identifier): JMap[String, String] = {
@@ -106,11 +108,14 @@ object OptionUtils extends SQLConfHelper {
def copyWithSQLConf[T <: Table](
table: T,
- catalogName: String,
- ident: Identifier,
- extraOptions: JMap[String, String]): T = {
- val mergedOptions: JMap[String, String] =
+ catalogName: String = null,
+ ident: Identifier = null,
+ extraOptions: JMap[String, String] = new JHashMap[String, String]()): T
= {
+ val mergedOptions = if (catalogName != null && ident != null) {
mergeSQLConfWithIdentifier(extraOptions, catalogName, ident)
+ } else {
+ mergeSQLConf(extraOptions)
+ }
if (mergedOptions.isEmpty) {
table
} else {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
index 2a8b8e9b39..d0fa8f1b74 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
@@ -758,6 +758,62 @@ abstract class DDLWithHiveCatalogTestBase extends
PaimonHiveTestBase {
}
}
+ test("Paimon Dataframe: read write with catalog name") {
+ withTempDir {
+ tmpDir =>
+ spark.sql(s"USE $paimonHiveCatalogName")
+ val dbName = "paimon_db"
+ withDatabase(dbName) {
+ spark.sql(s"CREATE DATABASE $dbName")
+ spark.sql(s"USE $dbName")
+ withTable("t") {
+ val tblLocation = tmpDir.getCanonicalPath
+ spark.sql(s"CREATE TABLE t (a INT, b STRING) USING paimon LOCATION
'$tblLocation'")
+
+ // write by path
+ Seq((1, "x1"), (2, "x2"))
+ .toDF("a", "b")
+ .write
+ .format("paimon")
+ .option("catalog", paimonHiveCatalogName)
+ .option("database", dbName)
+ .option("table", "t")
+ .mode("append")
+ .save(tblLocation)
+
+ // write by name
+ Seq((3, "x3"), (2, "x4"))
+ .toDF("a", "b")
+ .write
+ .format("paimon")
+ .mode("append")
+ .saveAsTable(s"$paimonHiveCatalogName.$dbName.t")
+
+ // read by path with time travel
+ checkAnswer(
+ spark.read
+ .format("paimon")
+ .option("catalog", paimonHiveCatalogName)
+ .option("database", dbName)
+ .option("table", "t")
+ .option("scan.snapshot-id", 1)
+ .load(tblLocation),
+ Seq(Row(1, "x1"), Row(2, "x2"))
+ )
+
+ // read by name with time travel
+ checkAnswer(
+ spark.read
+ .format("paimon")
+ .option("scan.snapshot-id", 1)
+ .table(s"$paimonHiveCatalogName.$dbName.t"),
+ Seq(Row(1, "x1"), Row(2, "x2"))
+ )
+ }
+ }
+ }
+ }
+
def getDatabaseProp(dbName: String, propertyName: String): String = {
spark
.sql(s"DESC DATABASE EXTENDED $dbName")
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
index 9f2b7cad7f..a51893941e 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
@@ -95,7 +95,7 @@ class PaimonOptionTest extends PaimonSparkTestBase {
// query with table options
withSparkSQLConf("spark.paimon.*.*.T.scan.snapshot-id" -> "1") {
checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1))
- checkAnswer(spark.read.format("paimon").load(table.location().toString),
Row(1))
+ checkAnswer(spark.read.format("paimon").table("T"), Row(1))
}
// query with both global and table options
@@ -103,9 +103,7 @@ class PaimonOptionTest extends PaimonSparkTestBase {
"spark.paimon.scan.snapshot-id" -> "1",
"spark.paimon.*.*.T.scan.snapshot-id" -> "2") {
checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1) :: Row(2) :: Nil)
- checkAnswer(
- spark.read.format("paimon").load(table.location().toString),
- Row(1) :: Row(2) :: Nil)
+ checkAnswer(spark.read.format("paimon").table("T"), Row(1) :: Row(2) ::
Nil)
}
}
@@ -129,8 +127,8 @@ class PaimonOptionTest extends PaimonSparkTestBase {
checkAnswer(
spark.read
.format("paimon")
- .load(table1.location().toString)
- .join(spark.read.format("paimon").load(table2.location().toString),
"id"),
+ .table("T1")
+ .join(spark.read.format("paimon").table("T2"), "id"),
Row(1)
)
}
@@ -141,8 +139,8 @@ class PaimonOptionTest extends PaimonSparkTestBase {
checkAnswer(
spark.read
.format("paimon")
- .load(table1.location().toString)
- .join(spark.read.format("paimon").load(table2.location().toString),
"id"),
+ .table("T1")
+ .join(spark.read.format("paimon").table("T2"), "id"),
Row(1)
)
}
@@ -157,8 +155,8 @@ class PaimonOptionTest extends PaimonSparkTestBase {
checkAnswer(
spark.read
.format("paimon")
- .load(table1.location().toString)
- .join(spark.read.format("paimon").load(table2.location().toString),
"id"),
+ .table("T1")
+ .join(spark.read.format("paimon").table("T2"), "id"),
Row(1) :: Row(2) :: Nil
)
}
@@ -170,8 +168,8 @@ class PaimonOptionTest extends PaimonSparkTestBase {
checkAnswer(
spark.read
.format("paimon")
- .load(table1.location().toString)
- .join(spark.read.format("paimon").load(table2.location().toString),
"id"),
+ .table("T1")
+ .join(spark.read.format("paimon").table("T2"), "id"),
Row(1)
)
}
@@ -183,8 +181,8 @@ class PaimonOptionTest extends PaimonSparkTestBase {
checkAnswer(
spark.read
.format("paimon")
- .load(table1.location().toString)
- .join(spark.read.format("paimon").load(table2.location().toString),
"id"),
+ .table("T1")
+ .join(spark.read.format("paimon").table("T2"), "id"),
Row(1)
)
}
@@ -198,8 +196,8 @@ class PaimonOptionTest extends PaimonSparkTestBase {
checkAnswer(
spark.read
.format("paimon")
- .load(table1.location().toString)
- .join(spark.read.format("paimon").load(table2.location().toString),
"id"),
+ .table("T1")
+ .join(spark.read.format("paimon").table("T2"), "id"),
Row(1) :: Row(2) :: Nil
)
}