[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18855


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133385095
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -165,6 +149,64 @@ private[spark] class DiskStore(
 
 }
 
+private class DiskBlockData(
+minMemoryMapBytes : Long,
+maxMemoryMapBytes : Long,
+file: File,
+blockSize: Long) extends BlockData {
+
+  override def toInputStream(): InputStream = new FileInputStream(file)
+
+  /**
+  * Returns a Netty-friendly wrapper for the block's data.
+  *
+  * Please see `ManagedBuffer.convertToNetty()` for more details.
+  */
+  override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)
+
+  override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): 
ChunkedByteBuffer = {
+Utils.tryWithResource(open()) { channel =>
+  var remaining = blockSize
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, maxMemoryMapBytes)
+val chunk = allocator(chunkSize.toInt)
+remaining -= chunkSize
+JavaUtils.readFully(channel, chunk)
+chunk.flip()
+chunks += chunk
+  }
+  new ChunkedByteBuffer(chunks.toArray)
+}
+  }
+
+  override def toByteBuffer(): ByteBuffer = {
+// I chose to leave to original error message here
+// since users are unfamiliar with the configureation key
+// controling maxMemoryMapBytes for tests
+require(blockSize < maxMemoryMapBytes,
+  s"can't create a byte buffer of size $blockSize" +
+  s" since it exceeds Int.MaxValue ${Int.MaxValue}.")
--- End diff --

or call `Utils.bytesToString` to make it more readable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133384946
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -165,6 +149,64 @@ private[spark] class DiskStore(
 
 }
 
+private class DiskBlockData(
+minMemoryMapBytes : Long,
+maxMemoryMapBytes : Long,
+file: File,
+blockSize: Long) extends BlockData {
+
+  override def toInputStream(): InputStream = new FileInputStream(file)
+
+  /**
+  * Returns a Netty-friendly wrapper for the block's data.
+  *
+  * Please see `ManagedBuffer.convertToNetty()` for more details.
+  */
+  override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)
+
+  override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): 
ChunkedByteBuffer = {
+Utils.tryWithResource(open()) { channel =>
+  var remaining = blockSize
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, maxMemoryMapBytes)
+val chunk = allocator(chunkSize.toInt)
+remaining -= chunkSize
+JavaUtils.readFully(channel, chunk)
+chunk.flip()
+chunks += chunk
+  }
+  new ChunkedByteBuffer(chunks.toArray)
+}
+  }
+
+  override def toByteBuffer(): ByteBuffer = {
+// I chose to leave to original error message here
+// since users are unfamiliar with the configureation key
+// controling maxMemoryMapBytes for tests
+require(blockSize < maxMemoryMapBytes,
+  s"can't create a byte buffer of size $blockSize" +
+  s" since it exceeds Int.MaxValue ${Int.MaxValue}.")
--- End diff --

`since it exceeds $maxMemoryMapBytes` is more accurate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133384556
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -165,6 +149,64 @@ private[spark] class DiskStore(
 
 }
 
+private class DiskBlockData(
+minMemoryMapBytes : Long,
+maxMemoryMapBytes : Long,
+file: File,
+blockSize: Long) extends BlockData {
+
+  override def toInputStream(): InputStream = new FileInputStream(file)
+
+  /**
+  * Returns a Netty-friendly wrapper for the block's data.
+  *
+  * Please see `ManagedBuffer.convertToNetty()` for more details.
+  */
+  override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)
+
+  override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): 
ChunkedByteBuffer = {
+Utils.tryWithResource(open()) { channel =>
+  var remaining = blockSize
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, maxMemoryMapBytes)
+val chunk = allocator(chunkSize.toInt)
+remaining -= chunkSize
+JavaUtils.readFully(channel, chunk)
+chunk.flip()
+chunks += chunk
+  }
+  new ChunkedByteBuffer(chunks.toArray)
+}
+  }
+
+  override def toByteBuffer(): ByteBuffer = {
+// I chose to leave to original error message here
+// since users are unfamiliar with the configureation key
+// controling maxMemoryMapBytes for tests
+require(blockSize < maxMemoryMapBytes,
--- End diff --

oh this is to verify the bug fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133384047
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -165,6 +149,64 @@ private[spark] class DiskStore(
 
 }
 
+private class DiskBlockData(
+minMemoryMapBytes : Long,
+maxMemoryMapBytes : Long,
+file: File,
+blockSize: Long) extends BlockData {
+
+  override def toInputStream(): InputStream = new FileInputStream(file)
+
+  /**
+  * Returns a Netty-friendly wrapper for the block's data.
+  *
+  * Please see `ManagedBuffer.convertToNetty()` for more details.
+  */
+  override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)
+
+  override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): 
ChunkedByteBuffer = {
+Utils.tryWithResource(open()) { channel =>
+  var remaining = blockSize
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, maxMemoryMapBytes)
+val chunk = allocator(chunkSize.toInt)
+remaining -= chunkSize
+JavaUtils.readFully(channel, chunk)
+chunk.flip()
+chunks += chunk
+  }
+  new ChunkedByteBuffer(chunks.toArray)
+}
+  }
+
+  override def toByteBuffer(): ByteBuffer = {
+// I chose to leave to original error message here
+// since users are unfamiliar with the configureation key
+// controling maxMemoryMapBytes for tests
+require(blockSize < maxMemoryMapBytes,
--- End diff --

why we need this? I think we can see the original error message if we don't 
have this check and go to the memory map code path.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133383513
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -165,6 +149,64 @@ private[spark] class DiskStore(
 
 }
 
+private class DiskBlockData(
+minMemoryMapBytes : Long,
--- End diff --

nit: no space before `"`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133382511
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -47,6 +47,8 @@ private[spark] class DiskStore(
 securityManager: SecurityManager) extends Logging {
 
   private val minMemoryMapBytes = 
conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
+  private val maxMemoryMapBytes = 
conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests",
+s"${Int.MaxValue}b")
--- End diff --

nit: just `Int.MaxValue.toString`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133317789
  
--- Diff: core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala 
---
@@ -92,6 +92,45 @@ class DiskStoreSuite extends SparkFunSuite {
 assert(diskStore.getSize(blockId) === 0L)
   }
 
+  test("blocks larger than 2gb") {
+val conf = new SparkConf()
+  .set("spark.storage.memoryMapLimitForTests", "10k" )
+val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = 
true)
+val diskStore = new DiskStore(conf, diskBlockManager, new 
SecurityManager(conf))
+
+
+val blockId = BlockId("rdd_1_2")
+diskStore.put(blockId) { chan =>
+  val arr = new Array[Byte](1024)
+  for {
+_ <- 0 until 20
+  } {
+val buf = ByteBuffer.wrap(arr)
+while (buf.hasRemaining()) {
+  chan.write(buf)
+}
+  }
+}
+
+val blockData = diskStore.getBytes(blockId)
+assert(blockData.size == 20 * 1024)
+
+val chunkedByteBuffer = 
blockData.toChunkedByteBuffer(ByteBuffer.allocate)
+val chunks = chunkedByteBuffer.chunks
+assert(chunks.size === 2)
+for( chunk <- chunks ) {
--- End diff --

nit: `for (chunk...) {`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133317809
  
--- Diff: core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala 
---
@@ -92,6 +92,45 @@ class DiskStoreSuite extends SparkFunSuite {
 assert(diskStore.getSize(blockId) === 0L)
   }
 
+  test("blocks larger than 2gb") {
+val conf = new SparkConf()
+  .set("spark.storage.memoryMapLimitForTests", "10k" )
+val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = 
true)
+val diskStore = new DiskStore(conf, diskBlockManager, new 
SecurityManager(conf))
+
--- End diff --

nit: remove this empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133318267
  
--- Diff: core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala 
---
@@ -92,6 +92,45 @@ class DiskStoreSuite extends SparkFunSuite {
 assert(diskStore.getSize(blockId) === 0L)
   }
 
+  test("blocks larger than 2gb") {
+val conf = new SparkConf()
+  .set("spark.storage.memoryMapLimitForTests", "10k" )
+val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = 
true)
+val diskStore = new DiskStore(conf, diskBlockManager, new 
SecurityManager(conf))
+
+
+val blockId = BlockId("rdd_1_2")
+diskStore.put(blockId) { chan =>
+  val arr = new Array[Byte](1024)
+  for {
+_ <- 0 until 20
+  } {
+val buf = ByteBuffer.wrap(arr)
+while (buf.hasRemaining()) {
+  chan.write(buf)
+}
+  }
+}
+
+val blockData = diskStore.getBytes(blockId)
+assert(blockData.size == 20 * 1024)
+
+val chunkedByteBuffer = 
blockData.toChunkedByteBuffer(ByteBuffer.allocate)
+val chunks = chunkedByteBuffer.chunks
+assert(chunks.size === 2)
+for( chunk <- chunks ) {
+  assert(chunk.limit === 10 * 1024)
+}
+
+val e = intercept[IllegalArgumentException]{
+  blockData.toByteBuffer()
+}
+
+assert(e.getMessage ==
--- End diff --

nit: `===`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133232857
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) {
--- End diff --

It would be probably easier to propagate the chunk size as a `SparkConf` 
entry that is not documented. But up to you guys.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-15 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133180495
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) {
--- End diff --

yes, currently working on:
1.  parameterizing `DiskStore` and `DiskStoreSuite`
2. revert the tests in `BlockManagerSuite` 
3. revert the 6gb  change in sbt


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133166421
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) {
--- End diff --

shall we do them together in a follow-up PR? I think the test case in 
`DiskStoreSuite` is enough.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-15 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133144224
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) {
--- End diff --

@cloud-fan , @vanzin ,
taking the 'parameterized approach', I'd remove most of the tests from 
`BlockManagerSuite` as they'd require propagating this parameter to too many 
subsystems.
so, I'm going to modify `DiskStore` and `DiskStoreSuite` to use such a 
parameter, I'm not sure about leaving a test-case in `BlockManagerSuite` that 
tests `DISK_ONLY` persistence, what do you guys think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-15 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133141988
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) {
--- End diff --

@cloud-fan  I know, 
it even gets worse when using the `===` operator.

I'm currently exploring the second direction pointed by @vanzin , 
introducing a test-only configuration key to configure the max page size


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133136857
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) {
--- End diff --

7-25 seconds is really a long time for a unit test...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-15 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133135659
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) {
--- End diff --

@vanzin , 
I've measured, test cases times range from 7-25 seconds on my laptop.
point well taken :sunglasses: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133007549
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) {
+store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1")
+def mkBlobs() = {
+  val rng = new java.util.Random(42)
+  val buff = new Array[Byte](1024 * 1024)
+  rng.nextBytes(buff)
+  Iterator.fill(2 * 1024 + 1) {
+buff
+  }
+}
+val res1 = store.getOrElseUpdate(
+  RDDBlockId(42, 0),
+  storageLevel,
+  implicitly[ClassTag[Array[Byte]]],
+  mkBlobs _
+)
+withClue(res1) {
+  assert(res1.isLeft)
+  assert(res1.left.get.data.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
+  a != null &&
+b != null &&
+a.asInstanceOf[Array[Byte]].seq == 
b.asInstanceOf[Array[Byte]].seq
+  })
+}
+val getResult = store.get(RDDBlockId(42, 0))
+withClue(getResult) {
+  assert(getResult.isDefined)
+  assert(getResult.get.data.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
+  a != null &&
+b != null &&
+a.asInstanceOf[Array[Byte]].seq == 
b.asInstanceOf[Array[Byte]].seq
+  })
+}
+val getBlockRes = store.getBlockData(RDDBlockId(42, 0))
+withClue(getBlockRes) {
+  try {
+assert(getBlockRes.size() >= 2 * 1024 * 1024 * 1024)
+Utils.tryWithResource(getBlockRes.createInputStream()) { inpStrm =>
+  val iter = store
+.serializerManager
+.dataDeserializeStream(RDDBlockId(42, 0)
+  , inpStrm)(implicitly[ClassTag[Array[Byte]]])
--- End diff --

Comma goes in the previous line. `inpStrm` is kind of an ugly variable 
name; pick one: `is`, `in`, `inputStream`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133009803
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) {
--- End diff --

Have you measured how long these tests take? I've seen this tried before in 
other changes related to 2g limits, and this kind of test was always 
ridiculously slow.

You can avoid this kind of test by making the chunk size configurable, e.g. 
in this line you're adding above:

val chunkSize = math.min(remaining, Int.MaxValue)

Then your test can run fast and not use a lot of memory. You just need to 
add extra checks that the data is being chunked properly, instead of relying on 
the JVM not throwing errors at you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133007010
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) {
+store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1")
+def mkBlobs() = {
+  val rng = new java.util.Random(42)
+  val buff = new Array[Byte](1024 * 1024)
+  rng.nextBytes(buff)
+  Iterator.fill(2 * 1024 + 1) {
+buff
+  }
+}
+val res1 = store.getOrElseUpdate(
+  RDDBlockId(42, 0),
+  storageLevel,
+  implicitly[ClassTag[Array[Byte]]],
+  mkBlobs _
+)
+withClue(res1) {
+  assert(res1.isLeft)
+  assert(res1.left.get.data.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
--- End diff --

Even if `===` does not work, you have `Arrays.equal`, which is null-safe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133005835
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -165,6 +147,62 @@ private[spark] class DiskStore(
 
 }
 
+private class DiskBlockData(
+conf: SparkConf,
+file: File,
+blockSize: Long) extends BlockData {
+
+  private val minMemoryMapBytes = 
conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
+
+  override def toInputStream(): InputStream = new FileInputStream(file)
+
+  /**
+  * Returns a Netty-friendly wrapper for the block's data.
+  *
+  * Please see `ManagedBuffer.convertToNetty()` for more details.
+  */
+  override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)
+
+  override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): 
ChunkedByteBuffer = {
+Utils.tryWithResource(open()) { channel =>
+  var remaining = blockSize
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, Int.MaxValue)
+val chunk = allocator(chunkSize.toInt)
+remaining -= chunkSize
+JavaUtils.readFully(channel, chunk)
+chunk.flip()
+chunks += chunk
+  }
+  new ChunkedByteBuffer(chunks.toArray)
+}
+  }
+
+  override def toByteBuffer(): ByteBuffer = {
+require( blockSize < Int.MaxValue
--- End diff --

no space after `(`; comma should be on this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-14 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r132982799
  
--- Diff: project/SparkBuild.scala ---
@@ -790,7 +790,7 @@ object TestSettings {
 javaOptions in Test ++= 
System.getProperties.asScala.filter(_._1.startsWith("spark"))
   .map { case (k,v) => s"-D$k=$v" }.toSeq,
 javaOptions in Test += "-ea",
-javaOptions in Test ++= "-Xmx3g -Xss4096k"
+javaOptions in Test ++= "-Xmx6g -Xss4096k"
--- End diff --

I am +1 for separating it if this can be. Let's get some changes we are 
sure of into the code base first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-14 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r132978316
  
--- Diff: project/SparkBuild.scala ---
@@ -790,7 +790,7 @@ object TestSettings {
 javaOptions in Test ++= 
System.getProperties.asScala.filter(_._1.startsWith("spark"))
   .map { case (k,v) => s"-D$k=$v" }.toSeq,
 javaOptions in Test += "-ea",
-javaOptions in Test ++= "-Xmx3g -Xss4096k"
+javaOptions in Test ++= "-Xmx6g -Xss4096k"
--- End diff --

@cloud-fan , let's wait few hours and see what the other guys CCed for this 
(the last ones to edit the build) have to say about this. if they are also 
worried or do not comment I'll revert this.

I must say I'm reluctant to revert these tests as I personally believe that 
lack of such tests contributed to spark's 2GB issues, including this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r132964212
  
--- Diff: project/SparkBuild.scala ---
@@ -790,7 +790,7 @@ object TestSettings {
 javaOptions in Test ++= 
System.getProperties.asScala.filter(_._1.startsWith("spark"))
   .map { case (k,v) => s"-D$k=$v" }.toSeq,
 javaOptions in Test += "-ea",
-javaOptions in Test ++= "-Xmx3g -Xss4096k"
+javaOptions in Test ++= "-Xmx6g -Xss4096k"
--- End diff --

I'm a little worried about this change. Since the change to 
`BlockManagerSuite` is not very related to this PR, can we revert and revisit 
it in follow-up PR? Then we can unblock this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131925431
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) {
+store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1")
+def mkBlobs() = {
+  val rng = new java.util.Random(42)
+  val buff = new Array[Byte](1024 * 1024)
+  rng.nextBytes(buff)
+  Iterator.fill(2 * 1024 + 1) {
+buff
+  }
+}
+val res1 = store.getOrElseUpdate(
+  RDDBlockId(42, 0),
+  storageLevel,
+  implicitly[ClassTag[Array[Byte]]],
+  mkBlobs _
+)
+withClue(res1) {
+  assert(res1.isLeft)
+  assert(res1.left.get.data.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
--- End diff --

`===` is a helper method in scala test and should be able to compare arrays


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-08 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131914883
  
--- Diff: core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala 
---
@@ -92,6 +92,31 @@ class DiskStoreSuite extends SparkFunSuite {
 assert(diskStore.getSize(blockId) === 0L)
   }
 
+  test("blocks larger than 2gb") {
+val conf = new SparkConf()
+val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = 
true)
+val diskStore = new DiskStore(conf, diskBlockManager, new 
SecurityManager(conf))
+
+val mb = 1024 * 1024
+val gb = 1024L * mb
+
+val blockId = BlockId("rdd_1_2")
+diskStore.put(blockId) { chan =>
+  val arr = new Array[Byte](mb)
+  for {
+_ <- 0 until 2048
+  } {
+val buf = ByteBuffer.wrap(arr)
+while (buf.hasRemaining()) {
+  chan.write(buf)
+}
+  }
+}
+
+val blockData = diskStore.getBytes(blockId)
+assert(blockData.size == 2 * gb)
--- End diff --

possible, will fix.
I guess I aimed for the lowest possible failing value


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-08 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131913940
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) {
+store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1")
+def mkBlobs() = {
+  val rng = new java.util.Random(42)
+  val buff = new Array[Byte](1024 * 1024)
+  rng.nextBytes(buff)
+  Iterator.fill(2 * 1024 + 1) {
+buff
+  }
+}
+val res1 = store.getOrElseUpdate(
+  RDDBlockId(42, 0),
+  storageLevel,
+  implicitly[ClassTag[Array[Byte]]],
+  mkBlobs _
+)
+withClue(res1) {
+  assert(res1.isLeft)
+  assert(res1.left.get.data.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
+  a != null &&
+b != null &&
+a.asInstanceOf[Array[Byte]].seq == 
b.asInstanceOf[Array[Byte]].seq
+  })
+}
+val getResult = store.get(RDDBlockId(42, 0))
+withClue(getResult) {
+  assert(getResult.isDefined)
+  assert(getResult.get.data.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
+  a != null &&
+b != null &&
+a.asInstanceOf[Array[Byte]].seq == 
b.asInstanceOf[Array[Byte]].seq
+  })
+}
+val getBlockRes = store.getBlockData(RDDBlockId(42, 0))
+withClue(getBlockRes) {
+  try {
+assert(getBlockRes.size() >= 2 * 1024 * 1024 * 1024)
+Utils.tryWithResource(getBlockRes.createInputStream()) { inpStrm =>
+  val iter = store
+.serializerManager
+.dataDeserializeStream(RDDBlockId(42, 0)
+  , inpStrm)(implicitly[ClassTag[Array[Byte]]])
+  assert(iter.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
+  a != null &&
+b != null &&
+a.asInstanceOf[Array[Byte]].seq == 
b.asInstanceOf[Array[Byte]].seq
+  })
+}
+  } finally {
+getBlockRes.release()
+  }
+}
+  }
+
+  test("getOrElseUpdate > 2gb, storage level = disk only") {
--- End diff --

these tests cover more than just the `DiskOnly` storage level, they were 
crafted when I had bigger ambitions of solving the entire 2GB issue 
:sunglasses: , that was before seeing some ~100 files pull requests being 
abandoned or rejected.
aside, these tests also test the entire orchestration done by 
`BlockManager` when an `RDD` requests a cached partition, notice that these 
tests intentionally makes two calls to the BlockManager in order to simulate 
both code paths (cache-hit, cache-miss).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-08 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131912886
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) {
+store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1")
+def mkBlobs() = {
+  val rng = new java.util.Random(42)
+  val buff = new Array[Byte](1024 * 1024)
+  rng.nextBytes(buff)
+  Iterator.fill(2 * 1024 + 1) {
+buff
+  }
+}
+val res1 = store.getOrElseUpdate(
+  RDDBlockId(42, 0),
+  storageLevel,
+  implicitly[ClassTag[Array[Byte]]],
+  mkBlobs _
+)
+withClue(res1) {
+  assert(res1.isLeft)
+  assert(res1.left.get.data.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
--- End diff --

can't compare Arrays, you get identity equality which is usually not what 
you want. hence the `.seq` that forces it to be wrapped with a Seq


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-08 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131912438
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) {
+store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1")
+def mkBlobs() = {
+  val rng = new java.util.Random(42)
+  val buff = new Array[Byte](1024 * 1024)
+  rng.nextBytes(buff)
+  Iterator.fill(2 * 1024 + 1) {
+buff
+  }
+}
+val res1 = store.getOrElseUpdate(
+  RDDBlockId(42, 0),
+  storageLevel,
+  implicitly[ClassTag[Array[Byte]]],
+  mkBlobs _
+)
+withClue(res1) {
--- End diff --

I think it'd print an Either where left side is a case class with members: 
iterator (prints as empty/non empty iterator), an enum and number of bytes. 
right side is an iterator, again this'd print an empty/not-empty iterator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131890999
  
--- Diff: core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala 
---
@@ -92,6 +92,31 @@ class DiskStoreSuite extends SparkFunSuite {
 assert(diskStore.getSize(blockId) === 0L)
   }
 
+  test("blocks larger than 2gb") {
+val conf = new SparkConf()
+val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = 
true)
+val diskStore = new DiskStore(conf, diskBlockManager, new 
SecurityManager(conf))
+
+val mb = 1024 * 1024
+val gb = 1024L * mb
+
+val blockId = BlockId("rdd_1_2")
+diskStore.put(blockId) { chan =>
+  val arr = new Array[Byte](mb)
+  for {
+_ <- 0 until 2048
+  } {
+val buf = ByteBuffer.wrap(arr)
+while (buf.hasRemaining()) {
+  chan.write(buf)
+}
+  }
+}
+
+val blockData = diskStore.getBytes(blockId)
+assert(blockData.size == 2 * gb)
--- End diff --

test with 3gb to be more explicit that it's larger than 2gb?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131890565
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) {
+store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1")
+def mkBlobs() = {
+  val rng = new java.util.Random(42)
+  val buff = new Array[Byte](1024 * 1024)
+  rng.nextBytes(buff)
+  Iterator.fill(2 * 1024 + 1) {
+buff
+  }
+}
+val res1 = store.getOrElseUpdate(
+  RDDBlockId(42, 0),
+  storageLevel,
+  implicitly[ClassTag[Array[Byte]]],
+  mkBlobs _
+)
+withClue(res1) {
+  assert(res1.isLeft)
+  assert(res1.left.get.data.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
+  a != null &&
+b != null &&
+a.asInstanceOf[Array[Byte]].seq == 
b.asInstanceOf[Array[Byte]].seq
+  })
+}
+val getResult = store.get(RDDBlockId(42, 0))
+withClue(getResult) {
+  assert(getResult.isDefined)
+  assert(getResult.get.data.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
+  a != null &&
+b != null &&
+a.asInstanceOf[Array[Byte]].seq == 
b.asInstanceOf[Array[Byte]].seq
+  })
+}
+val getBlockRes = store.getBlockData(RDDBlockId(42, 0))
+withClue(getBlockRes) {
+  try {
+assert(getBlockRes.size() >= 2 * 1024 * 1024 * 1024)
+Utils.tryWithResource(getBlockRes.createInputStream()) { inpStrm =>
+  val iter = store
+.serializerManager
+.dataDeserializeStream(RDDBlockId(42, 0)
+  , inpStrm)(implicitly[ClassTag[Array[Byte]]])
+  assert(iter.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
+  a != null &&
+b != null &&
+a.asInstanceOf[Array[Byte]].seq == 
b.asInstanceOf[Array[Byte]].seq
+  })
+}
+  } finally {
+getBlockRes.release()
+  }
+}
+  }
+
+  test("getOrElseUpdate > 2gb, storage level = disk only") {
--- End diff --

oh we already have, then why we have these tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131890389
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) {
+store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1")
+def mkBlobs() = {
+  val rng = new java.util.Random(42)
+  val buff = new Array[Byte](1024 * 1024)
+  rng.nextBytes(buff)
+  Iterator.fill(2 * 1024 + 1) {
+buff
+  }
+}
+val res1 = store.getOrElseUpdate(
+  RDDBlockId(42, 0),
+  storageLevel,
+  implicitly[ClassTag[Array[Byte]]],
+  mkBlobs _
+)
+withClue(res1) {
+  assert(res1.isLeft)
+  assert(res1.left.get.data.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
+  a != null &&
+b != null &&
+a.asInstanceOf[Array[Byte]].seq == 
b.asInstanceOf[Array[Byte]].seq
+  })
+}
+val getResult = store.get(RDDBlockId(42, 0))
+withClue(getResult) {
+  assert(getResult.isDefined)
+  assert(getResult.get.data.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
+  a != null &&
+b != null &&
+a.asInstanceOf[Array[Byte]].seq == 
b.asInstanceOf[Array[Byte]].seq
+  })
+}
+val getBlockRes = store.getBlockData(RDDBlockId(42, 0))
+withClue(getBlockRes) {
+  try {
+assert(getBlockRes.size() >= 2 * 1024 * 1024 * 1024)
+Utils.tryWithResource(getBlockRes.createInputStream()) { inpStrm =>
+  val iter = store
+.serializerManager
+.dataDeserializeStream(RDDBlockId(42, 0)
+  , inpStrm)(implicitly[ClassTag[Array[Byte]]])
+  assert(iter.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
+  a != null &&
+b != null &&
+a.asInstanceOf[Array[Byte]].seq == 
b.asInstanceOf[Array[Byte]].seq
+  })
+}
+  } finally {
+getBlockRes.release()
+  }
+}
+  }
+
+  test("getOrElseUpdate > 2gb, storage level = disk only") {
--- End diff --

shall we just write a test in `DiskStoreSuite`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131890077
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) {
+store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1")
+def mkBlobs() = {
+  val rng = new java.util.Random(42)
+  val buff = new Array[Byte](1024 * 1024)
+  rng.nextBytes(buff)
+  Iterator.fill(2 * 1024 + 1) {
+buff
+  }
+}
+val res1 = store.getOrElseUpdate(
+  RDDBlockId(42, 0),
+  storageLevel,
+  implicitly[ClassTag[Array[Byte]]],
+  mkBlobs _
+)
+withClue(res1) {
+  assert(res1.isLeft)
+  assert(res1.left.get.data.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
--- End diff --

just `a === b`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131889921
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) {
+store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1")
+def mkBlobs() = {
+  val rng = new java.util.Random(42)
+  val buff = new Array[Byte](1024 * 1024)
+  rng.nextBytes(buff)
+  Iterator.fill(2 * 1024 + 1) {
+buff
+  }
+}
+val res1 = store.getOrElseUpdate(
+  RDDBlockId(42, 0),
+  storageLevel,
+  implicitly[ClassTag[Array[Byte]]],
+  mkBlobs _
+)
+withClue(res1) {
--- End diff --

does `res1` have a reasonable string representation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131889556
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) {
--- End diff --

nit: `storageLevel: StorageLevel`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131889407
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -165,6 +147,62 @@ private[spark] class DiskStore(
 
 }
 
+private class DiskBlockData(
+conf: SparkConf,
--- End diff --

we can pass in `minMemoryMapBytes` directly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-07 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131758308
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -165,6 +147,62 @@ private[spark] class DiskStore(
 
 }
 
+private class DiskBlockData(
+conf: SparkConf,
+file: File,
+blockSize: Long) extends BlockData {
+
+  private val minMemoryMapBytes = 
conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
+
+  override def toInputStream(): InputStream = new FileInputStream(file)
+
+  /**
+  * Returns a Netty-friendly wrapper for the block's data.
+  *
+  * Please see `ManagedBuffer.convertToNetty()` for more details.
+  */
+  override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)
+
+  override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): 
ChunkedByteBuffer = {
+Utils.tryWithResource(open()) { channel =>
+  var remaining = blockSize
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, Int.MaxValue)
+val chunk = allocator(chunkSize.toInt)
+remaining -= chunkSize
+JavaUtils.readFully(channel, chunk)
+chunk.flip()
+chunks += chunk
+  }
+  new ChunkedByteBuffer(chunks.toArray)
+}
+  }
+
+  override def toByteBuffer(): ByteBuffer = {
--- End diff --

@cloud-fan 
it took me roughly 4 hours, but I looked both at the shuffle cod path and 
at `BlockManager.getRemoteBytes`:
it seems the first is robust to large blocks by using Netty's stream 
capabilities,
the later seems to be broken as it's not using the Netty's streaming 
capabilities and actually tries to copy the result buffer into a heap based 
buffer. I think this deserves its own JIRA/PR.
I think these two places plus the external shuffle server cover most of the 
relevant use cases (aside from local caching which i believe this PR completes 
in terms of being 2GB  proof).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org