>From the symptoms you mentioned that one task's shuffle write is much
larger than all the other task, it is quite similar to normal data skew
behavior, I just give some advice based on your descriptions, I think you
need to detect whether data is actually skewed or not.

The shuffle will put data with same partitioner strategy (default is hash
partitioner) into one task, so the same key data will be put into the same
task, but one task not just has only one key.

2015-05-04 18:04 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>:

> Attached image shows the Spark UI for the job.
>
>
>
>
>
> On Mon, May 4, 2015 at 3:28 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:
>
>> Four tasks are now failing with
>>
>> IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch TimeDurationGC
>> TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)
>> Errors  0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44   /
>>   ExecutorLostFailure (executor 114 lost)  1007 4973 1 FAILED
>> PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14   /   FetchFailed(null,
>> shuffleId=1, mapId=-1, reduceId=1007, message= +details
>>
>> FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message=
>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
>> location for shuffle 1
>>      at 
>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
>>      at 
>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
>>      at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>      at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>      at 
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>      at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>      at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>      at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>      at 
>> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381)
>>      at 
>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177)
>>      at 
>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>>      at 
>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
>>      at 
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
>>      at 
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127)
>>      at 
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>      at 
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>      at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>      at 
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>      at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>      at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>      at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>      at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>      at java.lang.Thread.run(Thread.java:745)
>>
>> )
>>
>>  371 4972 1 FAILED PROCESS_LOCAL 563 / host3 2015/05/04 02:13:14   /   
>> FetchFailed(null,
>> shuffleId=1, mapId=-1, reduceId=371, message= +details
>>
>> FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message=
>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
>> location for shuffle 1
>>      at 
>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
>>      at 
>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
>>      at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>      at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>      at 
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>      at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>      at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>      at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>      at 
>> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381)
>>      at 
>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177)
>>      at 
>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>>      at 
>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
>>      at 
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
>>      at 
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127)
>>      at 
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>      at 
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>      at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>      at 
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>      at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>      at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>      at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>      at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>      at java.lang.Thread.run(Thread.java:745)
>>
>> )
>>
>>  0 4971 1 FAILED PROCESS_LOCAL 45 / host4 2015/05/04 02:13:14 0 ms  2 s
>> 374.3 MB / 19281537  0.0 B 0.0 B
>>
>>
>> On Mon, May 4, 2015 at 3:15 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>> wrote:
>>
>>> One dataset (RDD Pair)
>>>
>>> val lstgItem = listings.map { lstg => (lstg.getItemId().toLong, lstg) }
>>>
>>> Second Dataset (RDDPair)
>>>
>>> val viEvents = viEventsRaw.map { vi => (vi.get(14).asInstanceOf[Long],
>>> vi) }
>>>
>>> As i want to join based on item Id that is used as first element in the
>>> tuple in both cases and i think thats what is shuffle key.
>>>
>>> listings ==> Data set contains all the unique item ids that are ever
>>> listed on the ecommerce site.
>>>
>>> viEvents ===> List of items viewed by user in last day. This will always
>>> be a subset of the total set.
>>>
>>> So i do not understand what is data skewness. When my long running task
>>> is working on 1591.2 MB / 98,931,767 does that mean 98 million reocrds
>>> contain all the same item ID ? How can millions of user look at the same
>>> item in last day ?
>>>
>>> Or does this dataset contain records across item ids ?
>>>
>>>
>>> Regards,
>>>
>>> Deepak
>>>
>>>
>>>
>>>
>>> On Mon, May 4, 2015 at 3:08 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>> wrote:
>>>
>>>> Shuffle key is depending on your implementation, I'm not sure if you
>>>> are familiar with MapReduce, the mapper output is a key-value pair, where
>>>> the key is the shuffle key for shuffling, Spark is also the same.
>>>>
>>>> 2015-05-04 17:31 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>:
>>>>
>>>>> Hello Shao,
>>>>> Can you talk more about shuffle key or point me to APIs that allow me
>>>>> to change shuffle key. I will try with different keys and see the
>>>>> performance.
>>>>>
>>>>> What is the shuffle key by default ?
>>>>>
>>>>> On Mon, May 4, 2015 at 2:37 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> IMHO If your data or your algorithm is prone to data skew, I think
>>>>>> you have to fix this from application level, Spark itself cannot overcome
>>>>>> this problem (if one key has large amount of values), you may change your
>>>>>> algorithm to choose another shuffle key, somethings like this to avoid
>>>>>> shuffle on skewed keys.
>>>>>>
>>>>>> 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>:
>>>>>>
>>>>>>> Hello Dean & Others,
>>>>>>> Thanks for the response.
>>>>>>>
>>>>>>> I tried with 100,200, 400, 600 and 1200 repartitions with
>>>>>>> 100,200,400 and 800 executors. Each time all the tasks of join complete 
>>>>>>> in
>>>>>>> less than a minute except one and that one tasks runs forever. I have a
>>>>>>> huge cluster at my disposal.
>>>>>>>
>>>>>>> The data for each of 1199 tasks is around 40MB/30k records and for 1
>>>>>>> never ending task is 1.5G/98million records. I see that there is data 
>>>>>>> skew
>>>>>>> among tasks. I had observed this a week earlier and i have no clue on 
>>>>>>> how
>>>>>>> to fix it and when someone suggested that repartition might make things
>>>>>>> more parallel, but the problem is still persistent.
>>>>>>>
>>>>>>> Please suggest on how to get the task to complete.
>>>>>>> All i want to do is join two datasets. (dataset1 is in sequence file
>>>>>>> and dataset2 is in avro format).
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Ex:
>>>>>>> Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch
>>>>>>> TimeDurationGC TimeShuffle Read Size / RecordsShuffle Spill 
>>>>>>> (Memory)Shuffle
>>>>>>> Spill (Disk)Errors  0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 
>>>>>>> 2015/05/04
>>>>>>> 01:27:44 7.3 min  19 s  1591.2 MB / 98931767  0.0 B 0.0 B   1 3772 0
>>>>>>> SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s  2 s
>>>>>>> 39.2 MB / 29754  0.0 B 0.0 B   2 3773 0 SUCCESS PROCESS_LOCAL 283
>>>>>>> / host3 2015/05/04 01:27:44 26 s  2 s  39.0 MB / 29646  0.0 B 0.0 B
>>>>>>> 5 3776 0 SUCCESS PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s
>>>>>>> 3 s  38.8 MB / 29512  0.0 B 0.0 B   4 3775 0 SUCCESS PROCESS_LOCAL 203
>>>>>>> / host5 2015/05/04 01:27:44 41 s  3 s  38.4 MB / 29169  0.0 B 0.0 B
>>>>>>> 3 3774 0 SUCCESS PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s
>>>>>>> 2 s  38.5 MB / 29258  0.0 B 0.0 B   8 3779 0 SUCCESS PROCESS_LOCAL 309
>>>>>>> / host7 2015/05/04 01:27:44 31 s  4 s  39.5 MB / 30008  0.0 B 0.0 B
>>>>>>>
>>>>>>> There are 1200 tasks in total.
>>>>>>>
>>>>>>>
>>>>>>> On Sun, May 3, 2015 at 9:53 PM, Dean Wampler <deanwamp...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I don't know the full context of what you're doing, but
>>>>>>>> serialization errors usually mean you're attempting to serialize 
>>>>>>>> something
>>>>>>>> that can't be serialized, like the SparkContext. Kryo won't help there.
>>>>>>>>
>>>>>>>> The arguments to spark-submit you posted previously look good:
>>>>>>>>
>>>>>>>> 2)  --num-executors 96 --driver-memory 12g --driver-java-options
>>>>>>>> "-XX:MaxPermSize=10G" --executor-memory 12g --executor-cores 4
>>>>>>>>
>>>>>>>> I suspect you aren't getting the parallelism you need. For
>>>>>>>> partitioning, if your data is in HDFS and your block size is 128MB, 
>>>>>>>> then
>>>>>>>> you'll get ~195 partitions anyway. If it takes 7 hours to do a join 
>>>>>>>> over
>>>>>>>> 25GB of data, you have some other serious bottleneck. You should 
>>>>>>>> examine
>>>>>>>> the web console and the logs to determine where all the time is going.
>>>>>>>> Questions you might pursue:
>>>>>>>>
>>>>>>>>    - How long does each task take to complete?
>>>>>>>>    - How many of those 195 partitions/tasks are processed at the
>>>>>>>>    same time? That is, how many "slots" are available?  Maybe you need 
>>>>>>>> more
>>>>>>>>    nodes if the number of slots is too low. Based on your command 
>>>>>>>> arguments,
>>>>>>>>    you should be able to process 1/2 of them at a time, unless the 
>>>>>>>> cluster is
>>>>>>>>    busy.
>>>>>>>>    - Is the cluster swamped with other work?
>>>>>>>>    - How much data does each task process? Is the data roughly the
>>>>>>>>    same from one task to the next? If not, then you might have serious 
>>>>>>>> key
>>>>>>>>    skew?
>>>>>>>>
>>>>>>>> You may also need to research the details of how joins are
>>>>>>>> implemented and some of the common tricks for organizing data to 
>>>>>>>> minimize
>>>>>>>> having to shuffle all N by M records.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Dean Wampler, Ph.D.
>>>>>>>> Author: Programming Scala, 2nd Edition
>>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>>>>>> Typesafe <http://typesafe.com>
>>>>>>>> @deanwampler <http://twitter.com/deanwampler>
>>>>>>>> http://polyglotprogramming.com
>>>>>>>>
>>>>>>>> On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Hello Deam,
>>>>>>>>> If I don;t use Kryo serializer i got Serialization error and hence
>>>>>>>>> am using it.
>>>>>>>>> If I don';t use partitionBy/reparition then the simply join never
>>>>>>>>> completed even after 7 hours and infact as next step i need to run it
>>>>>>>>> against 250G as that is my full dataset size. Someone here suggested 
>>>>>>>>> to me
>>>>>>>>> to use repartition.
>>>>>>>>>
>>>>>>>>> Assuming reparition is mandatory , how do i decide whats the right
>>>>>>>>> number ? When i am using 400 i do not get NullPointerException that i
>>>>>>>>> talked about, which is strange. I never saw that exception against 
>>>>>>>>> small
>>>>>>>>> random dataset but see it with 25G and again with 400 partitions , i 
>>>>>>>>> do not
>>>>>>>>> see it.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, May 3, 2015 at 9:15 PM, Dean Wampler <
>>>>>>>>> deanwamp...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> IMHO, you are trying waaay to hard to optimize work on what is
>>>>>>>>>> really a small data set. 25G, even 250G, is not that much data, 
>>>>>>>>>> especially
>>>>>>>>>> if you've spent a month trying to get something to work that should 
>>>>>>>>>> be
>>>>>>>>>> simple. All these errors are from optimization attempts.
>>>>>>>>>>
>>>>>>>>>> Kryo is great, but if it's not working reliably for some reason,
>>>>>>>>>> then don't use it. Rather than force 200 partitions, let Spark try to
>>>>>>>>>> figure out a good-enough number. (If you really need to force a 
>>>>>>>>>> partition
>>>>>>>>>> count, use the repartition method instead, unless you're overriding 
>>>>>>>>>> the
>>>>>>>>>> partitioner.)
>>>>>>>>>>
>>>>>>>>>> So. I recommend that you eliminate all the optimizations: Kryo,
>>>>>>>>>> partitionBy, etc. Just use the simplest code you can. Make it work 
>>>>>>>>>> first.
>>>>>>>>>> Then, if it really isn't fast enough, look for actual evidence of
>>>>>>>>>> bottlenecks and optimize those.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Dean Wampler, Ph.D.
>>>>>>>>>> Author: Programming Scala, 2nd Edition
>>>>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>>>>>>>> Typesafe <http://typesafe.com>
>>>>>>>>>> @deanwampler <http://twitter.com/deanwampler>
>>>>>>>>>> http://polyglotprogramming.com
>>>>>>>>>>
>>>>>>>>>> On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello Dean & Others,
>>>>>>>>>>> Thanks for your suggestions.
>>>>>>>>>>> I have two data sets and all i want to do is a simple equi join.
>>>>>>>>>>> I have 10G limit and as my dataset_1 exceeded that it was throwing 
>>>>>>>>>>> OOM
>>>>>>>>>>> error. Hence i switched back to use .join() API instead of map-side
>>>>>>>>>>> broadcast join.
>>>>>>>>>>> I am repartitioning the data with 100,200 and i see a
>>>>>>>>>>> NullPointerException now.
>>>>>>>>>>>
>>>>>>>>>>> When i run against 25G of each input and with .partitionBy(new
>>>>>>>>>>> org.apache.spark.HashPartitioner(200)) , I see
>>>>>>>>>>> NullPointerExveption
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> this trace does not include a line from my code and hence i do
>>>>>>>>>>> not what is causing error ?
>>>>>>>>>>> I do have registered kryo serializer.
>>>>>>>>>>>
>>>>>>>>>>> val conf = new SparkConf()
>>>>>>>>>>>       .setAppName(detail)
>>>>>>>>>>> *      .set("spark.serializer",
>>>>>>>>>>> "org.apache.spark.serializer.KryoSerializer")*
>>>>>>>>>>>       .set("spark.kryoserializer.buffer.mb",
>>>>>>>>>>> arguments.get("buffersize").get)
>>>>>>>>>>>       .set("spark.kryoserializer.buffer.max.mb",
>>>>>>>>>>> arguments.get("maxbuffersize").get)
>>>>>>>>>>>       .set("spark.driver.maxResultSize",
>>>>>>>>>>> arguments.get("maxResultSize").get)
>>>>>>>>>>>       .set("spark.yarn.maxAppAttempts", "0")
>>>>>>>>>>> * 
>>>>>>>>>>> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve*
>>>>>>>>>>> lMetricSum]))
>>>>>>>>>>>     val sc = new SparkContext(conf)
>>>>>>>>>>>
>>>>>>>>>>> I see the exception when this task runs
>>>>>>>>>>>
>>>>>>>>>>> val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long],
>>>>>>>>>>> vi) }
>>>>>>>>>>>
>>>>>>>>>>> Its a simple mapping of input records to (itemId, record)
>>>>>>>>>>>
>>>>>>>>>>> I found this
>>>>>>>>>>>
>>>>>>>>>>> http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
>>>>>>>>>>> and
>>>>>>>>>>>
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html
>>>>>>>>>>>
>>>>>>>>>>> Looks like Kryo (2.21v)  changed something to stop using default
>>>>>>>>>>> constructors.
>>>>>>>>>>>
>>>>>>>>>>> (Kryo.DefaultInstantiatorStrategy) 
>>>>>>>>>>> kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new 
>>>>>>>>>>> StdInstantiatorStrategy());
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Please suggest
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Trace:
>>>>>>>>>>> 15/05/01 03:02:15 ERROR executor.Executor: Exception in task
>>>>>>>>>>> 110.1 in stage 2.0 (TID 774)
>>>>>>>>>>> com.esotericsoftware.kryo.KryoException:
>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>> Serialization trace:
>>>>>>>>>>> values (org.apache.avro.generic.GenericData$Record)
>>>>>>>>>>> datum (org.apache.avro.mapred.AvroKey)
>>>>>>>>>>>     at
>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>>>>>>>>>>>     at
>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>>>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>>>>>>     at
>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>>>>>>     at
>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>>>>>     at
>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>>>     at
>>>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41)
>>>>>>>>>>>     at
>>>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>>>>>>>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.
>>>>>>>>>>> java:729)Regards,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Any suggestions.
>>>>>>>>>>> I am not able to get this thing to work over a month now, its
>>>>>>>>>>> kind of getting frustrating.
>>>>>>>>>>>
>>>>>>>>>>> On Sun, May 3, 2015 at 8:03 PM, Dean Wampler <
>>>>>>>>>>> deanwamp...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> How big is the data you're returning to the driver with
>>>>>>>>>>>> collectAsMap? You are probably running out of memory trying to 
>>>>>>>>>>>> copy too
>>>>>>>>>>>> much data back to it.
>>>>>>>>>>>>
>>>>>>>>>>>> If you're trying to force a map-side join, Spark can do that
>>>>>>>>>>>> for you in some cases within the regular DataFrame/RDD context. See
>>>>>>>>>>>> http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning
>>>>>>>>>>>> and this talk by Michael Armbrust for example,
>>>>>>>>>>>> http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> dean
>>>>>>>>>>>>
>>>>>>>>>>>> Dean Wampler, Ph.D.
>>>>>>>>>>>> Author: Programming Scala, 2nd Edition
>>>>>>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>>>>>>>>>> Typesafe <http://typesafe.com>
>>>>>>>>>>>> @deanwampler <http://twitter.com/deanwampler>
>>>>>>>>>>>> http://polyglotprogramming.com
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Full Exception
>>>>>>>>>>>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1
>>>>>>>>>>>>> (collectAsMap at VISummaryDataProvider.scala:37) failed in 
>>>>>>>>>>>>> 884.087 s*
>>>>>>>>>>>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed:
>>>>>>>>>>>>> collectAsMap at VISummaryDataProvider.scala:37, took 1093.418249 
>>>>>>>>>>>>> s*
>>>>>>>>>>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class
>>>>>>>>>>>>> threw exception: Job aborted due to stage failure: Exception 
>>>>>>>>>>>>> while getting
>>>>>>>>>>>>> task result: org.apache.spark.SparkException: Error sending 
>>>>>>>>>>>>> message
>>>>>>>>>>>>> [message = GetLocations(taskresult_112)]
>>>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>>>>>> failure: Exception while getting task result:
>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message [message =
>>>>>>>>>>>>> GetLocations(taskresult_112)]
>>>>>>>>>>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>>>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>>>>>>>> at scala.Option.foreach(Option.scala:236)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>>>> 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app
>>>>>>>>>>>>> status: FAILED, exitCode: 15, (reason: User class threw 
>>>>>>>>>>>>> exception: Job
>>>>>>>>>>>>> aborted due to stage failure: Exception while getting task result:
>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message [message =
>>>>>>>>>>>>> GetLocations(taskresult_112)])
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Code at line 37*
>>>>>>>>>>>>>
>>>>>>>>>>>>> val lstgItemMap = listings.map { lstg =>
>>>>>>>>>>>>> (lstg.getItemId().toLong, lstg) }.collectAsMap
>>>>>>>>>>>>>
>>>>>>>>>>>>> Listing data set size is 26G (10 files) and my driver memory
>>>>>>>>>>>>> is 12G (I cant go beyond it). The reason i do collectAsMap is to 
>>>>>>>>>>>>> brodcast
>>>>>>>>>>>>> it and do a map-side join instead of regular join.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Please suggest ?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> My Spark Job is failing  and i see
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ==============================
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class
>>>>>>>>>>>>>> threw exception: Job aborted due to stage failure: Exception 
>>>>>>>>>>>>>> while getting
>>>>>>>>>>>>>> task result: org.apache.spark.SparkException: Error sending 
>>>>>>>>>>>>>> message
>>>>>>>>>>>>>> [message = GetLocations(taskresult_112)]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>>>>>>> failure: Exception while getting task result:
>>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message [message =
>>>>>>>>>>>>>> GetLocations(taskresult_112)]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>>>>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> at scala.Option.foreach(Option.scala:236)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> java.util.concurrent.TimeoutException: Futures timed out
>>>>>>>>>>>>>> after [30 seconds]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I see multiple of these
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures
>>>>>>>>>>>>>> timed out after [30 seconds]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> And finally i see this
>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>>>>>>>>>>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:95)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:206)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:72)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:124)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>>>>>>>>>>>>> at io.netty.util.concurrent.SingleThreadEven
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Solutions
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>       .set("spark.akka.askTimeout", "6000")
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>       .set("spark.akka.timeout", "6000")
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>       .set("spark.worker.timeout", "6000")
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2)  --num-executors 96 --driver-memory 12g
>>>>>>>>>>>>>> --driver-java-options "-XX:MaxPermSize=10G" --executor-memory 12g
>>>>>>>>>>>>>> --executor-cores 4
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 12G is the limit imposed by YARN cluster, I cant go beyond
>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ANY suggestions ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Apr 30, 2015 at 6:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Did not work. Same problem.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Apr 30, 2015 at 1:28 PM, Akhil Das <
>>>>>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> You could try increasing your heap space explicitly. like
>>>>>>>>>>>>>>>> export _JAVA_OPTIONS="-Xmx10g", its not the correct approach 
>>>>>>>>>>>>>>>> but try.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Apr 28, 2015 at 10:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have a SparkApp that runs completes in 45 mins for 5
>>>>>>>>>>>>>>>>> files (5*750MB size) and it takes 16 executors to do so.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I wanted to run it against 10 files of each input type
>>>>>>>>>>>>>>>>> (10*3 files as there are three inputs that are transformed). 
>>>>>>>>>>>>>>>>> [Input1 =
>>>>>>>>>>>>>>>>> 10*750 MB, Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used 
>>>>>>>>>>>>>>>>> 32 executors.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I see multiple
>>>>>>>>>>>>>>>>> 5/04/28 09:23:31 WARN executor.Executor: Issue
>>>>>>>>>>>>>>>>> communicating with driver in heartbeater
>>>>>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message
>>>>>>>>>>>>>>>>> [message = 
>>>>>>>>>>>>>>>>> Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22,
>>>>>>>>>>>>>>>>> phxaishdc9dn1048.stratus.phx.ebay.com, 39505))]
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
>>>>>>>>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures
>>>>>>>>>>>>>>>>> timed out after [30 seconds]
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>>>>>>>>>>>>>>>> at scala.concurrent.Await$.result(package.scala:107)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
>>>>>>>>>>>>>>>>> ... 1 more
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> When i searched deeper, i found OOM error.
>>>>>>>>>>>>>>>>> 15/04/28 09:10:15 INFO storage.BlockManagerMasterActor:
>>>>>>>>>>>>>>>>> Removing block manager BlockManagerId(17,
>>>>>>>>>>>>>>>>> phxdpehdc9dn2643.stratus.phx.ebay.com, 36819)
>>>>>>>>>>>>>>>>> 15/04/28 09:11:26 WARN storage.BlockManagerMasterActor:
>>>>>>>>>>>>>>>>> Removing BlockManager BlockManagerId(9,
>>>>>>>>>>>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304) with no
>>>>>>>>>>>>>>>>> recent heart beats: 121200ms exceeds 120000ms
>>>>>>>>>>>>>>>>> 15/04/28 09:11:26 INFO storage.BlockManagerMasterActor:
>>>>>>>>>>>>>>>>> Removing block manager BlockManagerId(9,
>>>>>>>>>>>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304)
>>>>>>>>>>>>>>>>> 15/04/28 09:11:26 ERROR util.Utils: Uncaught exception in
>>>>>>>>>>>>>>>>> thread task-result-getter-3
>>>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245)
>>>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219)
>>>>>>>>>>>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
>>>>>>>>>>>>>>>>> at java.util.ArrayList.add(ArrayList.java:440)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>>>> Exception in thread "task-result-getter-3"
>>>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245)
>>>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219)
>>>>>>>>>>>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
>>>>>>>>>>>>>>>>> at java.util.ArrayList.add(ArrayList.java:440)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> LogType: stdout
>>>>>>>>>>>>>>>>> LogLength: 96
>>>>>>>>>>>>>>>>> Log Contents:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> hdfs://hostName:8020/sys/edw/dw_lstg_item/snapshot/2015/04/28/00/part-r-0000*
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Spark Command:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ./bin/spark-submit -v --master yarn-cluster
>>>>>>>>>>>>>>>>> --driver-class-path
>>>>>>>>>>>>>>>>> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
>>>>>>>>>>>>>>>>> --jars
>>>>>>>>>>>>>>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting_dep_only-1.0-SNAPSHOT-jar-with-dependencies.jar
>>>>>>>>>>>>>>>>> --num-executors 32 --driver-memory 12g --driver-java-options
>>>>>>>>>>>>>>>>> "-XX:MaxPermSize=8G" --executor-memory 12g --executor-cores 4 
>>>>>>>>>>>>>>>>> --queue
>>>>>>>>>>>>>>>>> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
>>>>>>>>>>>>>>>>> /home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting-1.0-SNAPSHOT.jar
>>>>>>>>>>>>>>>>> startDate=2015-04-6 endDate=2015-04-7
>>>>>>>>>>>>>>>>> input=/user/dvasthimal/epdatasets_small/exptsession 
>>>>>>>>>>>>>>>>> subcommand=viewItem
>>>>>>>>>>>>>>>>> output=/user/dvasthimal/epdatasets/viewItem buffersize=128
>>>>>>>>>>>>>>>>> maxbuffersize=1068 maxResultSize=200G askTimeout=1200
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> There is 12G limit on memory that i can use as this Spark
>>>>>>>>>>>>>>>>> is running over YARN.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Spark Version: 1.3.1
>>>>>>>>>>>>>>>>> Should i increase the number of executors form 32?
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Deepak
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Deepak
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Deepak
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Deepak
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>

Reply via email to