This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 5625e47e48dd6a26002c7b20d1dad4a26f97a7fb
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Oct 29 10:26:11 2025 +0800

    [spark] Fix write non-pk dv table with external paths (#6487)
---
 .../spark/commands/PaimonRowLevelCommand.scala     | 31 ++++++--------------
 .../paimon/spark/commands/PaimonSparkWriter.scala  |  5 ++--
 .../paimon/spark/commands/SparkDataFileMeta.scala  | 13 +++++----
 .../spark/commands/SparkDeletionVector.scala       | 23 ++++-----------
 .../spark/commands/UpdatePaimonTableCommand.scala  |  7 ++---
 .../paimon/spark/sql/DeletionVectorTest.scala      | 33 +++++++++++++++++++++-
 6 files changed, 60 insertions(+), 52 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
index 300ca20ff8..11dd947009 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
@@ -71,12 +71,6 @@ trait PaimonRowLevelCommand
     }
   }
 
-  /** Gets a relative path against the table path. */
-  protected def relativePath(absolutePath: String): String = {
-    val location = table.location().toUri
-    location.relativize(new URI(absolutePath)).toString
-  }
-
   protected def findCandidateDataSplits(
       condition: Expression,
       output: Seq[Attribute]): Seq[DataSplit] = {
@@ -121,7 +115,6 @@ trait PaimonRowLevelCommand
       .distinct()
       .as[String]
       .collect()
-      .map(relativePath)
   }
 
   protected def extractFilesAndCreateNewScan(
@@ -136,15 +129,14 @@ trait PaimonRowLevelCommand
     (files, newRelation)
   }
 
-  /** Notice that, the key is a relative path, not just the file name. */
+  /** Notice that, the key is a file path, not just the file name. */
   protected def candidateFileMap(
       candidateDataSplits: Seq[DataSplit]): Map[String, SparkDataFileMeta] = {
     val totalBuckets = coreOptions.bucket()
     val candidateDataFiles = candidateDataSplits
       .flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit, 
totalBuckets))
-    val fileStorePathFactory = fileStore.pathFactory()
     candidateDataFiles
-      .map(file => (file.relativePath(fileStorePathFactory), file))
+      .map(file => (file.filePath(), file))
       .toMap
   }
 
@@ -165,11 +157,12 @@ trait PaimonRowLevelCommand
       dataset: Dataset[Row],
       sparkSession: SparkSession): Dataset[SparkDeletionVector] = {
     import sparkSession.implicits._
-    val dataFileToPartitionAndBucket =
-      dataFilePathToMeta.mapValues(meta => (meta.partition, 
meta.bucket)).toArray
+    // convert to a serializable map
+    val dataFileToPartitionAndBucket = dataFilePathToMeta.map {
+      case (k, v) => k -> (v.bucketPath, v.partition, v.bucket)
+    }
 
     val my_table = table
-    val location = my_table.location
     val dvBitmap64 = my_table.coreOptions().deletionVectorBitmap64()
     dataset
       .select(DV_META_COLUMNS.map(col): _*)
@@ -183,18 +176,12 @@ trait PaimonRowLevelCommand
             dv.delete(iter.next()._2)
           }
 
-          val relativeFilePath = location.toUri.relativize(new 
URI(filePath)).toString
-          val (partition, bucket) = 
dataFileToPartitionAndBucket.toMap.apply(relativeFilePath)
-          val pathFactory = my_table.store().pathFactory()
-          val relativeBucketPath = pathFactory
-            .relativeBucketPath(partition, bucket)
-            .toString
-
+          val (bucketPath, partition, bucket) = 
dataFileToPartitionAndBucket.apply(filePath)
           SparkDeletionVector(
-            relativeBucketPath,
+            bucketPath,
             SerializationUtils.serializeBinaryRow(partition),
             bucket,
-            new Path(filePath).getName,
+            filePath,
             DeletionVector.serializeToBytes(dv)
           )
       }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 6d0563b364..ee3104e9b3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -25,6 +25,7 @@ import org.apache.paimon.crosspartition.{IndexBootstrap, 
KeyPartOrRow}
 import org.apache.paimon.data.serializer.InternalSerializers
 import org.apache.paimon.deletionvectors.DeletionVector
 import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer
