[ https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265045#comment-16265045 ]
Eyal Farago commented on SPARK-22579: ------------------------------------- [~jerryshao], SPARK-22062 seems to solve the memory footprint issue, will take a deeper look at it (and pray fro mapR to upgrade their spark version). [~srowen], not sure what do you mean by reading the data twice, getRemoteValues returns an iterator, if called twice on the same block it will fetch the data twice, even with current implementation. and yes I agree I need to read the data anyway, but I don't need all of it at once, executors consume partitions via iterators, this allows for streaming/pagination behind the scenes which is currently not implemented by the block manager. btw, this kind of streaming/pagination can play nicely with SPARK-22062 and any potential caching mechanism that night be introduced into the block manager in the future. I'd also like to stress out that what we've experienced was lengthy tasks, given that these tasks were 'dead in the water' waiting for the transfer to complete is a bit unfortunate as they could have make progress with the part of the transfer that already completed. I believe streaming/pagination or some other reactive approach can greatly improve this behavior. > BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be > implemented using streaming > -------------------------------------------------------------------------------------------------- > > Key: SPARK-22579 > URL: https://issues.apache.org/jira/browse/SPARK-22579 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Spark Core > Affects Versions: 2.1.0 > Reporter: Eyal Farago > > when an RDD partition is cached on an executor bu the task requiring it is > running on another executor (process locality ANY), the cached partition is > fetched via BlockManager.getRemoteValues which delegates to > BlockManager.getRemoteBytes, both calls are blocking. > in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes > cluster, cached to disk. rough math shows that average partition size is > 700MB. > looking at spark UI it was obvious that tasks running with process locality > 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I > was able to capture thread dumps of executors executing remote tasks and got > this stake trace: > {quote}Thread ID Thread Name Thread State Thread Locks > 1521 Executor task launch worker-1000 WAITING > Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978}) > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > scala.concurrent.Await$.result(package.scala:190) > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) > org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104) > org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582) > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550) > org.apache.spark.storage.BlockManager.get(BlockManager.scala:638) > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690) > org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote} > digging into the code showed that the block manager first fetches all bytes > (getRemoteBytes) and then wraps it with a deserialization stream, this has > several draw backs: > 1. blocking, requesting executor is blocked while the remote executor is > serving the block. > 2. potentially large memory footprint on requesting executor, in my use case > a 700mb of raw bytes stored in a ChunkedByteBuffer. > 3. inefficient, requesting side usually don't need all values at once as it > consumes the values via an iterator. > 4. potentially large memory footprint on serving executor, in case the block > is cached in deserialized form the serving executor has to serialize it into > a ChunkedByteBuffer (BlockManager.doGetLocalBytes). this is both memory & CPU > intensive, memory footprint can be reduced by using a limited buffer for > serialization 'spilling' to the response stream. > I suggest improving this either by implementing full streaming mechanism or > some kind of pagination mechanism, in addition the requesting executor should > be able to make progress with the data it already has, blocking only when > local buffer is exhausted and remote side didn't deliver the next chunk of > the stream (or page in case of pagination) yet. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org