[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-07-19 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r203914040
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -659,6 +659,11 @@ private[spark] class BlockManager(
* Get block from remote block managers as serialized bytes.
*/
   def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
+// TODO if we change this method to return the ManagedBuffer, then 
getRemoteValues
+// could just use the inputStream on the temp file, rather than 
memory-mapping the file.
+// Until then, replication can cause the process to use too much 
memory and get killed
+// by the OS / cluster manager (not a java OOM, since its a 
memory-mapped file) even though
+// we've read the data to disk.
--- End diff --

I see. I agree with you that YARN could have some issues in calculating the 
exact memory usage.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-07-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r203773818
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -659,6 +659,11 @@ private[spark] class BlockManager(
* Get block from remote block managers as serialized bytes.
*/
   def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
+// TODO if we change this method to return the ManagedBuffer, then 
getRemoteValues
+// could just use the inputStream on the temp file, rather than 
memory-mapping the file.
+// Until then, replication can cause the process to use too much 
memory and get killed
+// by the OS / cluster manager (not a java OOM, since its a 
memory-mapped file) even though
+// we've read the data to disk.
--- End diff --

to be honest I don't have perfect understanding of this, but my impression 
is that it is not exactly _lazy_ loading, the OS has a lot of leeway in 
deciding how much to keep in memory, but that it should always release the 
memory under pressure.  this is problematic under yarn, when the container's 
memory use is being monitored independently of the OS.  so the OS thinks its 
fine to put large amounts of data in physical memory, but then the yarn NM 
looks at the memory use of the specific process tree, decides its over the 
limits it has configured, and so kills it.

At least, I've seen cases of yarn killing things for exceeding memory 
limits where I *thought* that was the case, though I did not directly confirm 
it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-07-18 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r203581903
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -659,6 +659,11 @@ private[spark] class BlockManager(
* Get block from remote block managers as serialized bytes.
*/
   def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
+// TODO if we change this method to return the ManagedBuffer, then 
getRemoteValues
+// could just use the inputStream on the temp file, rather than 
memory-mapping the file.
+// Until then, replication can cause the process to use too much 
memory and get killed
+// by the OS / cluster manager (not a java OOM, since its a 
memory-mapped file) even though
+// we've read the data to disk.
--- End diff --

> not a java OOM, since its a memory-mapped file

I'm not sure why memory-mapped file will cause too much memory? AFAIK 
memory mapping is a lazy loading mechanism in page-wise, system will only load 
the to-be-accessed file segment to memory page, not the whole file to memory. 
So from my understanding even very small physical memory could map a super 
large file. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-07-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r203472192
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -659,6 +659,11 @@ private[spark] class BlockManager(
* Get block from remote block managers as serialized bytes.
*/
   def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
+// TODO if we change this method to return the ManagedBuffer, then 
getRemoteValues
+// could just use the inputStream on the temp file, rather than 
memory-mapping the file.
+// Until then, replication can cause the process to use too much 
memory and get killed
+// by the OS / cluster manager (not a java OOM, since its a 
memory-mapped file) even though
+// we've read the data to disk.
--- End diff --

Assuming this goes in shortly -- anybody interested in picking up this 
TODO?  maybe @Ngone51 or @NiharS ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-07-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r203381863
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
@@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
 
 }
 
