This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 73061c5f1e [core] format table: insert overwrite when data is empty
need init partition path (#6583)
73061c5f1e is described below
commit 73061c5f1ee95c621190c1adc8efe9f6ee795e33
Author: jerry <[email protected]>
AuthorDate: Tue Nov 11 22:37:22 2025 +0800
[core] format table: insert overwrite when data is empty need init
partition path (#6583)
---
.../paimon/table/format/FormatTableCommit.java | 12 +++++--
.../paimon/spark/format/PaimonFormatTable.scala | 1 -
.../paimon/spark/table/PaimonFormatTableTest.scala | 40 ++++++++++++++++++++++
3 files changed, 50 insertions(+), 3 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
index 235dbd4310..5ff58d8b9a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java
@@ -80,14 +80,21 @@ public class FormatTableCommit implements BatchTableCommit {
+ commitMessage.getClass().getName());
}
}
- if (overwrite && staticPartitions != null &&
!staticPartitions.isEmpty()) {
+
+ if (staticPartitions != null && !staticPartitions.isEmpty()) {
Path partitionPath =
buildPartitionPath(
location,
staticPartitions,
formatTablePartitionOnlyValueInPath,
partitionKeys);
- deletePreviousDataFile(partitionPath);
+
+ if (overwrite) {
+ deletePreviousDataFile(partitionPath);
+ }
+ if (!fileIO.exists(partitionPath)) {
+ fileIO.mkdirs(partitionPath);
+ }
} else if (overwrite) {
Set<Path> partitionPaths = new HashSet<>();
for (TwoPhaseOutputStream.Committer c : committers) {
@@ -97,6 +104,7 @@ public class FormatTableCommit implements BatchTableCommit {
deletePreviousDataFile(p);
}
}
+
for (TwoPhaseOutputStream.Committer committer : committers) {
committer.commit(this.fileIO);
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
index 449346e456..75f7588185 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
@@ -18,7 +18,6 @@
package org.apache.paimon.spark.format
-import org.apache.paimon.CoreOptions
import org.apache.paimon.format.csv.CsvOptions
import org.apache.paimon.spark.{BaseTable, FormatTableScanBuilder,
SparkInternalRowWrapper}
import org.apache.paimon.spark.write.BaseV2WriteBuilder
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
index fc65cc28fe..71580156fd 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
@@ -34,6 +34,46 @@ class PaimonFormatTableTest extends
PaimonSparkTestWithRestCatalogBase {
sql("USE test_db")
}
+ test("PaimonFormatTable: partition path validate when insert overwrite empty
data") {
+ val tableName = "paimon_format_test_partition_path"
+ val readTableName = s"${tableName}_read"
+ withTable(tableName) {
+ sql(
+ s"CREATE TABLE $tableName (f0 INT, f1 string, f2 INT) USING CSV
TBLPROPERTIES (" +
+ s"'file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
+ "'format-table.implementation'='paimon') PARTITIONED BY (`ds`
bigint)")
+ sql(
+ s"CREATE TABLE $readTableName (f0 INT, f1 string, f2 INT) USING CSV
TBLPROPERTIES (" +
+ s"'file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
+ "'format-table.implementation'='paimon') PARTITIONED BY (`ds`
bigint)")
+ val table =
+ paimonCatalog.getTable(Identifier.create("test_db",
tableName)).asInstanceOf[FormatTable]
+ val readTable =
+ paimonCatalog
+ .getTable(Identifier.create("test_db", s"$readTableName"))
+ .asInstanceOf[FormatTable]
+
+ table.fileIO().mkdirs(new Path(table.location()))
+ readTable.fileIO().mkdirs(new Path(readTable.location()))
+
+ val partition = 20250920
+ val partitionPath = new Path(table.location(), s"ds=$partition")
+ checkAnswer(
+ sql(s"SELECT * FROM $readTableName where ds = $partition"),
+ Nil
+ )
+ checkAnswer(
+ sql(s"SELECT * FROM $tableName where ds = $partition"),
+ Nil
+ )
+ spark.sql(
+ s"INSERT OVERWRITE $tableName PARTITION (ds = $partition) select `f0`,
`f1`, `f2` from $readTableName where ds = $partition")
+ assert(
+ table.fileIO().exists(partitionPath),
+ s"Partition directory should exist after empty insert: $partitionPath")
+ }
+ }
+
test("PaimonFormatTableRead table: csv mode") {
val tableName = "paimon_format_test_csv_malformed"
withTable(tableName) {