Re: Executor memory requirement for reduceByKey
Even though it does not sound intuitive, reduce by key expects all values for a particular key for a partition to be loaded into memory. So once you increase the partitions you can run the jobs.
Re: Executor memory requirement for reduceByKey
Ok, so that worked flawlessly after I upped the number of partitions to 400 from 40. Thanks! On Fri, May 13, 2016 at 7:28 PM, Sung Hwan Chung wrote: > I'll try that, as of now I have a small number of partitions in the order > of 20~40. > > It would be great if there's some documentation on the memory requirement > wrt the number of keys and the number of partitions per executor (i.e., the > Spark's internal memory requirement outside of the user space). > > Otherwise, it's like shooting in the dark. > > On Fri, May 13, 2016 at 7:20 PM, Ted Yu wrote: > >> Have you taken a look at SPARK-11293 ? >> >> Consider using repartition to increase the number of partitions. >> >> FYI >> >> On Fri, May 13, 2016 at 12:14 PM, Sung Hwan Chung < >> coded...@cs.stanford.edu> wrote: >> >>> Hello, >>> >>> I'm using Spark version 1.6.0 and have trouble with memory when trying >>> to do reducebykey on a dataset with as many as 75 million keys. I.e. I get >>> the following exception when I run the task. >>> >>> There are 20 workers in the cluster. It is running under the standalone >>> mode with 12 GB assigned per executor and 4 cores per worker. The >>> spark.memory.fraction is set to 0.5 and I'm not using any caching. >>> >>> What might be the problem here? Since I'm using the version 1.6.0, this >>> doesn't seem to be related to SPARK-12155. This problem always happens >>> during the shuffle read phase. >>> >>> Is there a minimum amount of memory required for executor >>> (spark.memory.fraction) for shuffle read? >>> >>> java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0 >>> at >>> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91) >>> at >>> org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735) >>> at >>> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197) >>> at >>> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212) >>> at >>> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103) >>> at >>> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483) >>> at >>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95) >>> at >>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >> >
Re: Executor memory requirement for reduceByKey
I'll try that, as of now I have a small number of partitions in the order of 20~40. It would be great if there's some documentation on the memory requirement wrt the number of keys and the number of partitions per executor (i.e., the Spark's internal memory requirement outside of the user space). Otherwise, it's like shooting in the dark. On Fri, May 13, 2016 at 7:20 PM, Ted Yu wrote: > Have you taken a look at SPARK-11293 ? > > Consider using repartition to increase the number of partitions. > > FYI > > On Fri, May 13, 2016 at 12:14 PM, Sung Hwan Chung < > coded...@cs.stanford.edu> wrote: > >> Hello, >> >> I'm using Spark version 1.6.0 and have trouble with memory when trying to >> do reducebykey on a dataset with as many as 75 million keys. I.e. I get the >> following exception when I run the task. >> >> There are 20 workers in the cluster. It is running under the standalone >> mode with 12 GB assigned per executor and 4 cores per worker. The >> spark.memory.fraction is set to 0.5 and I'm not using any caching. >> >> What might be the problem here? Since I'm using the version 1.6.0, this >> doesn't seem to be related to SPARK-12155. This problem always happens >> during the shuffle read phase. >> >> Is there a minimum amount of memory required for executor >> (spark.memory.fraction) for shuffle read? >> >> java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0 >> at >> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91) >> at >> org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735) >> at >> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197) >> at >> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212) >> at >> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103) >> at >> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483) >> at >> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95) >> at >> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> >> >
Re: Executor memory requirement for reduceByKey
Have you taken a look at SPARK-11293 ? Consider using repartition to increase the number of partitions. FYI On Fri, May 13, 2016 at 12:14 PM, Sung Hwan Chung wrote: > Hello, > > I'm using Spark version 1.6.0 and have trouble with memory when trying to > do reducebykey on a dataset with as many as 75 million keys. I.e. I get the > following exception when I run the task. > > There are 20 workers in the cluster. It is running under the standalone > mode with 12 GB assigned per executor and 4 cores per worker. The > spark.memory.fraction is set to 0.5 and I'm not using any caching. > > What might be the problem here? Since I'm using the version 1.6.0, this > doesn't seem to be related to SPARK-12155. This problem always happens > during the shuffle read phase. > > Is there a minimum amount of memory required for executor > (spark.memory.fraction) for shuffle read? > > java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0 > at > org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91) > at > org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735) > at > org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197) > at > org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212) > at > org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > >
Re: Executor memory allocations
It would be the "40%", although it's probably better to think of it as shuffle vs. data cache and the remainder goes to tasks. As the comments for the shuffle memory fraction configuration clarify that it will be taking memory at the expense of the storage/data cache fraction: spark.shuffle.memoryFraction0.2Fraction of Java heap to use for aggregation and cogroups during shuffles, ifspark.shuffle.spill is true. At any given time, the collective size of all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will begin to spill to disk. If spills are often, consider increasing this value at the expense of spark.storage.memoryFraction. On Wed, Jun 17, 2015 at 6:02 PM, Corey Nolet wrote: > So I've seen in the documentation that (after the overhead memory is > subtracted), the memory allocations of each executor are as follows (assume > default settings): > > 60% for cache > 40% for tasks to process data > > > Reading about how Spark implements shuffling, I've also seen it say "20% > of executor memory is utilized for shuffles" Does this 20% cut into the 40% > for tasks to process data or the 60% for the data cache? >
Re: Executor memory in web UI
This is the fraction available for caching, which is 60% * 90% * total by default. On Fri, Apr 17, 2015 at 11:30 AM, podioss wrote: > Hi, > i am a bit confused with the executor-memory option. I am running > applications with Standalone cluster manager with 8 workers with 4gb memory > and 2 cores each and when i submit my application with spark-submit i use > --executor-memory 1g. > In the web ui in the completed applications table i see that my application > was correctly submitted with 1g memory per node as expected but when i check > the executors tab of the application i see that every executor launched with > 530mb which is about half the memory of the configuration. > I would really appreciate an explanation if anyone knew. > > Thanks > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Executor-memory-in-web-UI-tp22538.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Executor memory
Thanks for the clarifications. I misunderstood what the number on UI meant. On Mon, Dec 15, 2014 at 7:00 PM, Sean Owen wrote: > I believe this corresponds to the 0.6 of the whole heap that is > allocated for caching partitions. See spark.storage.memoryFraction on > http://spark.apache.org/docs/latest/configuration.html 0.6 of 4GB is > about 2.3GB. > > The note there is important, that you probably don't want to exceed > the JVM old generation size with this parameter. > > On Tue, Dec 16, 2014 at 12:53 AM, Pala M Muthaia > wrote: > > Hi, > > > > Running Spark 1.0.1 on Yarn 2.5 > > > > When i specify --executor-memory 4g, the spark UI shows each executor as > > having only 2.3 GB, and similarly for 8g, only 4.6 GB. > > > > I am guessing that the executor memory corresponds to the container > memory, > > and that the task JVM gets only a percentage of the container total > memory. > > Is there a yarn or spark parameter to tune this so that my task JVM > actually > > gets 6GB out of the 8GB for example? > > > > > > Thanks. > > > > >
Re: Executor memory
I believe this corresponds to the 0.6 of the whole heap that is allocated for caching partitions. See spark.storage.memoryFraction on http://spark.apache.org/docs/latest/configuration.html 0.6 of 4GB is about 2.3GB. The note there is important, that you probably don't want to exceed the JVM old generation size with this parameter. On Tue, Dec 16, 2014 at 12:53 AM, Pala M Muthaia wrote: > Hi, > > Running Spark 1.0.1 on Yarn 2.5 > > When i specify --executor-memory 4g, the spark UI shows each executor as > having only 2.3 GB, and similarly for 8g, only 4.6 GB. > > I am guessing that the executor memory corresponds to the container memory, > and that the task JVM gets only a percentage of the container total memory. > Is there a yarn or spark parameter to tune this so that my task JVM actually > gets 6GB out of the 8GB for example? > > > Thanks. > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Executor memory
Hi Pala, Spark executors only reserve spark.storage.memoryFraction (default 0.6) of their spark.executor.memory for caching RDDs. The spark UI displays this fraction. spark.executor.memory controls the executor heap size. spark.yarn.executor.memoryOverhead controls the extra that's tacked on for the container memory. -Sandy > On Dec 15, 2014, at 7:53 PM, Pala M Muthaia > wrote: > > Hi, > > Running Spark 1.0.1 on Yarn 2.5 > > When i specify --executor-memory 4g, the spark UI shows each executor as > having only 2.3 GB, and similarly for 8g, only 4.6 GB. > > I am guessing that the executor memory corresponds to the container memory, > and that the task JVM gets only a percentage of the container total memory. > Is there a yarn or spark parameter to tune this so that my task JVM actually > gets 6GB out of the 8GB for example? > > > Thanks. > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Executor Memory, Task hangs
Thanks Akhil and Sean. All three workers are doing the work and tasks stall simultaneously on all three. I think Sean hit on my issue. I've been under the impression that each application has one executor process per worker machine (not per core per machine). Is that incorrect? If an executor is running on each core that would totally make sense why things are stalling. Akhil, I'm running 8/cores per machine, and tasks are stalling on all three machines simultaneously. Also, no other Spark contexts are running, so I didn't think this was an issue of spark.executor.memory vs SPARK_WORKER_MEMORY (which is default currently). App UI ID NameCores Memory per Node Submitted Time UserState Duration app-20140819101355-0001<http://tc1-master:8080/app?appId=app-20140819101355-0001> Spark shell<http://tc1-master:4040/>24 2.0 GB Worker UI ExecutorID Cores State Memory Job Details Logs 2 8 RUNNING 2.0 GB Tasks when it stalls: 129 129 SUCCESS NODE_LOCAL worker018/19/14 10:16 0.1 s 1 ms 130 130 RUNNING NODE_LOCAL worker038/19/14 10:16 5 s 131 131 RUNNING NODE_LOCAL worker028/19/14 10:16 5 s 132 132 SUCCESS NODE_LOCAL worker028/19/14 10:16 0.1 s 1 ms 133 133 RUNNING NODE_LOCAL worker018/19/14 10:16 5 s 134 134 RUNNING NODE_LOCAL worker028/19/14 10:16 5 s 135 135 RUNNING NODE_LOCAL worker038/19/14 10:16 5 s 136 136 RUNNING NODE_LOCAL worker018/19/14 10:16 5 s 137 137 RUNNING NODE_LOCAL worker018/19/14 10:16 5 s 138 138 RUNNING NODE_LOCAL worker038/19/14 10:16 5 s 139 139 RUNNING NODE_LOCAL worker028/19/14 10:16 5 s 140 140 RUNNING NODE_LOCAL worker018/19/14 10:16 5 s 141 141 RUNNING NODE_LOCAL worker028/19/14 10:16 5 s 142 142 RUNNING NODE_LOCAL worker018/19/14 10:16 5 s 143 143 RUNNING NODE_LOCAL worker018/19/14 10:16 5 s 144 144 RUNNING NODE_LOCAL worker038/19/14 10:16 5 s 145 145 RUNNING NODE_LOCAL worker028/19/14 10:16 5 s From: Sean Owen mailto:so...@cloudera.com>> Date: Tuesday, August 19, 2014 at 9:23 AM To: Capital One mailto:benjamin.la...@capitalone.com>> Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" mailto:user@spark.apache.org>> Subject: Re: Executor Memory, Task hangs Given a fixed amount of memory allocated to your workers, more memory per executor means fewer executors can execute in parallel. This means it takes longer to finish all of the tasks. Set high enough, and your executors can find no worker with enough memory and so they all are stuck waiting for resources. The reason the tasks seem to take longer is really that they spend time waiting for an executor rather than spend more time running. That's my first guess. If you want Spark to use more memory on your machines, give workers more memory. It sounds like there is no value in increasing executor memory as it only means you are underutilizing the CPU of your cluster by not running as many tasks in parallel as would be optimal. Hi all, I'm doing some testing on a small dataset (HadoopRDD, 2GB, ~10M records), with a cluster of 3 nodes Simple calculations like count take approximately 5s when using the default value of executor.memory (512MB). When I scale this up to 2GB, several Tasks take 1m or more (while most still are <1s), and tasks hang indefinitely if I set it to 4GB or higher. While these worker nodes aren't very powerful, they seem to have enough RAM to handle this: Running 'free –m' shows I have >7GB free on each worker. Any tips on why these jobs would hang when given more available RAM? Thanks Ben The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the re
Re: Executor Memory, Task hangs
Given a fixed amount of memory allocated to your workers, more memory per executor means fewer executors can execute in parallel. This means it takes longer to finish all of the tasks. Set high enough, and your executors can find no worker with enough memory and so they all are stuck waiting for resources. The reason the tasks seem to take longer is really that they spend time waiting for an executor rather than spend more time running. That's my first guess. If you want Spark to use more memory on your machines, give workers more memory. It sounds like there is no value in increasing executor memory as it only means you are underutilizing the CPU of your cluster by not running as many tasks in parallel as would be optimal. Hi all, I'm doing some testing on a small dataset (HadoopRDD, 2GB, ~10M records), with a cluster of 3 nodes Simple calculations like count take approximately 5s when using the default value of executor.memory (512MB). When I scale this up to 2GB, several Tasks take 1m or more (while most still are <1s), and tasks hang indefinitely if I set it to 4GB or higher. While these worker nodes aren't very powerful, they seem to have enough RAM to handle this: Running 'free –m' shows I have >7GB free on each worker. Any tips on why these jobs would hang when given more available RAM? Thanks Ben -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Executor Memory, Task hangs
Looks like 1 worker is doing the job. Can you repartition the RDD? Also what is the number of cores that you allocated? Things like this, you can easily identify by looking at the workers webUI (default worker:8081) Thanks Best Regards On Tue, Aug 19, 2014 at 6:35 PM, Laird, Benjamin < benjamin.la...@capitalone.com> wrote: > Hi all, > > I'm doing some testing on a small dataset (HadoopRDD, 2GB, ~10M records), > with a cluster of 3 nodes > > Simple calculations like count take approximately 5s when using the > default value of executor.memory (512MB). When I scale this up to 2GB, > several Tasks take 1m or more (while most still are <1s), and tasks hang > indefinitely if I set it to 4GB or higher. > > While these worker nodes aren't very powerful, they seem to have enough > RAM to handle this: > > Running 'free –m' shows I have >7GB free on each worker. > > Any tips on why these jobs would hang when given more available RAM? > > Thanks > Ben > > -- > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the > intended recipient, you are hereby notified that any review, > retransmission, dissemination, distribution, copying or other use of, or > taking of any action in reliance upon this information is strictly > prohibited. If you have received this communication in error, please > contact the sender and delete the material from your computer. >