[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r203914040 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -659,6 +659,11 @@ private[spark] class BlockManager( * Get block from remote block managers as serialized bytes. */ def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { +// TODO if we change this method to return the ManagedBuffer, then getRemoteValues +// could just use the inputStream on the temp file, rather than memory-mapping the file. +// Until then, replication can cause the process to use too much memory and get killed +// by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though +// we've read the data to disk. --- End diff -- I see. I agree with you that YARN could have some issues in calculating the exact memory usage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r203773818 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -659,6 +659,11 @@ private[spark] class BlockManager( * Get block from remote block managers as serialized bytes. */ def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { +// TODO if we change this method to return the ManagedBuffer, then getRemoteValues +// could just use the inputStream on the temp file, rather than memory-mapping the file. +// Until then, replication can cause the process to use too much memory and get killed +// by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though +// we've read the data to disk. --- End diff -- to be honest I don't have perfect understanding of this, but my impression is that it is not exactly _lazy_ loading, the OS has a lot of leeway in deciding how much to keep in memory, but that it should always release the memory under pressure. this is problematic under yarn, when the container's memory use is being monitored independently of the OS. so the OS thinks its fine to put large amounts of data in physical memory, but then the yarn NM looks at the memory use of the specific process tree, decides its over the limits it has configured, and so kills it. At least, I've seen cases of yarn killing things for exceeding memory limits where I *thought* that was the case, though I did not directly confirm it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r203581903 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -659,6 +659,11 @@ private[spark] class BlockManager( * Get block from remote block managers as serialized bytes. */ def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { +// TODO if we change this method to return the ManagedBuffer, then getRemoteValues +// could just use the inputStream on the temp file, rather than memory-mapping the file. +// Until then, replication can cause the process to use too much memory and get killed +// by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though +// we've read the data to disk. --- End diff -- > not a java OOM, since its a memory-mapped file I'm not sure why memory-mapped file will cause too much memory? AFAIK memory mapping is a lazy loading mechanism in page-wise, system will only load the to-be-accessed file segment to memory page, not the whole file to memory. So from my understanding even very small physical memory could map a super large file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r203472192 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -659,6 +659,11 @@ private[spark] class BlockManager( * Get block from remote block managers as serialized bytes. */ def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { +// TODO if we change this method to return the ManagedBuffer, then getRemoteValues +// could just use the inputStream on the temp file, rather than memory-mapping the file. +// Until then, replication can cause the process to use too much memory and get killed +// by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though +// we've read the data to disk. --- End diff -- Assuming this goes in shortly -- anybody interested in picking up this TODO? maybe @Ngone51 or @NiharS ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r203381863 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } +object ChunkedByteBuffer { + // TODO eliminate this method if we switch BlockManager to getting InputStreams + def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = { +data match { + case f: FileSegmentManagedBuffer => +map(f.getFile, maxChunkSize, f.getOffset, f.getLength) + case other => +new ChunkedByteBuffer(other.nioByteBuffer()) +} + } + + def map(file: File, maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = { +Utils.tryWithResource(new FileInputStream(file).getChannel()) { channel => --- End diff -- great, thanks for the explanation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r203273344 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } +object ChunkedByteBuffer { + // TODO eliminate this method if we switch BlockManager to getting InputStreams + def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = { +data match { + case f: FileSegmentManagedBuffer => +map(f.getFile, maxChunkSize, f.getOffset, f.getLength) + case other => +new ChunkedByteBuffer(other.nioByteBuffer()) +} + } + + def map(file: File, maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = { +Utils.tryWithResource(new FileInputStream(file).getChannel()) { channel => + var remaining = length + var pos = offset + val chunks = new ListBuffer[ByteBuffer]() + while (remaining > 0) { +val chunkSize = math.min(remaining, maxChunkSize) +val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos, chunkSize) --- End diff -- Perfect, thanks for clarifying ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r203251175 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging +import org.apache.spark.network.util.AbstractFileRegion + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +private val chunkedByteBuffer: ChunkedByteBuffer, +private val ioChunkSize: Int) extends AbstractFileRegion { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val size = chunks.foldLeft(0L) { _ + _.remaining() } + + protected def deallocate: Unit = {} + + override def count(): Long = size + + // this is the "start position" of the overall Data in the backing file, not our current position + override def position(): Long = 0 + + override def transferred(): Long = _transferred + + private var currentChunkIdx = 0 + + def transferTo(target: WritableByteChannel, position: Long): Long = { +assert(position == _transferred) +if (position == size) return 0L +var keepGoing = true +var written = 0L +var currentChunk = chunks(currentChunkIdx) +while (keepGoing) { + while (currentChunk.hasRemaining && keepGoing) { +val ioSize = Math.min(currentChunk.remaining(), ioChunkSize) +val originalLimit = currentChunk.limit() +currentChunk.limit(currentChunk.position() + ioSize) +val thisWriteSize = target.write(currentChunk) +currentChunk.limit(originalLimit) +written += thisWriteSize +if (thisWriteSize < ioSize) { --- End diff -- I see, thanks for explain. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r203250619 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } +object ChunkedByteBuffer { + // TODO eliminate this method if we switch BlockManager to getting InputStreams + def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = { +data match { + case f: FileSegmentManagedBuffer => +map(f.getFile, maxChunkSize, f.getOffset, f.getLength) + case other => +new ChunkedByteBuffer(other.nioByteBuffer()) +} + } + + def map(file: File, maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = { +Utils.tryWithResource(new FileInputStream(file).getChannel()) { channel => --- End diff -- I've already updated some of them in SPARK-21475 in shuffle related code path, but not all of them which are not so critical. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r20324 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging +import org.apache.spark.network.util.AbstractFileRegion + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +private val chunkedByteBuffer: ChunkedByteBuffer, +private val ioChunkSize: Int) extends AbstractFileRegion { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val size = chunks.foldLeft(0L) { _ + _.remaining() } + + protected def deallocate: Unit = {} + + override def count(): Long = size + + // this is the "start position" of the overall Data in the backing file, not our current position + override def position(): Long = 0 + + override def transferred(): Long = _transferred + + private var currentChunkIdx = 0 + + def transferTo(target: WritableByteChannel, position: Long): Long = { +assert(position == _transferred) +if (position == size) return 0L +var keepGoing = true +var written = 0L +var currentChunk = chunks(currentChunkIdx) +while (keepGoing) { + while (currentChunk.hasRemaining && keepGoing) { +val ioSize = Math.min(currentChunk.remaining(), ioChunkSize) +val originalLimit = currentChunk.limit() +currentChunk.limit(currentChunk.position() + ioSize) +val thisWriteSize = target.write(currentChunk) +currentChunk.limit(originalLimit) +written += thisWriteSize +if (thisWriteSize < ioSize) { --- End diff -- actually this is a totally normal condition, it just means the channel is not currently ready to accept anymore data. This is something netty expects, and it will make sure the rest of the data is put on the channel eventually (it'll get called the next time with the correct `position` argument indicating how far along it is). The added unit tests cover this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r203245221 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } +object ChunkedByteBuffer { + // TODO eliminate this method if we switch BlockManager to getting InputStreams + def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = { +data match { + case f: FileSegmentManagedBuffer => +map(f.getFile, maxChunkSize, f.getOffset, f.getLength) + case other => +new ChunkedByteBuffer(other.nioByteBuffer()) +} + } + + def map(file: File, maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = { +Utils.tryWithResource(new FileInputStream(file).getChannel()) { channel => --- End diff -- I wasn't aware of that issue, thanks for sharing that, I'll update this. Should we also update other uses? Seems there are a lot of other cases, eg. `UnsafeShuffleWriter`, `DiskBlockObjectWriter`, etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r203244832 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } +object ChunkedByteBuffer { + // TODO eliminate this method if we switch BlockManager to getting InputStreams + def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = { +data match { + case f: FileSegmentManagedBuffer => +map(f.getFile, maxChunkSize, f.getOffset, f.getLength) + case other => +new ChunkedByteBuffer(other.nioByteBuffer()) +} + } + + def map(file: File, maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = { +Utils.tryWithResource(new FileInputStream(file).getChannel()) { channel => + var remaining = length + var pos = offset + val chunks = new ListBuffer[ByteBuffer]() + while (remaining > 0) { +val chunkSize = math.min(remaining, maxChunkSize) +val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos, chunkSize) --- End diff -- I think your concern is that when we are going to send data that is backed by a file, eg. a remote read of an RDD cached on disk, we should be able to send it using something more efficient than memory mapping the entire file. Is that correct? That actually isn't a problem. This `map()` method isn't called for sending disk-cached RDDs. That is already handled correctly with `FileSegmentManagedBuffer.convertToNetty()`, which uses the `DefaultFileRegion` you had in mind. The `map` method is only used on the receiving end, after the data has already been transferred, and just to pass the data on to other spark code locally in the executor. (And that will avoid the `map()` entirely after the TODO above.) I needed to add `ChunkedByteBufferFileRegion` for data that is already in memory as a ChunkedByteBuffer, eg. for memory-cached RDDs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r203237484 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging +import org.apache.spark.network.util.AbstractFileRegion + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +private val chunkedByteBuffer: ChunkedByteBuffer, +private val ioChunkSize: Int) extends AbstractFileRegion { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val size = chunks.foldLeft(0L) { _ + _.remaining() } + + protected def deallocate: Unit = {} + + override def count(): Long = size + + // this is the "start position" of the overall Data in the backing file, not our current position + override def position(): Long = 0 + + override def transferred(): Long = _transferred + + private var currentChunkIdx = 0 + + def transferTo(target: WritableByteChannel, position: Long): Long = { +assert(position == _transferred) +if (position == size) return 0L +var keepGoing = true +var written = 0L +var currentChunk = chunks(currentChunkIdx) +while (keepGoing) { + while (currentChunk.hasRemaining && keepGoing) { +val ioSize = Math.min(currentChunk.remaining(), ioChunkSize) +val originalLimit = currentChunk.limit() +currentChunk.limit(currentChunk.position() + ioSize) +val thisWriteSize = target.write(currentChunk) +currentChunk.limit(originalLimit) +written += thisWriteSize +if (thisWriteSize < ioSize) { --- End diff -- What will be happened if `thisWriteSize` is smaller than `ioSize`, will Spark throw an exception or something else? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r203236014 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } +object ChunkedByteBuffer { + // TODO eliminate this method if we switch BlockManager to getting InputStreams + def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = { +data match { + case f: FileSegmentManagedBuffer => +map(f.getFile, maxChunkSize, f.getOffset, f.getLength) + case other => +new ChunkedByteBuffer(other.nioByteBuffer()) +} + } + + def map(file: File, maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = { +Utils.tryWithResource(new FileInputStream(file).getChannel()) { channel => --- End diff -- Can we please use `FileChannel#open` instead, FileInputStream/FileOutputStream has some issues (https://www.cloudbees.com/blog/fileinputstream-fileoutputstream-considered-harmful) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r203235292 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -17,17 +17,21 @@ package org.apache.spark.util.io -import java.io.InputStream +import java.io.{File, FileInputStream, InputStream} import java.nio.ByteBuffer -import java.nio.channels.WritableByteChannel +import java.nio.channels.{FileChannel, WritableByteChannel} + +import scala.collection.mutable.ListBuffer import com.google.common.primitives.UnsignedBytes -import io.netty.buffer.{ByteBuf, Unpooled} import org.apache.spark.SparkEnv import org.apache.spark.internal.config +import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.util.ByteArrayWritableChannel import org.apache.spark.storage.StorageUtils +import org.apache.spark.util.Utils + --- End diff -- nit. This blank line seems not necessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r203182507 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } +object ChunkedByteBuffer { + // TODO eliminate this method if we switch BlockManager to getting InputStreams + def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = { +data match { + case f: FileSegmentManagedBuffer => +map(f.getFile, maxChunkSize, f.getOffset, f.getLength) + case other => +new ChunkedByteBuffer(other.nioByteBuffer()) +} + } + + def map(file: File, maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = { +Utils.tryWithResource(new FileInputStream(file).getChannel()) { channel => + var remaining = length + var pos = offset + val chunks = new ListBuffer[ByteBuffer]() + while (remaining > 0) { +val chunkSize = math.min(remaining, maxChunkSize) +val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos, chunkSize) --- End diff -- I was thinking of `DefaultFileRegion` .. but any other zero copy impl should be fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r203097734 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } +object ChunkedByteBuffer { + // TODO eliminate this method if we switch BlockManager to getting InputStreams + def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = { +data match { + case f: FileSegmentManagedBuffer => +map(f.getFile, maxChunkSize, f.getOffset, f.getLength) + case other => +new ChunkedByteBuffer(other.nioByteBuffer()) +} + } + + def map(file: File, maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = { +Utils.tryWithResource(new FileInputStream(file).getChannel()) { channel => + var remaining = length + var pos = offset + val chunks = new ListBuffer[ByteBuffer]() + while (remaining > 0) { +val chunkSize = math.min(remaining, maxChunkSize) +val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos, chunkSize) --- End diff -- I'm not sure I understand. What `FileRegion` are you referring to -- the only one I know of is netty's interface. Do you mean implement another `FileRegion` for each chunk?, and then have `ChunkedByteBufferFileRegion` delegate to that? We could do that, but I don't think it would be any better. `ChunkedByteBufferFileRegion.transferTo` would be about as complex as now. Also it may be worth noting that this particular method really should disappear -- we shouldn't be mapping this at all, we should be using an input stream (see the TODO above), but I want to do that separately. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r202150730 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } +object ChunkedByteBuffer { + // TODO eliminate this method if we switch BlockManager to getting InputStreams + def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = { +data match { + case f: FileSegmentManagedBuffer => +map(f.getFile, maxChunkSize, f.getOffset, f.getLength) + case other => +new ChunkedByteBuffer(other.nioByteBuffer()) +} + } + + def map(file: File, maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = { +Utils.tryWithResource(new FileInputStream(file).getChannel()) { channel => + var remaining = length + var pos = offset + val chunks = new ListBuffer[ByteBuffer]() + while (remaining > 0) { +val chunkSize = math.min(remaining, maxChunkSize) +val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos, chunkSize) --- End diff -- Wondering if we could make these FileRegion's instead : and use `transferTo` instead of `write` in `ChunkedByteBufferFileRegion` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r202149408 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -723,7 +728,9 @@ private[spark] class BlockManager( } if (data != null) { -return Some(new ChunkedByteBuffer(data)) +val chunkSize = + conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", Int.MaxValue.toString).toInt --- End diff -- nit: Make `chunkSize` as a `private` field in `BlockManager` instead of recomputing it each time ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r199291252 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging +import org.apache.spark.network.util.AbstractFileRegion + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +private val chunkedByteBuffer: ChunkedByteBuffer, +private val ioChunkSize: Int) extends AbstractFileRegion { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val size = chunks.foldLeft(0) { _ + _.remaining() } --- End diff -- `0L`? Otherwise this will overflow for > 2G right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r198668278 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging +import org.apache.spark.network.util.AbstractFileRegion + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +private val chunkedByteBuffer: ChunkedByteBuffer, +private val ioChunkSize: Int) extends AbstractFileRegion { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val size = chunks.foldLeft(0) { _ + _.remaining()} --- End diff -- space before `}` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r198668704 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io + +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import scala.util.Random + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar +with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { +super.beforeEach() +val conf = new SparkConf() +val env = mock[SparkEnv] +SparkEnv.set(env) +when(env.conf).thenReturn(conf) + } + + override protected def afterEach(): Unit = { +SparkEnv.set(null) + } + + private def generateChunkedByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = { +val bytes = (0 until nChunks).map { chunkIdx => + val bb = ByteBuffer.allocate(perChunk) + (0 until perChunk).foreach { idx => +bb.put((chunkIdx * perChunk + idx).toByte) + } + bb.position(0) + bb +}.toArray +new ChunkedByteBuffer(bytes) + } + + test("transferTo can stop and resume correctly") { +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L) +val cbb = generateChunkedByteBuffer(4, 10) +val fileRegion = cbb.toNetty + +val targetChannel = new LimitedWritableByteChannel(40) + +var pos = 0L +// write the fileregion to the channel, but with the transfer limited at various spots along +// the way. + +// limit to within the first chunk +targetChannel.acceptNBytes = 5 +pos = fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 5) + +// a little bit further within the first chunk +targetChannel.acceptNBytes = 2 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 7) + +// past the first chunk, into the 2nd +targetChannel.acceptNBytes = 6 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 13) + +// right to the end of the 2nd chunk +targetChannel.acceptNBytes = 7 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 20) + +// rest of 2nd chunk, all of 3rd, some of 4th +targetChannel.acceptNBytes = 15 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 35) + +// now till the end +targetChannel.acceptNBytes = 5 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + +// calling again at the end should be OK +targetChannel.acceptNBytes = 20 +fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + } + + test(s"transfer to with random limits") { +val rng = new Random() +val seed = System.currentTimeMillis() +logInfo(s"seed = $seed") +rng.setSeed(seed) +val chunkSize = 1e4.toInt +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, rng.nextInt(chunkSize).toLong) + +val cbb = generateChunkedByteBuffer(50, chunkSize) +val fileRegion = cbb.toNetty +val transferLimit = 1e5.toInt +val targetChannel = new LimitedWritableByteChannel(transferLimit) +while (targetChannel.pos < cbb.size) { + val nextTransferSize = rng.nextInt(transferLimit) + targetChannel.acceptNBytes = nextTransferSize +
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r198668294 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging +import org.apache.spark.network.util.AbstractFileRegion + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, --- End diff -- 3 spaces --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r198655159 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +val chunkedByteBuffer: ChunkedByteBuffer, +val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion with Logging { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()} + private val size = cumLength.last + + protected def deallocate: Unit = {} + + override def count(): Long = size + + // this is the "start position" of the overall Data in the backing file, not our current position + override def position(): Long = 0 + + override def transferred(): Long = _transferred + + override def transfered(): Long = _transferred + + override def touch(): ChunkedByteBufferFileRegion = this + + override def touch(hint: Object): ChunkedByteBufferFileRegion = this + + override def retain(): FileRegion = { +super.retain() +this + } + + override def retain(increment: Int): FileRegion = { +super.retain(increment) +this + } + + private var currentChunkIdx = 0 + + def transferTo(target: WritableByteChannel, position: Long): Long = { +assert(position == _transferred) +if (position == size) return 0L +var keepGoing = true +var written = 0L +var currentChunk = chunks(currentChunkIdx) +while (keepGoing) { + while (currentChunk.hasRemaining && keepGoing) { +val ioSize = Math.min(currentChunk.remaining(), ioChunkSize) +val originalPos = currentChunk.position() --- End diff -- sorry bunch of leftover bits from earlier debugging. all cleaned up now --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r198651582 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -166,6 +170,38 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } +object ChunkedByteBuffer { + // TODO eliminate this method if we switch BlockManager to getting InputStreams + def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = { +data match { + case f: FileSegmentManagedBuffer => +map(f.getFile, maxChunkSize, f.getOffset, f.getLength) + case other => +new ChunkedByteBuffer(other.nioByteBuffer()) +} + } + + def map(file: File, maxChunkSize: Int): ChunkedByteBuffer = { --- End diff -- this version isn't used till the other PR. I can pull it out there the other version of `map` is used in this pr from `BlockManager.getRemoteBytes() -> ChunkedByteBuffer.fromManagedBuffer() -> ChunkedByteBuffer.map` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r198630017 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -166,6 +170,38 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } +object ChunkedByteBuffer { + // TODO eliminate this method if we switch BlockManager to getting InputStreams + def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = { +data match { + case f: FileSegmentManagedBuffer => +map(f.getFile, maxChunkSize, f.getOffset, f.getLength) + case other => +new ChunkedByteBuffer(other.nioByteBuffer()) +} + } + + def map(file: File, maxChunkSize: Int): ChunkedByteBuffer = { --- End diff -- Is this used anywhere? Couldn't find a reference. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r198641717 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +val chunkedByteBuffer: ChunkedByteBuffer, +val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion with Logging { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()} + private val size = cumLength.last + + protected def deallocate: Unit = {} + + override def count(): Long = size + + // this is the "start position" of the overall Data in the backing file, not our current position + override def position(): Long = 0 + + override def transferred(): Long = _transferred + + override def transfered(): Long = _transferred + + override def touch(): ChunkedByteBufferFileRegion = this + + override def touch(hint: Object): ChunkedByteBufferFileRegion = this + + override def retain(): FileRegion = { +super.retain() +this + } + + override def retain(increment: Int): FileRegion = { +super.retain(increment) +this + } + + private var currentChunkIdx = 0 + + def transferTo(target: WritableByteChannel, position: Long): Long = { +assert(position == _transferred) +if (position == size) return 0L +var keepGoing = true +var written = 0L +var currentChunk = chunks(currentChunkIdx) +while (keepGoing) { + while (currentChunk.hasRemaining && keepGoing) { +val ioSize = Math.min(currentChunk.remaining(), ioChunkSize) +val originalPos = currentChunk.position() --- End diff -- Unused. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r198632416 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +val chunkedByteBuffer: ChunkedByteBuffer, +val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion with Logging { --- End diff -- Extend `AbstractFileRegion`? Do the fields need to be public? You don't seem to need `Logging`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r198636259 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +val chunkedByteBuffer: ChunkedByteBuffer, +val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion with Logging { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()} --- End diff -- Use `foldLeft(0) { blah }` + avoid the intermediate `val`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191623277 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io + +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import scala.util.Random + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar +with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { +super.beforeEach() +val conf = new SparkConf() +val env = mock[SparkEnv] +SparkEnv.set(env) +when(env.conf).thenReturn(conf) + } + + override protected def afterEach(): Unit = { +SparkEnv.set(null) + } + + private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = { +val bytes = (0 until nChunks).map { chunkIdx => + val bb = ByteBuffer.allocate(perChunk) + (0 until perChunk).foreach { idx => +bb.put((chunkIdx * perChunk + idx).toByte) + } + bb.position(0) + bb +}.toArray +new ChunkedByteBuffer(bytes) + } + + test("transferTo can stop and resume correctly") { +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L) +val cbb = generateChunkByteBuffer(4, 10) +val fileRegion = cbb.toNetty + +val targetChannel = new LimitedWritableByteChannel(40) + +var pos = 0L +// write the fileregion to the channel, but with the transfer limited at various spots along +// the way. + +// limit to within the first chunk +targetChannel.acceptNBytes = 5 +pos = fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 5) + +// a little bit further within the first chunk +targetChannel.acceptNBytes = 2 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 7) + +// past the first chunk, into the 2nd +targetChannel.acceptNBytes = 6 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 13) + +// right to the end of the 2nd chunk +targetChannel.acceptNBytes = 7 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 20) + +// rest of 2nd chunk, all of 3rd, some of 4th +targetChannel.acceptNBytes = 15 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 35) + +// now till the end +targetChannel.acceptNBytes = 5 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + +// calling again at the end should be OK +targetChannel.acceptNBytes = 20 +fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + } + + test(s"transfer to with random limits") { +val rng = new Random() +val seed = System.currentTimeMillis() +logInfo(s"seed = $seed") +rng.setSeed(seed) +val chunkSize = 1e4.toInt +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, rng.nextInt(chunkSize).toLong) + +val cbb = generateChunkByteBuffer(50, chunkSize) +val fileRegion = cbb.toNetty +val transferLimit = 1e5.toInt +val targetChannel = new LimitedWritableByteChannel(transferLimit) +while (targetChannel.pos < cbb.size) { + val nextTransferSize = rng.nextInt(transferLimit) + targetChannel.acceptNBytes = nextTransferSize + file
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191476566 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -659,6 +659,11 @@ private[spark] class BlockManager( * Get block from remote block managers as serialized bytes. */ def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { +// TODO if we change this method to return the ManagedBuffer, then getRemoteValues +// could just use the inputStream on the temp file, rather than memory-mapping the file. +// Until then, replication can cause the process to use too much memory and get killed +// by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though +// we've read the data to disk. --- End diff -- btw this fix is such low-hanging fruit that I would try to do this immediately afterwards. (I haven't filed a jira yet just because there are already so many defunct jira related to this, I was going to wait till my changes got some traction). I think its OK to get it in like this first, as this makes the behavior for 2.01 gb basically the same as it was for 1.99 gb. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191472949 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io + +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import scala.util.Random + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar +with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { +super.beforeEach() +val conf = new SparkConf() +val env = mock[SparkEnv] +SparkEnv.set(env) +when(env.conf).thenReturn(conf) + } + + override protected def afterEach(): Unit = { +SparkEnv.set(null) + } + + private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = { +val bytes = (0 until nChunks).map { chunkIdx => + val bb = ByteBuffer.allocate(perChunk) + (0 until perChunk).foreach { idx => +bb.put((chunkIdx * perChunk + idx).toByte) + } + bb.position(0) + bb +}.toArray +new ChunkedByteBuffer(bytes) + } + + test("transferTo can stop and resume correctly") { +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L) +val cbb = generateChunkByteBuffer(4, 10) +val fileRegion = cbb.toNetty + +val targetChannel = new LimitedWritableByteChannel(40) + +var pos = 0L +// write the fileregion to the channel, but with the transfer limited at various spots along +// the way. + +// limit to within the first chunk +targetChannel.acceptNBytes = 5 +pos = fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 5) + +// a little bit further within the first chunk +targetChannel.acceptNBytes = 2 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 7) + +// past the first chunk, into the 2nd +targetChannel.acceptNBytes = 6 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 13) + +// right to the end of the 2nd chunk +targetChannel.acceptNBytes = 7 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 20) + +// rest of 2nd chunk, all of 3rd, some of 4th +targetChannel.acceptNBytes = 15 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 35) + +// now till the end +targetChannel.acceptNBytes = 5 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + +// calling again at the end should be OK +targetChannel.acceptNBytes = 20 +fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + } + + test(s"transfer to with random limits") { +val rng = new Random() +val seed = System.currentTimeMillis() +logInfo(s"seed = $seed") +rng.setSeed(seed) +val chunkSize = 1e4.toInt +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, rng.nextInt(chunkSize).toLong) + +val cbb = generateChunkByteBuffer(50, chunkSize) +val fileRegion = cbb.toNetty +val transferLimit = 1e5.toInt +val targetChannel = new LimitedWritableByteChannel(transferLimit) +while (targetChannel.pos < cbb.size) { + val nextTransferSize = rng.nextInt(transferLimit) + targetChannel.acceptNBytes = nextTransferSize + fileR
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191472540 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io + +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import scala.util.Random + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar +with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { +super.beforeEach() +val conf = new SparkConf() +val env = mock[SparkEnv] +SparkEnv.set(env) +when(env.conf).thenReturn(conf) + } + + override protected def afterEach(): Unit = { +SparkEnv.set(null) + } + + private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = { +val bytes = (0 until nChunks).map { chunkIdx => + val bb = ByteBuffer.allocate(perChunk) + (0 until perChunk).foreach { idx => +bb.put((chunkIdx * perChunk + idx).toByte) + } + bb.position(0) + bb +}.toArray +new ChunkedByteBuffer(bytes) + } + + test("transferTo can stop and resume correctly") { +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L) +val cbb = generateChunkByteBuffer(4, 10) +val fileRegion = cbb.toNetty + +val targetChannel = new LimitedWritableByteChannel(40) + +var pos = 0L +// write the fileregion to the channel, but with the transfer limited at various spots along +// the way. + +// limit to within the first chunk +targetChannel.acceptNBytes = 5 +pos = fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 5) + +// a little bit further within the first chunk +targetChannel.acceptNBytes = 2 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 7) + +// past the first chunk, into the 2nd +targetChannel.acceptNBytes = 6 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 13) + +// right to the end of the 2nd chunk +targetChannel.acceptNBytes = 7 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 20) + +// rest of 2nd chunk, all of 3rd, some of 4th +targetChannel.acceptNBytes = 15 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 35) + +// now till the end +targetChannel.acceptNBytes = 5 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + +// calling again at the end should be OK +targetChannel.acceptNBytes = 20 +fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + } + + test(s"transfer to with random limits") { +val rng = new Random() +val seed = System.currentTimeMillis() +logInfo(s"seed = $seed") +rng.setSeed(seed) +val chunkSize = 1e4.toInt +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, rng.nextInt(chunkSize).toLong) + +val cbb = generateChunkByteBuffer(50, chunkSize) +val fileRegion = cbb.toNetty +val transferLimit = 1e5.toInt +val targetChannel = new LimitedWritableByteChannel(transferLimit) +while (targetChannel.pos < cbb.size) { + val nextTransferSize = rng.nextInt(transferLimit) + targetChannel.acceptNBytes = nextTransferSize + fileR
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191471289 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +val chunkedByteBuffer: ChunkedByteBuffer, +val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion with Logging { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()} + private val size = cumLength.last + // Chunk size in bytes + + protected def deallocate: Unit = {} + + override def count(): Long = chunkedByteBuffer.size --- End diff -- no difference, `count()` is just to satisfy an interface. My mistake for having them look different, I'll make them the same --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191178697 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io + +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import scala.util.Random + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar +with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { +super.beforeEach() +val conf = new SparkConf() +val env = mock[SparkEnv] +SparkEnv.set(env) +when(env.conf).thenReturn(conf) + } + + override protected def afterEach(): Unit = { +SparkEnv.set(null) + } + + private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = { +val bytes = (0 until nChunks).map { chunkIdx => + val bb = ByteBuffer.allocate(perChunk) + (0 until perChunk).foreach { idx => +bb.put((chunkIdx * perChunk + idx).toByte) + } + bb.position(0) + bb +}.toArray +new ChunkedByteBuffer(bytes) + } + + test("transferTo can stop and resume correctly") { +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L) +val cbb = generateChunkByteBuffer(4, 10) +val fileRegion = cbb.toNetty + +val targetChannel = new LimitedWritableByteChannel(40) + +var pos = 0L +// write the fileregion to the channel, but with the transfer limited at various spots along +// the way. + +// limit to within the first chunk +targetChannel.acceptNBytes = 5 +pos = fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 5) + +// a little bit further within the first chunk +targetChannel.acceptNBytes = 2 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 7) + +// past the first chunk, into the 2nd +targetChannel.acceptNBytes = 6 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 13) + +// right to the end of the 2nd chunk +targetChannel.acceptNBytes = 7 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 20) + +// rest of 2nd chunk, all of 3rd, some of 4th +targetChannel.acceptNBytes = 15 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 35) + +// now till the end +targetChannel.acceptNBytes = 5 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + +// calling again at the end should be OK +targetChannel.acceptNBytes = 20 +fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + } + + test(s"transfer to with random limits") { +val rng = new Random() +val seed = System.currentTimeMillis() +logInfo(s"seed = $seed") +rng.setSeed(seed) +val chunkSize = 1e4.toInt +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, rng.nextInt(chunkSize).toLong) + +val cbb = generateChunkByteBuffer(50, chunkSize) +val fileRegion = cbb.toNetty +val transferLimit = 1e5.toInt +val targetChannel = new LimitedWritableByteChannel(transferLimit) +while (targetChannel.pos < cbb.size) { + val nextTransferSize = rng.nextInt(transferLimit) + targetChannel.acceptNBytes = nextTransferSize + file
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191182696 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io + +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import scala.util.Random + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar +with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { +super.beforeEach() +val conf = new SparkConf() +val env = mock[SparkEnv] +SparkEnv.set(env) +when(env.conf).thenReturn(conf) + } + + override protected def afterEach(): Unit = { +SparkEnv.set(null) + } + + private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = { +val bytes = (0 until nChunks).map { chunkIdx => + val bb = ByteBuffer.allocate(perChunk) + (0 until perChunk).foreach { idx => +bb.put((chunkIdx * perChunk + idx).toByte) + } + bb.position(0) + bb +}.toArray +new ChunkedByteBuffer(bytes) + } + + test("transferTo can stop and resume correctly") { +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L) +val cbb = generateChunkByteBuffer(4, 10) +val fileRegion = cbb.toNetty + +val targetChannel = new LimitedWritableByteChannel(40) + +var pos = 0L +// write the fileregion to the channel, but with the transfer limited at various spots along +// the way. + +// limit to within the first chunk +targetChannel.acceptNBytes = 5 +pos = fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 5) + +// a little bit further within the first chunk +targetChannel.acceptNBytes = 2 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 7) + +// past the first chunk, into the 2nd +targetChannel.acceptNBytes = 6 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 13) + +// right to the end of the 2nd chunk +targetChannel.acceptNBytes = 7 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 20) + +// rest of 2nd chunk, all of 3rd, some of 4th +targetChannel.acceptNBytes = 15 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 35) + +// now till the end +targetChannel.acceptNBytes = 5 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + +// calling again at the end should be OK +targetChannel.acceptNBytes = 20 +fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + } + + test(s"transfer to with random limits") { +val rng = new Random() +val seed = System.currentTimeMillis() +logInfo(s"seed = $seed") +rng.setSeed(seed) +val chunkSize = 1e4.toInt +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, rng.nextInt(chunkSize).toLong) + +val cbb = generateChunkByteBuffer(50, chunkSize) +val fileRegion = cbb.toNetty +val transferLimit = 1e5.toInt +val targetChannel = new LimitedWritableByteChannel(transferLimit) +while (targetChannel.pos < cbb.size) { + val nextTransferSize = rng.nextInt(transferLimit) + targetChannel.acceptNBytes = nextTransferSize + file
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191176828 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io + +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import scala.util.Random + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar +with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { +super.beforeEach() +val conf = new SparkConf() +val env = mock[SparkEnv] +SparkEnv.set(env) +when(env.conf).thenReturn(conf) + } + + override protected def afterEach(): Unit = { +SparkEnv.set(null) + } + + private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = { --- End diff -- nit: generateChunk**ed**ByteBuffer --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191175242 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +val chunkedByteBuffer: ChunkedByteBuffer, +val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion with Logging { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()} + private val size = cumLength.last + // Chunk size in bytes + + protected def deallocate: Unit = {} + + override def count(): Long = chunkedByteBuffer.size + + // this is the "start position" of the overall Data in the backing file, not our current position + override def position(): Long = 0 + + override def transferred(): Long = _transferred + + override def transfered(): Long = _transferred + + override def touch(): ChunkedByteBufferFileRegion = this + + override def touch(hint: Object): ChunkedByteBufferFileRegion = this + + override def retain(): FileRegion = { +super.retain() +this + } + + override def retain(increment: Int): FileRegion = { +super.retain(increment) +this + } + + private var currentChunkIdx = 0 + + def transferTo(target: WritableByteChannel, position: Long): Long = { +assert(position == _transferred) +if (position == size) return 0L +var keepGoing = true +var written = 0L +var currentChunk = chunks(currentChunkIdx) +var originalLimit = currentChunk.limit() --- End diff -- Seems it is unused. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191175890 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io + +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import scala.util.Random + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar +with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { +super.beforeEach() +val conf = new SparkConf() +val env = mock[SparkEnv] +SparkEnv.set(env) +when(env.conf).thenReturn(conf) + } + + override protected def afterEach(): Unit = { +SparkEnv.set(null) + } + + private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = { +val bytes = (0 until nChunks).map { chunkIdx => + val bb = ByteBuffer.allocate(perChunk) + (0 until perChunk).foreach { idx => +bb.put((chunkIdx * perChunk + idx).toByte) + } + bb.position(0) + bb +}.toArray +new ChunkedByteBuffer(bytes) + } + + test("transferTo can stop and resume correctly") { +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L) +val cbb = generateChunkByteBuffer(4, 10) +val fileRegion = cbb.toNetty + +val targetChannel = new LimitedWritableByteChannel(40) + +var pos = 0L +// write the fileregion to the channel, but with the transfer limited at various spots along +// the way. + +// limit to within the first chunk +targetChannel.acceptNBytes = 5 +pos = fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 5) + +// a little bit further within the first chunk +targetChannel.acceptNBytes = 2 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 7) + +// past the first chunk, into the 2nd +targetChannel.acceptNBytes = 6 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 13) + +// right to the end of the 2nd chunk +targetChannel.acceptNBytes = 7 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 20) + +// rest of 2nd chunk, all of 3rd, some of 4th +targetChannel.acceptNBytes = 15 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 35) + +// now till the end +targetChannel.acceptNBytes = 5 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + +// calling again at the end should be OK +targetChannel.acceptNBytes = 20 +fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + } + + test(s"transfer to with random limits") { +val rng = new Random() +val seed = System.currentTimeMillis() +logInfo(s"seed = $seed") +rng.setSeed(seed) +val chunkSize = 1e4.toInt +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, rng.nextInt(chunkSize).toLong) + +val cbb = generateChunkByteBuffer(50, chunkSize) +val fileRegion = cbb.toNetty +val transferLimit = 1e5.toInt +val targetChannel = new LimitedWritableByteChannel(transferLimit) +while (targetChannel.pos < cbb.size) { + val nextTransferSize = rng.nextInt(transferLimit) + targetChannel.acceptNBytes = nextTransferSize + file
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191117686 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +val chunkedByteBuffer: ChunkedByteBuffer, +val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion with Logging { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()} + private val size = cumLength.last + // Chunk size in bytes + + protected def deallocate: Unit = {} + + override def count(): Long = chunkedByteBuffer.size --- End diff -- What's the difference between `size` and `count`? Should `count` indicates the rest data's size can be transfered ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191175960 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io + +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import scala.util.Random + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar +with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { +super.beforeEach() +val conf = new SparkConf() +val env = mock[SparkEnv] +SparkEnv.set(env) +when(env.conf).thenReturn(conf) + } + + override protected def afterEach(): Unit = { +SparkEnv.set(null) + } + + private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = { +val bytes = (0 until nChunks).map { chunkIdx => + val bb = ByteBuffer.allocate(perChunk) + (0 until perChunk).foreach { idx => +bb.put((chunkIdx * perChunk + idx).toByte) + } + bb.position(0) + bb +}.toArray +new ChunkedByteBuffer(bytes) + } + + test("transferTo can stop and resume correctly") { +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L) +val cbb = generateChunkByteBuffer(4, 10) +val fileRegion = cbb.toNetty + +val targetChannel = new LimitedWritableByteChannel(40) + +var pos = 0L +// write the fileregion to the channel, but with the transfer limited at various spots along +// the way. + +// limit to within the first chunk +targetChannel.acceptNBytes = 5 +pos = fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 5) + +// a little bit further within the first chunk +targetChannel.acceptNBytes = 2 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 7) + +// past the first chunk, into the 2nd +targetChannel.acceptNBytes = 6 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 13) + +// right to the end of the 2nd chunk +targetChannel.acceptNBytes = 7 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 20) + +// rest of 2nd chunk, all of 3rd, some of 4th +targetChannel.acceptNBytes = 15 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 35) + +// now till the end +targetChannel.acceptNBytes = 5 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + +// calling again at the end should be OK +targetChannel.acceptNBytes = 20 +fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + } + + test(s"transfer to with random limits") { +val rng = new Random() +val seed = System.currentTimeMillis() +logInfo(s"seed = $seed") +rng.setSeed(seed) +val chunkSize = 1e4.toInt +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, rng.nextInt(chunkSize).toLong) + +val cbb = generateChunkByteBuffer(50, chunkSize) +val fileRegion = cbb.toNetty +val transferLimit = 1e5.toInt +val targetChannel = new LimitedWritableByteChannel(transferLimit) +while (targetChannel.pos < cbb.size) { + val nextTransferSize = rng.nextInt(transferLimit) + targetChannel.acceptNBytes = nextTransferSize + file
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191104760 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +val chunkedByteBuffer: ChunkedByteBuffer, +val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion with Logging { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()} + private val size = cumLength.last + // Chunk size in bytes --- End diff -- Should this comment be moved above last line ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191063339 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -659,6 +659,11 @@ private[spark] class BlockManager( * Get block from remote block managers as serialized bytes. */ def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { +// TODO if we change this method to return the ManagedBuffer, then getRemoteValues +// could just use the inputStream on the temp file, rather than memory-mapping the file. +// Until then, replication can go cause the process to use too much memory and get killed --- End diff -- grammar --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/21440 [SPARK-24307][CORE] Support reading remote cached partitions > 2gb (1) Netty's ByteBuf cannot support data > 2gb. So to transfer data from a ChunkedByteBuffer over the network, we use a custom version of FileRegion which is backed by the ChunkedByteBuffer. (2) On the receiving end, we need to expose all the data in a FileSegmentManagedBuffer as a ChunkedByteBuffer. We do that by memory mapping the entire file in chunks. Added unit tests. Ran the randomized test a couple of hundred times on my laptop. Tests cover the equivalent of SPARK-24107 for the ChunkedByteBufferFileRegion. Also tested on a cluster with remote cache reads >2gb (in memory and on disk). You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark chunked_bb_file_region Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21440.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21440 commit 4373e27c2ec96b77a2311f5c5997ae5ca84bf6c5 Author: Imran Rashid Date: 2018-05-23T03:59:40Z [SPARK-24307][CORE] Support reading remote cached partitions > 2gb (1) Netty's ByteBuf cannot support data > 2gb. So to transfer data from a ChunkedByteBuffer over the network, we use a custom version of FileRegion which is backed by the ChunkedByteBuffer. (2) On the receiving end, we need to expose all the data in a FileSegmentManagedBuffer as a ChunkedByteBuffer. We do that by memory mapping the entire file in chunks. Added unit tests. Also tested on a cluster with remote cache reads > 2gb (in memory and on disk). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org