[GitHub] spark pull request #23058: [SPARK-25905][CORE] When getting a remote block, ...

2018-11-28 Thread wypoon
Github user wypoon commented on a diff in the pull request:

https://github.com/apache/spark/pull/23058#discussion_r237320541
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -718,13 +718,9 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Get block from remote block managers as serialized bytes.
+   * Get block from remote block managers as a ManagedBuffer.
*/
-  def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
-// TODO SPARK-25905 if we change this method to return the 
ManagedBuffer, then getRemoteValues
-// could just use the inputStream on the temp file, rather than 
reading the file into memory.
-// Until then, replication can cause the process to use too much 
memory and get killed
-// even though we've read the data to disk.
+  def getRemoteManagedBuffer(blockId: BlockId): Option[ManagedBuffer] = {
--- End diff --

Yes. Changed it to private.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23058: [SPARK-25905][CORE] When getting a remote block, ...

2018-11-28 Thread wypoon
Github user wypoon commented on a diff in the pull request:

https://github.com/apache/spark/pull/23058#discussion_r237320424
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -789,21 +785,31 @@ private[spark] class BlockManager(
   }
 
   if (data != null) {
-// SPARK-24307 undocumented "escape-hatch" in case there are any 
issues in converting to
-// ChunkedByteBuffer, to go back to old code-path.  Can be removed 
post Spark 2.4 if
-// new path is stable.
-if (remoteReadNioBufferConversion) {
-  return Some(new ChunkedByteBuffer(data.nioByteBuffer()))
-} else {
-  return Some(ChunkedByteBuffer.fromManagedBuffer(data))
-}
+assert(!data.isInstanceOf[BlockManagerManagedBuffer])
--- End diff --

Added a comment to explain the assert.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23058: [SPARK-25905][CORE] When getting a remote block, avoid f...

2018-11-26 Thread wypoon
Github user wypoon commented on the issue:

https://github.com/apache/spark/pull/23058
  
Thanks @squito. I updated the testing section of the PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23058: [SPARK-25905][CORE] When getting a remote block, ...

2018-11-21 Thread wypoon
Github user wypoon commented on a diff in the pull request:

https://github.com/apache/spark/pull/23058#discussion_r235603798
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -693,9 +693,9 @@ private[spark] class BlockManager(
*/
   private def getRemoteValues[T: ClassTag](blockId: BlockId): 
Option[BlockResult] = {
 val ct = implicitly[ClassTag[T]]
-getRemoteBytes(blockId).map { data =>
+getRemoteManagedBuffer(blockId).map { data =>
   val values =
-serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))(ct)
+serializerManager.dataDeserializeStream(blockId, 
data.createInputStream())(ct)
--- End diff --

I reran my system tests with a build with the assert. No issues.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23058: [SPARK-25905][CORE] When getting a remote block, avoid f...

2018-11-20 Thread wypoon
Github user wypoon commented on the issue:

https://github.com/apache/spark/pull/23058
  
> can we also make the same change to `TaskResultGetter`?

I had a conversation off-line with Imran. As we end up deserializing the 
value of the task result into a ByteBuffer anyway, this change does not seem 
worthwhile.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23058: [SPARK-25905][CORE] When getting a remote block, ...

2018-11-20 Thread wypoon
Github user wypoon commented on a diff in the pull request:

https://github.com/apache/spark/pull/23058#discussion_r235238066
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -693,9 +693,9 @@ private[spark] class BlockManager(
*/
   private def getRemoteValues[T: ClassTag](blockId: BlockId): 
Option[BlockResult] = {
 val ct = implicitly[ClassTag[T]]
-getRemoteBytes(blockId).map { data =>
+getRemoteManagedBuffer(blockId).map { data =>
   val values =
-serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))(ct)
+serializerManager.dataDeserializeStream(blockId, 
data.createInputStream())(ct)
--- End diff --

I added an assert in `getRemoteManagedBuffer` itself. There are 3 places 
where a value can be returned in the method, and only one of them is nonEmpty. 
I added the assert there.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23058: [SPARK-25905][CORE] When getting a remote block, ...

2018-11-16 Thread wypoon
GitHub user wypoon opened a pull request:

https://github.com/apache/spark/pull/23058

[SPARK-25905][CORE] When getting a remote block, avoid forcing a conversion 
to a ChunkedByteBuffer

## What changes were proposed in this pull request?

In `BlockManager`, `getRemoteValues` gets a `ChunkedByteBuffer` (by calling 
`getRemoteBytes`) and creates an `InputStream` from it. `getRemoteBytes`, in 
turn, gets a `ManagedBuffer` and converts it to a `ChunkedByteBuffer`.
Instead, expose a `getRemoteManagedBuffer` method so `getRemoteValues` can 
just get this `ManagedBuffer` and use its `InputStream`.
When reading a remote cache block from disk, this reduces heap memory usage 
significantly.
Retain `getRemoteBytes` for other callers.

## How was this patch tested?

Imran Rashid wrote an application 
(https://github.com/squito/spark_2gb_test/blob/master/src/main/scala/com/cloudera/sparktest/LargeBlocks.scala),
 that among other things, tests reading remote cache blocks. I ran this 
application, using 2500MB blocks, to test reading a cache block on disk. 
Without this change, with `--executor-memory 5g`, the test fails with 
`java.lang.OutOfMemoryError: Java heap space`. With the change, the test passes 
with `--executor-memory 2g`.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wypoon/spark SPARK-25905

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23058.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23058


commit 2516ec61d9395a2ed36185affc4018a4a4f9b7ca
Author: Wing Yew Poon 
Date:   2018-11-16T02:47:16Z

[SPARK-25905][CORE] When getting a remote block, avoid forcing a conversion 
to a ChunkedByteBuffer

In BlockManager, getRemoteValues gets a ChunkedByteBuffer (by calling
getRemoteBytes) and creates an InputStream from it. getRemoteBytes, in
turn, gets a ManagedBuffer and converts it to a ChunkedByteBuffer.
Instead, expose a getRemoteManagedBuffer method so getRemoteValues can
just get this ManagedBuffer and use its InputStream.
When reading a remote cache block from disk, this reduces heap memory
usage significantly.
Retain getRemoteBytes for other callers.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22818: [SPARK-25827][CORE] Allocate arrays smaller than Int.Max...

2018-10-24 Thread wypoon
Github user wypoon commented on the issue:

https://github.com/apache/spark/pull/22818
  
Looks good to me. I reran the test that encountered this issue on a secure 
cluster after deploying a build with this change and now it passes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19703: [SPARK-22403][SS] Add optional checkpointLocation...

2017-11-09 Thread wypoon
Github user wypoon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19703#discussion_r150030572
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala
 ---
@@ -46,11 +51,13 @@ object StructuredKafkaWordCount {
   def main(args: Array[String]): Unit = {
 if (args.length < 3) {
   System.err.println("Usage: StructuredKafkaWordCount 
 " +
-" ")
+"  []")
   System.exit(1)
 }
 
-val Array(bootstrapServers, subscribeType, topics) = args
+val Array(bootstrapServers, subscribeType, topics, _*) = args
+val checkpointLocation =
+  if (args.length > 3) args(3) else "/tmp/temporary-" + 
UUID.randomUUID.toString
--- End diff --

This is what the internal default would be if java.io.tmpdir is "/tmp", but 
in case of YARN cluster mode, java.io.tmpdir is something else (the underlying 
problem). Supplying this default here is just to ease the user experience. They 
would get the same result running in YARN cluster mode or client mode, without 
supplying an explicit checkpoint location.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19703: [SPARK-22403][SS] Add optional checkpointLocation argume...

2017-11-09 Thread wypoon
Github user wypoon commented on the issue:

https://github.com/apache/spark/pull/19703
  
@srowen This change is indeed just a workaround for an underlying problem, 
as explained in the JIRA. @zsxwing suggested improving the 
StructuredKafkaWordCount example as a workaround. He did not have a proposal on 
how best to address the underlying problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19703: [SPARK-22403][SS] Add optional checkpointLocation...

2017-11-08 Thread wypoon
GitHub user wypoon opened a pull request:

https://github.com/apache/spark/pull/19703

[SPARK-22403][SS] Add optional checkpointLocation argument to 
StructuredKafkaWordCount example

## What changes were proposed in this pull request?

When run in YARN cluster mode, the StructuredKafkaWordCount example fails 
because Spark tries to create a temporary checkpoint location in a subdirectory 
of the path given by java.io.tmpdir, and YARN sets java.io.tmpdir to a path in 
the local filesystem that usually does not correspond to an existing path in 
the distributed filesystem.
Add an optional checkpointLocation argument to the StructuredKafkaWordCount 
example so that users can specify the checkpoint location and avoid this issue.

## How was this patch tested?

Built and ran the example manually on YARN client and cluster mode.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wypoon/spark SPARK-22403

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19703.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19703


commit 171496a424ed23ebadafe29ff74de72f3db5a49f
Author: Wing Yew Poon <wyp...@cloudera.com>
Date:   2017-11-09T04:06:48Z

Add optional checkpointLocation argument to StructuredKafkaWordCount example




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org