This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fef6888ea2 [spark] Introduce a base class for SparkTable to support
multi-version APIs (#6421)
fef6888ea2 is described below
commit fef6888ea20637a2f52cb6a87112c8c773e24364
Author: Kerwin Zhang <[email protected]>
AuthorDate: Tue Oct 21 10:19:36 2025 +0800
[spark] Introduce a base class for SparkTable to support multi-version APIs
(#6421)
---
.../paimon/spark/PaimonPartitionManagement.scala | 2 +-
...SparkTable.scala => PaimonSparkTableBase.scala} | 5 +-
.../scala/org/apache/paimon/spark/SparkTable.scala | 151 +--------------------
3 files changed, 5 insertions(+), 153 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 4516c28359..b40812d4d7 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -36,7 +36,7 @@ import java.util.{Map => JMap, Objects, UUID}
import scala.collection.JavaConverters._
trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
- self: SparkTable =>
+ self: PaimonSparkTableBase =>
lazy val partitionRowType: RowType = TypeUtils.project(table.rowType,
table.partitionKeys)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
similarity index 96%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
copy to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
index 530cb40943..ff1cfc8705 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
@@ -25,7 +25,7 @@ import
org.apache.paimon.spark.catalog.functions.BucketFunction
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.spark.util.OptionUtils
import org.apache.paimon.spark.write.{PaimonV2WriteBuilder, PaimonWriteBuilder}
-import org.apache.paimon.table.{DataTable, FileStoreTable, InnerTable,
KnownSplitsTable, Table}
+import org.apache.paimon.table.{Table, _}
import org.apache.paimon.table.BucketMode.{BUCKET_UNAWARE, HASH_FIXED,
POSTPONE_MODE}
import org.apache.paimon.utils.StringUtils
@@ -41,8 +41,7 @@ import java.util.{Collections, EnumSet => JEnumSet, HashMap
=> JHashMap, Map =>
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-/** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
-case class SparkTable(table: Table)
+abstract class PaimonSparkTableBase(val table: Table)
extends org.apache.spark.sql.connector.catalog.Table
with SupportsRead
with SupportsWrite
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index 530cb40943..284426b615 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -18,154 +18,7 @@
package org.apache.paimon.spark
-import org.apache.paimon.CoreOptions
-import org.apache.paimon.CoreOptions.BucketFunctionType
-import org.apache.paimon.options.Options
-import org.apache.paimon.spark.catalog.functions.BucketFunction
-import org.apache.paimon.spark.schema.PaimonMetadataColumn
-import org.apache.paimon.spark.util.OptionUtils
-import org.apache.paimon.spark.write.{PaimonV2WriteBuilder, PaimonWriteBuilder}
-import org.apache.paimon.table.{DataTable, FileStoreTable, InnerTable,
KnownSplitsTable, Table}
-import org.apache.paimon.table.BucketMode.{BUCKET_UNAWARE, HASH_FIXED,
POSTPONE_MODE}
-import org.apache.paimon.utils.StringUtils
-
-import org.apache.spark.sql.connector.catalog._
-import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
-import org.apache.spark.sql.connector.read.ScanBuilder
-import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-
-import java.util.{Collections, EnumSet => JEnumSet, HashMap => JHashMap, Map
=> JMap, Set => JSet}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
+import org.apache.paimon.table.Table
/** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
-case class SparkTable(table: Table)
- extends org.apache.spark.sql.connector.catalog.Table
- with SupportsRead
- with SupportsWrite
- with SupportsMetadataColumns
- with PaimonPartitionManagement {
-
- lazy val coreOptions = new CoreOptions(table.options())
-
- private lazy val useV2Write: Boolean = {
- val v2WriteConfigured = OptionUtils.useV2Write()
- v2WriteConfigured && supportsV2Write
- }
-
- private def supportsV2Write: Boolean = {
- coreOptions.bucketFunctionType() == BucketFunctionType.DEFAULT && {
- table match {
- case storeTable: FileStoreTable =>
- storeTable.bucketMode() match {
- case HASH_FIXED => BucketFunction.supportsTable(storeTable)
- case BUCKET_UNAWARE | POSTPONE_MODE => true
- case _ => false
- }
-
- case _ => false
- }
- } && coreOptions.clusteringColumns().isEmpty
- }
-
- def getTable: Table = table
-
- override def name: String = table.fullName
-
- override lazy val schema: StructType =
SparkTypeUtils.fromPaimonRowType(table.rowType)
-
- override def partitioning: Array[Transform] = {
- table.partitionKeys().asScala.map(p =>
Expressions.identity(StringUtils.quote(p))).toArray
- }
-
- override def properties: JMap[String, String] = {
- table match {
- case dataTable: DataTable =>
- val properties = new JHashMap[String,
String](dataTable.coreOptions.toMap)
- if (!table.primaryKeys.isEmpty) {
- properties.put(CoreOptions.PRIMARY_KEY.key, String.join(",",
table.primaryKeys))
- }
- properties.put(TableCatalog.PROP_PROVIDER, SparkSource.NAME)
- if (table.comment.isPresent) {
- properties.put(TableCatalog.PROP_COMMENT, table.comment.get)
- }
- if (properties.containsKey(CoreOptions.PATH.key())) {
- properties.put(TableCatalog.PROP_LOCATION,
properties.get(CoreOptions.PATH.key()))
- }
- properties
- case _ => Collections.emptyMap()
- }
- }
-
- override def capabilities: JSet[TableCapability] = {
- val capabilities = JEnumSet.of(
- TableCapability.BATCH_READ,
- TableCapability.OVERWRITE_BY_FILTER,
- TableCapability.MICRO_BATCH_READ
- )
-
- if (useV2Write) {
- capabilities.add(TableCapability.ACCEPT_ANY_SCHEMA)
- capabilities.add(TableCapability.BATCH_WRITE)
- capabilities.add(TableCapability.OVERWRITE_DYNAMIC)
- } else {
- capabilities.add(TableCapability.ACCEPT_ANY_SCHEMA)
- capabilities.add(TableCapability.V1_BATCH_WRITE)
- }
-
- capabilities
- }
-
- override def metadataColumns: Array[MetadataColumn] = {
- val partitionType = SparkTypeUtils.toSparkPartitionType(table)
-
- val _metadataColumns = ArrayBuffer[MetadataColumn]()
-
- if (coreOptions.rowTrackingEnabled()) {
- _metadataColumns.append(PaimonMetadataColumn.ROW_ID)
- _metadataColumns.append(PaimonMetadataColumn.SEQUENCE_NUMBER)
- }
-
- _metadataColumns.appendAll(
- Seq(
- PaimonMetadataColumn.FILE_PATH,
- PaimonMetadataColumn.ROW_INDEX,
- PaimonMetadataColumn.PARTITION(partitionType),
- PaimonMetadataColumn.BUCKET
- ))
-
- _metadataColumns.toArray
- }
-
- override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
- table match {
- case t: KnownSplitsTable =>
- new PaimonSplitScanBuilder(t)
- case _: InnerTable =>
- new
PaimonScanBuilder(table.copy(options.asCaseSensitiveMap).asInstanceOf[InnerTable])
- case _ =>
- throw new RuntimeException("Only InnerTable can be scanned.")
- }
- }
-
- override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
- table match {
- case fileStoreTable: FileStoreTable =>
- val options = Options.fromMap(info.options)
- if (useV2Write) {
- new PaimonV2WriteBuilder(fileStoreTable, info.schema(), options)
- } else {
- new PaimonWriteBuilder(fileStoreTable, options)
- }
- case _ =>
- throw new RuntimeException("Only FileStoreTable can be written.")
- }
- }
-
- override def toString: String = {
- s"${table.getClass.getSimpleName}[${table.fullName()}]"
- }
-}
+case class SparkTable(override val table: Table) extends
PaimonSparkTableBase(table) {}