This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch dev-v6.0.0-decimal-cast
in repository https://gitbox.apache.org/repos/asf/auron.git

commit d446a83983ef383b1622554ecdba933e824d2b65
Author: zhuangxian <[email protected]>
AuthorDate: Tue Dec 9 02:31:20 2025 +0000

    adapt to celeborn-0.6.0 with some special patchs
    
    KDev_MR_link:https://ksurl.cn/cMld7QJI
---
 pom.xml                                            |  4 ++--
 .../shuffle/celeborn/CelebornPartitionWriter.scala | 22 +++++++++++++++++++++-
 2 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/pom.xml b/pom.xml
index b455cdac..57140bf3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,7 +21,7 @@
     <arrowVersion>16.0.0</arrowVersion>
     <protobufVersion>3.21.9</protobufVersion>
     <paimonVersion>1.1.1</paimonVersion>
-    <celebornVersion>0.6.0</celebornVersion>
+    <celebornVersion>0.6.2</celebornVersion>
   </properties>
 
   <dependencyManagement>
@@ -416,7 +416,7 @@
       <id>spark-3.5</id>
       <properties>
         <shimName>spark-3.5</shimName>
-        <pkgSuffix>-kwai</pkgSuffix>
+        <pkgSuffix>-kwai-adapt-cele060</pkgSuffix>
         <shimPkg>spark-extension-shims-spark3</shimPkg>
         <javaVersion>1.8</javaVersion>
         <scalaVersion>2.12</scalaVersion>
diff --git 
a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/CelebornPartitionWriter.scala
 
b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/CelebornPartitionWriter.scala
index 88e2eb6f..a60ec70e 100644
--- 
a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/CelebornPartitionWriter.scala
+++ 
b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/CelebornPartitionWriter.scala
@@ -64,7 +64,27 @@ class CelebornPartitionWriter(
   override def close(success: Boolean): Unit = {
     val waitStartTime = System.nanoTime()
     if (success) {
-      shuffleClient.mapperEnd(shuffleId, mapId, encodedAttemptId, numMappers)
+      try {
+        // Try new API with numPartitions parameter (Celeborn 0.6+)
+        shuffleClient.mapperEnd(shuffleId, mapId, encodedAttemptId, 
numMappers, numPartitions)
+      } catch {
+        case _: NoSuchMethodError =>
+          // Fallback to old API without numPartitions parameter (Celeborn 
0.5.x)
+          // Use reflection to avoid compilation error with new version
+          try {
+            val method = shuffleClient.getClass.getMethod(
+              "mapperEnd",
+              classOf[Int], classOf[Int], classOf[Int], classOf[Int])
+            method.invoke(shuffleClient,
+              Int.box(shuffleId),
+              Int.box(mapId),
+              Int.box(encodedAttemptId),
+              Int.box(numMappers))
+          } catch {
+            case e: Exception =>
+              logWarning(s"mapperEnd failed with error: ${e.getMessage}", e)
+          }
+      }
     }
     shuffleClient.cleanup(shuffleId, mapId, encodedAttemptId)
     metrics.incWriteTime(System.nanoTime() - waitStartTime)

Reply via email to