[GitHub] spark pull request #23058: [SPARK-25905][CORE] When getting a remote block, ...
Github user wypoon commented on a diff in the pull request: https://github.com/apache/spark/pull/23058#discussion_r237320541 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -718,13 +718,9 @@ private[spark] class BlockManager( } /** - * Get block from remote block managers as serialized bytes. + * Get block from remote block managers as a ManagedBuffer. */ - def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { -// TODO SPARK-25905 if we change this method to return the ManagedBuffer, then getRemoteValues -// could just use the inputStream on the temp file, rather than reading the file into memory. -// Until then, replication can cause the process to use too much memory and get killed -// even though we've read the data to disk. + def getRemoteManagedBuffer(blockId: BlockId): Option[ManagedBuffer] = { --- End diff -- Yes. Changed it to private. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23058: [SPARK-25905][CORE] When getting a remote block, ...
Github user wypoon commented on a diff in the pull request: https://github.com/apache/spark/pull/23058#discussion_r237320424 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -789,21 +785,31 @@ private[spark] class BlockManager( } if (data != null) { -// SPARK-24307 undocumented "escape-hatch" in case there are any issues in converting to -// ChunkedByteBuffer, to go back to old code-path. Can be removed post Spark 2.4 if -// new path is stable. -if (remoteReadNioBufferConversion) { - return Some(new ChunkedByteBuffer(data.nioByteBuffer())) -} else { - return Some(ChunkedByteBuffer.fromManagedBuffer(data)) -} +assert(!data.isInstanceOf[BlockManagerManagedBuffer]) --- End diff -- Added a comment to explain the assert. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23058: [SPARK-25905][CORE] When getting a remote block, avoid f...
Github user wypoon commented on the issue: https://github.com/apache/spark/pull/23058 Thanks @squito. I updated the testing section of the PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23058: [SPARK-25905][CORE] When getting a remote block, ...
Github user wypoon commented on a diff in the pull request: https://github.com/apache/spark/pull/23058#discussion_r235603798 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -693,9 +693,9 @@ private[spark] class BlockManager( */ private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = { val ct = implicitly[ClassTag[T]] -getRemoteBytes(blockId).map { data => +getRemoteManagedBuffer(blockId).map { data => val values = -serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct) +serializerManager.dataDeserializeStream(blockId, data.createInputStream())(ct) --- End diff -- I reran my system tests with a build with the assert. No issues. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23058: [SPARK-25905][CORE] When getting a remote block, avoid f...
Github user wypoon commented on the issue: https://github.com/apache/spark/pull/23058 > can we also make the same change to `TaskResultGetter`? I had a conversation off-line with Imran. As we end up deserializing the value of the task result into a ByteBuffer anyway, this change does not seem worthwhile. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23058: [SPARK-25905][CORE] When getting a remote block, ...
Github user wypoon commented on a diff in the pull request: https://github.com/apache/spark/pull/23058#discussion_r235238066 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -693,9 +693,9 @@ private[spark] class BlockManager( */ private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = { val ct = implicitly[ClassTag[T]] -getRemoteBytes(blockId).map { data => +getRemoteManagedBuffer(blockId).map { data => val values = -serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct) +serializerManager.dataDeserializeStream(blockId, data.createInputStream())(ct) --- End diff -- I added an assert in `getRemoteManagedBuffer` itself. There are 3 places where a value can be returned in the method, and only one of them is nonEmpty. I added the assert there. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23058: [SPARK-25905][CORE] When getting a remote block, ...
GitHub user wypoon opened a pull request: https://github.com/apache/spark/pull/23058 [SPARK-25905][CORE] When getting a remote block, avoid forcing a conversion to a ChunkedByteBuffer ## What changes were proposed in this pull request? In `BlockManager`, `getRemoteValues` gets a `ChunkedByteBuffer` (by calling `getRemoteBytes`) and creates an `InputStream` from it. `getRemoteBytes`, in turn, gets a `ManagedBuffer` and converts it to a `ChunkedByteBuffer`. Instead, expose a `getRemoteManagedBuffer` method so `getRemoteValues` can just get this `ManagedBuffer` and use its `InputStream`. When reading a remote cache block from disk, this reduces heap memory usage significantly. Retain `getRemoteBytes` for other callers. ## How was this patch tested? Imran Rashid wrote an application (https://github.com/squito/spark_2gb_test/blob/master/src/main/scala/com/cloudera/sparktest/LargeBlocks.scala), that among other things, tests reading remote cache blocks. I ran this application, using 2500MB blocks, to test reading a cache block on disk. Without this change, with `--executor-memory 5g`, the test fails with `java.lang.OutOfMemoryError: Java heap space`. With the change, the test passes with `--executor-memory 2g`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wypoon/spark SPARK-25905 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23058.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23058 commit 2516ec61d9395a2ed36185affc4018a4a4f9b7ca Author: Wing Yew Poon Date: 2018-11-16T02:47:16Z [SPARK-25905][CORE] When getting a remote block, avoid forcing a conversion to a ChunkedByteBuffer In BlockManager, getRemoteValues gets a ChunkedByteBuffer (by calling getRemoteBytes) and creates an InputStream from it. getRemoteBytes, in turn, gets a ManagedBuffer and converts it to a ChunkedByteBuffer. Instead, expose a getRemoteManagedBuffer method so getRemoteValues can just get this ManagedBuffer and use its InputStream. When reading a remote cache block from disk, this reduces heap memory usage significantly. Retain getRemoteBytes for other callers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22818: [SPARK-25827][CORE] Allocate arrays smaller than Int.Max...
Github user wypoon commented on the issue: https://github.com/apache/spark/pull/22818 Looks good to me. I reran the test that encountered this issue on a secure cluster after deploying a build with this change and now it passes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19703: [SPARK-22403][SS] Add optional checkpointLocation...
Github user wypoon commented on a diff in the pull request: https://github.com/apache/spark/pull/19703#discussion_r150030572 --- Diff: examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala --- @@ -46,11 +51,13 @@ object StructuredKafkaWordCount { def main(args: Array[String]): Unit = { if (args.length < 3) { System.err.println("Usage: StructuredKafkaWordCount " + -" ") +" []") System.exit(1) } -val Array(bootstrapServers, subscribeType, topics) = args +val Array(bootstrapServers, subscribeType, topics, _*) = args +val checkpointLocation = + if (args.length > 3) args(3) else "/tmp/temporary-" + UUID.randomUUID.toString --- End diff -- This is what the internal default would be if java.io.tmpdir is "/tmp", but in case of YARN cluster mode, java.io.tmpdir is something else (the underlying problem). Supplying this default here is just to ease the user experience. They would get the same result running in YARN cluster mode or client mode, without supplying an explicit checkpoint location. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19703: [SPARK-22403][SS] Add optional checkpointLocation argume...
Github user wypoon commented on the issue: https://github.com/apache/spark/pull/19703 @srowen This change is indeed just a workaround for an underlying problem, as explained in the JIRA. @zsxwing suggested improving the StructuredKafkaWordCount example as a workaround. He did not have a proposal on how best to address the underlying problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19703: [SPARK-22403][SS] Add optional checkpointLocation...
GitHub user wypoon opened a pull request: https://github.com/apache/spark/pull/19703 [SPARK-22403][SS] Add optional checkpointLocation argument to StructuredKafkaWordCount example ## What changes were proposed in this pull request? When run in YARN cluster mode, the StructuredKafkaWordCount example fails because Spark tries to create a temporary checkpoint location in a subdirectory of the path given by java.io.tmpdir, and YARN sets java.io.tmpdir to a path in the local filesystem that usually does not correspond to an existing path in the distributed filesystem. Add an optional checkpointLocation argument to the StructuredKafkaWordCount example so that users can specify the checkpoint location and avoid this issue. ## How was this patch tested? Built and ran the example manually on YARN client and cluster mode. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wypoon/spark SPARK-22403 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19703.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19703 commit 171496a424ed23ebadafe29ff74de72f3db5a49f Author: Wing Yew Poon <wyp...@cloudera.com> Date: 2017-11-09T04:06:48Z Add optional checkpointLocation argument to StructuredKafkaWordCount example --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org