[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/2408 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/2408#issuecomment-55801829 Thanks. Merging in master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2408#issuecomment-55801517 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20409/consoleFull) for PR 2408 at commit [`074781d`](https://github.com/apache/spark/commit/074781d220f37fa3edaa22ecce7312d0ca22596a). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2408#issuecomment-55791275 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20409/consoleFull) for PR 2408 at commit [`074781d`](https://github.com/apache/spark/commit/074781d220f37fa3edaa22ecce7312d0ca22596a). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user sarutak commented on the pull request: https://github.com/apache/spark/pull/2408#issuecomment-55790211 test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/2408#issuecomment-55788663 Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2408#discussion_r17619322 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -111,10 +112,18 @@ final class ShuffleBlockFetcherIterator( blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds, new BlockFetchingListener { override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { - results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), -() => serializer.newInstance().deserializeStream( - blockManager.wrapForCompression(BlockId(blockId), data.inputStream())).asIterator - )) + var is: InputStream = null + try { +is = data.inputStream() +results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), + () => serializer.newInstance().deserializeStream( +blockManager.wrapForCompression(BlockId(blockId), is)).asIterator +)) + } finally { +if (is != null) { + is.close() --- End diff -- @rxin Exactly, it's not make sense and I noticed the InputStream is closed via NextIterator#close. So, I revert this part of change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2408#discussion_r17617628 --- Diff: core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala --- @@ -66,8 +67,15 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt override def size: Long = length override def nioByteBuffer(): ByteBuffer = { -val channel = new RandomAccessFile(file, "r").getChannel -channel.map(MapMode.READ_ONLY, offset, length) +var channel: FileChannel = null +try { --- End diff -- this part looks good to me --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/2408#discussion_r17617004 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -111,10 +112,18 @@ final class ShuffleBlockFetcherIterator( blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds, new BlockFetchingListener { override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { - results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), -() => serializer.newInstance().deserializeStream( - blockManager.wrapForCompression(BlockId(blockId), data.inputStream())).asIterator - )) + var is: InputStream = null + try { +is = data.inputStream() +results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), + () => serializer.newInstance().deserializeStream( +blockManager.wrapForCompression(BlockId(blockId), is)).asIterator +)) + } finally { +if (is != null) { + is.close() --- End diff -- doesn't this close the inputstream prematurely? Note that the 3ard argument to results is passed in as a closure so it is lazy. BTW in my new refactoring of this, there is a place where we should explicitly close the streams: https://github.com/apache/spark/pull/2330/files#diff-27109eb30a77542d377c936e0d134420R295 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2408#issuecomment-55769853 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20391/consoleFull) for PR 2408 at commit [`5f63f67`](https://github.com/apache/spark/commit/5f63f67fb8e1dc85788436581f49adc2cb8b32bc). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2408#issuecomment-55758516 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20391/consoleFull) for PR 2408 at commit [`5f63f67`](https://github.com/apache/spark/commit/5f63f67fb8e1dc85788436581f49adc2cb8b32bc). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2408#issuecomment-55751983 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20384/consoleFull) for PR 2408 at commit [`b37231a`](https://github.com/apache/spark/commit/b37231a7285836d540a21b263f812953e7c6d800). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor ` * `class SCCallSiteSync(object):` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2408#discussion_r17601336 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -111,13 +112,21 @@ final class ShuffleBlockFetcherIterator( blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds, new BlockFetchingListener { override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { - results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), -() => serializer.newInstance().deserializeStream( - blockManager.wrapForCompression(BlockId(blockId), data.inputStream())).asIterator - )) - shuffleMetrics.remoteBytesRead += data.size - shuffleMetrics.remoteBlocksFetched += 1 - logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) + var is: InputStream = null + try { +is = data.inputStream() +results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), + () => serializer.newInstance().deserializeStream( +blockManager.wrapForCompression(BlockId(blockId), is)).asIterator +)) +shuffleMetrics.remoteBytesRead += data.size --- End diff -- Ah, do you mean line 122 - 124 should be out of try block? I agree with that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2408#issuecomment-55742800 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20383/consoleFull) for PR 2408 at commit [`bf29d4a`](https://github.com/apache/spark/commit/bf29d4a1d8e332941caba337286f26f2238095d4). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2408#issuecomment-55741767 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20384/consoleFull) for PR 2408 at commit [`b37231a`](https://github.com/apache/spark/commit/b37231a7285836d540a21b263f812953e7c6d800). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/2408#discussion_r17600258 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -111,13 +112,21 @@ final class ShuffleBlockFetcherIterator( blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds, new BlockFetchingListener { override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { - results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), -() => serializer.newInstance().deserializeStream( - blockManager.wrapForCompression(BlockId(blockId), data.inputStream())).asIterator - )) - shuffleMetrics.remoteBytesRead += data.size - shuffleMetrics.remoteBlocksFetched += 1 - logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) + var is: InputStream = null + try { +is = data.inputStream() +results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), + () => serializer.newInstance().deserializeStream( +blockManager.wrapForCompression(BlockId(blockId), is)).asIterator +)) +shuffleMetrics.remoteBytesRead += data.size --- End diff -- These three lines do not need to be within the `try` block, right? I figure it's best to complete the `try` block and have `is` be closed before moving on to further operations. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2408#discussion_r17600041 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -111,13 +112,21 @@ final class ShuffleBlockFetcherIterator( blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds, new BlockFetchingListener { override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { - results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), -() => serializer.newInstance().deserializeStream( - blockManager.wrapForCompression(BlockId(blockId), data.inputStream())).asIterator - )) - shuffleMetrics.remoteBytesRead += data.size - shuffleMetrics.remoteBlocksFetched += 1 - logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) + var is: InputStream = null + try { +is = data.inputStream() +results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), + () => serializer.newInstance().deserializeStream( +blockManager.wrapForCompression(BlockId(blockId), is)).asIterator +)) +shuffleMetrics.remoteBytesRead += data.size --- End diff -- Sorry, what do you mean? I didn't get. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2408#discussion_r17600021 --- Diff: core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala --- @@ -66,8 +67,13 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt override def size: Long = length override def nioByteBuffer(): ByteBuffer = { -val channel = new RandomAccessFile(file, "r").getChannel -channel.map(MapMode.READ_ONLY, offset, length) +var channel: FileChannel = null +try { + channel = new RandomAccessFile(file, "r").getChannel + channel.map(MapMode.READ_ONLY, offset, length) +} finally { + channel.close() --- End diff -- Originally I was going to check channel is null or not, but I forgot at previous PR. Now I've modified. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/2408#discussion_r17598955 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -111,13 +112,21 @@ final class ShuffleBlockFetcherIterator( blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds, new BlockFetchingListener { override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { - results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), -() => serializer.newInstance().deserializeStream( - blockManager.wrapForCompression(BlockId(blockId), data.inputStream())).asIterator - )) - shuffleMetrics.remoteBytesRead += data.size - shuffleMetrics.remoteBlocksFetched += 1 - logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) + var is: InputStream = null + try { +is = data.inputStream() +results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), + () => serializer.newInstance().deserializeStream( +blockManager.wrapForCompression(BlockId(blockId), is)).asIterator +)) +shuffleMetrics.remoteBytesRead += data.size --- End diff -- Can these three lines follow the `finally` block? the stream can be closed at this point I think. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/2408#discussion_r17598920 --- Diff: core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala --- @@ -66,8 +67,13 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt override def size: Long = length override def nioByteBuffer(): ByteBuffer = { -val channel = new RandomAccessFile(file, "r").getChannel -channel.map(MapMode.READ_ONLY, offset, length) +var channel: FileChannel = null +try { + channel = new RandomAccessFile(file, "r").getChannel + channel.map(MapMode.READ_ONLY, offset, length) +} finally { + channel.close() --- End diff -- This would throw an NPE if an error occurred in `new RandomAccessFile` or `getChannel`. I suppose you move `new RandomAccessFile(file, "r").getChannel` before the `try` block. Before that method returns, there is no `FileChannel` that successfully opened and therefore needs closing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org