This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 27150c87c4 [VL] Celeborn shuffle reader OOM with many empty input
stream (#9221)
27150c87c4 is described below
commit 27150c87c447ef6e23a70c07a496429122954dde
Author: Rong Ma <[email protected]>
AuthorDate: Fri Apr 4 09:19:31 2025 +0100
[VL] Celeborn shuffle reader OOM with many empty input stream (#9221)
---
.../VeloxCelebornColumnarBatchSerializer.scala | 30 ++++++++++++++--------
1 file changed, 20 insertions(+), 10 deletions(-)
diff --git
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
index dc314ba44a..fbf1c67303 100644
---
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
+++
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
@@ -16,8 +16,9 @@
*/
package org.apache.spark.shuffle
-import org.apache.gluten.config.ReservedKeys.{GLUTEN_RSS_SORT_SHUFFLE_WRITER,
GLUTEN_SORT_SHUFFLE_WRITER}
import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.ReservedKeys.{GLUTEN_RSS_SORT_SHUFFLE_WRITER,
GLUTEN_SORT_SHUFFLE_WRITER}
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.runtime.Runtimes
import org.apache.gluten.utils.ArrowAbiUtil
@@ -37,7 +38,6 @@ import org.apache.spark.task.{TaskResource, TaskResources}
import org.apache.arrow.c.ArrowSchema
import org.apache.arrow.memory.BufferAllocator
import org.apache.celeborn.client.read.CelebornInputStream
-import org.apache.gluten.config.GlutenConfig
import java.io._
import java.nio.ByteBuffer
@@ -120,12 +120,8 @@ private class CelebornColumnarBatchSerializerInstance(
private class TaskDeserializationStream(in: InputStream)
extends DeserializationStream
with TaskResource {
- private val byteIn: JniByteInputStream = JniByteInputStreams.create(in)
- private val wrappedOut: ColumnarBatchOutIterator = new
ColumnarBatchOutIterator(
- runtime,
- ShuffleReaderJniWrapper
- .create(runtime)
- .readStream(shuffleReaderHandle, byteIn))
+ private var byteIn: JniByteInputStream = _
+ private var wrappedOut: ColumnarBatchOutIterator = _
private var cb: ColumnarBatch = _
@@ -191,6 +187,7 @@ private class CelebornColumnarBatchSerializerInstance(
@throws(classOf[EOFException])
override def readValue[T: ClassTag](): T = {
+ initStream();
if (cb != null) {
cb.close()
cb = null
@@ -245,13 +242,26 @@ private class CelebornColumnarBatchSerializerInstance(
readBatchNumRows.set(numRowsTotal.toDouble / numBatchesTotal)
}
numOutputRows += numRowsTotal
- wrappedOut.close()
- byteIn.close()
+ if (byteIn != null) {
+ wrappedOut.close()
+ byteIn.close()
+ }
if (cb != null) {
cb.close()
}
}
+ private def initStream(): Unit = {
+ if (byteIn == null) {
+ byteIn = JniByteInputStreams.create(in)
+ wrappedOut = new ColumnarBatchOutIterator(
+ runtime,
+ ShuffleReaderJniWrapper
+ .create(runtime)
+ .readStream(shuffleReaderHandle, byteIn))
+ }
+ }
+
override def resourceName(): String = getClass.getName
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]