[jira] [Commented] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming
[ https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964281#comment-16964281 ] Imran Rashid commented on SPARK-22579: -- Sorry I had not noticed this issue before. I agree that there is an inefficiency here, if you did this streaming you could pipeline fetching the data w/ computing on the data. The existing changes you point to solve the memory footprint, by fetching to disk, but not actually pipelining the computation. That said, this isnt' easy to fix. You need to touch a lot of core stuff in the network layers, and as you said it gets trickier with handling failures (you have to throw out all partial work in the current task). You'll probably still see a discrepancy between runtimes when running locally vs. remote. Best case, you'd get a 2x speedup with this change. In your use case, that would still be ~40 seconds to 4 minutes. > BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be > implemented using streaming > -- > > Key: SPARK-22579 > URL: https://issues.apache.org/jira/browse/SPARK-22579 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Spark Core >Affects Versions: 2.1.0 >Reporter: Eyal Farago >Priority: Major > > when an RDD partition is cached on an executor bu the task requiring it is > running on another executor (process locality ANY), the cached partition is > fetched via BlockManager.getRemoteValues which delegates to > BlockManager.getRemoteBytes, both calls are blocking. > in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes > cluster, cached to disk. rough math shows that average partition size is > 700MB. > looking at spark UI it was obvious that tasks running with process locality > 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I > was able to capture thread dumps of executors executing remote tasks and got > this stake trace: > {quote}Thread ID Thread Name Thread StateThread Locks > 1521 Executor task launch worker-1000WAITING > Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978}) > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > scala.concurrent.Await$.result(package.scala:190) > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) > org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104) > org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582) > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550) > org.apache.spark.storage.BlockManager.get(BlockManager.scala:638) > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690) > org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote} > digging into the code showed that the block manager first fetches all bytes > (getRemoteBytes) and then wraps it with a deserialization stream, this has > several draw backs: > 1. blocking, requesting executor is blocked while the remote executor is > serving the block. > 2. potentially large memory footprint on requesting executor, in my use case > a 700mb of raw bytes stored in a ChunkedByteBuffer. > 3. inefficient, requesting side usually don't need all values at once as it > consumes the values via an iterator. > 4. potentially large memory footprint on serving executor, in case the block > is cached in deserialized form the serving executor has to serialize it into > a ChunkedByteBuffer (BlockManager
[jira] [Commented] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming
[ https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16730301#comment-16730301 ] Eyal Farago commented on SPARK-22579: - Glanced over this on my cell, seems like spark-25905 only addresses the reading side, when the executor serving this block has all values in memory a similar issue happens on that executor as well. > BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be > implemented using streaming > -- > > Key: SPARK-22579 > URL: https://issues.apache.org/jira/browse/SPARK-22579 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Spark Core >Affects Versions: 2.1.0 >Reporter: Eyal Farago >Priority: Major > > when an RDD partition is cached on an executor bu the task requiring it is > running on another executor (process locality ANY), the cached partition is > fetched via BlockManager.getRemoteValues which delegates to > BlockManager.getRemoteBytes, both calls are blocking. > in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes > cluster, cached to disk. rough math shows that average partition size is > 700MB. > looking at spark UI it was obvious that tasks running with process locality > 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I > was able to capture thread dumps of executors executing remote tasks and got > this stake trace: > {quote}Thread ID Thread Name Thread StateThread Locks > 1521 Executor task launch worker-1000WAITING > Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978}) > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > scala.concurrent.Await$.result(package.scala:190) > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) > org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104) > org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582) > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550) > org.apache.spark.storage.BlockManager.get(BlockManager.scala:638) > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690) > org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote} > digging into the code showed that the block manager first fetches all bytes > (getRemoteBytes) and then wraps it with a deserialization stream, this has > several draw backs: > 1. blocking, requesting executor is blocked while the remote executor is > serving the block. > 2. potentially large memory footprint on requesting executor, in my use case > a 700mb of raw bytes stored in a ChunkedByteBuffer. > 3. inefficient, requesting side usually don't need all values at once as it > consumes the values via an iterator. > 4. potentially large memory footprint on serving executor, in case the block > is cached in deserialized form the serving executor has to serialize it into > a ChunkedByteBuffer (BlockManager.doGetLocalBytes). this is both memory & CPU > intensive, memory footprint can be reduced by using a limited buffer for > serialization 'spilling' to the response stream. > I suggest improving this either by implementing full streaming mechanism or > some kind of pagination mechanism, in addition the requesting executor should > be able to make progress with the data it already has, blocking only when > local buffer is exhausted and remote side didn't deliver the next chunk of > the stream (or page in case of pagination) yet. -
[jira] [Commented] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming
[ https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16730007#comment-16730007 ] Andrey Siunov commented on SPARK-22579: --- Is it true that the issue has been resolved in the ticket https://issues.apache.org/jira/browse/SPARK-25905 (PR: https://github.com/apache/spark/pull/23058)? > BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be > implemented using streaming > -- > > Key: SPARK-22579 > URL: https://issues.apache.org/jira/browse/SPARK-22579 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Spark Core >Affects Versions: 2.1.0 >Reporter: Eyal Farago >Priority: Major > > when an RDD partition is cached on an executor bu the task requiring it is > running on another executor (process locality ANY), the cached partition is > fetched via BlockManager.getRemoteValues which delegates to > BlockManager.getRemoteBytes, both calls are blocking. > in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes > cluster, cached to disk. rough math shows that average partition size is > 700MB. > looking at spark UI it was obvious that tasks running with process locality > 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I > was able to capture thread dumps of executors executing remote tasks and got > this stake trace: > {quote}Thread ID Thread Name Thread StateThread Locks > 1521 Executor task launch worker-1000WAITING > Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978}) > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > scala.concurrent.Await$.result(package.scala:190) > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) > org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104) > org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582) > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550) > org.apache.spark.storage.BlockManager.get(BlockManager.scala:638) > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690) > org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote} > digging into the code showed that the block manager first fetches all bytes > (getRemoteBytes) and then wraps it with a deserialization stream, this has > several draw backs: > 1. blocking, requesting executor is blocked while the remote executor is > serving the block. > 2. potentially large memory footprint on requesting executor, in my use case > a 700mb of raw bytes stored in a ChunkedByteBuffer. > 3. inefficient, requesting side usually don't need all values at once as it > consumes the values via an iterator. > 4. potentially large memory footprint on serving executor, in case the block > is cached in deserialized form the serving executor has to serialize it into > a ChunkedByteBuffer (BlockManager.doGetLocalBytes). this is both memory & CPU > intensive, memory footprint can be reduced by using a limited buffer for > serialization 'spilling' to the response stream. > I suggest improving this either by implementing full streaming mechanism or > some kind of pagination mechanism, in addition the requesting executor should > be able to make progress with the data it already has, blocking only when > local buffer is exhausted and remote side didn't deliver the next chunk of > the stream (or page in case of pagination) yet. -- This message was sent by Atlassian
[jira] [Commented] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming
[ https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16266102#comment-16266102 ] Eyal Farago commented on SPARK-22579: - [~srowen], I couldn't see a place where this data is stored for later use, even in the 'big fetch' scenario the file isn't reused as org.apache.spark.storage.BlockManager#remoteBlockTempFileManager is only used to stream big file - once, it's never used to figure out if the download can be avoided. furthermore this blocks are also not registered with the block manager, after all they can be used as 'DISK_ONLY' cache on the requesting executor and potentially even by other executors residing on the same node. [~jerryshao], it seems the 'big files' code path actually uses streaming, makes me think it can be slightly modified to actually accept a stream handler (current behavior can be achieved by passing in a download stream handler). one thing that does have the potential for troubles is error recovery when more than one executor can serve the block, current impl simply moves to the next one, a streaming based approach would have to request the rest of the block from an other executor (assuming blocks are identical). > BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be > implemented using streaming > -- > > Key: SPARK-22579 > URL: https://issues.apache.org/jira/browse/SPARK-22579 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Spark Core >Affects Versions: 2.1.0 >Reporter: Eyal Farago > > when an RDD partition is cached on an executor bu the task requiring it is > running on another executor (process locality ANY), the cached partition is > fetched via BlockManager.getRemoteValues which delegates to > BlockManager.getRemoteBytes, both calls are blocking. > in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes > cluster, cached to disk. rough math shows that average partition size is > 700MB. > looking at spark UI it was obvious that tasks running with process locality > 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I > was able to capture thread dumps of executors executing remote tasks and got > this stake trace: > {quote}Thread ID Thread Name Thread StateThread Locks > 1521 Executor task launch worker-1000WAITING > Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978}) > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > scala.concurrent.Await$.result(package.scala:190) > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) > org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104) > org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582) > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550) > org.apache.spark.storage.BlockManager.get(BlockManager.scala:638) > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690) > org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote} > digging into the code showed that the block manager first fetches all bytes > (getRemoteBytes) and then wraps it with a deserialization stream, this has > several draw backs: > 1. blocking, requesting executor is blocked while the remote executor is > serving the block. > 2. potentially large memory footprint on requesting executor, in my use case > a 700mb of raw bytes stored in a ChunkedByteBuffer. > 3. inefficient, requesting side usual
[jira] [Commented] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming
[ https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265234#comment-16265234 ] Sean Owen commented on SPARK-22579: --- Yes, the problem is reading the data across the network twice. I don't know the code or a proposed change well enough to say whether it will be an issue in practice, but I can imagine accidentally modifying the code in a way that means it now copies the data N times instead of reading one block in memory N times. It's just not obvious the loading of the block in memory is "for nothing". > BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be > implemented using streaming > -- > > Key: SPARK-22579 > URL: https://issues.apache.org/jira/browse/SPARK-22579 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Spark Core >Affects Versions: 2.1.0 >Reporter: Eyal Farago > > when an RDD partition is cached on an executor bu the task requiring it is > running on another executor (process locality ANY), the cached partition is > fetched via BlockManager.getRemoteValues which delegates to > BlockManager.getRemoteBytes, both calls are blocking. > in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes > cluster, cached to disk. rough math shows that average partition size is > 700MB. > looking at spark UI it was obvious that tasks running with process locality > 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I > was able to capture thread dumps of executors executing remote tasks and got > this stake trace: > {quote}Thread ID Thread Name Thread StateThread Locks > 1521 Executor task launch worker-1000WAITING > Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978}) > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > scala.concurrent.Await$.result(package.scala:190) > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) > org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104) > org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582) > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550) > org.apache.spark.storage.BlockManager.get(BlockManager.scala:638) > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690) > org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote} > digging into the code showed that the block manager first fetches all bytes > (getRemoteBytes) and then wraps it with a deserialization stream, this has > several draw backs: > 1. blocking, requesting executor is blocked while the remote executor is > serving the block. > 2. potentially large memory footprint on requesting executor, in my use case > a 700mb of raw bytes stored in a ChunkedByteBuffer. > 3. inefficient, requesting side usually don't need all values at once as it > consumes the values via an iterator. > 4. potentially large memory footprint on serving executor, in case the block > is cached in deserialized form the serving executor has to serialize it into > a ChunkedByteBuffer (BlockManager.doGetLocalBytes). this is both memory & CPU > intensive, memory footprint can be reduced by using a limited buffer for > serialization 'spilling' to the response stream. > I suggest improving this either by implementing full streaming mechanism or > some kind of pagination mechanism, in addition the requesting executor should > be able to make progress with the data it alre
[jira] [Commented] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming
[ https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265045#comment-16265045 ] Eyal Farago commented on SPARK-22579: - [~jerryshao], SPARK-22062 seems to solve the memory footprint issue, will take a deeper look at it (and pray fro mapR to upgrade their spark version). [~srowen], not sure what do you mean by reading the data twice, getRemoteValues returns an iterator, if called twice on the same block it will fetch the data twice, even with current implementation. and yes I agree I need to read the data anyway, but I don't need all of it at once, executors consume partitions via iterators, this allows for streaming/pagination behind the scenes which is currently not implemented by the block manager. btw, this kind of streaming/pagination can play nicely with SPARK-22062 and any potential caching mechanism that night be introduced into the block manager in the future. I'd also like to stress out that what we've experienced was lengthy tasks, given that these tasks were 'dead in the water' waiting for the transfer to complete is a bit unfortunate as they could have make progress with the part of the transfer that already completed. I believe streaming/pagination or some other reactive approach can greatly improve this behavior. > BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be > implemented using streaming > -- > > Key: SPARK-22579 > URL: https://issues.apache.org/jira/browse/SPARK-22579 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Spark Core >Affects Versions: 2.1.0 >Reporter: Eyal Farago > > when an RDD partition is cached on an executor bu the task requiring it is > running on another executor (process locality ANY), the cached partition is > fetched via BlockManager.getRemoteValues which delegates to > BlockManager.getRemoteBytes, both calls are blocking. > in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes > cluster, cached to disk. rough math shows that average partition size is > 700MB. > looking at spark UI it was obvious that tasks running with process locality > 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I > was able to capture thread dumps of executors executing remote tasks and got > this stake trace: > {quote}Thread ID Thread Name Thread StateThread Locks > 1521 Executor task launch worker-1000WAITING > Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978}) > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > scala.concurrent.Await$.result(package.scala:190) > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) > org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104) > org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582) > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550) > org.apache.spark.storage.BlockManager.get(BlockManager.scala:638) > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690) > org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote} > digging into the code showed that the block manager first fetches all bytes > (getRemoteBytes) and then wraps it with a deserialization stream, this has > several draw backs: > 1. blocking, requesting executor is blocked while the remote executor is > serving the block. > 2. potentially large memory footprint on requesting executor, in my use ca
[jira] [Commented] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming
[ https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265021#comment-16265021 ] Saisai Shao commented on SPARK-22579: - I think this issue should have already been fixed by SPARK-22062 and PR(https://github.com/apache/spark/pull/19476), what you need to do is to set a proper size for large blocks. > BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be > implemented using streaming > -- > > Key: SPARK-22579 > URL: https://issues.apache.org/jira/browse/SPARK-22579 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Spark Core >Affects Versions: 2.1.0 >Reporter: Eyal Farago > > when an RDD partition is cached on an executor bu the task requiring it is > running on another executor (process locality ANY), the cached partition is > fetched via BlockManager.getRemoteValues which delegates to > BlockManager.getRemoteBytes, both calls are blocking. > in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes > cluster, cached to disk. rough math shows that average partition size is > 700MB. > looking at spark UI it was obvious that tasks running with process locality > 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I > was able to capture thread dumps of executors executing remote tasks and got > this stake trace: > {quote}Thread ID Thread Name Thread StateThread Locks > 1521 Executor task launch worker-1000WAITING > Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978}) > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > scala.concurrent.Await$.result(package.scala:190) > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) > org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104) > org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582) > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550) > org.apache.spark.storage.BlockManager.get(BlockManager.scala:638) > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690) > org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote} > digging into the code showed that the block manager first fetches all bytes > (getRemoteBytes) and then wraps it with a deserialization stream, this has > several draw backs: > 1. blocking, requesting executor is blocked while the remote executor is > serving the block. > 2. potentially large memory footprint on requesting executor, in my use case > a 700mb of raw bytes stored in a ChunkedByteBuffer. > 3. inefficient, requesting side usually don't need all values at once as it > consumes the values via an iterator. > 4. potentially large memory footprint on serving executor, in case the block > is cached in deserialized form the serving executor has to serialize it into > a ChunkedByteBuffer (BlockManager.doGetLocalBytes). this is both memory & CPU > intensive, memory footprint can be reduced by using a limited buffer for > serialization 'spilling' to the response stream. > I suggest improving this either by implementing full streaming mechanism or > some kind of pagination mechanism, in addition the requesting executor should > be able to make progress with the data it already has, blocking only when > local buffer is exhausted and remote side didn't deliver the next chunk of > the stream (or page in case of pagination) yet. -- This message was sent by Atlassian JIRA (v6.4.1
[jira] [Commented] (SPARK-22579) BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be implemented using streaming
[ https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16262120#comment-16262120 ] Eyal Farago commented on SPARK-22579: - CC: [~hvanhovell] (we've discussed this privately), [~joshrosen], [~jerryshao2015] (git annotations points at you guys :-) ) > BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be > implemented using streaming > -- > > Key: SPARK-22579 > URL: https://issues.apache.org/jira/browse/SPARK-22579 > Project: Spark > Issue Type: Bug > Components: Block Manager, Spark Core >Affects Versions: 2.1.0 >Reporter: Eyal Farago > > when an RDD partition is cached on an executor bu the task requiring it is > running on another executor (process locality ANY), the cached partition is > fetched via BlockManager.getRemoteValues which delegates to > BlockManager.getRemoteBytes, both calls are blocking. > in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes > cluster, cached to disk. rough math shows that average partition size is > 700MB. > looking at spark UI it was obvious that tasks running with process locality > 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I > was able to capture thread dumps of executors executing remote tasks and got > this stake trace: > {quote}Thread ID Thread Name Thread StateThread Locks > 1521 Executor task launch worker-1000WAITING > Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978}) > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > scala.concurrent.Await$.result(package.scala:190) > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) > org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104) > org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582) > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550) > org.apache.spark.storage.BlockManager.get(BlockManager.scala:638) > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690) > org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote} > digging into the code showed that the block manager first fetches all bytes > (getRemoteBytes) and then wraps it with a deserialization stream, this has > several draw backs: > 1. blocking, requesting executor is blocked while the remote executor is > serving the block. > 2. potentially large memory footprint on requesting executor, in my use case > a 700mb of raw bytes stored in a ChunkedByteBuffer. > 3. inefficient, requesting side usually don't need all values at once as it > consumes the values via an iterator. > 4. potentially large memory footprint on serving executor, in case the block > is cached in deserialized form the serving executor has to serialize it into > a ChunkedByteBuffer (BlockManager.doGetLocalBytes). this is both memory & CPU > intensive, memory footprint can be reduced by using a limited buffer for > serialization 'spilling' to the response stream. > I suggest improving this either by implementing full streaming mechanism or > some kind of pagination mechanism, in addition the requesting executor should > be able to make progress with the data it already has, blocking only when > local buffer is exhausted and remote side didn't deliver the next chunk of > the stream (or page in case of pagination) yet. -- This message was sent by Atlassian JIRA (v6.4.14#64029) -