This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a6f3f7e424 [spark] Refactor for spark format table (#6477)
a6f3f7e424 is described below
commit a6f3f7e4242d4099fd6239de8bf0e973d22cd7cc
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Oct 28 08:34:23 2025 +0800
[spark] Refactor for spark format table (#6477)
---
.../java/org/apache/paimon/spark/SparkCatalog.java | 94 --------
.../paimon/spark/catalog/FormatTableCatalog.java | 102 +++++++++
...PaimonFormatTableScan.scala => BaseTable.scala} | 38 +++-
.../paimon/spark/PaimonFormatTableBaseScan.scala | 2 +-
.../paimon/spark/PaimonFormatTableScan.scala | 2 +-
.../paimon/spark/PaimonPartitionManagement.scala | 9 +-
.../apache/paimon/spark/PaimonSparkTableBase.scala | 21 +-
.../paimon/spark/format/PaimonFormatTable.scala | 238 ++++++++++++++++++++
...monFormatTable.scala => SparkFormatTable.scala} | 246 +--------------------
.../paimon/spark/table/PaimonFormatTableTest.scala | 22 ++
10 files changed, 407 insertions(+), 367 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 26693ed3f7..835756ffff 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -24,7 +24,6 @@ import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.DelegateCatalog;
import org.apache.paimon.catalog.PropertyChange;
-import org.apache.paimon.format.csv.CsvOptions;
import org.apache.paimon.function.Function;
import org.apache.paimon.function.FunctionDefinition;
import org.apache.paimon.options.Options;
@@ -43,7 +42,6 @@ import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.ExceptionUtils;
-import org.apache.paimon.utils.TypeUtils;
import org.apache.spark.sql.PaimonSparkSession$;
import org.apache.spark.sql.SparkSession;
@@ -61,7 +59,6 @@ import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
-import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
@@ -69,16 +66,6 @@ import
org.apache.spark.sql.connector.expressions.FieldReference;
import org.apache.spark.sql.connector.expressions.IdentityTransform;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.Transform;
-import org.apache.spark.sql.execution.PaimonFormatTable;
-import org.apache.spark.sql.execution.PartitionedCSVTable;
-import org.apache.spark.sql.execution.PartitionedJsonTable;
-import org.apache.spark.sql.execution.PartitionedOrcTable;
-import org.apache.spark.sql.execution.PartitionedParquetTable;
-import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat;
-import org.apache.spark.sql.execution.datasources.json.JsonFileFormat;
-import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat;
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat;
-import org.apache.spark.sql.execution.datasources.v2.FileTable;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -678,87 +665,6 @@ public class SparkCatalog extends SparkBaseCatalog
}
}
- private static Table toSparkFormatTable(Identifier ident, FormatTable
formatTable) {
- SparkSession spark = PaimonSparkSession$.MODULE$.active();
- StructType schema =
SparkTypeUtils.fromPaimonRowType(formatTable.rowType());
- StructType partitionSchema =
- SparkTypeUtils.fromPaimonRowType(
- TypeUtils.project(formatTable.rowType(),
formatTable.partitionKeys()));
- List<String> pathList = new ArrayList<>();
- pathList.add(formatTable.location());
- Map<String, String> optionsMap = formatTable.options();
- CoreOptions coreOptions = new CoreOptions(optionsMap);
- if (coreOptions.formatTableImplementationIsPaimon()) {
- return new PaimonFormatTable(
- spark,
- new CaseInsensitiveStringMap(optionsMap),
-
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
- schema,
- partitionSchema,
- formatTable,
- ident.name());
- }
- Options options = Options.fromMap(formatTable.options());
- return convertToFileTable(
- formatTable, ident, pathList, options, spark, schema,
partitionSchema);
- }
-
- private static FileTable convertToFileTable(
- FormatTable formatTable,
- Identifier ident,
- List<String> pathList,
- Options options,
- SparkSession spark,
- StructType schema,
- StructType partitionSchema) {
- CaseInsensitiveStringMap dsOptions = new
CaseInsensitiveStringMap(options.toMap());
- if (formatTable.format() == FormatTable.Format.CSV) {
- options.set("sep", options.get(CsvOptions.FIELD_DELIMITER));
- dsOptions = new CaseInsensitiveStringMap(options.toMap());
- return new PartitionedCSVTable(
- ident.name(),
- spark,
- dsOptions,
-
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
- scala.Option.apply(schema),
- CSVFileFormat.class,
- partitionSchema);
- } else if (formatTable.format() == FormatTable.Format.ORC) {
- return new PartitionedOrcTable(
- ident.name(),
- spark,
- dsOptions,
-
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
- scala.Option.apply(schema),
- OrcFileFormat.class,
- partitionSchema);
- } else if (formatTable.format() == FormatTable.Format.PARQUET) {
- return new PartitionedParquetTable(
- ident.name(),
- spark,
- dsOptions,
-
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
- scala.Option.apply(schema),
- ParquetFileFormat.class,
- partitionSchema);
- } else if (formatTable.format() == FormatTable.Format.JSON) {
- return new PartitionedJsonTable(
- ident.name(),
- spark,
- dsOptions,
-
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
- scala.Option.apply(schema),
- JsonFileFormat.class,
- partitionSchema);
- } else {
- throw new UnsupportedOperationException(
- "Unsupported format table "
- + ident.name()
- + " format "
- + formatTable.format().name());
- }
- }
-
protected List<String> convertPartitionTransforms(Transform[] transforms) {
List<String> partitionColNames = new ArrayList<>(transforms.length);
for (Transform transform : transforms) {
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/FormatTableCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/FormatTableCatalog.java
index 551fc54f73..e7fcf19fcf 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/FormatTableCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/FormatTableCatalog.java
@@ -18,14 +18,116 @@
package org.apache.paimon.spark.catalog;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.format.csv.CsvOptions;
+import org.apache.paimon.options.Options;
import org.apache.paimon.spark.SparkSource;
+import org.apache.paimon.spark.SparkTypeUtils;
+import org.apache.paimon.spark.format.PaimonFormatTable;
+import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.utils.TypeUtils;
+
+import org.apache.spark.sql.PaimonSparkSession$;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.execution.PartitionedCSVTable;
+import org.apache.spark.sql.execution.PartitionedJsonTable;
+import org.apache.spark.sql.execution.PartitionedOrcTable;
+import org.apache.spark.sql.execution.PartitionedParquetTable;
+import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat;
+import org.apache.spark.sql.execution.datasources.json.JsonFileFormat;
+import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat;
+import org.apache.spark.sql.execution.datasources.v2.FileTable;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
/** Catalog supports format table. */
public interface FormatTableCatalog {
default boolean isFormatTable(@Nullable String provide) {
return provide != null &&
SparkSource.FORMAT_NAMES().contains(provide.toLowerCase());
}
+
+ default Table toSparkFormatTable(Identifier ident, FormatTable
formatTable) {
+ Map<String, String> optionsMap = formatTable.options();
+ CoreOptions coreOptions = new CoreOptions(optionsMap);
+ if (coreOptions.formatTableImplementationIsPaimon()) {
+ return new PaimonFormatTable(formatTable);
+ } else {
+ SparkSession spark = PaimonSparkSession$.MODULE$.active();
+ List<String> pathList = new ArrayList<>();
+ pathList.add(formatTable.location());
+ StructType schema =
SparkTypeUtils.fromPaimonRowType(formatTable.rowType());
+ StructType partitionSchema =
+ SparkTypeUtils.fromPaimonRowType(
+ TypeUtils.project(formatTable.rowType(),
formatTable.partitionKeys()));
+ Options options = Options.fromMap(formatTable.options());
+ return convertToFileTable(
+ formatTable, ident, pathList, options, spark, schema,
partitionSchema);
+ }
+ }
+
+ default FileTable convertToFileTable(
+ FormatTable formatTable,
+ Identifier ident,
+ List<String> pathList,
+ Options options,
+ SparkSession spark,
+ StructType schema,
+ StructType partitionSchema) {
+ CaseInsensitiveStringMap dsOptions = new
CaseInsensitiveStringMap(options.toMap());
+ if (formatTable.format() == FormatTable.Format.CSV) {
+ options.set("sep", options.get(CsvOptions.FIELD_DELIMITER));
+ dsOptions = new CaseInsensitiveStringMap(options.toMap());
+ return new PartitionedCSVTable(
+ ident.name(),
+ spark,
+ dsOptions,
+
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+ scala.Option.apply(schema),
+ CSVFileFormat.class,
+ partitionSchema);
+ } else if (formatTable.format() == FormatTable.Format.ORC) {
+ return new PartitionedOrcTable(
+ ident.name(),
+ spark,
+ dsOptions,
+
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+ scala.Option.apply(schema),
+ OrcFileFormat.class,
+ partitionSchema);
+ } else if (formatTable.format() == FormatTable.Format.PARQUET) {
+ return new PartitionedParquetTable(
+ ident.name(),
+ spark,
+ dsOptions,
+
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+ scala.Option.apply(schema),
+ ParquetFileFormat.class,
+ partitionSchema);
+ } else if (formatTable.format() == FormatTable.Format.JSON) {
+ return new PartitionedJsonTable(
+ ident.name(),
+ spark,
+ dsOptions,
+
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+ scala.Option.apply(schema),
+ JsonFileFormat.class,
+ partitionSchema);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported format table "
+ + ident.name()
+ + " format "
+ + formatTable.format().name());
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/BaseTable.scala
similarity index 51%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
copy to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/BaseTable.scala
index ab429c82bc..e7521cd409 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/BaseTable.scala
@@ -18,15 +18,35 @@
package org.apache.paimon.spark
-import org.apache.paimon.predicate.Predicate
-import org.apache.paimon.table.FormatTable
+import org.apache.paimon.table.Table
+import org.apache.paimon.utils.StringUtils
+import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
import org.apache.spark.sql.types.StructType
-/** Scan implementation for {@link FormatTable}. */
-case class PaimonFormatTableScan(
- table: FormatTable,
- requiredSchema: StructType,
- filters: Seq[Predicate],
- override val pushDownLimit: Option[Int])
- extends PaimonFormatTableBaseScan(table, requiredSchema, filters,
pushDownLimit) {}
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+
+abstract class BaseTable
+ extends org.apache.spark.sql.connector.catalog.Table
+ with PaimonPartitionManagement {
+
+ val table: Table
+
+ override def name: String = table.fullName
+
+ override lazy val schema: StructType =
SparkTypeUtils.fromPaimonRowType(table.rowType)
+
+ override def partitioning: Array[Transform] = {
+ table.partitionKeys().asScala.map(p =>
Expressions.identity(StringUtils.quote(p))).toArray
+ }
+
+ override def properties: JMap[String, String] = {
+ table.options()
+ }
+
+ override def toString: String = {
+ s"${table.getClass.getSimpleName}[${table.fullName()}]"
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
index 2088ee50c2..b04e21c99a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
-/** Base Scan implementation for {@link FormatTable}. */
+/** Base Scan implementation for [[FormatTable]]. */
abstract class PaimonFormatTableBaseScan(
table: FormatTable,
requiredSchema: StructType,
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
index ab429c82bc..8fb64999b9 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.table.FormatTable
import org.apache.spark.sql.types.StructType
-/** Scan implementation for {@link FormatTable}. */
+/** Scan implementation for [[FormatTable]] */
case class PaimonFormatTableScan(
table: FormatTable,
requiredSchema: StructType,
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index b40812d4d7..193ec55d3b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -19,9 +19,7 @@
package org.apache.paimon.spark
import org.apache.paimon.CoreOptions
-import org.apache.paimon.operation.FileStoreCommit
-import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.sink.BatchWriteBuilder
+import org.apache.paimon.table.{FileStoreTable, Table}
import org.apache.paimon.types.RowType
import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils}
@@ -31,12 +29,13 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement
import org.apache.spark.sql.types.StructType
-import java.util.{Map => JMap, Objects, UUID}
+import java.util.{Map => JMap, Objects}
import scala.collection.JavaConverters._
trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
- self: PaimonSparkTableBase =>
+
+ val table: Table
lazy val partitionRowType: RowType = TypeUtils.project(table.rowType,
table.partitionKeys)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
index ff1cfc8705..addaaa0831 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
@@ -27,26 +27,21 @@ import org.apache.paimon.spark.util.OptionUtils
import org.apache.paimon.spark.write.{PaimonV2WriteBuilder, PaimonWriteBuilder}
import org.apache.paimon.table.{Table, _}
import org.apache.paimon.table.BucketMode.{BUCKET_UNAWARE, HASH_FIXED,
POSTPONE_MODE}
-import org.apache.paimon.utils.StringUtils
import org.apache.spark.sql.connector.catalog._
-import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import java.util.{Collections, EnumSet => JEnumSet, HashMap => JHashMap, Map
=> JMap, Set => JSet}
-import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
abstract class PaimonSparkTableBase(val table: Table)
- extends org.apache.spark.sql.connector.catalog.Table
+ extends BaseTable
with SupportsRead
with SupportsWrite
- with SupportsMetadataColumns
- with PaimonPartitionManagement {
+ with SupportsMetadataColumns {
lazy val coreOptions = new CoreOptions(table.options())
@@ -72,14 +67,6 @@ abstract class PaimonSparkTableBase(val table: Table)
def getTable: Table = table
- override def name: String = table.fullName
-
- override lazy val schema: StructType =
SparkTypeUtils.fromPaimonRowType(table.rowType)
-
- override def partitioning: Array[Transform] = {
- table.partitionKeys().asScala.map(p =>
Expressions.identity(StringUtils.quote(p))).toArray
- }
-
override def properties: JMap[String, String] = {
table match {
case dataTable: DataTable =>
@@ -163,8 +150,4 @@ abstract class PaimonSparkTableBase(val table: Table)
throw new RuntimeException("Only FileStoreTable can be written.")
}
}
-
- override def toString: String = {
- s"${table.getClass.getSimpleName}[${table.fullName()}]"
- }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
new file mode 100644
index 0000000000..3685e26ee6
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.format
+
+import org.apache.paimon.fs.TwoPhaseOutputStream
+import org.apache.paimon.spark.{BaseTable, FormatTableScanBuilder,
SparkInternalRowWrapper}
+import org.apache.paimon.spark.write.BaseV2WriteBuilder
+import org.apache.paimon.table.FormatTable
+import org.apache.paimon.table.format.TwoPhaseCommitMessage
+import org.apache.paimon.table.sink.BatchTableWrite
+import org.apache.paimon.types.RowType
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite,
TableCapability}
+import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ,
BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC}
+import org.apache.spark.sql.connector.read.ScanBuilder
+import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.connector.write.streaming.StreamingWrite
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.io.FileNotFoundException
+import java.util
+
+import scala.collection.JavaConverters._
+
+case class PaimonFormatTable(table: FormatTable)
+ extends BaseTable
+ with SupportsRead
+ with SupportsWrite {
+
+ override def capabilities(): util.Set[TableCapability] = {
+ util.EnumSet.of(BATCH_READ, BATCH_WRITE, OVERWRITE_DYNAMIC,
OVERWRITE_BY_FILTER)
+ }
+
+ override def newScanBuilder(caseInsensitiveStringMap:
CaseInsensitiveStringMap): ScanBuilder = {
+ val scanBuilder =
FormatTableScanBuilder(table.copy(caseInsensitiveStringMap))
+ scanBuilder.pruneColumns(schema)
+ scanBuilder
+ }
+
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+ PaimonFormatTableWriterBuilder(table, info.schema)
+ }
+}
+
+case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema:
StructType)
+ extends BaseV2WriteBuilder(table) {
+
+ override def partitionRowType(): RowType = table.partitionType
+
+ override def build: Write = new Write() {
+ override def toBatch: BatchWrite = {
+ FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions,
writeSchema)
+ }
+
+ override def toStreaming: StreamingWrite = {
+ throw new UnsupportedOperationException("FormatTable doesn't support
streaming write")
+ }
+ }
+}
+
+private case class FormatTableBatchWrite(
+ table: FormatTable,
+ overwriteDynamic: Boolean,
+ overwritePartitions: Option[Map[String, String]],
+ writeSchema: StructType)
+ extends BatchWrite
+ with Logging {
+
+ assert(
+ !(overwriteDynamic && overwritePartitions.exists(_.nonEmpty)),
+ "Cannot overwrite dynamically and by filter both")
+
+ override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory =
+ FormatTableWriterFactory(table, writeSchema)
+
+ override def useCommitCoordinator(): Boolean = false
+
+ override def commit(messages: Array[WriterCommitMessage]): Unit = {
+ logInfo(s"Committing to FormatTable ${table.name()}")
+
+ val committers = messages
+ .collect {
+ case taskCommit: FormatTableTaskCommit => taskCommit.committers()
+ case other =>
+ throw new IllegalArgumentException(s"${other.getClass.getName} is
not supported")
+ }
+ .flatten
+ .toSeq
+
+ try {
+ val start = System.currentTimeMillis()
+ if (overwritePartitions.isDefined && overwritePartitions.get.nonEmpty) {
+ val child = org.apache.paimon.partition.PartitionUtils
+ .buildPartitionName(overwritePartitions.get.asJava)
+ val partitionPath = new org.apache.paimon.fs.Path(table.location(),
child)
+ deletePreviousDataFile(partitionPath)
+ } else if (overwritePartitions.isDefined &&
overwritePartitions.get.isEmpty) {
+ committers
+ .map(c => c.targetFilePath().getParent)
+ .distinct
+ .foreach(deletePreviousDataFile)
+ }
+ committers.foreach(c => c.commit(table.fileIO()))
+ logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
+ } catch {
+ case e: Exception =>
+ logError("Failed to commit FormatTable writes", e)
+ throw e
+ }
+ }
+
+ private def deletePreviousDataFile(partitionPath:
org.apache.paimon.fs.Path): Unit = {
+ if (table.fileIO().exists(partitionPath)) {
+ val files = table.fileIO().listFiles(partitionPath, true)
+ files
+ .filter(f => !f.getPath.getName.startsWith(".") &&
!f.getPath.getName.startsWith("_"))
+ .foreach(
+ f => {
+ try {
+ table.fileIO().deleteQuietly(f.getPath)
+ } catch {
+ case _: FileNotFoundException => logInfo(s"File ${f.getPath}
already deleted")
+ case other => throw new RuntimeException(other)
+ }
+ })
+ }
+ }
+
+ override def abort(messages: Array[WriterCommitMessage]): Unit = {
+ logInfo(s"Aborting write to FormatTable ${table.name()}")
+ val committers = messages.collect {
+ case taskCommit: FormatTableTaskCommit => taskCommit.committers()
+ }.flatten
+
+ committers.foreach {
+ committer =>
+ try {
+ committer.discard(table.fileIO())
+ } catch {
+ case e: Exception => logWarning(s"Failed to abort committer:
${e.getMessage}")
+ }
+ }
+ }
+}
+
+private case class FormatTableWriterFactory(table: FormatTable, writeSchema:
StructType)
+ extends DataWriterFactory {
+
+ override def createWriter(partitionId: Int, taskId: Long):
DataWriter[InternalRow] = {
+ val formatTableWrite = table.newBatchWriteBuilder().newWrite()
+ new FormatTableDataWriter(table, formatTableWrite, writeSchema)
+ }
+}
+
+private class FormatTableDataWriter(
+ table: FormatTable,
+ formatTableWrite: BatchTableWrite,
+ writeSchema: StructType)
+ extends DataWriter[InternalRow]
+ with Logging {
+
+ private val rowConverter: InternalRow => org.apache.paimon.data.InternalRow
= {
+ val numFields = writeSchema.fields.length
+ record => {
+ new SparkInternalRowWrapper(-1, writeSchema, numFields).replace(record)
+ }
+ }
+
+ override def write(record: InternalRow): Unit = {
+ val paimonRow = rowConverter.apply(record)
+ formatTableWrite.write(paimonRow)
+ }
+
+ override def commit(): WriterCommitMessage = {
+ try {
+ val committers = formatTableWrite
+ .prepareCommit()
+ .asScala
+ .map {
+ case committer: TwoPhaseCommitMessage => committer.getCommitter
+ case other =>
+ throw new IllegalArgumentException(
+ "Unsupported commit message type: " +
other.getClass.getSimpleName)
+ }
+ .toSeq
+ FormatTableTaskCommit(committers)
+ } finally {
+ close()
+ }
+ }
+
+ override def abort(): Unit = {
+ logInfo("Aborting FormatTable data writer")
+ close()
+ }
+
+ override def close(): Unit = {
+ try {
+ formatTableWrite.close()
+ } catch {
+ case e: Exception =>
+ logError("Error closing FormatTableDataWriter", e)
+ throw new RuntimeException(e)
+ }
+ }
+}
+
+/** Commit message container for FormatTable writes, holding committers that
need to be executed. */
+class FormatTableTaskCommit private (private val _committers:
Seq[TwoPhaseOutputStream.Committer])
+ extends WriterCommitMessage {
+
+ def committers(): Seq[TwoPhaseOutputStream.Committer] = _committers
+}
+
+object FormatTableTaskCommit {
+ def apply(committers: Seq[TwoPhaseOutputStream.Committer]):
FormatTableTaskCommit = {
+ new FormatTableTaskCommit(committers)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
similarity index 51%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
index 5b3aaa4713..0a9c1fcdab 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
@@ -18,27 +18,15 @@
package org.apache.spark.sql.execution
-import org.apache.paimon.fs.TwoPhaseOutputStream
-import org.apache.paimon.spark.{FormatTableScanBuilder,
SparkInternalRowWrapper}
-import org.apache.paimon.spark.write.BaseV2WriteBuilder
-import org.apache.paimon.table.FormatTable
-import org.apache.paimon.table.format.TwoPhaseCommitMessage
-import org.apache.paimon.table.sink.BatchTableWrite
-import org.apache.paimon.types.RowType
import org.apache.paimon.utils.StringUtils
import org.apache.hadoop.fs.Path
-import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Literal}
-import org.apache.spark.sql.connector.catalog.{SupportsPartitionManagement,
SupportsRead, SupportsWrite, TableCapability}
-import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ,
BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC}
+import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
-import org.apache.spark.sql.connector.read.ScanBuilder
-import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter,
DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, Write, WriteBuilder,
WriterCommitMessage}
-import org.apache.spark.sql.connector.write.streaming.StreamingWrite
-import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat,
FileStatusCache, InMemoryFileIndex, NoopCache, PartitioningAwareFileIndex,
PartitionSpec}
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.csv.{CSVScanBuilder,
CSVTable}
import org.apache.spark.sql.execution.datasources.v2.json.JsonTable
import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
@@ -47,12 +35,12 @@ import
org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFile
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import java.io.FileNotFoundException
import java.util
import scala.collection.JavaConverters._
-object PaimonFormatTable {
+/** Format Table implementation with spark. */
+object SparkFormatTable {
// Copy from spark and override FileIndex's partitionSchema
def createFileIndex(
@@ -128,7 +116,6 @@ object PaimonFormatTable {
metadataOpsTimeNs)
}
-// Paimon Format Table
trait PartitionedFormatTable extends SupportsPartitionManagement {
val partitionSchema_ : StructType
@@ -173,47 +160,6 @@ trait PartitionedFormatTable extends
SupportsPartitionManagement {
}
}
-case class PaimonFormatTable(
- sparkSession: SparkSession,
- options: CaseInsensitiveStringMap,
- paths: Seq[String],
- schema: StructType,
- override val partitionSchema_ : StructType,
- table: FormatTable,
- identName: String)
- extends org.apache.spark.sql.connector.catalog.Table
- with SupportsRead
- with SupportsWrite
- with PartitionedFormatTable {
-
- override lazy val fileIndex: PartitioningAwareFileIndex = {
- PaimonFormatTable.createFileIndex(
- options,
- sparkSession,
- paths,
- Option.apply(schema),
- partitionSchema())
- }
-
- override def name(): String = {
- identName
- }
-
- override def capabilities(): util.Set[TableCapability] = {
- util.EnumSet.of(BATCH_READ, BATCH_WRITE, OVERWRITE_DYNAMIC,
OVERWRITE_BY_FILTER)
- }
-
- override def newScanBuilder(caseInsensitiveStringMap:
CaseInsensitiveStringMap): ScanBuilder = {
- val scanBuilder =
FormatTableScanBuilder(table.copy(caseInsensitiveStringMap))
- scanBuilder.pruneColumns(schema)
- scanBuilder
- }
-
- override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
- PaimonFormatTableWriterBuilder(table, info.schema)
- }
-}
-
class PartitionedCSVTable(
name: String,
sparkSession: SparkSession,
@@ -237,7 +183,7 @@ class PartitionedCSVTable(
}
override lazy val fileIndex: PartitioningAwareFileIndex = {
- PaimonFormatTable.createFileIndex(
+ SparkFormatTable.createFileIndex(
options,
sparkSession,
paths,
@@ -263,7 +209,7 @@ class PartitionedOrcTable(
with PartitionedFormatTable {
override lazy val fileIndex: PartitioningAwareFileIndex = {
- PaimonFormatTable.createFileIndex(
+ SparkFormatTable.createFileIndex(
options,
sparkSession,
paths,
@@ -284,7 +230,7 @@ class PartitionedParquetTable(
with PartitionedFormatTable {
override lazy val fileIndex: PartitioningAwareFileIndex = {
- PaimonFormatTable.createFileIndex(
+ SparkFormatTable.createFileIndex(
options,
sparkSession,
paths,
@@ -305,7 +251,7 @@ class PartitionedJsonTable(
with PartitionedFormatTable {
override lazy val fileIndex: PartitioningAwareFileIndex = {
- PaimonFormatTable.createFileIndex(
+ SparkFormatTable.createFileIndex(
options,
sparkSession,
paths,
@@ -313,179 +259,3 @@ class PartitionedJsonTable(
partitionSchema())
}
}
-
-case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema:
StructType)
- extends BaseV2WriteBuilder(table) {
-
- override def partitionRowType(): RowType = table.partitionType
-
- override def build: Write = new Write() {
- override def toBatch: BatchWrite = {
- FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions,
writeSchema)
- }
-
- override def toStreaming: StreamingWrite = {
- throw new UnsupportedOperationException("FormatTable doesn't support
streaming write")
- }
- }
-}
-
-private case class FormatTableBatchWrite(
- table: FormatTable,
- overwriteDynamic: Boolean,
- overwritePartitions: Option[Map[String, String]],
- writeSchema: StructType)
- extends BatchWrite
- with Logging {
-
- assert(
- !(overwriteDynamic && overwritePartitions.exists(_.nonEmpty)),
- "Cannot overwrite dynamically and by filter both")
-
- override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory =
- FormatTableWriterFactory(table, writeSchema)
-
- override def useCommitCoordinator(): Boolean = false
-
- override def commit(messages: Array[WriterCommitMessage]): Unit = {
- logInfo(s"Committing to FormatTable ${table.name()}")
-
- val committers = messages
- .collect {
- case taskCommit: FormatTableTaskCommit => taskCommit.committers()
- case other =>
- throw new IllegalArgumentException(s"${other.getClass.getName} is
not supported")
- }
- .flatten
- .toSeq
-
- try {
- val start = System.currentTimeMillis()
- if (overwritePartitions.isDefined && overwritePartitions.get.nonEmpty) {
- val child = org.apache.paimon.partition.PartitionUtils
- .buildPartitionName(overwritePartitions.get.asJava)
- val partitionPath = new org.apache.paimon.fs.Path(table.location(),
child)
- deletePreviousDataFile(partitionPath)
- } else if (overwritePartitions.isDefined &&
overwritePartitions.get.isEmpty) {
- committers
- .map(c => c.targetFilePath().getParent)
- .distinct
- .foreach(deletePreviousDataFile)
- }
- committers.foreach(c => c.commit(table.fileIO()))
- logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
- } catch {
- case e: Exception =>
- logError("Failed to commit FormatTable writes", e)
- throw e
- }
- }
-
- def deletePreviousDataFile(partitionPath: org.apache.paimon.fs.Path): Unit =
{
- if (table.fileIO().exists(partitionPath)) {
- val files = table.fileIO().listFiles(partitionPath, true)
- files
- .filter(f => !f.getPath.getName.startsWith(".") &&
!f.getPath.getName.startsWith("_"))
- .foreach(
- f => {
- try {
- table.fileIO().deleteQuietly(f.getPath)
- } catch {
- case e: FileNotFoundException => logInfo(s"File ${f.getPath}
already deleted")
- case other => throw new RuntimeException(other)
- }
- })
- }
- }
-
- override def abort(messages: Array[WriterCommitMessage]): Unit = {
- logInfo(s"Aborting write to FormatTable ${table.name()}")
- val committers = messages.collect {
- case taskCommit: FormatTableTaskCommit => taskCommit.committers()
- }.flatten
-
- committers.foreach {
- committer =>
- try {
- committer.discard(table.fileIO())
- } catch {
- case e: Exception => logWarning(s"Failed to abort committer:
${e.getMessage}")
- }
- }
- }
-}
-
-private case class FormatTableWriterFactory(table: FormatTable, writeSchema:
StructType)
- extends DataWriterFactory {
-
- override def createWriter(partitionId: Int, taskId: Long):
DataWriter[InternalRow] = {
- val formatTableWrite = table.newBatchWriteBuilder().newWrite()
- new FormatTableDataWriter(table, formatTableWrite, writeSchema)
- }
-}
-
-private class FormatTableDataWriter(
- table: FormatTable,
- formatTableWrite: BatchTableWrite,
- writeSchema: StructType)
- extends DataWriter[InternalRow]
- with Logging {
-
- private val rowConverter: InternalRow => org.apache.paimon.data.InternalRow
= {
- val numFields = writeSchema.fields.length
- record => {
- new SparkInternalRowWrapper(-1, writeSchema, numFields).replace(record)
- }
- }
-
- override def write(record: InternalRow): Unit = {
- val paimonRow = rowConverter.apply(record)
- formatTableWrite.write(paimonRow)
- }
-
- override def commit(): WriterCommitMessage = {
- try {
- val committers = formatTableWrite
- .prepareCommit()
- .asScala
- .map {
- case committer: TwoPhaseCommitMessage => committer.getCommitter
- case other =>
- throw new IllegalArgumentException(
- "Unsupported commit message type: " +
other.getClass.getSimpleName)
- }
- .toSeq
- FormatTableTaskCommit(committers)
- } finally {
- close()
- }
- }
-
- override def abort(): Unit = {
- logInfo("Aborting FormatTable data writer")
- close()
- }
-
- override def close(): Unit = {
- try {
- formatTableWrite.close()
- } catch {
- case e: Exception =>
- logError("Error closing FormatTableDataWriter", e)
- throw new RuntimeException(e)
- }
- }
-}
-
-/** Commit message container for FormatTable writes, holding committers that
need to be executed. */
-class FormatTableTaskCommit private (private val _committers:
Seq[TwoPhaseOutputStream.Committer])
- extends WriterCommitMessage {
-
- def committers(): Seq[TwoPhaseOutputStream.Committer] = _committers
-}
-
-object FormatTableTaskCommit {
- def apply(committers: Seq[TwoPhaseOutputStream.Committer]):
FormatTableTaskCommit = {
- new FormatTableTaskCommit(committers)
- }
-}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
index 23e3f5a2eb..44c6abb297 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
@@ -287,4 +287,26 @@ class PaimonFormatTableTest extends
PaimonSparkTestWithRestCatalogBase {
}
}
}
+
+ test("Paimon format table: show partitions") {
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (id INT, p1 INT, p2 STRING) USING parquet
+ |PARTITIONED BY (p1, p2)
+ |TBLPROPERTIES ('format-table.implementation'='paimon')
+ |""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 1, '1')")
+ sql("INSERT INTO t VALUES (2, 1, '1')")
+ sql("INSERT INTO t VALUES (3, 2, '1')")
+ sql("INSERT INTO t VALUES (3, 2, '2')")
+
+ checkAnswer(
+ sql("SHOW PARTITIONS t"),
+ Seq(Row("p1=1/p2=1"), Row("p1=2/p2=1"), Row("p1=2/p2=2")))
+ checkAnswer(
+ sql("SHOW PARTITIONS t PARTITION (p1=2)"),
+ Seq(Row("p1=2/p2=1"), Row("p1=2/p2=2")))
+ checkAnswer(sql("SHOW PARTITIONS t PARTITION (p1=2, p2='2')"),
Seq(Row("p1=2/p2=2")))
+ }
+ }
}