This is an automated email from the ASF dual-hosted git repository.
zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5d2c0ff6f2 [spark] format table: in spark support read it by paimon
(#6296)
5d2c0ff6f2 is described below
commit 5d2c0ff6f25a42b51031163a36f858792f1dae4a
Author: jerry <[email protected]>
AuthorDate: Fri Sep 26 17:03:15 2025 +0800
[spark] format table: in spark support read it by paimon (#6296)
---
.../shortcodes/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 35 ++++
.../paimon/table/format/FormatReadBuilder.java | 9 +-
.../apache/paimon/flink/FormatCatalogTable.java | 2 +-
.../org/apache/paimon/format/csv/CsvOptions.java | 3 +-
.../scala/org/apache/paimon/spark/PaimonScan.scala | 2 +-
.../java/org/apache/paimon/spark/SparkCatalog.java | 30 +++-
.../paimon/spark/ColumnPruningAndPushDown.scala | 4 +-
.../spark/PaimonFormatTableScanBuilder.scala | 91 ++++++++++
.../scala/org/apache/paimon/spark/PaimonScan.scala | 2 +-
.../apache/paimon/spark/PaimonScanBuilder.scala | 2 -
.../spark/sql/execution/PaimonFormatTable.scala | 52 +++++-
.../paimon/spark/table/PaimonFormatTableTest.scala | 190 +++++++++++++++++++++
13 files changed, 415 insertions(+), 13 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 9c243fa163..6cc752a1bf 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -506,6 +506,12 @@ under the License.
<td>Boolean</td>
<td>Whether to force the use of lookup for compaction.</td>
</tr>
+ <tr>
+ <td><h5>format-table.implementation</h5></td>
+ <td style="word-wrap: break-word;">engine</td>
+ <td><p>Enum</p></td>
+ <td>Format table uses paimon or engine.<br /><br />Possible
values:<ul><li>"paimon": Paimon format table implementation.</li><li>"engine":
Engine format table implementation.</li></ul></td>
+ </tr>
<tr>
<td><h5>format-table.partition-path-only-value</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e0a5b5fcb6..7407388449 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1971,6 +1971,12 @@ public class CoreOptions implements Serializable {
.defaultValue(1024)
.withDescription("Threshold for merging records to binary
buffer in lookup.");
+ public static final ConfigOption<FormatTableImplementation>
FORMAT_TABLE_IMPLEMENTATION =
+ key("format-table.implementation")
+ .enumType(FormatTableImplementation.class)
+ .defaultValue(FormatTableImplementation.ENGINE)
+ .withDescription("Format table uses paimon or engine.");
+
public static final ConfigOption<Boolean>
FORMAT_TABLE_PARTITION_ONLY_VALUE_IN_PATH =
ConfigOptions.key("format-table.partition-path-only-value")
.booleanType()
@@ -3035,6 +3041,10 @@ public class CoreOptions implements Serializable {
return options.get(LOOKUP_MERGE_RECORDS_THRESHOLD);
}
+ public boolean formatTableImplementationIsPaimon() {
+ return options.get(FORMAT_TABLE_IMPLEMENTATION) ==
FormatTableImplementation.PAIMON;
+ }
+
public boolean formatTablePartitionOnlyValueInPath() {
return options.get(FORMAT_TABLE_PARTITION_ONLY_VALUE_IN_PATH);
}
@@ -3828,4 +3838,29 @@ public class CoreOptions implements Serializable {
HASH
// TODO : Supports range-partition strategy.
}
+
+ /** Specifies the implementation of format table. */
+ public enum FormatTableImplementation implements DescribedEnum {
+ PAIMON("paimon", "Paimon format table implementation."),
+ ENGINE("engine", "Engine format table implementation.");
+
+ private final String value;
+
+ private final String description;
+
+ FormatTableImplementation(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index 18670e3179..d538c82e19 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -44,11 +44,13 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
+import static
org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields;
import static
org.apache.paimon.predicate.PredicateBuilder.fieldIdxToPartitionIdx;
import static org.apache.paimon.predicate.PredicateBuilder.splitAndByPartition;
@@ -155,11 +157,14 @@ public class FormatReadBuilder implements ReadBuilder {
Path filePath = dataSplit.dataPath();
FormatReaderContext formatReaderContext =
new FormatReaderContext(table.fileIO(), filePath,
dataSplit.length(), null);
+ // Skip pushing down partition filters to reader.
+ List<Predicate> readFilters =
+ excludePredicateWithFields(
+ PredicateBuilder.splitAnd(filter), new
HashSet<>(table.partitionKeys()));
FormatReaderFactory readerFactory =
FileFormatDiscover.of(options)
.discover(options.formatType())
- .createReaderFactory(
- table.rowType(), readType(),
PredicateBuilder.splitAnd(filter));
+ .createReaderFactory(table.rowType(), readType(),
readFilters);
Pair<int[], RowType> partitionMapping =
PartitionUtils.getPartitionMapping(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
index 7ee09b27ec..1c7520621c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
@@ -98,7 +98,7 @@ public class FormatCatalogTable implements CatalogTable {
}
});
if (options.containsKey("field-delimiter")) {
- cachedOptions.put("csv.field-delimiter", "field-delimiter");
+ cachedOptions.put("csv.field-delimiter",
options.get("field-delimiter"));
}
cachedOptions.put(CONNECTOR.key(), "filesystem");
cachedOptions.put(PATH.key(), table.location());
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
index 6a212a41cc..18bff8a474 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
@@ -29,13 +29,14 @@ public class CsvOptions {
ConfigOptions.key("csv.field-delimiter")
.stringType()
.defaultValue(",")
- .withFallbackKeys("field-delimiter")
+ .withFallbackKeys("field-delimiter", "seq")
.withDescription("The field delimiter for CSV or TXT
format");
public static final ConfigOption<String> LINE_DELIMITER =
ConfigOptions.key("csv.line-delimiter")
.stringType()
.defaultValue("\n")
+ .withFallbackKeys("lineSep")
.withDescription("The line delimiter for CSV format");
public static final ConfigOption<String> QUOTE_CHARACTER =
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index e8fe9a9c40..8edd7e5952 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -19,7 +19,7 @@
package org.apache.paimon.spark
import org.apache.paimon.predicate.{Predicate, TopN}
-import org.apache.paimon.table.InnerTable
+import org.apache.paimon.table.{InnerTable, Table}
import org.apache.spark.sql.PaimonUtils.fieldReference
import org.apache.spark.sql.connector.expressions.NamedReference
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index db7281de92..a7bda93290 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -60,6 +60,7 @@ import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
@@ -67,6 +68,7 @@ import
org.apache.spark.sql.connector.expressions.FieldReference;
import org.apache.spark.sql.connector.expressions.IdentityTransform;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.execution.PaimonFormatTable;
import org.apache.spark.sql.execution.PartitionedCSVTable;
import org.apache.spark.sql.execution.PartitionedJsonTable;
import org.apache.spark.sql.execution.PartitionedOrcTable;
@@ -642,7 +644,7 @@ public class SparkCatalog extends SparkBaseCatalog
try {
org.apache.paimon.table.Table paimonTable =
catalog.getTable(toIdentifier(ident));
if (paimonTable instanceof FormatTable) {
- return convertToFileTable(ident, (FormatTable) paimonTable);
+ return toSparkFormatTable(ident, (FormatTable) paimonTable);
} else {
return new SparkTable(
copyWithSQLConf(
@@ -653,7 +655,7 @@ public class SparkCatalog extends SparkBaseCatalog
}
}
- private static FileTable convertToFileTable(Identifier ident, FormatTable
formatTable) {
+ private static Table toSparkFormatTable(Identifier ident, FormatTable
formatTable) {
SparkSession spark = PaimonSparkSession$.MODULE$.active();
StructType schema =
SparkTypeUtils.fromPaimonRowType(formatTable.rowType());
StructType partitionSchema =
@@ -661,7 +663,31 @@ public class SparkCatalog extends SparkBaseCatalog
TypeUtils.project(formatTable.rowType(),
formatTable.partitionKeys()));
List<String> pathList = new ArrayList<>();
pathList.add(formatTable.location());
+ Map<String, String> optionsMap = formatTable.options();
+ CoreOptions coreOptions = new CoreOptions(optionsMap);
+ if (coreOptions.formatTableImplementationIsPaimon()) {
+ return new PaimonFormatTable(
+ spark,
+ new CaseInsensitiveStringMap(optionsMap),
+
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+ schema,
+ partitionSchema,
+ formatTable,
+ ident.name());
+ }
Options options = Options.fromMap(formatTable.options());
+ return convertToFileTable(
+ formatTable, ident, pathList, options, spark, schema,
partitionSchema);
+ }
+
+ private static FileTable convertToFileTable(
+ FormatTable formatTable,
+ Identifier ident,
+ List<String> pathList,
+ Options options,
+ SparkSession spark,
+ StructType schema,
+ StructType partitionSchema) {
CaseInsensitiveStringMap dsOptions = new
CaseInsensitiveStringMap(options.toMap());
if (formatTable.format() == FormatTable.Format.CSV) {
options.set("sep", options.get(CsvOptions.FIELD_DELIMITER));
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
index db28b4f4ef..9cdebd1033 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark
import org.apache.paimon.CoreOptions
import org.apache.paimon.predicate.{Predicate, PredicateBuilder, TopN}
import org.apache.paimon.spark.schema.PaimonMetadataColumn
-import org.apache.paimon.table.{InnerTable, SpecialFields}
+import org.apache.paimon.table.{SpecialFields, Table}
import org.apache.paimon.table.source.ReadBuilder
import org.apache.paimon.types.RowType
import org.apache.paimon.utils.Preconditions.checkState
@@ -31,7 +31,7 @@ import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.types.StructType
trait ColumnPruningAndPushDown extends Scan with Logging {
- def table: InnerTable
+ def table: Table
def requiredSchema: StructType
def filters: Seq[Predicate]
def pushDownLimit: Option[Int] = None
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScanBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScanBuilder.scala
new file mode 100644
index 0000000000..0e1de5c8ea
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScanBuilder.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.predicate.Predicate
+import org.apache.paimon.table.FormatTable
+import org.apache.paimon.table.source.Split
+
+import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
+import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+/** A ScanBuilder implementation for {@link FormatTable} that supports basic
scan operations. */
+case class PaimonFormatTableScanBuilder(
+ table: FormatTable,
+ requiredSchema: StructType,
+ filters: Seq[Predicate])
+ extends ScanBuilder {
+ override def build() = PaimonFormatTableScan(table, requiredSchema, filters)
+}
+
+case class PaimonFormatTableScan(
+ table: FormatTable,
+ requiredSchema: StructType,
+ filters: Seq[Predicate])
+ extends ColumnPruningAndPushDown
+ with ScanHelper {
+
+ override val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
+ protected var inputSplits: Array[Split] = _
+
+ def getOriginSplits: Array[Split] = {
+ if (inputSplits == null) {
+ inputSplits = readBuilder
+ .newScan()
+ .plan()
+ .splits()
+ .asScala
+ .toArray
+ }
+ inputSplits
+ }
+
+ override def toBatch: Batch = {
+ PaimonBatch(getInputPartitions(getOriginSplits), readBuilder,
metadataColumns)
+ }
+
+ override def supportedCustomMetrics: Array[CustomMetric] = {
+ Array(
+ PaimonNumSplitMetric(),
+ PaimonSplitSizeMetric(),
+ PaimonAvgSplitSizeMetric(),
+ PaimonResultedTableFilesMetric()
+ )
+ }
+
+ override def reportDriverMetrics(): Array[CustomTaskMetric] = {
+ val filesCount = getOriginSplits.length
+ Array(
+ PaimonResultedTableFilesTaskMetric(filesCount)
+ )
+ }
+
+ override def description(): String = {
+ val pushedFiltersStr = if (filters.nonEmpty) {
+ ", PushedFilters: [" + filters.mkString(",") + "]"
+ } else {
+ ""
+ }
+ s"PaimonFormatTableScan: [${table.name}]" + pushedFiltersStr
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 0ceb36d240..d3ff6d1a40 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark
import org.apache.paimon.CoreOptions.BucketFunctionType
import org.apache.paimon.predicate.{Predicate, TopN}
import org.apache.paimon.spark.commands.BucketExpression.quote
-import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable, Table}
+import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable}
import org.apache.paimon.table.source.{DataSplit, Split}
import org.apache.spark.sql.PaimonUtils.fieldReference
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 8d802ca61d..fdeae66490 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -31,8 +31,6 @@ import
org.apache.spark.sql.connector.expressions.filter.{Predicate => SparkPred
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.sources.Filter
-import java.util.Collections
-
import scala.collection.JavaConverters._
import scala.collection.mutable
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
index 46516d64c8..5167c4c1bd 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
@@ -18,11 +18,17 @@
package org.apache.spark.sql.execution
+import org.apache.paimon.spark.{PaimonFormatTableScanBuilder, SparkTypeUtils}
+import org.apache.paimon.table.FormatTable
+
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Literal}
-import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
+import org.apache.spark.sql.connector.catalog.{SupportsPartitionManagement,
SupportsRead, SupportsWrite, TableCapability}
+import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
+import org.apache.spark.sql.connector.read.ScanBuilder
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.csv.{CSVScanBuilder,
CSVTable}
import org.apache.spark.sql.execution.datasources.v2.json.JsonTable
@@ -153,6 +159,45 @@ trait PartitionedFormatTable extends
SupportsPartitionManagement {
}
}
+case class PaimonFormatTable(
+ sparkSession: SparkSession,
+ options: CaseInsensitiveStringMap,
+ paths: Seq[String],
+ schema: StructType,
+ override val partitionSchema_ : StructType,
+ table: FormatTable,
+ identName: String)
+ extends org.apache.spark.sql.connector.catalog.Table
+ with SupportsRead
+ with SupportsWrite
+ with PartitionedFormatTable {
+
+ override lazy val fileIndex: PartitioningAwareFileIndex = {
+ PaimonFormatTable.createFileIndex(
+ options,
+ sparkSession,
+ paths,
+ Option.apply(schema),
+ partitionSchema())
+ }
+
+ override def name(): String = {
+ identName
+ }
+
+ override def capabilities(): util.Set[TableCapability] = {
+ util.EnumSet.of(BATCH_READ)
+ }
+
+ override def newScanBuilder(caseInsensitiveStringMap:
CaseInsensitiveStringMap): ScanBuilder = {
+ PaimonFormatTableScanBuilder(table.copy(caseInsensitiveStringMap), schema,
Seq.empty)
+ }
+
+ override def newWriteBuilder(logicalWriteInfo: LogicalWriteInfo):
WriteBuilder = {
+ throw new UnsupportedOperationException()
+ }
+}
+
class PartitionedCSVTable(
name: String,
sparkSession: SparkSession,
@@ -183,6 +228,11 @@ class PartitionedCSVTable(
userSpecifiedSchema,
partitionSchema())
}
+
+ override def newWriteBuilder(info:
_root_.org.apache.spark.sql.connector.write.LogicalWriteInfo)
+ : _root_.org.apache.spark.sql.connector.write.WriteBuilder = {
+ super.newWriteBuilder(info)
+ }
}
class PartitionedOrcTable(
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
new file mode 100644
index 0000000000..dd045ebcfa
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.table
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.catalog.Identifier
+import org.apache.paimon.fs.Path
+import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
+import org.apache.paimon.table.FormatTable
+
+import org.apache.spark.sql.Row
+
+class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase {
+
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ sql("USE paimon")
+ sql("CREATE DATABASE IF NOT EXISTS test_db")
+ sql("USE test_db")
+ }
+
+ test("PaimonFormatTableRead table: csv with field-delimiter") {
+ val tableName = "paimon_format_test_csv_options"
+ withTable(tableName) {
+ sql(
+ s"CREATE TABLE $tableName (f0 INT, f1 INT) USING CSV OPTIONS ('" +
+ s"file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
+ s"'${CoreOptions.FORMAT_TABLE_IMPLEMENTATION
+
.key()}'='${CoreOptions.FormatTableImplementation.PAIMON.toString}')")
+ val table =
+ paimonCatalog.getTable(Identifier.create("test_db",
tableName)).asInstanceOf[FormatTable]
+ val csvFile =
+ new Path(table.location(),
"part-00000-0a28422e-68ba-4713-8870-2fde2d36ed06-c001.csv")
+ table.fileIO().writeFile(csvFile, "1|2\n3|4", false)
+ checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(1, 2), Row(3, 4)))
+ }
+ }
+
+ test("PaimonFormatTableRead: read non-partitioned table") {
+ for {
+ (format, compression) <- Seq(
+ ("csv", "gzip"),
+ ("json", "gzip"),
+ ("parquet", "zstd"),
+ ("orc", "zstd"))
+ } {
+ val tableName = s"format_test_$format"
+ withTable(tableName) {
+ // Create format table using the same pattern as FormatTableTestBase
+ sql(
+ s"CREATE TABLE $tableName (id INT, name STRING, value DOUBLE) USING
$format " +
+ s"TBLPROPERTIES ('file.compression'='$compression', 'seq'=',',
'lineSep'='\n')")
+ val path =
+ paimonCatalog.getTable(Identifier.create("test_db",
tableName)).options().get("path")
+ fileIO.mkdirs(new Path(path))
+ // Insert data using our new write implementation
+ sql(s"INSERT INTO $tableName VALUES (1, 'Alice', 10.5)")
+ sql(s"INSERT INTO $tableName VALUES (2, 'Bob', 20.7)")
+ sql(s"INSERT INTO $tableName VALUES (3, 'Charlie', 30.9)")
+
+ // Test reading all data
+ sql(
+ s"Alter table $tableName SET TBLPROPERTIES
('${CoreOptions.FORMAT_TABLE_IMPLEMENTATION.key()}'" +
+ s"='${CoreOptions.FormatTableImplementation.PAIMON.toString}')")
+ checkAnswer(
+ sql(s"SELECT * FROM $tableName ORDER BY id"),
+ Seq(
+ Row(1, "Alice", 10.5),
+ Row(2, "Bob", 20.7),
+ Row(3, "Charlie", 30.9)
+ )
+ )
+
+ // Test column projection (using our scan builder)
+ checkAnswer(
+ sql(s"SELECT name, value FROM $tableName WHERE id = 2"),
+ Seq(Row("Bob", 20.7))
+ )
+
+ // Test filtering
+ checkAnswer(
+ sql(s"SELECT id FROM $tableName WHERE value > 15.0 ORDER BY id"),
+ Seq(Row(2), Row(3))
+ )
+
+ // Verify this is actually a FormatTable
+ val table = paimonCatalog.getTable(Identifier.create("test_db",
tableName))
+ assert(
+ table.isInstanceOf[FormatTable],
+ s"Table should be FormatTable but was ${table.getClass}")
+ sql(s"DROP TABLE $tableName")
+ }
+ }
+ }
+
+ test("PaimonFormatTableRead: read partitioned table") {
+ for {
+ (format, compression) <- Seq(
+ ("csv", "gzip"),
+ ("json", "gzip"),
+ ("parquet", "zstd"),
+ ("orc", "zstd"))
+ } {
+ val tableName = s"format_test_partitioned_$format"
+ withTable(tableName) {
+ // Create partitioned format table
+ sql(
+ s"CREATE TABLE $tableName (id INT, name STRING, value DOUBLE, dept
STRING) USING $format " +
+ s"PARTITIONED BY (dept) TBLPROPERTIES
('file.compression'='$compression')")
+ val paimonTable = paimonCatalog.getTable(Identifier.create("test_db",
tableName))
+ val path =
+ paimonCatalog.getTable(Identifier.create("test_db",
tableName)).options().get("path")
+ fileIO.mkdirs(new Path(path))
+ // Insert data into different partitions
+ sql(
+ s"INSERT INTO $tableName VALUES (1, 'Alice', 10.5, 'Engineering')," +
+ s" (2, 'Bob', 20.7, 'Engineering')," +
+ s" (3, 'Charlie', 30.9, 'Sales')," +
+ s" (4, 'David', 25.3, 'Sales')," +
+ s" (5, 'Eve', 15.8, 'Marketing')")
+
+ // Test reading all data
+ sql(
+ s"Alter table $tableName SET TBLPROPERTIES
('${CoreOptions.FORMAT_TABLE_IMPLEMENTATION.key()}'" +
+ s"='${CoreOptions.FormatTableImplementation.PAIMON.toString}')")
+ checkAnswer(
+ sql(s"SELECT * FROM $tableName ORDER BY id"),
+ Seq(
+ Row(1, "Alice", 10.5, "Engineering"),
+ Row(2, "Bob", 20.7, "Engineering"),
+ Row(3, "Charlie", 30.9, "Sales"),
+ Row(4, "David", 25.3, "Sales"),
+ Row(5, "Eve", 15.8, "Marketing")
+ )
+ )
+
+ // Test partition filtering
+ checkAnswer(
+ sql(s"SELECT * FROM $tableName WHERE dept = 'Engineering' ORDER BY
id"),
+ Seq(
+ Row(1, "Alice", 10.5, "Engineering"),
+ Row(2, "Bob", 20.7, "Engineering")
+ )
+ )
+
+ // Test column projection with partition filtering
+ checkAnswer(
+ sql(s"SELECT name, value FROM $tableName WHERE dept = 'Sales' ORDER
BY id"),
+ Seq(
+ Row("Charlie", 30.9),
+ Row("David", 25.3)
+ )
+ )
+
+ // Test filtering on non-partition columns
+ checkAnswer(
+ sql(s"SELECT id, dept FROM $tableName WHERE value > 20.0 ORDER BY
id"),
+ Seq(
+ Row(2, "Engineering"),
+ Row(3, "Sales"),
+ Row(4, "Sales")
+ )
+ )
+
+ // Test combined filtering (partition + non-partition columns)
+ checkAnswer(
+ sql(s"SELECT name FROM $tableName WHERE dept = 'Sales' AND value >
25.0"),
+ Seq(Row("Charlie"), Row("David"))
+ )
+ sql(s"DROP TABLE $tableName")
+ }
+ }
+ }
+}