+import org.apache.paimon.fs.Path
 import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner}
 import org.apache.paimon.io.{CompactIncrement, DataIncrement}
 import org.apache.paimon.manifest.FileKind
@@ -310,7 +311,7 @@ case class PaimonSparkWriter(table: FileStoreTable, 
writeRowTracking: Boolean =
     val sparkSession = deletionVectors.sparkSession
     import sparkSession.implicits._
     val serializedCommits = deletionVectors
-      .groupByKey(_.partitionAndBucket)
+      .groupByKey(_.bucketPath)
       .mapGroups {
         (_, iter: Iterator[SparkDeletionVector]) =>
           val indexHandler = table.store().newIndexFileHandler()
@@ -334,7 +335,7 @@ case class PaimonSparkWriter(table: FileStoreTable, 
writeRowTracking: Boolean =
             }
 
             dvIndexFileMaintainer.notifyNewDeletionVector(
-              sdv.dataFileName,
+              new Path(sdv.dataFilePath).getName,
               DeletionVector.deserializeFromBytes(sdv.deletionVector))
           }
           val indexEntries = dvIndexFileMaintainer.persist()
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
index 569a84a74c..921d2e4735 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
@@ -27,17 +27,19 @@ import org.apache.paimon.utils.FileStorePathFactory
 import scala.collection.JavaConverters._
 
 case class SparkDataFileMeta(
+    bucketPath: String,
     partition: BinaryRow,
     bucket: Int,
     totalBuckets: Int,
     dataFileMeta: DataFileMeta,
     deletionFile: Option[DeletionFile] = None) {
 
-  def relativePath(fileStorePathFactory: FileStorePathFactory): String = {
-    fileStorePathFactory
-      .relativeBucketPath(partition, bucket)
-      .toUri
-      .toString + "/" + dataFileMeta.fileName()
+  def filePath(): String = {
+    if (dataFileMeta.externalPath().isPresent) {
+      dataFileMeta.externalPath().get()
+    } else {
+      bucketPath + "/" + dataFileMeta.fileName()
+    }
   }
 }
 
@@ -52,6 +54,7 @@ object SparkDataFileMeta {
     dataSplit.dataFiles().asScala.map {
       file =>
         SparkDataFileMeta(
+          dataSplit.bucketPath(),
           dataSplit.partition,
           dataSplit.bucket,
           totalBuckets,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVector.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVector.scala
index 9fc7fdadcb..53453af913 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVector.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVector.scala
@@ -18,9 +18,8 @@
 
 package org.apache.paimon.spark.commands
 
-import org.apache.paimon.fs.Path
 import org.apache.paimon.table.source.DataSplit
-import org.apache.paimon.utils.{FileStorePathFactory, SerializationUtils}
+import org.apache.paimon.utils.SerializationUtils
 
 import scala.collection.JavaConverters._
 
@@ -29,31 +28,21 @@ import scala.collection.JavaConverters._
  * or DeletionVector.
  */
 case class SparkDeletionVector(
-    partitionAndBucket: String,
+    bucketPath: String,
     partition: Array[Byte],
     bucket: Int,
-    dataFileName: String,
+    dataFilePath: String,
     deletionVector: Array[Byte]
-) {
-  def relativePath(pathFactory: FileStorePathFactory): String = {
-    val prefix = pathFactory
-      .relativeBucketPath(SerializationUtils.deserializeBinaryRow(partition), 
bucket)
-      .toUri
-      .toString + "/"
-    prefix + dataFileName
-  }
-}
+)
 
 object SparkDeletionVector {
   def toDataSplit(
       deletionVector: SparkDeletionVector,
-      root: Path,
-      pathFactory: FileStorePathFactory,
       dataFilePathToMeta: Map[String, SparkDataFileMeta]): DataSplit = {
-    val meta = dataFilePathToMeta(deletionVector.relativePath(pathFactory))
+    val meta = dataFilePathToMeta(deletionVector.dataFilePath)
     DataSplit
       .builder()
-      .withBucketPath(root + "/" + deletionVector.partitionAndBucket)
+      .withBucketPath(deletionVector.bucketPath)
       
.withPartition(SerializationUtils.deserializeBinaryRow(deletionVector.partition))
       .withBucket(deletionVector.bucket)
       .withTotalBuckets(meta.totalBuckets)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 8839d5c8ac..1258ebc449 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -81,8 +81,6 @@ case class UpdatePaimonTableCommand(
       logDebug("No file need to rewrote. It's an empty Commit.")
       Seq.empty[CommitMessage]
     } else {
-      val pathFactory = fileStore.pathFactory()
-
       if (deletionVectorsEnabled) {
         // Step2: collect all the deletion vectors that marks the deleted rows.
         val deletionVectors = collectDeletionVectors(
@@ -95,9 +93,8 @@ case class UpdatePaimonTableCommand(
         deletionVectors.cache()
         try {
           // Step3: write these updated data
-          val touchedDataSplits = deletionVectors.collect().map {
-            SparkDeletionVector.toDataSplit(_, root, pathFactory, 
dataFilePathToMeta)
-          }
+          val touchedDataSplits =
+            deletionVectors.collect().map(SparkDeletionVector.toDataSplit(_, 
dataFilePathToMeta))
           val addCommitMessage = writeOnlyUpdatedData(sparkSession, 
touchedDataSplits)
 
           // Step4: write these deletion vectors.
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
index 68e741fb13..e3a5896ab1 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -660,7 +660,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with 
AdaptiveSparkPlanHelpe
     assert(dvMeta.cardinality() == 334)
   }
 
-  test("Paimon deletionVector: delete from non-pk table with data file path") {
+  test("Paimon deletionVector: delete from non-pk table with data file 
directory") {
     sql(s"""
            |CREATE TABLE T (id INT)
            |TBLPROPERTIES (
@@ -677,6 +677,37 @@ class DeletionVectorTest extends PaimonSparkTestBase with 
AdaptiveSparkPlanHelpe
     checkAnswer(sql("SELECT count(*) FROM T"), Row(49665))
   }
 
+  test("Paimon deletionVector: delete from non-pk table with data file 
external paths") {
+    withTempDir {
+      tmpDir =>
+        {
+          sql(s"""
+                 |CREATE TABLE T (id INT, v INT)
+                 |TBLPROPERTIES (
+                 | 'deletion-vectors.enabled' = 'true',
+                 | 'deletion-vectors.bitmap64' = '${Random.nextBoolean()}',
+                 | 'bucket-key' = 'id',
+                 | 'bucket' = '1',
+                 | 'data-file.external-paths' = 
'file://${tmpDir.getCanonicalPath}',
+                 | 'data-file.external-paths.strategy' = 'round-robin'
+                 |)
+                 |""".stripMargin)
+          sql("INSERT INTO T SELECT /*+ REPARTITION(1) */ id, id FROM range 
(1, 50000)")
+          sql("DELETE FROM T WHERE id >= 111 and id <= 444")
+          checkAnswer(sql("SELECT count(*) FROM T"), Row(49665))
+          checkAnswer(sql("SELECT sum(v) FROM T"), Row(1249882315L))
+
+          sql("UPDATE T SET v = v + 1 WHERE id >= 555 and id <= 666")
+          checkAnswer(sql("SELECT count(*) FROM T"), Row(49665))
+          checkAnswer(sql("SELECT sum(v) FROM T"), Row(1249882427L))
+
+          sql("UPDATE T SET v = v + 1 WHERE id >= 600 and id <= 800")
+          checkAnswer(sql("SELECT count(*) FROM T"), Row(49665))
+          checkAnswer(sql("SELECT sum(v) FROM T"), Row(1249882628L))
+        }
+    }
+  }
+
   test("Paimon deletionVector: work v1 with v2") {
     sql(s"""
            |CREATE TABLE T (id INT)

Reply via email to