This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a6f3f7e424 [spark] Refactor for spark format table (#6477)
a6f3f7e424 is described below

commit a6f3f7e4242d4099fd6239de8bf0e973d22cd7cc
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Oct 28 08:34:23 2025 +0800

    [spark] Refactor for spark format table (#6477)
---
 .../java/org/apache/paimon/spark/SparkCatalog.java |  94 --------
 .../paimon/spark/catalog/FormatTableCatalog.java   | 102 +++++++++
 ...PaimonFormatTableScan.scala => BaseTable.scala} |  38 +++-
 .../paimon/spark/PaimonFormatTableBaseScan.scala   |   2 +-
 .../paimon/spark/PaimonFormatTableScan.scala       |   2 +-
 .../paimon/spark/PaimonPartitionManagement.scala   |   9 +-
 .../apache/paimon/spark/PaimonSparkTableBase.scala |  21 +-
 .../paimon/spark/format/PaimonFormatTable.scala    | 238 ++++++++++++++++++++
 ...monFormatTable.scala => SparkFormatTable.scala} | 246 +--------------------
 .../paimon/spark/table/PaimonFormatTableTest.scala |  22 ++
 10 files changed, 407 insertions(+), 367 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 26693ed3f7..835756ffff 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -24,7 +24,6 @@ import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.DelegateCatalog;
 import org.apache.paimon.catalog.PropertyChange;
-import org.apache.paimon.format.csv.CsvOptions;
 import org.apache.paimon.function.Function;
 import org.apache.paimon.function.FunctionDefinition;
 import org.apache.paimon.options.Options;
@@ -43,7 +42,6 @@ import org.apache.paimon.types.BlobType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.utils.ExceptionUtils;
-import org.apache.paimon.utils.TypeUtils;
 
 import org.apache.spark.sql.PaimonSparkSession$;
 import org.apache.spark.sql.SparkSession;
@@ -61,7 +59,6 @@ import org.apache.spark.sql.connector.catalog.FunctionCatalog;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.NamespaceChange;
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
-import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.catalog.TableChange;
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
@@ -69,16 +66,6 @@ import 
org.apache.spark.sql.connector.expressions.FieldReference;
 import org.apache.spark.sql.connector.expressions.IdentityTransform;
 import org.apache.spark.sql.connector.expressions.NamedReference;
 import org.apache.spark.sql.connector.expressions.Transform;
-import org.apache.spark.sql.execution.PaimonFormatTable;
-import org.apache.spark.sql.execution.PartitionedCSVTable;
-import org.apache.spark.sql.execution.PartitionedJsonTable;
-import org.apache.spark.sql.execution.PartitionedOrcTable;
-import org.apache.spark.sql.execution.PartitionedParquetTable;
-import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat;
-import org.apache.spark.sql.execution.datasources.json.JsonFileFormat;
-import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat;
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat;
-import org.apache.spark.sql.execution.datasources.v2.FileTable;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -678,87 +665,6 @@ public class SparkCatalog extends SparkBaseCatalog
         }
     }
 
