Correction. I have to use spark.yarn.am.memoryOverhead because I'm in Yarn
client mode. I set it to 13% of the executor memory.

Also quite helpful was increasing the total overall executor memory.

It will be great when tungsten enhancements make there way into RDDs.

Thanks!

Arun

On Thu, Jan 21, 2016 at 6:19 PM, Arun Luthra <arun.lut...@gmail.com> wrote:

> Two changes I made that appear to be keeping various errors at bay:
>
> 1) bumped up spark.yarn.executor.memoryOverhead to 2000 in the spirit of
> https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3ccacbyxkld8qasymj2ghk__vttzv4gejczcqfaw++s1d5te1d...@mail.gmail.com%3E
> . Even though I couldn't find the same error in my yarn log.
>
> 2) very important: I ran coalesce(1000) on the RDD at the start of the
> DAG. I know keeping the # of partitions lower is helpful, based on past
> experience with groupByKey. I haven't run this pipeline in a bit so that
> rule of thumb was not forefront in my mind.
>
> On Thu, Jan 21, 2016 at 5:35 PM, Arun Luthra <arun.lut...@gmail.com>
> wrote:
>
>> Looking into the yarn logs for a similar job where an executor was
>> associated with the same error, I find:
>>
>> ...
>> 16/01/22 01:17:18 INFO client.TransportClientFactory: Found inactive
>> connection to (SERVER), creating a new one.
>> 16/01/22 01:17:18 *ERROR shuffle.RetryingBlockFetcher: Exception while
>> beginning fetch of 46 outstanding blocks*
>> *java.io.IOException: Failed to connect to (SERVER)*
>>     at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
>>     at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>>     at
>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
>>     at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>>     at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>>     at
>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97)
>>     at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:152)
>>     at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:265)
>>     at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:112)
>>     at
>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:43)
>>     at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>     at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>     at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>     at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>     at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>     at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>     at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>     at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>     at java.lang.Thread.run(Thread.java:745)
>> *Caused by: java.net.ConnectException: Connection refused:* (SERVER)
>>     at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>     at
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>>     at
>> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
>>     at
>> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
>>     at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>>     at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>     at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>     at
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>     ... 1 more
>>
>> ...
>>
>>
>> Not sure if this reveals anything at all.
>>
>>
>> On Thu, Jan 21, 2016 at 2:58 PM, Holden Karau <hol...@pigscanfly.ca>
>> wrote:
>>
>>> My hunch is that the TaskCommitDenied is perhaps a red hearing and the
>>> problem is groupByKey - but I've also just seen a lot of people be bitten
>>> by it so that might not be issue. If you just do a count at the point of
>>> the groupByKey does the pipeline succeed?
>>>
>>> On Thu, Jan 21, 2016 at 2:56 PM, Arun Luthra <arun.lut...@gmail.com>
>>> wrote:
>>>
>>>> Usually the pipeline works, it just failed on this particular input
>>>> data. The other data it has run on is of similar size.
>>>>
>>>> Speculation is enabled.
>>>>
>>>> I'm using Spark 1.5.0.
>>>>
>>>> Here is the config. Many of these may not be needed anymore, they are
>>>> from trying to get things working in Spark 1.2 and 1.3.
>>>>
>>>>         .set("spark.storage.memoryFraction","0.2") // default 0.6
>>>>         .set("spark.shuffle.memoryFraction","0.2") // default 0.2
>>>>         .set("spark.shuffle.manager","SORT") // preferred setting for
>>>> optimized joins
>>>>         .set("spark.shuffle.consolidateFiles","true") // helpful for
>>>> "too many files open"
>>>>         .set("spark.mesos.coarse", "true") // helpful for
>>>> MapOutputTracker errors?
>>>>         .set("spark.akka.frameSize","300") // helpful when using
>>>> consildateFiles=true
>>>>         .set("spark.shuffle.compress","false") //
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>>         .set("spark.file.transferTo","false") //
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>>         .set("spark.core.connection.ack.wait.timeout","600") //
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>>         .set("spark.speculation","true")
>>>>         .set("spark.worker.timeout","600") //
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>>>         .set("spark.akka.timeout","300") //
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>>>         .set("spark.storage.blockManagerSlaveTimeoutMs","120000")
>>>>         .set("spark.driver.maxResultSize","2048") // in response to
>>>> error: Total size of serialized results of 39901 tasks (1024.0 MB) is
>>>> bigger than spark.driver.maxResultSize (1024.0 MB)
>>>>         .set("spark.serializer",
>>>> "org.apache.spark.serializer.KryoSerializer")
>>>>         .set("spark.kryo.registrator","------.MyRegistrator")
>>>>         .set("spark.kryo.registrationRequired", "true")
>>>>         .set("spark.yarn.executor.memoryOverhead","600")
>>>>
>>>> On Thu, Jan 21, 2016 at 2:50 PM, Josh Rosen <joshro...@databricks.com>
>>>> wrote:
>>>>
>>>>> Is speculation enabled? This TaskCommitDenied by driver error is
>>>>> thrown by writers who lost the race to commit an output partition. I don't
>>>>> think this had anything to do with key skew etc. Replacing the groupbykey
>>>>> with a count will mask this exception because the coordination does not 
>>>>> get
>>>>> triggered in non save/write operations.
>>>>>
>>>>> On Thu, Jan 21, 2016 at 2:46 PM Holden Karau <hol...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> Before we dig too far into this, the thing which most quickly jumps
>>>>>> out to me is groupByKey which could be causing some problems - whats the
>>>>>> distribution of keys like? Try replacing the groupByKey with a count() 
>>>>>> and
>>>>>> see if the pipeline works up until that stage. Also 1G of driver memory 
>>>>>> is
>>>>>> also a bit small for something with 90 executors...
>>>>>>
>>>>>> On Thu, Jan 21, 2016 at 2:40 PM, Arun Luthra <arun.lut...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 16/01/21 21:52:11 WARN NativeCodeLoader: Unable to load
>>>>>>> native-hadoop library for your platform... using builtin-java classes 
>>>>>>> where
>>>>>>> applicable
>>>>>>>
>>>>>>> 16/01/21 21:52:14 WARN MetricsSystem: Using default name
>>>>>>> DAGScheduler for source because spark.app.id is not set.
>>>>>>>
>>>>>>> spark.yarn.driver.memoryOverhead is set but does not apply in client
>>>>>>> mode.
>>>>>>>
>>>>>>> 16/01/21 21:52:16 WARN DomainSocketFactory: The short-circuit local
>>>>>>> reads feature cannot be used because libhadoop cannot be loaded.
>>>>>>>
>>>>>>> 16/01/21 21:52:52 WARN MemoryStore: Not enough space to cache
>>>>>>> broadcast_4 in memory! (computed 60.2 MB so far)
>>>>>>>
>>>>>>> 16/01/21 21:52:52 WARN MemoryStore: Persisting block broadcast_4 to
>>>>>>> disk instead.
>>>>>>>
>>>>>>> [Stage 1:====================================================>(2260
>>>>>>> + 7) / 2262]16/01/21 21:57:24 WARN TaskSetManager: Lost task 1440.1 in
>>>>>>> stage 1.0 (TID 4530, --): TaskCommitDenied (Driver denied task commit) 
>>>>>>> for
>>>>>>> job: 1, partition: 1440, attempt: 4530
>>>>>>>
>>>>>>> [Stage 1:====================================================>(2260
>>>>>>> + 6) / 2262]16/01/21 21:57:27 WARN TaskSetManager: Lost task 1488.1 in
>>>>>>> stage 1.0 (TID 4531, --): TaskCommitDenied (Driver denied task commit) 
>>>>>>> for
>>>>>>> job: 1, partition: 1488, attempt: 4531
>>>>>>>
>>>>>>> [Stage 1:====================================================>(2261
>>>>>>> + 4) / 2262]16/01/21 21:57:39 WARN TaskSetManager: Lost task 1982.1 in
>>>>>>> stage 1.0 (TID 4532, --): TaskCommitDenied (Driver denied task commit) 
>>>>>>> for
>>>>>>> job: 1, partition: 1982, attempt: 4532
>>>>>>>
>>>>>>> 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2214.0 in stage 1.0
>>>>>>> (TID 4482, --): TaskCommitDenied (Driver denied task commit) for job: 1,
>>>>>>> partition: 2214, attempt: 4482
>>>>>>>
>>>>>>> 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2168.0 in stage 1.0
>>>>>>> (TID 4436, --): TaskCommitDenied (Driver denied task commit) for job: 1,
>>>>>>> partition: 2168, attempt: 4436
>>>>>>>
>>>>>>>
>>>>>>> I am running with:
>>>>>>>
>>>>>>>     spark-submit --class "myclass" \
>>>>>>>
>>>>>>>       --num-executors 90 \
>>>>>>>
>>>>>>>       --driver-memory 1g \
>>>>>>>
>>>>>>>       --executor-memory 60g \
>>>>>>>
>>>>>>>       --executor-cores 8 \
>>>>>>>
>>>>>>>       --master yarn-client \
>>>>>>>
>>>>>>>       --conf "spark.executor.extraJavaOptions=-verbose:gc
>>>>>>> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
>>>>>>>
>>>>>>>       my.jar
>>>>>>>
>>>>>>>
>>>>>>> There are 2262 input files totaling just 98.6G. The DAG is basically
>>>>>>> textFile().map().filter().groupByKey().saveAsTextFile().
>>>>>>>
>>>>>>> On Thu, Jan 21, 2016 at 2:14 PM, Holden Karau <hol...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Can you post more of your log? How big are the partitions? What is
>>>>>>>> the action you are performing?
>>>>>>>>
>>>>>>>> On Thu, Jan 21, 2016 at 2:02 PM, Arun Luthra <arun.lut...@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Example warning:
>>>>>>>>>
>>>>>>>>> 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2168.0 in stage
>>>>>>>>> 1.0 (TID 4436, XXXXXXX): TaskCommitDenied (Driver denied task commit) 
>>>>>>>>> for
>>>>>>>>> job: 1, partition: 2168, attempt: 4436
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Is there a solution for this? Increase driver memory? I'm using
>>>>>>>>> just 1G driver memory but ideally I won't have to increase it.
>>>>>>>>>
>>>>>>>>> The RDD being processed has 2262 partitions.
>>>>>>>>>
>>>>>>>>> Arun
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Cell : 425-233-8271
>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Cell : 425-233-8271
>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>>
>

Reply via email to