This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 373155fb99 [spark] Fix CSV Format Table with field-delimiter (#5845)
373155fb99 is described below
commit 373155fb9988de82669baf427155ecc2346d11b5
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jul 7 13:20:01 2025 +0800
[spark] Fix CSV Format Table with field-delimiter (#5845)
---
.../apache/spark/sql/execution/PaimonFormatTable.scala | 16 ++++++++++++++--
.../apache/paimon/spark/sql/FormatTableTestBase.scala | 12 ++++++++++++
2 files changed, 26 insertions(+), 2 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
index 8e6eea0127..46516d64c8 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Literal}
import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable
+import org.apache.spark.sql.execution.datasources.v2.csv.{CSVScanBuilder,
CSVTable}
import org.apache.spark.sql.execution.datasources.v2.json.JsonTable
import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
@@ -48,7 +48,7 @@ object PaimonFormatTable {
def globPaths: Boolean = {
val entry = options.get(DataSource.GLOB_PATHS_KEY)
- Option(entry).map(_ == "true").getOrElse(true)
+ Option(entry).forall(_ == "true")
}
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
@@ -163,6 +163,18 @@ class PartitionedCSVTable(
override val partitionSchema_ : StructType)
extends CSVTable(name, sparkSession, options, paths, userSpecifiedSchema,
fallbackFileFormat)
with PartitionedFormatTable {
+
+ override def newScanBuilder(options: CaseInsensitiveStringMap):
CSVScanBuilder = {
+ val mergedOptions =
+ this.options.asCaseSensitiveMap().asScala ++
options.asCaseSensitiveMap().asScala
+ CSVScanBuilder(
+ sparkSession,
+ fileIndex,
+ schema,
+ dataSchema,
+ new CaseInsensitiveStringMap(mergedOptions.asJava))
+ }
+
override lazy val fileIndex: PartitioningAwareFileIndex = {
PaimonFormatTable.createFileIndex(
options,
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
index a296c2ce90..c899853dd1 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
@@ -33,6 +33,18 @@ abstract class FormatTableTestBase extends
PaimonHiveTestBase {
sql(s"USE $hiveDbName")
}
+ test("Format table: csv with field-delimiter") {
+ withTable("t") {
+ sql(s"CREATE TABLE t (f0 INT, f1 INT) USING CSV OPTIONS
('field-delimiter' ';')")
+ val table =
+ paimonCatalog.getTable(Identifier.create(hiveDbName,
"t")).asInstanceOf[FormatTable]
+ val csvFile =
+ new Path(table.location(),
"part-00000-0a28422e-68ba-4713-8870-2fde2d36ed06-c000.csv")
+ table.fileIO().writeFile(csvFile, "1;2\n3;4", false)
+ checkAnswer(sql("SELECT * FROM t"), Seq(Row(1, 2), Row(3, 4)))
+ }
+ }
+
test("Format table: write partitioned table") {
for (format <- Seq("csv", "orc", "parquet", "json")) {
withTable("t") {