-    private static Table toSparkFormatTable(Identifier ident, FormatTable 
formatTable) {
-        SparkSession spark = PaimonSparkSession$.MODULE$.active();
-        StructType schema = 
SparkTypeUtils.fromPaimonRowType(formatTable.rowType());
-        StructType partitionSchema =
-                SparkTypeUtils.fromPaimonRowType(
-                        TypeUtils.project(formatTable.rowType(), 
formatTable.partitionKeys()));
-        List<String> pathList = new ArrayList<>();
-        pathList.add(formatTable.location());
-        Map<String, String> optionsMap = formatTable.options();
-        CoreOptions coreOptions = new CoreOptions(optionsMap);
-        if (coreOptions.formatTableImplementationIsPaimon()) {
-            return new PaimonFormatTable(
-                    spark,
-                    new CaseInsensitiveStringMap(optionsMap),
-                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
-                    schema,
-                    partitionSchema,
-                    formatTable,
-                    ident.name());
-        }
-        Options options = Options.fromMap(formatTable.options());
-        return convertToFileTable(
-                formatTable, ident, pathList, options, spark, schema, 
partitionSchema);
-    }
-
-    private static FileTable convertToFileTable(
-            FormatTable formatTable,
-            Identifier ident,
-            List<String> pathList,
-            Options options,
-            SparkSession spark,
-            StructType schema,
-            StructType partitionSchema) {
-        CaseInsensitiveStringMap dsOptions = new 
CaseInsensitiveStringMap(options.toMap());
-        if (formatTable.format() == FormatTable.Format.CSV) {
-            options.set("sep", options.get(CsvOptions.FIELD_DELIMITER));
-            dsOptions = new CaseInsensitiveStringMap(options.toMap());
-            return new PartitionedCSVTable(
-                    ident.name(),
-                    spark,
-                    dsOptions,
-                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
-                    scala.Option.apply(schema),
-                    CSVFileFormat.class,
-                    partitionSchema);
-        } else if (formatTable.format() == FormatTable.Format.ORC) {
-            return new PartitionedOrcTable(
-                    ident.name(),
-                    spark,
-                    dsOptions,
-                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
-                    scala.Option.apply(schema),
-                    OrcFileFormat.class,
-                    partitionSchema);
-        } else if (formatTable.format() == FormatTable.Format.PARQUET) {
-            return new PartitionedParquetTable(
-                    ident.name(),
-                    spark,
-                    dsOptions,
-                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
-                    scala.Option.apply(schema),
-                    ParquetFileFormat.class,
-                    partitionSchema);
-        } else if (formatTable.format() == FormatTable.Format.JSON) {
-            return new PartitionedJsonTable(
-                    ident.name(),
-                    spark,
-                    dsOptions,
-                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
-                    scala.Option.apply(schema),
-                    JsonFileFormat.class,
-                    partitionSchema);
-        } else {
-            throw new UnsupportedOperationException(
-                    "Unsupported format table "
-                            + ident.name()
-                            + " format "
-                            + formatTable.format().name());
-        }
-    }
-
     protected List<String> convertPartitionTransforms(Transform[] transforms) {
         List<String> partitionColNames = new ArrayList<>(transforms.length);
         for (Transform transform : transforms) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/FormatTableCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/FormatTableCatalog.java
index 551fc54f73..e7fcf19fcf 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/FormatTableCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/FormatTableCatalog.java
@@ -18,14 +18,116 @@
 
 package org.apache.paimon.spark.catalog;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.format.csv.CsvOptions;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.spark.SparkSource;
+import org.apache.paimon.spark.SparkTypeUtils;
+import org.apache.paimon.spark.format.PaimonFormatTable;
+import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.utils.TypeUtils;
+
+import org.apache.spark.sql.PaimonSparkSession$;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.execution.PartitionedCSVTable;
+import org.apache.spark.sql.execution.PartitionedJsonTable;
+import org.apache.spark.sql.execution.PartitionedOrcTable;
+import org.apache.spark.sql.execution.PartitionedParquetTable;
+import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat;
+import org.apache.spark.sql.execution.datasources.json.JsonFileFormat;
+import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat;
+import org.apache.spark.sql.execution.datasources.v2.FileTable;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 /** Catalog supports format table. */
 public interface FormatTableCatalog {
 
     default boolean isFormatTable(@Nullable String provide) {
         return provide != null && 
SparkSource.FORMAT_NAMES().contains(provide.toLowerCase());
     }
+
+    default Table toSparkFormatTable(Identifier ident, FormatTable 
formatTable) {
+        Map<String, String> optionsMap = formatTable.options();
+        CoreOptions coreOptions = new CoreOptions(optionsMap);
+        if (coreOptions.formatTableImplementationIsPaimon()) {
+            return new PaimonFormatTable(formatTable);
+        } else {
+            SparkSession spark = PaimonSparkSession$.MODULE$.active();
+            List<String> pathList = new ArrayList<>();
+            pathList.add(formatTable.location());
+            StructType schema = 
SparkTypeUtils.fromPaimonRowType(formatTable.rowType());
+            StructType partitionSchema =
+                    SparkTypeUtils.fromPaimonRowType(
+                            TypeUtils.project(formatTable.rowType(), 
formatTable.partitionKeys()));
+            Options options = Options.fromMap(formatTable.options());
+            return convertToFileTable(
+                    formatTable, ident, pathList, options, spark, schema, 
partitionSchema);
+        }
+    }
+
+    default FileTable convertToFileTable(
+            FormatTable formatTable,
+            Identifier ident,
+            List<String> pathList,
+            Options options,
+            SparkSession spark,
+            StructType schema,
+            StructType partitionSchema) {
+        CaseInsensitiveStringMap dsOptions = new 
CaseInsensitiveStringMap(options.toMap());
+        if (formatTable.format() == FormatTable.Format.CSV) {
+            options.set("sep", options.get(CsvOptions.FIELD_DELIMITER));
+            dsOptions = new CaseInsensitiveStringMap(options.toMap());
+            return new PartitionedCSVTable(
+                    ident.name(),
+                    spark,
+                    dsOptions,
+                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+                    scala.Option.apply(schema),
+                    CSVFileFormat.class,
+                    partitionSchema);
+        } else if (formatTable.format() == FormatTable.Format.ORC) {
+            return new PartitionedOrcTable(
+                    ident.name(),
+                    spark,
+                    dsOptions,
+                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+                    scala.Option.apply(schema),
+                    OrcFileFormat.class,
+                    partitionSchema);
+        } else if (formatTable.format() == FormatTable.Format.PARQUET) {
+            return new PartitionedParquetTable(
+                    ident.name(),
+                    spark,
+                    dsOptions,
+                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+                    scala.Option.apply(schema),
+                    ParquetFileFormat.class,
+                    partitionSchema);
+        } else if (formatTable.format() == FormatTable.Format.JSON) {
+            return new PartitionedJsonTable(
+                    ident.name(),
+                    spark,
+                    dsOptions,
+                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+                    scala.Option.apply(schema),
+                    JsonFileFormat.class,
+                    partitionSchema);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported format table "
+                            + ident.name()
+                            + " format "
+                            + formatTable.format().name());
+        }
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/BaseTable.scala
similarity index 51%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
copy to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/BaseTable.scala
index ab429c82bc..e7521cd409 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/BaseTable.scala
@@ -18,15 +18,35 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.predicate.Predicate
-import org.apache.paimon.table.FormatTable
+import org.apache.paimon.table.Table
+import org.apache.paimon.utils.StringUtils
 
+import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
 import org.apache.spark.sql.types.StructType
 
-/** Scan implementation for {@link FormatTable}. */
-case class PaimonFormatTableScan(
-    table: FormatTable,
-    requiredSchema: StructType,
-    filters: Seq[Predicate],
-    override val pushDownLimit: Option[Int])
-  extends PaimonFormatTableBaseScan(table, requiredSchema, filters, 
pushDownLimit) {}
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+
+abstract class BaseTable
+  extends org.apache.spark.sql.connector.catalog.Table
+  with PaimonPartitionManagement {
+
+  val table: Table
+
+  override def name: String = table.fullName
+
+  override lazy val schema: StructType = 
SparkTypeUtils.fromPaimonRowType(table.rowType)
+
+  override def partitioning: Array[Transform] = {
+    table.partitionKeys().asScala.map(p => 
Expressions.identity(StringUtils.quote(p))).toArray
+  }
+
+  override def properties: JMap[String, String] = {
+    table.options()
+  }
+
+  override def toString: String = {
+    s"${table.getClass.getSimpleName}[${table.fullName()}]"
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
index 2088ee50c2..b04e21c99a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.types.StructType
 
 import scala.collection.JavaConverters._
 
-/** Base Scan implementation for {@link FormatTable}. */
+/** Base Scan implementation for [[FormatTable]]. */
 abstract class PaimonFormatTableBaseScan(
     table: FormatTable,
     requiredSchema: StructType,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
index ab429c82bc..8fb64999b9 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.table.FormatTable
 
 import org.apache.spark.sql.types.StructType
 
-/** Scan implementation for {@link FormatTable}. */
+/** Scan implementation for [[FormatTable]] */
 case class PaimonFormatTableScan(
     table: FormatTable,
     requiredSchema: StructType,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index b40812d4d7..193ec55d3b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -19,9 +19,7 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.CoreOptions
-import org.apache.paimon.operation.FileStoreCommit
-import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.sink.BatchWriteBuilder
+import org.apache.paimon.table.{FileStoreTable, Table}
 import org.apache.paimon.types.RowType
 import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils}
 
@@ -31,12 +29,13 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement
 import org.apache.spark.sql.types.StructType
 
-import java.util.{Map => JMap, Objects, UUID}
+import java.util.{Map => JMap, Objects}
 
 import scala.collection.JavaConverters._
 
 trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
-  self: PaimonSparkTableBase =>
+
+  val table: Table
 
   lazy val partitionRowType: RowType = TypeUtils.project(table.rowType, 
table.partitionKeys)
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
index ff1cfc8705..addaaa0831 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
@@ -27,26 +27,21 @@ import org.apache.paimon.spark.util.OptionUtils
 import org.apache.paimon.spark.write.{PaimonV2WriteBuilder, PaimonWriteBuilder}
 import org.apache.paimon.table.{Table, _}
 import org.apache.paimon.table.BucketMode.{BUCKET_UNAWARE, HASH_FIXED, 
POSTPONE_MODE}
-import org.apache.paimon.utils.StringUtils
 
 import org.apache.spark.sql.connector.catalog._
-import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 import java.util.{Collections, EnumSet => JEnumSet, HashMap => JHashMap, Map 
=> JMap, Set => JSet}
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 abstract class PaimonSparkTableBase(val table: Table)
-  extends org.apache.spark.sql.connector.catalog.Table
+  extends BaseTable
   with SupportsRead
   with SupportsWrite
-  with SupportsMetadataColumns
-  with PaimonPartitionManagement {
+  with SupportsMetadataColumns {
 
   lazy val coreOptions = new CoreOptions(table.options())
 
@@ -72,14 +67,6 @@ abstract class PaimonSparkTableBase(val table: Table)
 
   def getTable: Table = table
 
-  override def name: String = table.fullName
-
-  override lazy val schema: StructType = 
SparkTypeUtils.fromPaimonRowType(table.rowType)
-
-  override def partitioning: Array[Transform] = {
-    table.partitionKeys().asScala.map(p => 
Expressions.identity(StringUtils.quote(p))).toArray
-  }
-
   override def properties: JMap[String, String] = {
     table match {
       case dataTable: DataTable =>
@@ -163,8 +150,4 @@ abstract class PaimonSparkTableBase(val table: Table)
         throw new RuntimeException("Only FileStoreTable can be written.")
     }
   }
-
-  override def toString: String = {
-    s"${table.getClass.getSimpleName}[${table.fullName()}]"
-  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
new file mode 100644
index 0000000000..3685e26ee6
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.format
+
+import org.apache.paimon.fs.TwoPhaseOutputStream
+import org.apache.paimon.spark.{BaseTable, FormatTableScanBuilder, 
SparkInternalRowWrapper}
+import org.apache.paimon.spark.write.BaseV2WriteBuilder
+import org.apache.paimon.table.FormatTable
+import org.apache.paimon.table.format.TwoPhaseCommitMessage
+import org.apache.paimon.table.sink.BatchTableWrite
+import org.apache.paimon.types.RowType
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, 
TableCapability}
+import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, 
BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC}
+import org.apache.spark.sql.connector.read.ScanBuilder
+import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.connector.write.streaming.StreamingWrite
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.io.FileNotFoundException
+import java.util
+
+import scala.collection.JavaConverters._
+
+case class PaimonFormatTable(table: FormatTable)
+  extends BaseTable
+  with SupportsRead
+  with SupportsWrite {
+
+  override def capabilities(): util.Set[TableCapability] = {
+    util.EnumSet.of(BATCH_READ, BATCH_WRITE, OVERWRITE_DYNAMIC, 
OVERWRITE_BY_FILTER)
+  }
+
+  override def newScanBuilder(caseInsensitiveStringMap: 
CaseInsensitiveStringMap): ScanBuilder = {
+    val scanBuilder = 
FormatTableScanBuilder(table.copy(caseInsensitiveStringMap))
+    scanBuilder.pruneColumns(schema)
+    scanBuilder
+  }
+
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    PaimonFormatTableWriterBuilder(table, info.schema)
+  }
+}
+
+case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema: 
StructType)
+  extends BaseV2WriteBuilder(table) {
+
+  override def partitionRowType(): RowType = table.partitionType
+
+  override def build: Write = new Write() {
+    override def toBatch: BatchWrite = {
+      FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions, 
writeSchema)
+    }
+
+    override def toStreaming: StreamingWrite = {
+      throw new UnsupportedOperationException("FormatTable doesn't support 
streaming write")
+    }
+  }
+}
+
+private case class FormatTableBatchWrite(
+    table: FormatTable,
+    overwriteDynamic: Boolean,
+    overwritePartitions: Option[Map[String, String]],
+    writeSchema: StructType)
+  extends BatchWrite
+  with Logging {
+
+  assert(
+    !(overwriteDynamic && overwritePartitions.exists(_.nonEmpty)),
+    "Cannot overwrite dynamically and by filter both")
+
+  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory =
+    FormatTableWriterFactory(table, writeSchema)
+
+  override def useCommitCoordinator(): Boolean = false
+
+  override def commit(messages: Array[WriterCommitMessage]): Unit = {
+    logInfo(s"Committing to FormatTable ${table.name()}")
+
+    val committers = messages
+      .collect {
+        case taskCommit: FormatTableTaskCommit => taskCommit.committers()
+        case other =>
+          throw new IllegalArgumentException(s"${other.getClass.getName} is 
not supported")
+      }
+      .flatten
+      .toSeq
+
+    try {
+      val start = System.currentTimeMillis()
+      if (overwritePartitions.isDefined && overwritePartitions.get.nonEmpty) {
+        val child = org.apache.paimon.partition.PartitionUtils
+          .buildPartitionName(overwritePartitions.get.asJava)
+        val partitionPath = new org.apache.paimon.fs.Path(table.location(), 
child)
+        deletePreviousDataFile(partitionPath)
+      } else if (overwritePartitions.isDefined && 
overwritePartitions.get.isEmpty) {
+        committers
+          .map(c => c.targetFilePath().getParent)
+          .distinct
+          .foreach(deletePreviousDataFile)
+      }
+      committers.foreach(c => c.commit(table.fileIO()))
+      logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
+    } catch {
+      case e: Exception =>
+        logError("Failed to commit FormatTable writes", e)
+        throw e
+    }
+  }
+
+  private def deletePreviousDataFile(partitionPath: 
org.apache.paimon.fs.Path): Unit = {
+    if (table.fileIO().exists(partitionPath)) {
+      val files = table.fileIO().listFiles(partitionPath, true)
+      files
+        .filter(f => !f.getPath.getName.startsWith(".") && 
!f.getPath.getName.startsWith("_"))
+        .foreach(
+          f => {
+            try {
+              table.fileIO().deleteQuietly(f.getPath)
+            } catch {
+              case _: FileNotFoundException => logInfo(s"File ${f.getPath} 
already deleted")
+              case other => throw new RuntimeException(other)
+            }
+          })
+    }
+  }
+
+  override def abort(messages: Array[WriterCommitMessage]): Unit = {
+    logInfo(s"Aborting write to FormatTable ${table.name()}")
+    val committers = messages.collect {
+      case taskCommit: FormatTableTaskCommit => taskCommit.committers()
+    }.flatten
+
+    committers.foreach {
+      committer =>
+        try {
+          committer.discard(table.fileIO())
+        } catch {
+          case e: Exception => logWarning(s"Failed to abort committer: 
${e.getMessage}")
+        }
+    }
+  }
+}
+
+private case class FormatTableWriterFactory(table: FormatTable, writeSchema: 
StructType)
+  extends DataWriterFactory {
+
+  override def createWriter(partitionId: Int, taskId: Long): 
DataWriter[InternalRow] = {
+    val formatTableWrite = table.newBatchWriteBuilder().newWrite()
+    new FormatTableDataWriter(table, formatTableWrite, writeSchema)
+  }
+}
+
+private class FormatTableDataWriter(
+    table: FormatTable,
+    formatTableWrite: BatchTableWrite,
+    writeSchema: StructType)
+  extends DataWriter[InternalRow]
+  with Logging {
+
+  private val rowConverter: InternalRow => org.apache.paimon.data.InternalRow 
= {
+    val numFields = writeSchema.fields.length
+    record => {
+      new SparkInternalRowWrapper(-1, writeSchema, numFields).replace(record)
+    }
+  }
+
+  override def write(record: InternalRow): Unit = {
+    val paimonRow = rowConverter.apply(record)
+    formatTableWrite.write(paimonRow)
+  }
+
+  override def commit(): WriterCommitMessage = {
+    try {
+      val committers = formatTableWrite
+        .prepareCommit()
+        .asScala
+        .map {
+          case committer: TwoPhaseCommitMessage => committer.getCommitter
+          case other =>
+            throw new IllegalArgumentException(
+              "Unsupported commit message type: " + 
other.getClass.getSimpleName)
+        }
+        .toSeq
+      FormatTableTaskCommit(committers)
+    } finally {
+      close()
+    }
+  }
+
+  override def abort(): Unit = {
+    logInfo("Aborting FormatTable data writer")
+    close()
+  }
+
+  override def close(): Unit = {
+    try {
+      formatTableWrite.close()
+    } catch {
+      case e: Exception =>
+        logError("Error closing FormatTableDataWriter", e)
+        throw new RuntimeException(e)
+    }
+  }
+}
+
+/** Commit message container for FormatTable writes, holding committers that 
need to be executed. */
+class FormatTableTaskCommit private (private val _committers: 
Seq[TwoPhaseOutputStream.Committer])
+  extends WriterCommitMessage {
+
+  def committers(): Seq[TwoPhaseOutputStream.Committer] = _committers
+}
+
+object FormatTableTaskCommit {
+  def apply(committers: Seq[TwoPhaseOutputStream.Committer]): 
FormatTableTaskCommit = {
+    new FormatTableTaskCommit(committers)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
similarity index 51%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
index 5b3aaa4713..0a9c1fcdab 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
@@ -18,27 +18,15 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.paimon.fs.TwoPhaseOutputStream
-import org.apache.paimon.spark.{FormatTableScanBuilder, 
SparkInternalRowWrapper}
-import org.apache.paimon.spark.write.BaseV2WriteBuilder
-import org.apache.paimon.table.FormatTable
-import org.apache.paimon.table.format.TwoPhaseCommitMessage
-import org.apache.paimon.table.sink.BatchTableWrite
-import org.apache.paimon.types.RowType
 import org.apache.paimon.utils.StringUtils
 
 import org.apache.hadoop.fs.Path
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Literal}
-import org.apache.spark.sql.connector.catalog.{SupportsPartitionManagement, 
SupportsRead, SupportsWrite, TableCapability}
-import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, 
BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC}
+import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
 import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
-import org.apache.spark.sql.connector.read.ScanBuilder
-import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, 
DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, Write, WriteBuilder, 
WriterCommitMessage}
-import org.apache.spark.sql.connector.write.streaming.StreamingWrite
-import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, 
FileStatusCache, InMemoryFileIndex, NoopCache, PartitioningAwareFileIndex, 
PartitionSpec}
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.v2.csv.{CSVScanBuilder, 
CSVTable}
 import org.apache.spark.sql.execution.datasources.v2.json.JsonTable
 import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
@@ -47,12 +35,12 @@ import 
org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFile
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-import java.io.FileNotFoundException
 import java.util
 
 import scala.collection.JavaConverters._
 
-object PaimonFormatTable {
+/** Format Table implementation with spark. */
+object SparkFormatTable {
 
   // Copy from spark and override FileIndex's partitionSchema
   def createFileIndex(
@@ -128,7 +116,6 @@ object PaimonFormatTable {
       metadataOpsTimeNs)
 }
 
-// Paimon Format Table
 trait PartitionedFormatTable extends SupportsPartitionManagement {
 
   val partitionSchema_ : StructType
@@ -173,47 +160,6 @@ trait PartitionedFormatTable extends 
SupportsPartitionManagement {
   }
 }
 
-case class PaimonFormatTable(
-    sparkSession: SparkSession,
-    options: CaseInsensitiveStringMap,
-    paths: Seq[String],
-    schema: StructType,
-    override val partitionSchema_ : StructType,
-    table: FormatTable,
-    identName: String)
-  extends org.apache.spark.sql.connector.catalog.Table
-  with SupportsRead
-  with SupportsWrite
-  with PartitionedFormatTable {
-
-  override lazy val fileIndex: PartitioningAwareFileIndex = {
-    PaimonFormatTable.createFileIndex(
-      options,
-      sparkSession,
-      paths,
-      Option.apply(schema),
-      partitionSchema())
-  }
-
-  override def name(): String = {
-    identName
-  }
-
-  override def capabilities(): util.Set[TableCapability] = {
-    util.EnumSet.of(BATCH_READ, BATCH_WRITE, OVERWRITE_DYNAMIC, 
OVERWRITE_BY_FILTER)
-  }
-
-  override def newScanBuilder(caseInsensitiveStringMap: 
CaseInsensitiveStringMap): ScanBuilder = {
-    val scanBuilder = 
FormatTableScanBuilder(table.copy(caseInsensitiveStringMap))
-    scanBuilder.pruneColumns(schema)
-    scanBuilder
-  }
-
-  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
-    PaimonFormatTableWriterBuilder(table, info.schema)
-  }
-}
-
 class PartitionedCSVTable(
     name: String,
     sparkSession: SparkSession,
@@ -237,7 +183,7 @@ class PartitionedCSVTable(
   }
 
   override lazy val fileIndex: PartitioningAwareFileIndex = {
-    PaimonFormatTable.createFileIndex(
+    SparkFormatTable.createFileIndex(
       options,
       sparkSession,
       paths,
@@ -263,7 +209,7 @@ class PartitionedOrcTable(
   with PartitionedFormatTable {
 
   override lazy val fileIndex: PartitioningAwareFileIndex = {
-    PaimonFormatTable.createFileIndex(
+    SparkFormatTable.createFileIndex(
       options,
       sparkSession,
       paths,
@@ -284,7 +230,7 @@ class PartitionedParquetTable(
   with PartitionedFormatTable {
 
   override lazy val fileIndex: PartitioningAwareFileIndex = {
-    PaimonFormatTable.createFileIndex(
+    SparkFormatTable.createFileIndex(
       options,
       sparkSession,
       paths,
@@ -305,7 +251,7 @@ class PartitionedJsonTable(
   with PartitionedFormatTable {
 
   override lazy val fileIndex: PartitioningAwareFileIndex = {
-    PaimonFormatTable.createFileIndex(
+    SparkFormatTable.createFileIndex(
       options,
       sparkSession,
       paths,
@@ -313,179 +259,3 @@ class PartitionedJsonTable(
       partitionSchema())
   }
 }
-
-case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema: 
StructType)
-  extends BaseV2WriteBuilder(table) {
-
-  override def partitionRowType(): RowType = table.partitionType
-
-  override def build: Write = new Write() {
-    override def toBatch: BatchWrite = {
-      FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions, 
writeSchema)
-    }
-
-    override def toStreaming: StreamingWrite = {
-      throw new UnsupportedOperationException("FormatTable doesn't support 
streaming write")
-    }
-  }
-}
-
-private case class FormatTableBatchWrite(
-    table: FormatTable,
-    overwriteDynamic: Boolean,
-    overwritePartitions: Option[Map[String, String]],
-    writeSchema: StructType)
-  extends BatchWrite
-  with Logging {
-
-  assert(
-    !(overwriteDynamic && overwritePartitions.exists(_.nonEmpty)),
-    "Cannot overwrite dynamically and by filter both")
-
-  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory =
-    FormatTableWriterFactory(table, writeSchema)
-
-  override def useCommitCoordinator(): Boolean = false
-
-  override def commit(messages: Array[WriterCommitMessage]): Unit = {
-    logInfo(s"Committing to FormatTable ${table.name()}")
-
-    val committers = messages
-      .collect {
-        case taskCommit: FormatTableTaskCommit => taskCommit.committers()
-        case other =>
-          throw new IllegalArgumentException(s"${other.getClass.getName} is 
not supported")
-      }
-      .flatten
-      .toSeq
-
-    try {
-      val start = System.currentTimeMillis()
-      if (overwritePartitions.isDefined && overwritePartitions.get.nonEmpty) {
-        val child = org.apache.paimon.partition.PartitionUtils
-          .buildPartitionName(overwritePartitions.get.asJava)
-        val partitionPath = new org.apache.paimon.fs.Path(table.location(), 
child)
-        deletePreviousDataFile(partitionPath)
-      } else if (overwritePartitions.isDefined && 
overwritePartitions.get.isEmpty) {
-        committers
-          .map(c => c.targetFilePath().getParent)
-          .distinct
-          .foreach(deletePreviousDataFile)
-      }
-      committers.foreach(c => c.commit(table.fileIO()))
-      logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
-    } catch {
-      case e: Exception =>
-        logError("Failed to commit FormatTable writes", e)
-        throw e
-    }
-  }
-
-  def deletePreviousDataFile(partitionPath: org.apache.paimon.fs.Path): Unit = 
{
-    if (table.fileIO().exists(partitionPath)) {
-      val files = table.fileIO().listFiles(partitionPath, true)
-      files
-        .filter(f => !f.getPath.getName.startsWith(".") && 
!f.getPath.getName.startsWith("_"))
-        .foreach(
-          f => {
-            try {
-              table.fileIO().deleteQuietly(f.getPath)
-            } catch {
-              case e: FileNotFoundException => logInfo(s"File ${f.getPath} 
already deleted")
-              case other => throw new RuntimeException(other)
-            }
-          })
-    }
-  }
-
-  override def abort(messages: Array[WriterCommitMessage]): Unit = {
-    logInfo(s"Aborting write to FormatTable ${table.name()}")
-    val committers = messages.collect {
-      case taskCommit: FormatTableTaskCommit => taskCommit.committers()
-    }.flatten
-
-    committers.foreach {
-      committer =>
-        try {
-          committer.discard(table.fileIO())
-        } catch {
-          case e: Exception => logWarning(s"Failed to abort committer: 
${e.getMessage}")
-        }
-    }
-  }
-}
-
-private case class FormatTableWriterFactory(table: FormatTable, writeSchema: 
StructType)
-  extends DataWriterFactory {
-
-  override def createWriter(partitionId: Int, taskId: Long): 
DataWriter[InternalRow] = {
-    val formatTableWrite = table.newBatchWriteBuilder().newWrite()
-    new FormatTableDataWriter(table, formatTableWrite, writeSchema)
-  }
-}
-
-private class FormatTableDataWriter(
-    table: FormatTable,
-    formatTableWrite: BatchTableWrite,
-    writeSchema: StructType)
-  extends DataWriter[InternalRow]
-  with Logging {
-
-  private val rowConverter: InternalRow => org.apache.paimon.data.InternalRow 
= {
-    val numFields = writeSchema.fields.length
-    record => {
-      new SparkInternalRowWrapper(-1, writeSchema, numFields).replace(record)
-    }
-  }
-
-  override def write(record: InternalRow): Unit = {
-    val paimonRow = rowConverter.apply(record)
-    formatTableWrite.write(paimonRow)
-  }
-
-  override def commit(): WriterCommitMessage = {
-    try {
-      val committers = formatTableWrite
-        .prepareCommit()
-        .asScala
-        .map {
-          case committer: TwoPhaseCommitMessage => committer.getCommitter
-          case other =>
-            throw new IllegalArgumentException(
-              "Unsupported commit message type: " + 
other.getClass.getSimpleName)
-        }
-        .toSeq
-      FormatTableTaskCommit(committers)
-    } finally {
-      close()
-    }
-  }
-
-  override def abort(): Unit = {
-    logInfo("Aborting FormatTable data writer")
-    close()
-  }
-
-  override def close(): Unit = {
-    try {
-      formatTableWrite.close()
-    } catch {
-      case e: Exception =>
-        logError("Error closing FormatTableDataWriter", e)
-        throw new RuntimeException(e)
-    }
-  }
-}
-
-/** Commit message container for FormatTable writes, holding committers that 
need to be executed. */
-class FormatTableTaskCommit private (private val _committers: 
Seq[TwoPhaseOutputStream.Committer])
-  extends WriterCommitMessage {
-
-  def committers(): Seq[TwoPhaseOutputStream.Committer] = _committers
-}
-
-object FormatTableTaskCommit {
-  def apply(committers: Seq[TwoPhaseOutputStream.Committer]): 
FormatTableTaskCommit = {
-    new FormatTableTaskCommit(committers)
-  }
-}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
index 23e3f5a2eb..44c6abb297 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
@@ -287,4 +287,26 @@ class PaimonFormatTableTest extends 
PaimonSparkTestWithRestCatalogBase {
       }
     }
   }
+
+  test("Paimon format table: show partitions") {
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (id INT, p1 INT, p2 STRING) USING parquet
+            |PARTITIONED BY (p1, p2)
+            |TBLPROPERTIES ('format-table.implementation'='paimon')
+            |""".stripMargin)
+      sql("INSERT INTO t VALUES (1, 1, '1')")
+      sql("INSERT INTO t VALUES (2, 1, '1')")
+      sql("INSERT INTO t VALUES (3, 2, '1')")
+      sql("INSERT INTO t VALUES (3, 2, '2')")
+
+      checkAnswer(
+        sql("SHOW PARTITIONS t"),
+        Seq(Row("p1=1/p2=1"), Row("p1=2/p2=1"), Row("p1=2/p2=2")))
+      checkAnswer(
+        sql("SHOW PARTITIONS t PARTITION (p1=2)"),
+        Seq(Row("p1=2/p2=1"), Row("p1=2/p2=2")))
+      checkAnswer(sql("SHOW PARTITIONS t PARTITION (p1=2, p2='2')"), 
Seq(Row("p1=2/p2=2")))
+    }
+  }
 }


Reply via email to