Two changes I made that appear to be keeping various errors at bay:

1) bumped up spark.yarn.executor.memoryOverhead to 2000 in the spirit of
https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3ccacbyxkld8qasymj2ghk__vttzv4gejczcqfaw++s1d5te1d...@mail.gmail.com%3E
. Even though I couldn't find the same error in my yarn log.

2) very important: I ran coalesce(1000) on the RDD at the start of the DAG.
I know keeping the # of partitions lower is helpful, based on past
experience with groupByKey. I haven't run this pipeline in a bit so that
rule of thumb was not forefront in my mind.

On Thu, Jan 21, 2016 at 5:35 PM, Arun Luthra <arun.lut...@gmail.com> wrote:

> Looking into the yarn logs for a similar job where an executor was
> associated with the same error, I find:
>
> ...
> 16/01/22 01:17:18 INFO client.TransportClientFactory: Found inactive
> connection to (SERVER), creating a new one.
> 16/01/22 01:17:18 *ERROR shuffle.RetryingBlockFetcher: Exception while
> beginning fetch of 46 outstanding blocks*
> *java.io.IOException: Failed to connect to (SERVER)*
>     at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
>     at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>     at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
>     at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>     at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>     at
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97)
>     at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:152)
>     at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:265)
>     at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:112)
>     at
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:43)
>     at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>     at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>     at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>     at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>     at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>     at org.apache.spark.scheduler.Task.run(Task.scala:88)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> *Caused by: java.net.ConnectException: Connection refused:* (SERVER)
>     at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>     at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>     at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
>     at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
>     at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>     at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>     at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>     at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>     ... 1 more
>
> ...
>
>
> Not sure if this reveals anything at all.
>
>
> On Thu, Jan 21, 2016 at 2:58 PM, Holden Karau <hol...@pigscanfly.ca>
> wrote:
>
>> My hunch is that the TaskCommitDenied is perhaps a red hearing and the
>> problem is groupByKey - but I've also just seen a lot of people be bitten
>> by it so that might not be issue. If you just do a count at the point of
>> the groupByKey does the pipeline succeed?
>>
>> On Thu, Jan 21, 2016 at 2:56 PM, Arun Luthra <arun.lut...@gmail.com>
>> wrote:
>>
>>> Usually the pipeline works, it just failed on this particular input
>>> data. The other data it has run on is of similar size.
>>>
>>> Speculation is enabled.
>>>
>>> I'm using Spark 1.5.0.
>>>
>>> Here is the config. Many of these may not be needed anymore, they are
>>> from trying to get things working in Spark 1.2 and 1.3.
>>>
>>>         .set("spark.storage.memoryFraction","0.2") // default 0.6
>>>         .set("spark.shuffle.memoryFraction","0.2") // default 0.2
>>>         .set("spark.shuffle.manager","SORT") // preferred setting for
>>> optimized joins
>>>         .set("spark.shuffle.consolidateFiles","true") // helpful for
>>> "too many files open"
>>>         .set("spark.mesos.coarse", "true") // helpful for
>>> MapOutputTracker errors?
>>>         .set("spark.akka.frameSize","300") // helpful when using
>>> consildateFiles=true
>>>         .set("spark.shuffle.compress","false") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>         .set("spark.file.transferTo","false") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>         .set("spark.core.connection.ack.wait.timeout","600") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>         .set("spark.speculation","true")
>>>         .set("spark.worker.timeout","600") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>>         .set("spark.akka.timeout","300") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>>         .set("spark.storage.blockManagerSlaveTimeoutMs","120000")
>>>         .set("spark.driver.maxResultSize","2048") // in response to
>>> error: Total size of serialized results of 39901 tasks (1024.0 MB) is
>>> bigger than spark.driver.maxResultSize (1024.0 MB)
>>>         .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>         .set("spark.kryo.registrator","------.MyRegistrator")
>>>         .set("spark.kryo.registrationRequired", "true")
>>>         .set("spark.yarn.executor.memoryOverhead","600")
>>>
>>> On Thu, Jan 21, 2016 at 2:50 PM, Josh Rosen <joshro...@databricks.com>
>>> wrote:
>>>
>>>> Is speculation enabled? This TaskCommitDenied by driver error is thrown
>>>> by writers who lost the race to commit an output partition. I don't think
>>>> this had anything to do with key skew etc. Replacing the groupbykey with a
>>>> count will mask this exception because the coordination does not get
>>>> triggered in non save/write operations.
>>>>
>>>> On Thu, Jan 21, 2016 at 2:46 PM Holden Karau <hol...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>> Before we dig too far into this, the thing which most quickly jumps
>>>>> out to me is groupByKey which could be causing some problems - whats the
>>>>> distribution of keys like? Try replacing the groupByKey with a count() and
>>>>> see if the pipeline works up until that stage. Also 1G of driver memory is
>>>>> also a bit small for something with 90 executors...
>>>>>
>>>>> On Thu, Jan 21, 2016 at 2:40 PM, Arun Luthra <arun.lut...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> 16/01/21 21:52:11 WARN NativeCodeLoader: Unable to load native-hadoop
>>>>>> library for your platform... using builtin-java classes where applicable
>>>>>>
>>>>>> 16/01/21 21:52:14 WARN MetricsSystem: Using default name DAGScheduler
>>>>>> for source because spark.app.id is not set.
>>>>>>
>>>>>> spark.yarn.driver.memoryOverhead is set but does not apply in client
>>>>>> mode.
>>>>>>
>>>>>> 16/01/21 21:52:16 WARN DomainSocketFactory: The short-circuit local
>>>>>> reads feature cannot be used because libhadoop cannot be loaded.
>>>>>>
>>>>>> 16/01/21 21:52:52 WARN MemoryStore: Not enough space to cache
>>>>>> broadcast_4 in memory! (computed 60.2 MB so far)
>>>>>>
>>>>>> 16/01/21 21:52:52 WARN MemoryStore: Persisting block broadcast_4 to
>>>>>> disk instead.
>>>>>>
>>>>>> [Stage 1:====================================================>(2260 +
>>>>>> 7) / 2262]16/01/21 21:57:24 WARN TaskSetManager: Lost task 1440.1 in 
>>>>>> stage
>>>>>> 1.0 (TID 4530, --): TaskCommitDenied (Driver denied task commit) for job:
>>>>>> 1, partition: 1440, attempt: 4530
>>>>>>
>>>>>> [Stage 1:====================================================>(2260 +
>>>>>> 6) / 2262]16/01/21 21:57:27 WARN TaskSetManager: Lost task 1488.1 in 
>>>>>> stage
>>>>>> 1.0 (TID 4531, --): TaskCommitDenied (Driver denied task commit) for job:
>>>>>> 1, partition: 1488, attempt: 4531
>>>>>>
>>>>>> [Stage 1:====================================================>(2261 +
>>>>>> 4) / 2262]16/01/21 21:57:39 WARN TaskSetManager: Lost task 1982.1 in 
>>>>>> stage
>>>>>> 1.0 (TID 4532, --): TaskCommitDenied (Driver denied task commit) for job:
>>>>>> 1, partition: 1982, attempt: 4532
>>>>>>
>>>>>> 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2214.0 in stage 1.0
>>>>>> (TID 4482, --): TaskCommitDenied (Driver denied task commit) for job: 1,
>>>>>> partition: 2214, attempt: 4482
>>>>>>
>>>>>> 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2168.0 in stage 1.0
>>>>>> (TID 4436, --): TaskCommitDenied (Driver denied task commit) for job: 1,
>>>>>> partition: 2168, attempt: 4436
>>>>>>
>>>>>>
>>>>>> I am running with:
>>>>>>
>>>>>>     spark-submit --class "myclass" \
>>>>>>
>>>>>>       --num-executors 90 \
>>>>>>
>>>>>>       --driver-memory 1g \
>>>>>>
>>>>>>       --executor-memory 60g \
>>>>>>
>>>>>>       --executor-cores 8 \
>>>>>>
>>>>>>       --master yarn-client \
>>>>>>
>>>>>>       --conf "spark.executor.extraJavaOptions=-verbose:gc
>>>>>> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
>>>>>>
>>>>>>       my.jar
>>>>>>
>>>>>>
>>>>>> There are 2262 input files totaling just 98.6G. The DAG is basically
>>>>>> textFile().map().filter().groupByKey().saveAsTextFile().
>>>>>>
>>>>>> On Thu, Jan 21, 2016 at 2:14 PM, Holden Karau <hol...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>> Can you post more of your log? How big are the partitions? What is
>>>>>>> the action you are performing?
>>>>>>>
>>>>>>> On Thu, Jan 21, 2016 at 2:02 PM, Arun Luthra <arun.lut...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Example warning:
>>>>>>>>
>>>>>>>> 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2168.0 in stage
>>>>>>>> 1.0 (TID 4436, XXXXXXX): TaskCommitDenied (Driver denied task commit) 
>>>>>>>> for
>>>>>>>> job: 1, partition: 2168, attempt: 4436
>>>>>>>>
>>>>>>>>
>>>>>>>> Is there a solution for this? Increase driver memory? I'm using
>>>>>>>> just 1G driver memory but ideally I won't have to increase it.
>>>>>>>>
>>>>>>>> The RDD being processed has 2262 partitions.
>>>>>>>>
>>>>>>>> Arun
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Cell : 425-233-8271
>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Cell : 425-233-8271
>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>
>>>>
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>
>

Reply via email to