This is an automated email from the ASF dual-hosted git repository. richox pushed a commit to branch dev-v6.0.0-parallel-scan-kdev-build in repository https://gitbox.apache.org/repos/asf/auron.git
commit 60328bc9e8899dd814714670d7b717f767362432 Author: zhangli20 <[email protected]> AuthorDate: Mon Dec 8 09:19:57 2025 +0000 Fix: keep backward compatibility with celeborn-054This reverts commit 437180b7506bb56da6ad0a77ba6f6e KDev_MR_linkļ¼https://ksurl.cn/aCmpUcJb --- .../celeborn/BlazeCelebornShuffleReader.scala | 144 +++++++++++++++------ 1 file changed, 104 insertions(+), 40 deletions(-) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/BlazeCelebornShuffleReader.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/BlazeCelebornShuffleReader.scala index 569061d3..34f681cc 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/BlazeCelebornShuffleReader.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/BlazeCelebornShuffleReader.scala @@ -25,6 +25,7 @@ import scala.reflect.ClassTag import org.apache.celeborn.client.read.CelebornInputStream import org.apache.celeborn.common.CelebornConf +import org.apache.commons.lang3.reflect.FieldUtils import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.shuffle.ShuffleReadMetricsReporter @@ -51,52 +52,115 @@ class BlazeCelebornShuffleReader[K, C]( with Logging { override protected def readBlocks(): Iterator[InputStream] = { - // force disable decompression because compression is skipped in shuffle writer - val reader = new CelebornShuffleReader[K, C]( - handle, - startPartition, - endPartition, - startMapIndex.getOrElse(0), - endMapIndex.getOrElse(Int.MaxValue), - context, - conf, - BlazeCelebornShuffleReader.createBypassingIncRecordsReadMetrics(metrics), - shuffleIdTracker, - false) { - - override def newSerializerInstance(dep: ShuffleDependency[K, _, C]): SerializerInstance = { - new SerializerInstance { - override def serialize[T: ClassTag](t: T): ByteBuffer = - throw new UnsupportedOperationException( - "BlazeCelebornShuffleReader.newSerializerInstance") - - override def deserialize[T: ClassTag](bytes: ByteBuffer): T = - throw new UnsupportedOperationException( - "BlazeCelebornShuffleReader.newSerializerInstance") - - override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = - throw new UnsupportedOperationException( - "BlazeCelebornShuffleReader.newSerializerInstance") - - override def serializeStream(s: OutputStream): SerializationStream = - throw new UnsupportedOperationException( - "BlazeCelebornShuffleReader.newSerializerInstance") - - override def deserializeStream(s: InputStream): DeserializationStream = { - new DeserializationStream { - override def asKeyValueIterator: Iterator[(Any, Any)] = Iterator.single((null, s)) - - override def readObject[T: ClassTag](): T = - throw new UnsupportedOperationException() - - override def close(): Unit = s.close() + val reader = { + try { + // for celeborn-060 + new CelebornShuffleReader[K, C]( + handle, + startPartition, + endPartition, + startMapIndex.getOrElse(0), + endMapIndex.getOrElse(Int.MaxValue), + context, + conf, + BlazeCelebornShuffleReader.createBypassingIncRecordsReadMetrics(metrics), + shuffleIdTracker, false) { + + override def newSerializerInstance(dep: ShuffleDependency[K, _, C]): SerializerInstance = { + new SerializerInstance { + override def serialize[T: ClassTag](t: T): ByteBuffer = + throw new UnsupportedOperationException( + "BlazeCelebornShuffleReader.newSerializerInstance") + + override def deserialize[T: ClassTag](bytes: ByteBuffer): T = + throw new UnsupportedOperationException( + "BlazeCelebornShuffleReader.newSerializerInstance") + + override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = + throw new UnsupportedOperationException( + "BlazeCelebornShuffleReader.newSerializerInstance") + + override def serializeStream(s: OutputStream): SerializationStream = + throw new UnsupportedOperationException( + "BlazeCelebornShuffleReader.newSerializerInstance") + + override def deserializeStream(s: InputStream): DeserializationStream = { + new DeserializationStream { + override def asKeyValueIterator: Iterator[(Any, Any)] = Iterator.single((null, s)) + + override def readObject[T: ClassTag](): T = + throw new UnsupportedOperationException() + + override def close(): Unit = s.close() + } + } } } } + } catch { + case _: NoSuchMethodError => + // for celeborn-054 + new CelebornShuffleReader[K, C]( + handle, + startPartition, + endPartition, + startMapIndex.getOrElse(0), + endMapIndex.getOrElse(Int.MaxValue), + context, + conf, + BlazeCelebornShuffleReader.createBypassingIncRecordsReadMetrics(metrics), + shuffleIdTracker) { + + override def newSerializerInstance(dep: ShuffleDependency[K, _, C]): SerializerInstance = { + new SerializerInstance { + override def serialize[T: ClassTag](t: T): ByteBuffer = + throw new UnsupportedOperationException( + "BlazeCelebornShuffleReader.newSerializerInstance") + + override def deserialize[T: ClassTag](bytes: ByteBuffer): T = + throw new UnsupportedOperationException( + "BlazeCelebornShuffleReader.newSerializerInstance") + + override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = + throw new UnsupportedOperationException( + "BlazeCelebornShuffleReader.newSerializerInstance") + + override def serializeStream(s: OutputStream): SerializationStream = + throw new UnsupportedOperationException( + "BlazeCelebornShuffleReader.newSerializerInstance") + + override def deserializeStream(s: InputStream): DeserializationStream = { + new DeserializationStream { + override def asKeyValueIterator: Iterator[(Any, Any)] = Iterator.single((null, s)) + + override def readObject[T: ClassTag](): T = + throw new UnsupportedOperationException() + + override def close(): Unit = s.close() + } + } + } + } + } } } - reader.read().map { kv => kv._2.asInstanceOf[CelebornInputStream] } + reader.read().map { kv => + val celebornInputStream = kv._2.asInstanceOf[CelebornInputStream] + + // force disable decompression because compression is skipped in shuffle writer + try { + FieldUtils.writeField( + celebornInputStream, + "shuffleCompressionEnabled", + Boolean.box(false).asInstanceOf[Object], + true) + } catch { + case _: IllegalAccessException => + // ignore if field not found (likely in celeborn-0.6.0) + } + celebornInputStream + } } }
