This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch dev-v6.0.0-decimal-cast
in repository https://gitbox.apache.org/repos/asf/auron.git

commit 10d7d0bbe144f05a145ea63d8a2f3463038849c1
Author: zhuangxian <[email protected]>
AuthorDate: Wed Dec 10 13:26:34 2025 +0000

    fix empty partition data size
    
    KDev_MR_link:https://ksurl.cn/yez9gIJf
---
 pom.xml                                              |  2 +-
 .../celeborn/BlazeCelebornShuffleWriter.scala        | 20 ++++++++++++++++++--
 2 files changed, 19 insertions(+), 3 deletions(-)

diff --git a/pom.xml b/pom.xml
index 57140bf3..0301c25f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -416,7 +416,7 @@
       <id>spark-3.5</id>
       <properties>
         <shimName>spark-3.5</shimName>
-        <pkgSuffix>-kwai-adapt-cele060</pkgSuffix>
+        <pkgSuffix>-kwai-adapt-cele060-fix-non-partition-len</pkgSuffix>
         <shimPkg>spark-extension-shims-spark3</shimPkg>
         <javaVersion>1.8</javaVersion>
         <scalaVersion>2.12</scalaVersion>
diff --git 
a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/BlazeCelebornShuffleWriter.scala
 
b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/BlazeCelebornShuffleWriter.scala
index 2c636307..c5c4df76 100644
--- 
a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/BlazeCelebornShuffleWriter.scala
+++ 
b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/BlazeCelebornShuffleWriter.scala
@@ -16,7 +16,7 @@
 package org.apache.spark.sql.execution.blaze.shuffle.celeborn
 
 import org.apache.celeborn.client.ShuffleClient
-import org.apache.spark.TaskContext
+import org.apache.spark.{TaskContext, SparkEnv}
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.shuffle.ShuffleHandle
 import org.apache.spark.shuffle.ShuffleWriteMetricsReporter
@@ -24,6 +24,7 @@ import org.apache.spark.shuffle.ShuffleWriter
 import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle
 import org.apache.spark.shuffle.celeborn.ExecutorShuffleIdTracker
 import org.apache.spark.shuffle.celeborn.SparkUtils
+import org.apache.spark.sql.blaze.Shims
 import org.apache.spark.sql.execution.blaze.shuffle.BlazeRssShuffleWriterBase
 import org.apache.spark.sql.execution.blaze.shuffle.RssPartitionWriterBase
 import org.blaze.sparkver
@@ -68,4 +69,19 @@ class BlazeCelebornShuffleWriter[K, V](
     celebornShuffleWriter.write(Iterator.empty) // force flush
     celebornShuffleWriter.stop(success)
   }
-}
+
+  // Override stop to use partition length map directly instead of rssStop's 
mapStatus
+  // because celeborn writer doesn't populate partition sizes correctly when 
using native writer
+  override def stop(success: Boolean): Option[MapStatus] = {
+    if (!success) {
+      return None
+    }
+
+    // Always use getPartitionLengthMap for Celeborn to get correct partition 
sizes
+    val blockManagerId = SparkEnv.get.blockManager.shuffleServerId
+    Some(Shims.get.getMapStatus(
+      blockManagerId,
+      celebornPartitionWriter.getPartitionLengthMap,
+      taskContext.partitionId()))
+  }
+}
\ No newline at end of file

Reply via email to