Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21440#discussion_r203244832
  
    --- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
    @@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
     
     }
     
    +object ChunkedByteBuffer {
    +  // TODO eliminate this method if we switch BlockManager to getting 
InputStreams
    +  def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): 
ChunkedByteBuffer = {
    +    data match {
    +      case f: FileSegmentManagedBuffer =>
    +        map(f.getFile, maxChunkSize, f.getOffset, f.getLength)
    +      case other =>
    +        new ChunkedByteBuffer(other.nioByteBuffer())
    +    }
    +  }
    +
    +  def map(file: File, maxChunkSize: Int, offset: Long, length: Long): 
ChunkedByteBuffer = {
    +    Utils.tryWithResource(new FileInputStream(file).getChannel()) { 
channel =>
    +      var remaining = length
    +      var pos = offset
    +      val chunks = new ListBuffer[ByteBuffer]()
    +      while (remaining > 0) {
    +        val chunkSize = math.min(remaining, maxChunkSize)
    +        val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos, 
chunkSize)
    --- End diff --
    
    I think your concern is that when we are going to send data that is backed 
by a file, eg. a remote read of an RDD cached on disk, we should be able to 
send it using something more efficient than memory mapping the entire file.  Is 
that correct?
    
    That actually isn't a problem.  This `map()` method isn't called for 
sending disk-cached RDDs.  That is already handled correctly with 
`FileSegmentManagedBuffer.convertToNetty()`, which uses the `DefaultFileRegion` 
you had in mind.  The `map` method is only used on the receiving end, after the 
data has already been transferred, and just to pass the data on to other spark 
code locally in the executor.  (And that will avoid the `map()` entirely after 
the TODO above.)
    
    I needed to add `ChunkedByteBufferFileRegion` for data that is already in 
memory as a ChunkedByteBuffer, eg. for memory-cached RDDs. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to