Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r203097734 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } +object ChunkedByteBuffer { + // TODO eliminate this method if we switch BlockManager to getting InputStreams + def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = { + data match { + case f: FileSegmentManagedBuffer => + map(f.getFile, maxChunkSize, f.getOffset, f.getLength) + case other => + new ChunkedByteBuffer(other.nioByteBuffer()) + } + } + + def map(file: File, maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = { + Utils.tryWithResource(new FileInputStream(file).getChannel()) { channel => + var remaining = length + var pos = offset + val chunks = new ListBuffer[ByteBuffer]() + while (remaining > 0) { + val chunkSize = math.min(remaining, maxChunkSize) + val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos, chunkSize) --- End diff -- I'm not sure I understand. What `FileRegion` are you referring to -- the only one I know of is netty's interface. Do you mean implement another `FileRegion` for each chunk?, and then have `ChunkedByteBufferFileRegion` delegate to that? We could do that, but I don't think it would be any better. `ChunkedByteBufferFileRegion.transferTo` would be about as complex as now. Also it may be worth noting that this particular method really should disappear -- we shouldn't be mapping this at all, we should be using an input stream (see the TODO above), but I want to do that separately.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org