+object ChunkedByteBuffer {
+  // TODO eliminate this method if we switch BlockManager to getting 
InputStreams
+  def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): 
ChunkedByteBuffer = {
+data match {
+  case f: FileSegmentManagedBuffer =>
+map(f.getFile, maxChunkSize, f.getOffset, f.getLength)
+  case other =>
+new ChunkedByteBuffer(other.nioByteBuffer())
+}
+  }
+
+  def map(file: File, maxChunkSize: Int, offset: Long, length: Long): 
ChunkedByteBuffer = {
+Utils.tryWithResource(new FileInputStream(file).getChannel()) { 
channel =>
--- End diff --

great, thanks for the explanation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-07-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r203273344
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
@@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
 
 }
 
+object ChunkedByteBuffer {
+  // TODO eliminate this method if we switch BlockManager to getting 
InputStreams
+  def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): 
ChunkedByteBuffer = {
+data match {
+  case f: FileSegmentManagedBuffer =>
+map(f.getFile, maxChunkSize, f.getOffset, f.getLength)
+  case other =>
+new ChunkedByteBuffer(other.nioByteBuffer())
+}
+  }
+
+  def map(file: File, maxChunkSize: Int, offset: Long, length: Long): 
ChunkedByteBuffer = {
+Utils.tryWithResource(new FileInputStream(file).getChannel()) { 
channel =>
+  var remaining = length
+  var pos = offset
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, maxChunkSize)
+val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos, 
chunkSize)
--- End diff --

Perfect, thanks for clarifying !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-07-17 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r203251175
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala 
---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util.io
+
+import java.nio.channels.WritableByteChannel
+
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.AbstractFileRegion
+
+
+/**
+ * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow 
sending > 2gb in one netty
+ * message.  This is because netty cannot send a ByteBuf > 2g, but it can 
send a large FileRegion,
+ * even though the data is not backed by a file.
+ */
+private[io] class ChunkedByteBufferFileRegion(
+private val chunkedByteBuffer: ChunkedByteBuffer,
+private val ioChunkSize: Int) extends AbstractFileRegion {
+
+  private var _transferred: Long = 0
+  // this duplicates the original chunks, so we're free to modify the 
position, limit, etc.
+  private val chunks = chunkedByteBuffer.getChunks()
+  private val size = chunks.foldLeft(0L) { _ + _.remaining() }
+
+  protected def deallocate: Unit = {}
+
+  override def count(): Long = size
+
+  // this is the "start position" of the overall Data in the backing file, 
not our current position
+  override def position(): Long = 0
+
+  override def transferred(): Long = _transferred
+
+  private var currentChunkIdx = 0
+
+  def transferTo(target: WritableByteChannel, position: Long): Long = {
+assert(position == _transferred)
+if (position == size) return 0L
+var keepGoing = true
+var written = 0L
+var currentChunk = chunks(currentChunkIdx)
+while (keepGoing) {
+  while (currentChunk.hasRemaining && keepGoing) {
+val ioSize = Math.min(currentChunk.remaining(), ioChunkSize)
+val originalLimit = currentChunk.limit()
+currentChunk.limit(currentChunk.position() + ioSize)
+val thisWriteSize = target.write(currentChunk)
+currentChunk.limit(originalLimit)
+written += thisWriteSize
+if (thisWriteSize < ioSize) {
--- End diff --

I see, thanks for explain.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-07-17 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r203250619
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
@@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
 
 }
 
+object ChunkedByteBuffer {
+  // TODO eliminate this method if we switch BlockManager to getting 
InputStreams
+  def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): 
ChunkedByteBuffer = {
+data match {
+  case f: FileSegmentManagedBuffer =>
+map(f.getFile, maxChunkSize, f.getOffset, f.getLength)
+  case other =>
+new ChunkedByteBuffer(other.nioByteBuffer())
+}
+  }
+
+  def map(file: File, maxChunkSize: Int, offset: Long, length: Long): 
ChunkedByteBuffer = {
+Utils.tryWithResource(new FileInputStream(file).getChannel()) { 
channel =>
--- End diff --

I've already updated some of them in SPARK-21475 in shuffle related code 
path, but not all of them which are not so critical.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-07-17 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r20324
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala 
---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util.io
+
+import java.nio.channels.WritableByteChannel
+
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.AbstractFileRegion
+
+
+/**
+ * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow 
sending > 2gb in one netty
+ * message.  This is because netty cannot send a ByteBuf > 2g, but it can 
send a large FileRegion,
+ * even though the data is not backed by a file.
+ */
+private[io] class ChunkedByteBufferFileRegion(
+private val chunkedByteBuffer: ChunkedByteBuffer,
+private val ioChunkSize: Int) extends AbstractFileRegion {
+
+  private var _transferred: Long = 0
+  // this duplicates the original chunks, so we're free to modify the 
position, limit, etc.
+  private val chunks = chunkedByteBuffer.getChunks()
+  private val size = chunks.foldLeft(0L) { _ + _.remaining() }
+
+  protected def deallocate: Unit = {}
+
+  override def count(): Long = size
+
+  // this is the "start position" of the overall Data in the backing file, 
not our current position
+  override def position(): Long = 0
+
+  override def transferred(): Long = _transferred
+
+  private var currentChunkIdx = 0
+
+  def transferTo(target: WritableByteChannel, position: Long): Long = {
+assert(position == _transferred)
+if (position == size) return 0L
+var keepGoing = true
+var written = 0L
+var currentChunk = chunks(currentChunkIdx)
+while (keepGoing) {
+  while (currentChunk.hasRemaining && keepGoing) {
+val ioSize = Math.min(currentChunk.remaining(), ioChunkSize)
+val originalLimit = currentChunk.limit()
+currentChunk.limit(currentChunk.position() + ioSize)
+val thisWriteSize = target.write(currentChunk)
+currentChunk.limit(originalLimit)
+written += thisWriteSize
+if (thisWriteSize < ioSize) {
--- End diff --

actually this is a totally normal condition, it just means the channel is 
not currently ready to accept anymore data.  This is something netty expects, 
and it will make sure the rest of the data is put on the channel eventually 
(it'll get called the next time with the correct `position` argument indicating 
how far along it is).

The added unit tests cover this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-07-17 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r203245221
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
@@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
 
 }
 
+object ChunkedByteBuffer {
+  // TODO eliminate this method if we switch BlockManager to getting 
InputStreams
+  def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): 
ChunkedByteBuffer = {
+data match {
+  case f: FileSegmentManagedBuffer =>
+map(f.getFile, maxChunkSize, f.getOffset, f.getLength)
+  case other =>
+new ChunkedByteBuffer(other.nioByteBuffer())
+}
+  }
+
+  def map(file: File, maxChunkSize: Int, offset: Long, length: Long): 
ChunkedByteBuffer = {
+Utils.tryWithResource(new FileInputStream(file).getChannel()) { 
channel =>
--- End diff --

I wasn't aware of that issue, thanks for sharing that, I'll update this.  
Should we also update other uses?  Seems there are a lot of other cases, eg. 
`UnsafeShuffleWriter`, `DiskBlockObjectWriter`, etc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-07-17 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r203244832
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
@@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
 
 }
 
+object ChunkedByteBuffer {
+  // TODO eliminate this method if we switch BlockManager to getting 
InputStreams
+  def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): 
ChunkedByteBuffer = {
+data match {
+  case f: FileSegmentManagedBuffer =>
+map(f.getFile, maxChunkSize, f.getOffset, f.getLength)
+  case other =>
+new ChunkedByteBuffer(other.nioByteBuffer())
+}
+  }
+
+  def map(file: File, maxChunkSize: Int, offset: Long, length: Long): 
ChunkedByteBuffer = {
+Utils.tryWithResource(new FileInputStream(file).getChannel()) { 
channel =>
+  var remaining = length
+  var pos = offset
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, maxChunkSize)
+val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos, 
chunkSize)
--- End diff --

I think your concern is that when we are going to send data that is backed 
by a file, eg. a remote read of an RDD cached on disk, we should be able to 
send it using something more efficient than memory mapping the entire file.  Is 
that correct?

That actually isn't a problem.  This `map()` method isn't called for 
sending disk-cached RDDs.  That is already handled correctly with 
`FileSegmentManagedBuffer.convertToNetty()`, which uses the `DefaultFileRegion` 
you had in mind.  The `map` method is only used on the receiving end, after the 
data has already been transferred, and just to pass the data on to other spark 
code locally in the executor.  (And that will avoid the `map()` entirely after 
the TODO above.)

I needed to add `ChunkedByteBufferFileRegion` for data that is already in 
memory as a ChunkedByteBuffer, eg. for memory-cached RDDs. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-07-17 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r203237484
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala 
---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util.io
+
+import java.nio.channels.WritableByteChannel
+
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.AbstractFileRegion
+
+
+/**
+ * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow 
sending > 2gb in one netty
+ * message.  This is because netty cannot send a ByteBuf > 2g, but it can 
send a large FileRegion,
+ * even though the data is not backed by a file.
+ */
+private[io] class ChunkedByteBufferFileRegion(
+private val chunkedByteBuffer: ChunkedByteBuffer,
+private val ioChunkSize: Int) extends AbstractFileRegion {
+
+  private var _transferred: Long = 0
+  // this duplicates the original chunks, so we're free to modify the 
position, limit, etc.
+  private val chunks = chunkedByteBuffer.getChunks()
+  private val size = chunks.foldLeft(0L) { _ + _.remaining() }
+
+  protected def deallocate: Unit = {}
+
+  override def count(): Long = size
+
+  // this is the "start position" of the overall Data in the backing file, 
not our current position
+  override def position(): Long = 0
+
+  override def transferred(): Long = _transferred
+
+  private var currentChunkIdx = 0
+
+  def transferTo(target: WritableByteChannel, position: Long): Long = {
+assert(position == _transferred)
+if (position == size) return 0L
+var keepGoing = true
+var written = 0L
+var currentChunk = chunks(currentChunkIdx)
+while (keepGoing) {
+  while (currentChunk.hasRemaining && keepGoing) {
+val ioSize = Math.min(currentChunk.remaining(), ioChunkSize)
+val originalLimit = currentChunk.limit()
+currentChunk.limit(currentChunk.position() + ioSize)
+val thisWriteSize = target.write(currentChunk)
+currentChunk.limit(originalLimit)
+written += thisWriteSize
+if (thisWriteSize < ioSize) {
--- End diff --

What will be happened if `thisWriteSize` is smaller than `ioSize`, will 
Spark throw an exception or something else?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-07-17 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r203236014
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
@@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
 
 }
 
+object ChunkedByteBuffer {
+  // TODO eliminate this method if we switch BlockManager to getting 
InputStreams
+  def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): 
ChunkedByteBuffer = {
+data match {
+  case f: FileSegmentManagedBuffer =>
+map(f.getFile, maxChunkSize, f.getOffset, f.getLength)
+  case other =>
+new ChunkedByteBuffer(other.nioByteBuffer())
+}
+  }
+
+  def map(file: File, maxChunkSize: Int, offset: Long, length: Long): 
ChunkedByteBuffer = {
+Utils.tryWithResource(new FileInputStream(file).getChannel()) { 
channel =>
--- End diff --

Can we please use `FileChannel#open` instead, 
FileInputStream/FileOutputStream has some issues 
(https://www.cloudbees.com/blog/fileinputstream-fileoutputstream-considered-harmful)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-07-17 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r203235292
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
@@ -17,17 +17,21 @@
 
 package org.apache.spark.util.io
 
-import java.io.InputStream
+import java.io.{File, FileInputStream, InputStream}
 import java.nio.ByteBuffer
-import java.nio.channels.WritableByteChannel
+import java.nio.channels.{FileChannel, WritableByteChannel}
+
+import scala.collection.mutable.ListBuffer
 
 import com.google.common.primitives.UnsignedBytes
-import io.netty.buffer.{ByteBuf, Unpooled}
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.config
+import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
 import org.apache.spark.network.util.ByteArrayWritableChannel
 import org.apache.spark.storage.StorageUtils
+import org.apache.spark.util.Utils
+
--- End diff --

nit. This blank line seems not necessary.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-07-17 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r203182507
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
@@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
 
 }
 
+object ChunkedByteBuffer {
+  // TODO eliminate this method if we switch BlockManager to getting 
InputStreams
+  def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): 
ChunkedByteBuffer = {
+data match {
+  case f: FileSegmentManagedBuffer =>
+map(f.getFile, maxChunkSize, f.getOffset, f.getLength)
+  case other =>
+new ChunkedByteBuffer(other.nioByteBuffer())
+}
+  }
+
+  def map(file: File, maxChunkSize: Int, offset: Long, length: Long): 
ChunkedByteBuffer = {
+Utils.tryWithResource(new FileInputStream(file).getChannel()) { 
channel =>
+  var remaining = length
+  var pos = offset
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, maxChunkSize)
+val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos, 
chunkSize)
--- End diff --

I was thinking of `DefaultFileRegion` .. but any other zero copy impl 
should be fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-07-17 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r203097734
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
@@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
 
 }
 
+object ChunkedByteBuffer {
+  // TODO eliminate this method if we switch BlockManager to getting 
InputStreams
+  def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): 
ChunkedByteBuffer = {
+data match {
+  case f: FileSegmentManagedBuffer =>
+map(f.getFile, maxChunkSize, f.getOffset, f.getLength)
+  case other =>
+new ChunkedByteBuffer(other.nioByteBuffer())
+}
+  }
+
+  def map(file: File, maxChunkSize: Int, offset: Long, length: Long): 
ChunkedByteBuffer = {
+Utils.tryWithResource(new FileInputStream(file).getChannel()) { 
channel =>
+  var remaining = length
+  var pos = offset
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, maxChunkSize)
+val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos, 
chunkSize)
--- End diff --

I'm not sure I understand.  What `FileRegion` are you referring to -- the 
only one I know of is netty's interface.  Do you mean implement another 
`FileRegion` for each chunk?, and then have `ChunkedByteBufferFileRegion` 
delegate to that?

We could do that, but I don't think it would be any better.  
`ChunkedByteBufferFileRegion.transferTo` would be about as complex as now.  
Also it may be worth noting that this particular method really should disappear 
-- we shouldn't be mapping this at all, we should be using an input stream (see 
the TODO above), but I want to do that separately.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-07-12 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r202150730
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
@@ -166,6 +170,34 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
 
 }
 
+object ChunkedByteBuffer {
+  // TODO eliminate this method if we switch BlockManager to getting 
InputStreams
+  def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): 
ChunkedByteBuffer = {
+data match {
+  case f: FileSegmentManagedBuffer =>
+map(f.getFile, maxChunkSize, f.getOffset, f.getLength)
+  case other =>
+new ChunkedByteBuffer(other.nioByteBuffer())
+}
+  }
+
+  def map(file: File, maxChunkSize: Int, offset: Long, length: Long): 
ChunkedByteBuffer = {
+Utils.tryWithResource(new FileInputStream(file).getChannel()) { 
channel =>
+  var remaining = length
+  var pos = offset
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, maxChunkSize)
+val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos, 
chunkSize)
--- End diff --

Wondering if we could make these FileRegion's instead : and use 
`transferTo` instead of `write` in `ChunkedByteBufferFileRegion` ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-07-12 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r202149408
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -723,7 +728,9 @@ private[spark] class BlockManager(
   }
 
   if (data != null) {
-return Some(new ChunkedByteBuffer(data))
+val chunkSize =
+  conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", 
Int.MaxValue.toString).toInt
--- End diff --

nit: Make `chunkSize` as a `private` field in `BlockManager` instead of 
recomputing it each time ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-06-29 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r199291252
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala 
---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util.io
+
+import java.nio.channels.WritableByteChannel
+
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.AbstractFileRegion
+
+
+/**
+ * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow 
sending > 2gb in one netty
+ * message.  This is because netty cannot send a ByteBuf > 2g, but it can 
send a large FileRegion,
+ * even though the data is not backed by a file.
+ */
+private[io] class ChunkedByteBufferFileRegion(
+private val chunkedByteBuffer: ChunkedByteBuffer,
+private val ioChunkSize: Int) extends AbstractFileRegion {
+
+  private var _transferred: Long = 0
+  // this duplicates the original chunks, so we're free to modify the 
position, limit, etc.
+  private val chunks = chunkedByteBuffer.getChunks()
+  private val size = chunks.foldLeft(0) { _ + _.remaining() }
--- End diff --

`0L`? Otherwise this will overflow for > 2G right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-06-27 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r198668278
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala 
---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util.io
+
+import java.nio.channels.WritableByteChannel
+
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.AbstractFileRegion
+
+
+/**
+ * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow 
sending > 2gb in one netty
+ * message.   This is because netty cannot send a ByteBuf > 2g, but it can 
send a large FileRegion,
+ * even though the data is not backed by a file.
+ */
+private[io] class ChunkedByteBufferFileRegion(
+private val chunkedByteBuffer: ChunkedByteBuffer,
+private val ioChunkSize: Int) extends AbstractFileRegion {
+
+  private var _transferred: Long = 0
+  // this duplicates the original chunks, so we're free to modify the 
position, limit, etc.
+  private val chunks = chunkedByteBuffer.getChunks()
+  private val size = chunks.foldLeft(0) { _ + _.remaining()}
--- End diff --

space before `}`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-06-27 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r198668704
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import scala.util.Random
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
+with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+val conf = new SparkConf()
+val env = mock[SparkEnv]
+SparkEnv.set(env)
+when(env.conf).thenReturn(conf)
+  }
+
+  override protected def afterEach(): Unit = {
+SparkEnv.set(null)
+  }
+
+  private def generateChunkedByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
+val bytes = (0 until nChunks).map { chunkIdx =>
+  val bb = ByteBuffer.allocate(perChunk)
+  (0 until perChunk).foreach { idx =>
+bb.put((chunkIdx * perChunk + idx).toByte)
+  }
+  bb.position(0)
+  bb
+}.toArray
+new ChunkedByteBuffer(bytes)
+  }
+
+  test("transferTo can stop and resume correctly") {
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
+val cbb = generateChunkedByteBuffer(4, 10)
+val fileRegion = cbb.toNetty
+
+val targetChannel = new LimitedWritableByteChannel(40)
+
+var pos = 0L
+// write the fileregion to the channel, but with the transfer limited 
at various spots along
+// the way.
+
+// limit to within the first chunk
+targetChannel.acceptNBytes = 5
+pos = fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 5)
+
+// a little bit further within the first chunk
+targetChannel.acceptNBytes = 2
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 7)
+
+// past the first chunk, into the 2nd
+targetChannel.acceptNBytes = 6
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 13)
+
+// right to the end of the 2nd chunk
+targetChannel.acceptNBytes = 7
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 20)
+
+// rest of 2nd chunk, all of 3rd, some of 4th
+targetChannel.acceptNBytes = 15
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 35)
+
+// now till the end
+targetChannel.acceptNBytes = 5
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+
+// calling again at the end should be OK
+targetChannel.acceptNBytes = 20
+fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+  }
+
+  test(s"transfer to with random limits") {
+val rng = new Random()
+val seed = System.currentTimeMillis()
+logInfo(s"seed = $seed")
+rng.setSeed(seed)
+val chunkSize = 1e4.toInt
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 
rng.nextInt(chunkSize).toLong)
+
+val cbb = generateChunkedByteBuffer(50, chunkSize)
+val fileRegion = cbb.toNetty
+val transferLimit = 1e5.toInt
+val targetChannel = new LimitedWritableByteChannel(transferLimit)
+while (targetChannel.pos < cbb.size) {
+  val nextTransferSize = rng.nextInt(transferLimit)
+  targetChannel.acceptNBytes = nextTransferSize
+ 

[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-06-27 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r198668294
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala 
---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util.io
+
+import java.nio.channels.WritableByteChannel
+
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.AbstractFileRegion
+
+
+/**
+ * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow 
sending > 2gb in one netty
+ * message.   This is because netty cannot send a ByteBuf > 2g, but it can 
send a large FileRegion,
--- End diff --

3 spaces


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-06-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r198655159
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala 
---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util.io
+
+import java.nio.channels.WritableByteChannel
+
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.internal.Logging
+
+
+/**
+ * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow 
sending > 2gb in one netty
+ * message.   This is because netty cannot send a ByteBuf > 2g, but it can 
send a large FileRegion,
+ * even though the data is not backed by a file.
+ */
+private[io] class ChunkedByteBufferFileRegion(
+val chunkedByteBuffer: ChunkedByteBuffer,
+val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion 
with Logging {
+
+  private var _transferred: Long = 0
+  // this duplicates the original chunks, so we're free to modify the 
position, limit, etc.
+  private val chunks = chunkedByteBuffer.getChunks()
+  private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()}
+  private val size = cumLength.last
+
+  protected def deallocate: Unit = {}
+
+  override def count(): Long = size
+
+  // this is the "start position" of the overall Data in the backing file, 
not our current position
+  override def position(): Long = 0
+
+  override def transferred(): Long = _transferred
+
+  override def transfered(): Long = _transferred
+
+  override def touch(): ChunkedByteBufferFileRegion = this
+
+  override def touch(hint: Object): ChunkedByteBufferFileRegion = this
+
+  override def retain(): FileRegion = {
+super.retain()
+this
+  }
+
+  override def retain(increment: Int): FileRegion = {
+super.retain(increment)
+this
+  }
+
+  private var currentChunkIdx = 0
+
+  def transferTo(target: WritableByteChannel, position: Long): Long = {
+assert(position == _transferred)
+if (position == size) return 0L
+var keepGoing = true
+var written = 0L
+var currentChunk = chunks(currentChunkIdx)
+while (keepGoing) {
+  while (currentChunk.hasRemaining && keepGoing) {
+val ioSize = Math.min(currentChunk.remaining(), ioChunkSize)
+val originalPos = currentChunk.position()
--- End diff --

sorry bunch of leftover bits from earlier debugging.  all cleaned up now


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-06-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r198651582
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
@@ -166,6 +170,38 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
 
 }
 
+object ChunkedByteBuffer {
+  // TODO eliminate this method if we switch BlockManager to getting 
InputStreams
+  def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): 
ChunkedByteBuffer = {
+data match {
+  case f: FileSegmentManagedBuffer =>
+map(f.getFile, maxChunkSize, f.getOffset, f.getLength)
+  case other =>
+new ChunkedByteBuffer(other.nioByteBuffer())
+}
+  }
+
+  def map(file: File, maxChunkSize: Int): ChunkedByteBuffer = {
--- End diff --

this version isn't used till the other PR. I can pull it out there

the other version of `map` is used in this pr from 
`BlockManager.getRemoteBytes() -> ChunkedByteBuffer.fromManagedBuffer() -> 
ChunkedByteBuffer.map`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-06-27 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r198630017
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
@@ -166,6 +170,38 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
 
 }
 
+object ChunkedByteBuffer {
+  // TODO eliminate this method if we switch BlockManager to getting 
InputStreams
+  def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): 
ChunkedByteBuffer = {
+data match {
+  case f: FileSegmentManagedBuffer =>
+map(f.getFile, maxChunkSize, f.getOffset, f.getLength)
+  case other =>
+new ChunkedByteBuffer(other.nioByteBuffer())
+}
+  }
+
+  def map(file: File, maxChunkSize: Int): ChunkedByteBuffer = {
--- End diff --

Is this used anywhere? Couldn't find a reference.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-06-27 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r198641717
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala 
---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util.io
+
+import java.nio.channels.WritableByteChannel
+
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.internal.Logging
+
+
+/**
+ * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow 
sending > 2gb in one netty
+ * message.   This is because netty cannot send a ByteBuf > 2g, but it can 
send a large FileRegion,
+ * even though the data is not backed by a file.
+ */
+private[io] class ChunkedByteBufferFileRegion(
+val chunkedByteBuffer: ChunkedByteBuffer,
+val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion 
with Logging {
+
+  private var _transferred: Long = 0
+  // this duplicates the original chunks, so we're free to modify the 
position, limit, etc.
+  private val chunks = chunkedByteBuffer.getChunks()
+  private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()}
+  private val size = cumLength.last
+
+  protected def deallocate: Unit = {}
+
+  override def count(): Long = size
+
+  // this is the "start position" of the overall Data in the backing file, 
not our current position
+  override def position(): Long = 0
+
+  override def transferred(): Long = _transferred
+
+  override def transfered(): Long = _transferred
+
+  override def touch(): ChunkedByteBufferFileRegion = this
+
+  override def touch(hint: Object): ChunkedByteBufferFileRegion = this
+
+  override def retain(): FileRegion = {
+super.retain()
+this
+  }
+
+  override def retain(increment: Int): FileRegion = {
+super.retain(increment)
+this
+  }
+
+  private var currentChunkIdx = 0
+
+  def transferTo(target: WritableByteChannel, position: Long): Long = {
+assert(position == _transferred)
+if (position == size) return 0L
+var keepGoing = true
+var written = 0L
+var currentChunk = chunks(currentChunkIdx)
+while (keepGoing) {
+  while (currentChunk.hasRemaining && keepGoing) {
+val ioSize = Math.min(currentChunk.remaining(), ioChunkSize)
+val originalPos = currentChunk.position()
--- End diff --

Unused.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-06-27 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r198632416
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala 
---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util.io
+
+import java.nio.channels.WritableByteChannel
+
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.internal.Logging
+
+
+/**
+ * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow 
sending > 2gb in one netty
+ * message.   This is because netty cannot send a ByteBuf > 2g, but it can 
send a large FileRegion,
+ * even though the data is not backed by a file.
+ */
+private[io] class ChunkedByteBufferFileRegion(
+val chunkedByteBuffer: ChunkedByteBuffer,
+val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion 
with Logging {
--- End diff --

Extend `AbstractFileRegion`?

Do the fields need to be public?

You don't seem to need `Logging`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-06-27 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r198636259
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala 
---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util.io
+
+import java.nio.channels.WritableByteChannel
+
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.internal.Logging
+
+
+/**
+ * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow 
sending > 2gb in one netty
+ * message.   This is because netty cannot send a ByteBuf > 2g, but it can 
send a large FileRegion,
+ * even though the data is not backed by a file.
+ */
+private[io] class ChunkedByteBufferFileRegion(
+val chunkedByteBuffer: ChunkedByteBuffer,
+val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion 
with Logging {
+
+  private var _transferred: Long = 0
+  // this duplicates the original chunks, so we're free to modify the 
position, limit, etc.
+  private val chunks = chunkedByteBuffer.getChunks()
+  private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()}
--- End diff --

Use `foldLeft(0) { blah }` + avoid the intermediate `val`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-29 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191623277
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import scala.util.Random
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
+with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+val conf = new SparkConf()
+val env = mock[SparkEnv]
+SparkEnv.set(env)
+when(env.conf).thenReturn(conf)
+  }
+
+  override protected def afterEach(): Unit = {
+SparkEnv.set(null)
+  }
+
+  private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
+val bytes = (0 until nChunks).map { chunkIdx =>
+  val bb = ByteBuffer.allocate(perChunk)
+  (0 until perChunk).foreach { idx =>
+bb.put((chunkIdx * perChunk + idx).toByte)
+  }
+  bb.position(0)
+  bb
+}.toArray
+new ChunkedByteBuffer(bytes)
+  }
+
+  test("transferTo can stop and resume correctly") {
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
+val cbb = generateChunkByteBuffer(4, 10)
+val fileRegion = cbb.toNetty
+
+val targetChannel = new LimitedWritableByteChannel(40)
+
+var pos = 0L
+// write the fileregion to the channel, but with the transfer limited 
at various spots along
+// the way.
+
+// limit to within the first chunk
+targetChannel.acceptNBytes = 5
+pos = fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 5)
+
+// a little bit further within the first chunk
+targetChannel.acceptNBytes = 2
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 7)
+
+// past the first chunk, into the 2nd
+targetChannel.acceptNBytes = 6
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 13)
+
+// right to the end of the 2nd chunk
+targetChannel.acceptNBytes = 7
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 20)
+
+// rest of 2nd chunk, all of 3rd, some of 4th
+targetChannel.acceptNBytes = 15
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 35)
+
+// now till the end
+targetChannel.acceptNBytes = 5
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+
+// calling again at the end should be OK
+targetChannel.acceptNBytes = 20
+fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+  }
+
+  test(s"transfer to with random limits") {
+val rng = new Random()
+val seed = System.currentTimeMillis()
+logInfo(s"seed = $seed")
+rng.setSeed(seed)
+val chunkSize = 1e4.toInt
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 
rng.nextInt(chunkSize).toLong)
+
+val cbb = generateChunkByteBuffer(50, chunkSize)
+val fileRegion = cbb.toNetty
+val transferLimit = 1e5.toInt
+val targetChannel = new LimitedWritableByteChannel(transferLimit)
+while (targetChannel.pos < cbb.size) {
+  val nextTransferSize = rng.nextInt(transferLimit)
+  targetChannel.acceptNBytes = nextTransferSize
+  file

[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191476566
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -659,6 +659,11 @@ private[spark] class BlockManager(
* Get block from remote block managers as serialized bytes.
*/
   def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
+// TODO if we change this method to return the ManagedBuffer, then 
getRemoteValues
+// could just use the inputStream on the temp file, rather than 
memory-mapping the file.
+// Until then, replication can cause the process to use too much 
memory and get killed
+// by the OS / cluster manager (not a java OOM, since its a 
memory-mapped file) even though
+// we've read the data to disk.
--- End diff --

btw this fix is such low-hanging fruit that I would try to do this 
immediately afterwards.  (I haven't filed a jira yet just because there are 
already so many defunct jira related to this, I was going to wait till my 
changes got some traction).

I think its OK to get it in like this first, as this makes the behavior for 
2.01 gb basically the same as it was for 1.99 gb.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191472949
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import scala.util.Random
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
+with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+val conf = new SparkConf()
+val env = mock[SparkEnv]
+SparkEnv.set(env)
+when(env.conf).thenReturn(conf)
+  }
+
+  override protected def afterEach(): Unit = {
+SparkEnv.set(null)
+  }
+
+  private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
+val bytes = (0 until nChunks).map { chunkIdx =>
+  val bb = ByteBuffer.allocate(perChunk)
+  (0 until perChunk).foreach { idx =>
+bb.put((chunkIdx * perChunk + idx).toByte)
+  }
+  bb.position(0)
+  bb
+}.toArray
+new ChunkedByteBuffer(bytes)
+  }
+
+  test("transferTo can stop and resume correctly") {
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
+val cbb = generateChunkByteBuffer(4, 10)
+val fileRegion = cbb.toNetty
+
+val targetChannel = new LimitedWritableByteChannel(40)
+
+var pos = 0L
+// write the fileregion to the channel, but with the transfer limited 
at various spots along
+// the way.
+
+// limit to within the first chunk
+targetChannel.acceptNBytes = 5
+pos = fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 5)
+
+// a little bit further within the first chunk
+targetChannel.acceptNBytes = 2
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 7)
+
+// past the first chunk, into the 2nd
+targetChannel.acceptNBytes = 6
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 13)
+
+// right to the end of the 2nd chunk
+targetChannel.acceptNBytes = 7
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 20)
+
+// rest of 2nd chunk, all of 3rd, some of 4th
+targetChannel.acceptNBytes = 15
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 35)
+
+// now till the end
+targetChannel.acceptNBytes = 5
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+
+// calling again at the end should be OK
+targetChannel.acceptNBytes = 20
+fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+  }
+
+  test(s"transfer to with random limits") {
+val rng = new Random()
+val seed = System.currentTimeMillis()
+logInfo(s"seed = $seed")
+rng.setSeed(seed)
+val chunkSize = 1e4.toInt
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 
rng.nextInt(chunkSize).toLong)
+
+val cbb = generateChunkByteBuffer(50, chunkSize)
+val fileRegion = cbb.toNetty
+val transferLimit = 1e5.toInt
+val targetChannel = new LimitedWritableByteChannel(transferLimit)
+while (targetChannel.pos < cbb.size) {
+  val nextTransferSize = rng.nextInt(transferLimit)
+  targetChannel.acceptNBytes = nextTransferSize
+  fileR

[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191472540
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import scala.util.Random
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
+with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+val conf = new SparkConf()
+val env = mock[SparkEnv]
+SparkEnv.set(env)
+when(env.conf).thenReturn(conf)
+  }
+
+  override protected def afterEach(): Unit = {
+SparkEnv.set(null)
+  }
+
+  private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
+val bytes = (0 until nChunks).map { chunkIdx =>
+  val bb = ByteBuffer.allocate(perChunk)
+  (0 until perChunk).foreach { idx =>
+bb.put((chunkIdx * perChunk + idx).toByte)
+  }
+  bb.position(0)
+  bb
+}.toArray
+new ChunkedByteBuffer(bytes)
+  }
+
+  test("transferTo can stop and resume correctly") {
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
+val cbb = generateChunkByteBuffer(4, 10)
+val fileRegion = cbb.toNetty
+
+val targetChannel = new LimitedWritableByteChannel(40)
+
+var pos = 0L
+// write the fileregion to the channel, but with the transfer limited 
at various spots along
+// the way.
+
+// limit to within the first chunk
+targetChannel.acceptNBytes = 5
+pos = fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 5)
+
+// a little bit further within the first chunk
+targetChannel.acceptNBytes = 2
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 7)
+
+// past the first chunk, into the 2nd
+targetChannel.acceptNBytes = 6
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 13)
+
+// right to the end of the 2nd chunk
+targetChannel.acceptNBytes = 7
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 20)
+
+// rest of 2nd chunk, all of 3rd, some of 4th
+targetChannel.acceptNBytes = 15
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 35)
+
+// now till the end
+targetChannel.acceptNBytes = 5
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+
+// calling again at the end should be OK
+targetChannel.acceptNBytes = 20
+fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+  }
+
+  test(s"transfer to with random limits") {
+val rng = new Random()
+val seed = System.currentTimeMillis()
+logInfo(s"seed = $seed")
+rng.setSeed(seed)
+val chunkSize = 1e4.toInt
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 
rng.nextInt(chunkSize).toLong)
+
+val cbb = generateChunkByteBuffer(50, chunkSize)
+val fileRegion = cbb.toNetty
+val transferLimit = 1e5.toInt
+val targetChannel = new LimitedWritableByteChannel(transferLimit)
+while (targetChannel.pos < cbb.size) {
+  val nextTransferSize = rng.nextInt(transferLimit)
+  targetChannel.acceptNBytes = nextTransferSize
+  fileR

[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191471289
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util.io
+
+import java.nio.channels.WritableByteChannel
+
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.internal.Logging
+
+
+/**
+ * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow 
sending > 2gb in one netty
+ * message.   This is because netty cannot send a ByteBuf > 2g, but it can 
send a large FileRegion,
+ * even though the data is not backed by a file.
+ */
+private[io] class ChunkedByteBufferFileRegion(
+val chunkedByteBuffer: ChunkedByteBuffer,
+val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion 
with Logging {
+
+  private var _transferred: Long = 0
+  // this duplicates the original chunks, so we're free to modify the 
position, limit, etc.
+  private val chunks = chunkedByteBuffer.getChunks()
+  private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()}
+  private val size = cumLength.last
+  // Chunk size in bytes
+
+  protected def deallocate: Unit = {}
+
+  override def count(): Long = chunkedByteBuffer.size
--- End diff --

no difference, `count()` is just to satisfy an interface.  My mistake for 
having them look different, I'll make them the same


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-28 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191178697
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import scala.util.Random
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
+with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+val conf = new SparkConf()
+val env = mock[SparkEnv]
+SparkEnv.set(env)
+when(env.conf).thenReturn(conf)
+  }
+
+  override protected def afterEach(): Unit = {
+SparkEnv.set(null)
+  }
+
+  private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
+val bytes = (0 until nChunks).map { chunkIdx =>
+  val bb = ByteBuffer.allocate(perChunk)
+  (0 until perChunk).foreach { idx =>
+bb.put((chunkIdx * perChunk + idx).toByte)
+  }
+  bb.position(0)
+  bb
+}.toArray
+new ChunkedByteBuffer(bytes)
+  }
+
+  test("transferTo can stop and resume correctly") {
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
+val cbb = generateChunkByteBuffer(4, 10)
+val fileRegion = cbb.toNetty
+
+val targetChannel = new LimitedWritableByteChannel(40)
+
+var pos = 0L
+// write the fileregion to the channel, but with the transfer limited 
at various spots along
+// the way.
+
+// limit to within the first chunk
+targetChannel.acceptNBytes = 5
+pos = fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 5)
+
+// a little bit further within the first chunk
+targetChannel.acceptNBytes = 2
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 7)
+
+// past the first chunk, into the 2nd
+targetChannel.acceptNBytes = 6
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 13)
+
+// right to the end of the 2nd chunk
+targetChannel.acceptNBytes = 7
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 20)
+
+// rest of 2nd chunk, all of 3rd, some of 4th
+targetChannel.acceptNBytes = 15
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 35)
+
+// now till the end
+targetChannel.acceptNBytes = 5
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+
+// calling again at the end should be OK
+targetChannel.acceptNBytes = 20
+fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+  }
+
+  test(s"transfer to with random limits") {
+val rng = new Random()
+val seed = System.currentTimeMillis()
+logInfo(s"seed = $seed")
+rng.setSeed(seed)
+val chunkSize = 1e4.toInt
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 
rng.nextInt(chunkSize).toLong)
+
+val cbb = generateChunkByteBuffer(50, chunkSize)
+val fileRegion = cbb.toNetty
+val transferLimit = 1e5.toInt
+val targetChannel = new LimitedWritableByteChannel(transferLimit)
+while (targetChannel.pos < cbb.size) {
+  val nextTransferSize = rng.nextInt(transferLimit)
+  targetChannel.acceptNBytes = nextTransferSize
+  file

[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-28 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191182696
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import scala.util.Random
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
+with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+val conf = new SparkConf()
+val env = mock[SparkEnv]
+SparkEnv.set(env)
+when(env.conf).thenReturn(conf)
+  }
+
+  override protected def afterEach(): Unit = {
+SparkEnv.set(null)
+  }
+
+  private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
+val bytes = (0 until nChunks).map { chunkIdx =>
+  val bb = ByteBuffer.allocate(perChunk)
+  (0 until perChunk).foreach { idx =>
+bb.put((chunkIdx * perChunk + idx).toByte)
+  }
+  bb.position(0)
+  bb
+}.toArray
+new ChunkedByteBuffer(bytes)
+  }
+
+  test("transferTo can stop and resume correctly") {
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
+val cbb = generateChunkByteBuffer(4, 10)
+val fileRegion = cbb.toNetty
+
+val targetChannel = new LimitedWritableByteChannel(40)
+
+var pos = 0L
+// write the fileregion to the channel, but with the transfer limited 
at various spots along
+// the way.
+
+// limit to within the first chunk
+targetChannel.acceptNBytes = 5
+pos = fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 5)
+
+// a little bit further within the first chunk
+targetChannel.acceptNBytes = 2
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 7)
+
+// past the first chunk, into the 2nd
+targetChannel.acceptNBytes = 6
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 13)
+
+// right to the end of the 2nd chunk
+targetChannel.acceptNBytes = 7
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 20)
+
+// rest of 2nd chunk, all of 3rd, some of 4th
+targetChannel.acceptNBytes = 15
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 35)
+
+// now till the end
+targetChannel.acceptNBytes = 5
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+
+// calling again at the end should be OK
+targetChannel.acceptNBytes = 20
+fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+  }
+
+  test(s"transfer to with random limits") {
+val rng = new Random()
+val seed = System.currentTimeMillis()
+logInfo(s"seed = $seed")
+rng.setSeed(seed)
+val chunkSize = 1e4.toInt
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 
rng.nextInt(chunkSize).toLong)
+
+val cbb = generateChunkByteBuffer(50, chunkSize)
+val fileRegion = cbb.toNetty
+val transferLimit = 1e5.toInt
+val targetChannel = new LimitedWritableByteChannel(transferLimit)
+while (targetChannel.pos < cbb.size) {
+  val nextTransferSize = rng.nextInt(transferLimit)
+  targetChannel.acceptNBytes = nextTransferSize
+  file

[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-28 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191176828
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import scala.util.Random
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
+with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+val conf = new SparkConf()
+val env = mock[SparkEnv]
+SparkEnv.set(env)
+when(env.conf).thenReturn(conf)
+  }
+
+  override protected def afterEach(): Unit = {
+SparkEnv.set(null)
+  }
+
+  private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
--- End diff --

nit: generateChunk**ed**ByteBuffer


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-28 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191175242
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util.io
+
+import java.nio.channels.WritableByteChannel
+
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.internal.Logging
+
+
+/**
+ * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow 
sending > 2gb in one netty
+ * message.   This is because netty cannot send a ByteBuf > 2g, but it can 
send a large FileRegion,
+ * even though the data is not backed by a file.
+ */
+private[io] class ChunkedByteBufferFileRegion(
+val chunkedByteBuffer: ChunkedByteBuffer,
+val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion 
with Logging {
+
+  private var _transferred: Long = 0
+  // this duplicates the original chunks, so we're free to modify the 
position, limit, etc.
+  private val chunks = chunkedByteBuffer.getChunks()
+  private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()}
+  private val size = cumLength.last
+  // Chunk size in bytes
+
+  protected def deallocate: Unit = {}
+
+  override def count(): Long = chunkedByteBuffer.size
+
+  // this is the "start position" of the overall Data in the backing file, 
not our current position
+  override def position(): Long = 0
+
+  override def transferred(): Long = _transferred
+
+  override def transfered(): Long = _transferred
+
+  override def touch(): ChunkedByteBufferFileRegion = this
+
+  override def touch(hint: Object): ChunkedByteBufferFileRegion = this
+
+  override def retain(): FileRegion = {
+super.retain()
+this
+  }
+
+  override def retain(increment: Int): FileRegion = {
+super.retain(increment)
+this
+  }
+
+  private var currentChunkIdx = 0
+
+  def transferTo(target: WritableByteChannel, position: Long): Long = {
+assert(position == _transferred)
+if (position == size) return 0L
+var keepGoing = true
+var written = 0L
+var currentChunk = chunks(currentChunkIdx)
+var originalLimit = currentChunk.limit()
--- End diff --

Seems it is unused.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-28 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191175890
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import scala.util.Random
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
+with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+val conf = new SparkConf()
+val env = mock[SparkEnv]
+SparkEnv.set(env)
+when(env.conf).thenReturn(conf)
+  }
+
+  override protected def afterEach(): Unit = {
+SparkEnv.set(null)
+  }
+
+  private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
+val bytes = (0 until nChunks).map { chunkIdx =>
+  val bb = ByteBuffer.allocate(perChunk)
+  (0 until perChunk).foreach { idx =>
+bb.put((chunkIdx * perChunk + idx).toByte)
+  }
+  bb.position(0)
+  bb
+}.toArray
+new ChunkedByteBuffer(bytes)
+  }
+
+  test("transferTo can stop and resume correctly") {
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
+val cbb = generateChunkByteBuffer(4, 10)
+val fileRegion = cbb.toNetty
+
+val targetChannel = new LimitedWritableByteChannel(40)
+
+var pos = 0L
+// write the fileregion to the channel, but with the transfer limited 
at various spots along
+// the way.
+
+// limit to within the first chunk
+targetChannel.acceptNBytes = 5
+pos = fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 5)
+
+// a little bit further within the first chunk
+targetChannel.acceptNBytes = 2
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 7)
+
+// past the first chunk, into the 2nd
+targetChannel.acceptNBytes = 6
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 13)
+
+// right to the end of the 2nd chunk
+targetChannel.acceptNBytes = 7
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 20)
+
+// rest of 2nd chunk, all of 3rd, some of 4th
+targetChannel.acceptNBytes = 15
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 35)
+
+// now till the end
+targetChannel.acceptNBytes = 5
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+
+// calling again at the end should be OK
+targetChannel.acceptNBytes = 20
+fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+  }
+
+  test(s"transfer to with random limits") {
+val rng = new Random()
+val seed = System.currentTimeMillis()
+logInfo(s"seed = $seed")
+rng.setSeed(seed)
+val chunkSize = 1e4.toInt
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 
rng.nextInt(chunkSize).toLong)
+
+val cbb = generateChunkByteBuffer(50, chunkSize)
+val fileRegion = cbb.toNetty
+val transferLimit = 1e5.toInt
+val targetChannel = new LimitedWritableByteChannel(transferLimit)
+while (targetChannel.pos < cbb.size) {
+  val nextTransferSize = rng.nextInt(transferLimit)
+  targetChannel.acceptNBytes = nextTransferSize
+  file

[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-28 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191117686
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util.io
+
+import java.nio.channels.WritableByteChannel
+
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.internal.Logging
+
+
+/**
+ * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow 
sending > 2gb in one netty
+ * message.   This is because netty cannot send a ByteBuf > 2g, but it can 
send a large FileRegion,
+ * even though the data is not backed by a file.
+ */
+private[io] class ChunkedByteBufferFileRegion(
+val chunkedByteBuffer: ChunkedByteBuffer,
+val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion 
with Logging {
+
+  private var _transferred: Long = 0
+  // this duplicates the original chunks, so we're free to modify the 
position, limit, etc.
+  private val chunks = chunkedByteBuffer.getChunks()
+  private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()}
+  private val size = cumLength.last
+  // Chunk size in bytes
+
+  protected def deallocate: Unit = {}
+
+  override def count(): Long = chunkedByteBuffer.size
--- End diff --

What's the difference between `size` and `count`? Should `count` indicates 
the rest data's size can be transfered ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-28 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191175960
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import scala.util.Random
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
+with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+val conf = new SparkConf()
+val env = mock[SparkEnv]
+SparkEnv.set(env)
+when(env.conf).thenReturn(conf)
+  }
+
+  override protected def afterEach(): Unit = {
+SparkEnv.set(null)
+  }
+
+  private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
+val bytes = (0 until nChunks).map { chunkIdx =>
+  val bb = ByteBuffer.allocate(perChunk)
+  (0 until perChunk).foreach { idx =>
+bb.put((chunkIdx * perChunk + idx).toByte)
+  }
+  bb.position(0)
+  bb
+}.toArray
+new ChunkedByteBuffer(bytes)
+  }
+
+  test("transferTo can stop and resume correctly") {
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
+val cbb = generateChunkByteBuffer(4, 10)
+val fileRegion = cbb.toNetty
+
+val targetChannel = new LimitedWritableByteChannel(40)
+
+var pos = 0L
+// write the fileregion to the channel, but with the transfer limited 
at various spots along
+// the way.
+
+// limit to within the first chunk
+targetChannel.acceptNBytes = 5
+pos = fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 5)
+
+// a little bit further within the first chunk
+targetChannel.acceptNBytes = 2
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 7)
+
+// past the first chunk, into the 2nd
+targetChannel.acceptNBytes = 6
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 13)
+
+// right to the end of the 2nd chunk
+targetChannel.acceptNBytes = 7
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 20)
+
+// rest of 2nd chunk, all of 3rd, some of 4th
+targetChannel.acceptNBytes = 15
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 35)
+
+// now till the end
+targetChannel.acceptNBytes = 5
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+
+// calling again at the end should be OK
+targetChannel.acceptNBytes = 20
+fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+  }
+
+  test(s"transfer to with random limits") {
+val rng = new Random()
+val seed = System.currentTimeMillis()
+logInfo(s"seed = $seed")
+rng.setSeed(seed)
+val chunkSize = 1e4.toInt
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 
rng.nextInt(chunkSize).toLong)
+
+val cbb = generateChunkByteBuffer(50, chunkSize)
+val fileRegion = cbb.toNetty
+val transferLimit = 1e5.toInt
+val targetChannel = new LimitedWritableByteChannel(transferLimit)
+while (targetChannel.pos < cbb.size) {
+  val nextTransferSize = rng.nextInt(transferLimit)
+  targetChannel.acceptNBytes = nextTransferSize
+  file

[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-28 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191104760
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util.io
+
+import java.nio.channels.WritableByteChannel
+
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.internal.Logging
+
+
+/**
+ * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow 
sending > 2gb in one netty
+ * message.   This is because netty cannot send a ByteBuf > 2g, but it can 
send a large FileRegion,
+ * even though the data is not backed by a file.
+ */
+private[io] class ChunkedByteBufferFileRegion(
+val chunkedByteBuffer: ChunkedByteBuffer,
+val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion 
with Logging {
+
+  private var _transferred: Long = 0
+  // this duplicates the original chunks, so we're free to modify the 
position, limit, etc.
+  private val chunks = chunkedByteBuffer.getChunks()
+  private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()}
+  private val size = cumLength.last
+  // Chunk size in bytes
--- End diff --

Should this comment be moved above last line ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-26 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191063339
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -659,6 +659,11 @@ private[spark] class BlockManager(
* Get block from remote block managers as serialized bytes.
*/
   def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
+// TODO if we change this method to return the ManagedBuffer, then 
getRemoteValues
+// could just use the inputStream on the temp file, rather than 
memory-mapping the file.
+// Until then, replication can go cause the process to use too much 
memory and get killed
--- End diff --

grammar


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-26 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/21440

[SPARK-24307][CORE] Support reading remote cached partitions > 2gb

(1) Netty's ByteBuf cannot support data > 2gb.  So to transfer data from a
ChunkedByteBuffer over the network, we use a custom version of
FileRegion which is backed by the ChunkedByteBuffer.

(2) On the receiving end, we need to expose all the data in a
FileSegmentManagedBuffer as a ChunkedByteBuffer.  We do that by memory
mapping the entire file in chunks.

Added unit tests.  Ran the randomized test a couple of hundred times on my 
laptop.  Tests cover the equivalent of SPARK-24107 for the 
ChunkedByteBufferFileRegion.  Also tested on a cluster with remote cache reads 
>2gb (in memory and on disk).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark chunked_bb_file_region

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21440.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21440


commit 4373e27c2ec96b77a2311f5c5997ae5ca84bf6c5
Author: Imran Rashid 
Date:   2018-05-23T03:59:40Z

[SPARK-24307][CORE] Support reading remote cached partitions > 2gb

(1) Netty's ByteBuf cannot support data > 2gb.  So to transfer data from a
ChunkedByteBuffer over the network, we use a custom version of
FileRegion which is backed by the ChunkedByteBuffer.

(2) On the receiving end, we need to expose all the data in a
FileSegmentManagedBuffer as a ChunkedByteBuffer.  We do that by memory
mapping the entire file in chunks.

Added unit tests.  Also tested on a cluster with remote cache reads >
2gb (in memory and on disk).




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org