This is an automated email from the ASF dual-hosted git repository. richox pushed a commit to branch dev-v6.0.0-decimal-cast in repository https://gitbox.apache.org/repos/asf/auron.git
commit d446a83983ef383b1622554ecdba933e824d2b65 Author: zhuangxian <[email protected]> AuthorDate: Tue Dec 9 02:31:20 2025 +0000 adapt to celeborn-0.6.0 with some special patchs KDev_MR_linkļ¼https://ksurl.cn/cMld7QJI --- pom.xml | 4 ++-- .../shuffle/celeborn/CelebornPartitionWriter.scala | 22 +++++++++++++++++++++- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index b455cdac..57140bf3 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ <arrowVersion>16.0.0</arrowVersion> <protobufVersion>3.21.9</protobufVersion> <paimonVersion>1.1.1</paimonVersion> - <celebornVersion>0.6.0</celebornVersion> + <celebornVersion>0.6.2</celebornVersion> </properties> <dependencyManagement> @@ -416,7 +416,7 @@ <id>spark-3.5</id> <properties> <shimName>spark-3.5</shimName> - <pkgSuffix>-kwai</pkgSuffix> + <pkgSuffix>-kwai-adapt-cele060</pkgSuffix> <shimPkg>spark-extension-shims-spark3</shimPkg> <javaVersion>1.8</javaVersion> <scalaVersion>2.12</scalaVersion> diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/CelebornPartitionWriter.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/CelebornPartitionWriter.scala index 88e2eb6f..a60ec70e 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/CelebornPartitionWriter.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/CelebornPartitionWriter.scala @@ -64,7 +64,27 @@ class CelebornPartitionWriter( override def close(success: Boolean): Unit = { val waitStartTime = System.nanoTime() if (success) { - shuffleClient.mapperEnd(shuffleId, mapId, encodedAttemptId, numMappers) + try { + // Try new API with numPartitions parameter (Celeborn 0.6+) + shuffleClient.mapperEnd(shuffleId, mapId, encodedAttemptId, numMappers, numPartitions) + } catch { + case _: NoSuchMethodError => + // Fallback to old API without numPartitions parameter (Celeborn 0.5.x) + // Use reflection to avoid compilation error with new version + try { + val method = shuffleClient.getClass.getMethod( + "mapperEnd", + classOf[Int], classOf[Int], classOf[Int], classOf[Int]) + method.invoke(shuffleClient, + Int.box(shuffleId), + Int.box(mapId), + Int.box(encodedAttemptId), + Int.box(numMappers)) + } catch { + case e: Exception => + logWarning(s"mapperEnd failed with error: ${e.getMessage}", e) + } + } } shuffleClient.cleanup(shuffleId, mapId, encodedAttemptId) metrics.incWriteTime(System.nanoTime() - waitStartTime)
