Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r207968882
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
    @@ -404,6 +405,47 @@ private[spark] class BlockManager(
         putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), 
level)(classTag)
       }
     
    +  override def putBlockDataAsStream(
    +      blockId: BlockId,
    +      level: StorageLevel,
    +      classTag: ClassTag[_]): StreamCallbackWithID = {
    +    // TODO if we're going to only put the data in the disk store, we 
should just write it directly
    +    // to the final location, but that would require a deeper refactor of 
this code.  So instead
    +    // we just write to a temp file, and call putBytes on the data in that 
file.
    +    val tmpFile = diskBlockManager.createTempLocalBlock()._2
    +    new StreamCallbackWithID {
    +      val channel: WritableByteChannel = Channels.newChannel(new 
FileOutputStream(tmpFile))
    +
    +      override def getID: String = blockId.name
    +
    +      override def onData(streamId: String, buf: ByteBuffer): Unit = {
    +        while (buf.hasRemaining) {
    +          channel.write(buf)
    +        }
    +      }
    +
    +      override def onComplete(streamId: String): Unit = {
    +        // Read the contents of the downloaded file as a buffer to put 
into the blockManager.
    +        // Note this is all happening inside the netty thread as soon as 
it reads the end of the
    +        // stream.
    +        channel.close()
    +        // TODO Even if we're only going to write the data to disk after 
this, we end up using a lot
    +        // of memory here.  We wont' get a jvm OOM, but might get killed 
by the OS / cluster
    --- End diff --
    
    filed SPARK-25035


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to