This is an automated email from the ASF dual-hosted git repository.

zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d2c0ff6f2 [spark] format table: in spark support read it by paimon 
(#6296)
5d2c0ff6f2 is described below

commit 5d2c0ff6f25a42b51031163a36f858792f1dae4a
Author: jerry <[email protected]>
AuthorDate: Fri Sep 26 17:03:15 2025 +0800

    [spark] format table: in spark support read it by paimon (#6296)
---
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  35 ++++
 .../paimon/table/format/FormatReadBuilder.java     |   9 +-
 .../apache/paimon/flink/FormatCatalogTable.java    |   2 +-
 .../org/apache/paimon/format/csv/CsvOptions.java   |   3 +-
 .../scala/org/apache/paimon/spark/PaimonScan.scala |   2 +-
 .../java/org/apache/paimon/spark/SparkCatalog.java |  30 +++-
 .../paimon/spark/ColumnPruningAndPushDown.scala    |   4 +-
 .../spark/PaimonFormatTableScanBuilder.scala       |  91 ++++++++++
 .../scala/org/apache/paimon/spark/PaimonScan.scala |   2 +-
 .../apache/paimon/spark/PaimonScanBuilder.scala    |   2 -
 .../spark/sql/execution/PaimonFormatTable.scala    |  52 +++++-
 .../paimon/spark/table/PaimonFormatTableTest.scala | 190 +++++++++++++++++++++
 13 files changed, 415 insertions(+), 13 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 9c243fa163..6cc752a1bf 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -506,6 +506,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether to force the use of lookup for compaction.</td>
         </tr>
+        <tr>
+            <td><h5>format-table.implementation</h5></td>
+            <td style="word-wrap: break-word;">engine</td>
+            <td><p>Enum</p></td>
+            <td>Format table uses paimon or engine.<br /><br />Possible 
values:<ul><li>"paimon": Paimon format table implementation.</li><li>"engine": 
Engine format table implementation.</li></ul></td>
+        </tr>
         <tr>
             <td><h5>format-table.partition-path-only-value</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e0a5b5fcb6..7407388449 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1971,6 +1971,12 @@ public class CoreOptions implements Serializable {
                     .defaultValue(1024)
                     .withDescription("Threshold for merging records to binary 
buffer in lookup.");
 
+    public static final ConfigOption<FormatTableImplementation> 
FORMAT_TABLE_IMPLEMENTATION =
+            key("format-table.implementation")
+                    .enumType(FormatTableImplementation.class)
+                    .defaultValue(FormatTableImplementation.ENGINE)
+                    .withDescription("Format table uses paimon or engine.");
+
     public static final ConfigOption<Boolean> 
FORMAT_TABLE_PARTITION_ONLY_VALUE_IN_PATH =
             ConfigOptions.key("format-table.partition-path-only-value")
                     .booleanType()
@@ -3035,6 +3041,10 @@ public class CoreOptions implements Serializable {
         return options.get(LOOKUP_MERGE_RECORDS_THRESHOLD);
     }
 
+    public boolean formatTableImplementationIsPaimon() {
+        return options.get(FORMAT_TABLE_IMPLEMENTATION) == 
FormatTableImplementation.PAIMON;
+    }
+
     public boolean formatTablePartitionOnlyValueInPath() {
         return options.get(FORMAT_TABLE_PARTITION_ONLY_VALUE_IN_PATH);
     }
@@ -3828,4 +3838,29 @@ public class CoreOptions implements Serializable {
         HASH
         // TODO : Supports range-partition strategy.
     }
+
+    /** Specifies the implementation of format table. */
+    public enum FormatTableImplementation implements DescribedEnum {
+        PAIMON("paimon", "Paimon format table implementation."),
+        ENGINE("engine", "Engine format table implementation.");
+
+        private final String value;
+
+        private final String description;
+
+        FormatTableImplementation(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index 18670e3179..d538c82e19 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -44,11 +44,13 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
+import static 
org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields;
 import static 
org.apache.paimon.predicate.PredicateBuilder.fieldIdxToPartitionIdx;
 import static org.apache.paimon.predicate.PredicateBuilder.splitAndByPartition;
 
@@ -155,11 +157,14 @@ public class FormatReadBuilder implements ReadBuilder {
         Path filePath = dataSplit.dataPath();
         FormatReaderContext formatReaderContext =
                 new FormatReaderContext(table.fileIO(), filePath, 
dataSplit.length(), null);
+        // Skip pushing down partition filters to reader.
+        List<Predicate> readFilters =
+                excludePredicateWithFields(
+                        PredicateBuilder.splitAnd(filter), new 
HashSet<>(table.partitionKeys()));
         FormatReaderFactory readerFactory =
                 FileFormatDiscover.of(options)
                         .discover(options.formatType())
-                        .createReaderFactory(
-                                table.rowType(), readType(), 
PredicateBuilder.splitAnd(filter));
+                        .createReaderFactory(table.rowType(), readType(), 
readFilters);
 
         Pair<int[], RowType> partitionMapping =
                 PartitionUtils.getPartitionMapping(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
index 7ee09b27ec..1c7520621c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
@@ -98,7 +98,7 @@ public class FormatCatalogTable implements CatalogTable {
                         }
                     });
             if (options.containsKey("field-delimiter")) {
-                cachedOptions.put("csv.field-delimiter", "field-delimiter");
+                cachedOptions.put("csv.field-delimiter", 
options.get("field-delimiter"));
             }
             cachedOptions.put(CONNECTOR.key(), "filesystem");
             cachedOptions.put(PATH.key(), table.location());
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
index 6a212a41cc..18bff8a474 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
@@ -29,13 +29,14 @@ public class CsvOptions {
             ConfigOptions.key("csv.field-delimiter")
                     .stringType()
                     .defaultValue(",")
-                    .withFallbackKeys("field-delimiter")
+                    .withFallbackKeys("field-delimiter", "seq")
                     .withDescription("The field delimiter for CSV or TXT 
format");
 
     public static final ConfigOption<String> LINE_DELIMITER =
             ConfigOptions.key("csv.line-delimiter")
                     .stringType()
                     .defaultValue("\n")
+                    .withFallbackKeys("lineSep")
                     .withDescription("The line delimiter for CSV format");
 
     public static final ConfigOption<String> QUOTE_CHARACTER =
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index e8fe9a9c40..8edd7e5952 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.predicate.{Predicate, TopN}
-import org.apache.paimon.table.InnerTable
+import org.apache.paimon.table.{InnerTable, Table}
 
 import org.apache.spark.sql.PaimonUtils.fieldReference
 import org.apache.spark.sql.connector.expressions.NamedReference
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index db7281de92..a7bda93290 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -60,6 +60,7 @@ import org.apache.spark.sql.connector.catalog.FunctionCatalog;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.NamespaceChange;
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.catalog.TableChange;
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
@@ -67,6 +68,7 @@ import 
org.apache.spark.sql.connector.expressions.FieldReference;
 import org.apache.spark.sql.connector.expressions.IdentityTransform;
 import org.apache.spark.sql.connector.expressions.NamedReference;
 import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.execution.PaimonFormatTable;
 import org.apache.spark.sql.execution.PartitionedCSVTable;
 import org.apache.spark.sql.execution.PartitionedJsonTable;
 import org.apache.spark.sql.execution.PartitionedOrcTable;
@@ -642,7 +644,7 @@ public class SparkCatalog extends SparkBaseCatalog
         try {
             org.apache.paimon.table.Table paimonTable = 
catalog.getTable(toIdentifier(ident));
             if (paimonTable instanceof FormatTable) {
-                return convertToFileTable(ident, (FormatTable) paimonTable);
+                return toSparkFormatTable(ident, (FormatTable) paimonTable);
             } else {
                 return new SparkTable(
                         copyWithSQLConf(
@@ -653,7 +655,7 @@ public class SparkCatalog extends SparkBaseCatalog
         }
     }
 
-    private static FileTable convertToFileTable(Identifier ident, FormatTable 
formatTable) {
+    private static Table toSparkFormatTable(Identifier ident, FormatTable 
formatTable) {
         SparkSession spark = PaimonSparkSession$.MODULE$.active();
         StructType schema = 
SparkTypeUtils.fromPaimonRowType(formatTable.rowType());
         StructType partitionSchema =
@@ -661,7 +663,31 @@ public class SparkCatalog extends SparkBaseCatalog
                         TypeUtils.project(formatTable.rowType(), 
formatTable.partitionKeys()));
         List<String> pathList = new ArrayList<>();
         pathList.add(formatTable.location());
+        Map<String, String> optionsMap = formatTable.options();
+        CoreOptions coreOptions = new CoreOptions(optionsMap);
+        if (coreOptions.formatTableImplementationIsPaimon()) {
+            return new PaimonFormatTable(
+                    spark,
+                    new CaseInsensitiveStringMap(optionsMap),
+                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+                    schema,
+                    partitionSchema,
+                    formatTable,
+                    ident.name());
+        }
         Options options = Options.fromMap(formatTable.options());
+        return convertToFileTable(
+                formatTable, ident, pathList, options, spark, schema, 
partitionSchema);
+    }
+
+    private static FileTable convertToFileTable(
+            FormatTable formatTable,
+            Identifier ident,
+            List<String> pathList,
+            Options options,
+            SparkSession spark,
+            StructType schema,
+            StructType partitionSchema) {
         CaseInsensitiveStringMap dsOptions = new 
CaseInsensitiveStringMap(options.toMap());
         if (formatTable.format() == FormatTable.Format.CSV) {
             options.set("sep", options.get(CsvOptions.FIELD_DELIMITER));
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
index db28b4f4ef..9cdebd1033 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.predicate.{Predicate, PredicateBuilder, TopN}
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
-import org.apache.paimon.table.{InnerTable, SpecialFields}
+import org.apache.paimon.table.{SpecialFields, Table}
 import org.apache.paimon.table.source.ReadBuilder
 import org.apache.paimon.types.RowType
 import org.apache.paimon.utils.Preconditions.checkState
@@ -31,7 +31,7 @@ import org.apache.spark.sql.connector.read.Scan
 import org.apache.spark.sql.types.StructType
 
 trait ColumnPruningAndPushDown extends Scan with Logging {
-  def table: InnerTable
+  def table: Table
   def requiredSchema: StructType
   def filters: Seq[Predicate]
   def pushDownLimit: Option[Int] = None
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScanBuilder.scala
new file mode 100644
index 0000000000..0e1de5c8ea
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScanBuilder.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.predicate.Predicate
+import org.apache.paimon.table.FormatTable
+import org.apache.paimon.table.source.Split
+
+import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
+import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+/** A ScanBuilder implementation for {@link FormatTable} that supports basic 
scan operations. */
+case class PaimonFormatTableScanBuilder(
+    table: FormatTable,
+    requiredSchema: StructType,
+    filters: Seq[Predicate])
+  extends ScanBuilder {
+  override def build() = PaimonFormatTableScan(table, requiredSchema, filters)
+}
+
+case class PaimonFormatTableScan(
+    table: FormatTable,
+    requiredSchema: StructType,
+    filters: Seq[Predicate])
+  extends ColumnPruningAndPushDown
+  with ScanHelper {
+
+  override val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
+  protected var inputSplits: Array[Split] = _
+
+  def getOriginSplits: Array[Split] = {
+    if (inputSplits == null) {
+      inputSplits = readBuilder
+        .newScan()
+        .plan()
+        .splits()
+        .asScala
+        .toArray
+    }
+    inputSplits
+  }
+
+  override def toBatch: Batch = {
+    PaimonBatch(getInputPartitions(getOriginSplits), readBuilder, 
metadataColumns)
+  }
+
+  override def supportedCustomMetrics: Array[CustomMetric] = {
+    Array(
+      PaimonNumSplitMetric(),
+      PaimonSplitSizeMetric(),
+      PaimonAvgSplitSizeMetric(),
+      PaimonResultedTableFilesMetric()
+    )
+  }
+
+  override def reportDriverMetrics(): Array[CustomTaskMetric] = {
+    val filesCount = getOriginSplits.length
+    Array(
+      PaimonResultedTableFilesTaskMetric(filesCount)
+    )
+  }
+
+  override def description(): String = {
+    val pushedFiltersStr = if (filters.nonEmpty) {
+      ", PushedFilters: [" + filters.mkString(",") + "]"
+    } else {
+      ""
+    }
+    s"PaimonFormatTableScan: [${table.name}]" + pushedFiltersStr
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 0ceb36d240..d3ff6d1a40 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark
 import org.apache.paimon.CoreOptions.BucketFunctionType
 import org.apache.paimon.predicate.{Predicate, TopN}
 import org.apache.paimon.spark.commands.BucketExpression.quote
-import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable, Table}
+import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable}
 import org.apache.paimon.table.source.{DataSplit, Split}
 
 import org.apache.spark.sql.PaimonUtils.fieldReference
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 8d802ca61d..fdeae66490 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -31,8 +31,6 @@ import 
org.apache.spark.sql.connector.expressions.filter.{Predicate => SparkPred
 import org.apache.spark.sql.connector.read._
 import org.apache.spark.sql.sources.Filter
 
-import java.util.Collections
-
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
index 46516d64c8..5167c4c1bd 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
@@ -18,11 +18,17 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.paimon.spark.{PaimonFormatTableScanBuilder, SparkTypeUtils}
+import org.apache.paimon.table.FormatTable
+
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Literal}
-import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
+import org.apache.spark.sql.connector.catalog.{SupportsPartitionManagement, 
SupportsRead, SupportsWrite, TableCapability}
+import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
+import org.apache.spark.sql.connector.read.ScanBuilder
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.v2.csv.{CSVScanBuilder, 
CSVTable}
 import org.apache.spark.sql.execution.datasources.v2.json.JsonTable
@@ -153,6 +159,45 @@ trait PartitionedFormatTable extends 
SupportsPartitionManagement {
   }
 }
 
+case class PaimonFormatTable(
+    sparkSession: SparkSession,
+    options: CaseInsensitiveStringMap,
+    paths: Seq[String],
+    schema: StructType,
+    override val partitionSchema_ : StructType,
+    table: FormatTable,
+    identName: String)
+  extends org.apache.spark.sql.connector.catalog.Table
+  with SupportsRead
+  with SupportsWrite
+  with PartitionedFormatTable {
+
+  override lazy val fileIndex: PartitioningAwareFileIndex = {
+    PaimonFormatTable.createFileIndex(
+      options,
+      sparkSession,
+      paths,
+      Option.apply(schema),
+      partitionSchema())
+  }
+
+  override def name(): String = {
+    identName
+  }
+
+  override def capabilities(): util.Set[TableCapability] = {
+    util.EnumSet.of(BATCH_READ)
+  }
+
+  override def newScanBuilder(caseInsensitiveStringMap: 
CaseInsensitiveStringMap): ScanBuilder = {
+    PaimonFormatTableScanBuilder(table.copy(caseInsensitiveStringMap), schema, 
Seq.empty)
+  }
+
+  override def newWriteBuilder(logicalWriteInfo: LogicalWriteInfo): 
WriteBuilder = {
+    throw new UnsupportedOperationException()
+  }
+}
+
 class PartitionedCSVTable(
     name: String,
     sparkSession: SparkSession,
@@ -183,6 +228,11 @@ class PartitionedCSVTable(
       userSpecifiedSchema,
       partitionSchema())
   }
+
+  override def newWriteBuilder(info: 
_root_.org.apache.spark.sql.connector.write.LogicalWriteInfo)
+      : _root_.org.apache.spark.sql.connector.write.WriteBuilder = {
+    super.newWriteBuilder(info)
+  }
 }
 
 class PartitionedOrcTable(
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
new file mode 100644
index 0000000000..dd045ebcfa
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.table
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.catalog.Identifier
+import org.apache.paimon.fs.Path
+import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
+import org.apache.paimon.table.FormatTable
+
+import org.apache.spark.sql.Row
+
+class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase {
+
+  override protected def beforeEach(): Unit = {
+    super.beforeEach()
+    sql("USE paimon")
+    sql("CREATE DATABASE IF NOT EXISTS test_db")
+    sql("USE test_db")
+  }
+
+  test("PaimonFormatTableRead table: csv with field-delimiter") {
+    val tableName = "paimon_format_test_csv_options"
+    withTable(tableName) {
+      sql(
+        s"CREATE TABLE $tableName (f0 INT, f1 INT) USING CSV OPTIONS ('" +
+          s"file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
+          s"'${CoreOptions.FORMAT_TABLE_IMPLEMENTATION
+              
.key()}'='${CoreOptions.FormatTableImplementation.PAIMON.toString}')")
+      val table =
+        paimonCatalog.getTable(Identifier.create("test_db", 
tableName)).asInstanceOf[FormatTable]
+      val csvFile =
+        new Path(table.location(), 
"part-00000-0a28422e-68ba-4713-8870-2fde2d36ed06-c001.csv")
+      table.fileIO().writeFile(csvFile, "1|2\n3|4", false)
+      checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(1, 2), Row(3, 4)))
+    }
+  }
+
+  test("PaimonFormatTableRead: read non-partitioned table") {
+    for {
+      (format, compression) <- Seq(
+        ("csv", "gzip"),
+        ("json", "gzip"),
+        ("parquet", "zstd"),
+        ("orc", "zstd"))
+    } {
+      val tableName = s"format_test_$format"
+      withTable(tableName) {
+        // Create format table using the same pattern as FormatTableTestBase
+        sql(
+          s"CREATE TABLE $tableName (id INT, name STRING, value DOUBLE) USING 
$format " +
+            s"TBLPROPERTIES ('file.compression'='$compression', 'seq'=',', 
'lineSep'='\n')")
+        val path =
+          paimonCatalog.getTable(Identifier.create("test_db", 
tableName)).options().get("path")
+        fileIO.mkdirs(new Path(path))
+        // Insert data using our new write implementation
+        sql(s"INSERT INTO $tableName VALUES (1, 'Alice', 10.5)")
+        sql(s"INSERT INTO $tableName VALUES (2, 'Bob', 20.7)")
+        sql(s"INSERT INTO $tableName VALUES (3, 'Charlie', 30.9)")
+
+        // Test reading all data
+        sql(
+          s"Alter table $tableName SET TBLPROPERTIES 
('${CoreOptions.FORMAT_TABLE_IMPLEMENTATION.key()}'" +
+            s"='${CoreOptions.FormatTableImplementation.PAIMON.toString}')")
+        checkAnswer(
+          sql(s"SELECT * FROM $tableName ORDER BY id"),
+          Seq(
+            Row(1, "Alice", 10.5),
+            Row(2, "Bob", 20.7),
+            Row(3, "Charlie", 30.9)
+          )
+        )
+
+        // Test column projection (using our scan builder)
+        checkAnswer(
+          sql(s"SELECT name, value FROM $tableName WHERE id = 2"),
+          Seq(Row("Bob", 20.7))
+        )
+
+        // Test filtering
+        checkAnswer(
+          sql(s"SELECT id FROM $tableName WHERE value > 15.0 ORDER BY id"),
+          Seq(Row(2), Row(3))
+        )
+
+        // Verify this is actually a FormatTable
+        val table = paimonCatalog.getTable(Identifier.create("test_db", 
tableName))
+        assert(
+          table.isInstanceOf[FormatTable],
+          s"Table should be FormatTable but was ${table.getClass}")
+        sql(s"DROP TABLE $tableName")
+      }
+    }
+  }
+
+  test("PaimonFormatTableRead: read partitioned table") {
+    for {
+      (format, compression) <- Seq(
+        ("csv", "gzip"),
+        ("json", "gzip"),
+        ("parquet", "zstd"),
+        ("orc", "zstd"))
+    } {
+      val tableName = s"format_test_partitioned_$format"
+      withTable(tableName) {
+        // Create partitioned format table
+        sql(
+          s"CREATE TABLE $tableName (id INT, name STRING, value DOUBLE, dept 
STRING) USING $format " +
+            s"PARTITIONED BY (dept) TBLPROPERTIES 
('file.compression'='$compression')")
+        val paimonTable = paimonCatalog.getTable(Identifier.create("test_db", 
tableName))
+        val path =
+          paimonCatalog.getTable(Identifier.create("test_db", 
tableName)).options().get("path")
+        fileIO.mkdirs(new Path(path))
+        // Insert data into different partitions
+        sql(
+          s"INSERT INTO $tableName VALUES (1, 'Alice', 10.5, 'Engineering')," +
+            s" (2, 'Bob', 20.7, 'Engineering')," +
+            s" (3, 'Charlie', 30.9, 'Sales')," +
+            s" (4, 'David', 25.3, 'Sales')," +
+            s" (5, 'Eve', 15.8, 'Marketing')")
+
+        // Test reading all data
+        sql(
+          s"Alter table $tableName SET TBLPROPERTIES 
('${CoreOptions.FORMAT_TABLE_IMPLEMENTATION.key()}'" +
+            s"='${CoreOptions.FormatTableImplementation.PAIMON.toString}')")
+        checkAnswer(
+          sql(s"SELECT * FROM $tableName ORDER BY id"),
+          Seq(
+            Row(1, "Alice", 10.5, "Engineering"),
+            Row(2, "Bob", 20.7, "Engineering"),
+            Row(3, "Charlie", 30.9, "Sales"),
+            Row(4, "David", 25.3, "Sales"),
+            Row(5, "Eve", 15.8, "Marketing")
+          )
+        )
+
+        // Test partition filtering
+        checkAnswer(
+          sql(s"SELECT * FROM $tableName WHERE dept = 'Engineering' ORDER BY 
id"),
+          Seq(
+            Row(1, "Alice", 10.5, "Engineering"),
+            Row(2, "Bob", 20.7, "Engineering")
+          )
+        )
+
+        // Test column projection with partition filtering
+        checkAnswer(
+          sql(s"SELECT name, value FROM $tableName WHERE dept = 'Sales' ORDER 
BY id"),
+          Seq(
+            Row("Charlie", 30.9),
+            Row("David", 25.3)
+          )
+        )
+
+        // Test filtering on non-partition columns
+        checkAnswer(
+          sql(s"SELECT id, dept FROM $tableName WHERE value > 20.0 ORDER BY 
id"),
+          Seq(
+            Row(2, "Engineering"),
+            Row(3, "Sales"),
+            Row(4, "Sales")
+          )
+        )
+
+        // Test combined filtering (partition + non-partition columns)
+        checkAnswer(
+          sql(s"SELECT name FROM $tableName WHERE dept = 'Sales' AND value > 
25.0"),
+          Seq(Row("Charlie"), Row("David"))
+        )
+        sql(s"DROP TABLE $tableName")
+      }
+    }
+  }
+}

Reply via email to