I tried this

val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))]
= lstgItem.join(viEvents, new org.apache.spark.RangePartitioner(partitions
= 1200, rdd = viEvents)).map {


It fired two jobs and still i have 1 task that never completes.
IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill (Disk)
Errors  0 4818 0 RUNNING PROCESS_LOCAL 5 / host1 2015/05/04 07:24:25 1.1 h
13 min  778.0 MB / 50314161  4.5 GB 47.4 MB   955 5773 0 SUCCESS
PROCESS_LOCAL 5 / host2 2015/05/04 07:47:16 2.2 min  1.5 min  106.3 MB /
4197539  0.0 B 0.0 B   1199 6017 0 SUCCESS PROCESS_LOCAL 3 / host3 2015/05/04
07:51:51 48 s  2 s  94.2 MB / 3618335  2.8 GB 8.6 MB   216



2)
I tried reversing the datasets in join

val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))]
=viEvents.join(lstgItem)

This led to same problem of a long running task.
3)
Next, i am trying this

val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))]
= lstgItem.join(viEvents, 1200).map {


I have exhausted all my options.


Regards,

Deepak


On Mon, May 4, 2015 at 6:24 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:

> I ran it against one file instead of 10 files and i see one task is still
> running after 33 mins its shuffle read size is 780MB/50 mil records.
>
> I did a count of records for each itemId from dataset-2 [One FILE] (Second
> Dataset (RDDPair) val viEvents = viEventsRaw.map { vi => (vi.get(14
> ).asInstanceOf[Long], vi) } ). This is the dataset that contains the list
> of items viewed by user in one day.
>
> *Item Id                    Count*
> 201335783004 537
> 111654496030 353
> 141640703798 287
> 191568402102 258
> 111654479898 217
> 231521843148 211
> 251931716094 201
> 111654493548 181
> 181503913062 181
> 121635453050 152
> 261798565828 140
> 151494617682 139
> 251927181728 127
> 231516683056 119
> 141640492864 117
> 161677270656 117
> 171771073616 113
> 111649942124 109
> 191516989450 97
> 231539161292 94
> 221555628408 88
> 131497785968 87
> 121632233872 84
> 131335379184 83
> 281531363490 83
> 131492727742 79
> 231174157820 79
> 161666914810 77
> 251699753072 77
> 161683664300 76
>
>
> I was assuming that data-skew would be if the top item(201335783004) had
> a count of 1 million, however its only few hundreds, then why is Spark
> skewing it in join ? What should i do that Spark distributes the records
> evenly ?
>
> In M/R we can change the Partitioner between mapper and reducer, how can i
> do in Spark  for Join?
>
>
> IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
> TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill
> (Disk)Errors  0 3618 0 RUNNING PROCESS_LOCAL 4 / host12015/05/04 05:09:53 33
> min  8.5 min  783.9 MB / 50,761,322  4.6 GB 47.5 MB   433 4051 0 SUCCESS
> PROCESS_LOCAL 1 / host2 2015/05/04 05:16:27 1.1 min  20 s  116.0 MB /
> 4505143  1282.3 MB 10.1 MB   218 3836 0 SUCCESS PROCESS_LOCAL 3 / host3 
> 2015/05/04
> 05:13:01 53 s  11 s  76.4 MB / 2865143  879.6 MB 6.9 MB   113 3731 0
> SUCCESS PROCESS_LOCAL 2 / host4 2015/05/04 05:11:30 31 s  8 s  6.9 MB /
> 5187  0.0 B 0.0 B
>
> On Mon, May 4, 2015 at 6:00 PM, Saisai Shao <sai.sai.s...@gmail.com>
> wrote:
>
>> From the symptoms you mentioned that one task's shuffle write is much
>> larger than all the other task, it is quite similar to normal data skew
>> behavior, I just give some advice based on your descriptions, I think you
>> need to detect whether data is actually skewed or not.
>>
>> The shuffle will put data with same partitioner strategy (default is hash
>> partitioner) into one task, so the same key data will be put into the same
>> task, but one task not just has only one key.
>>
>> 2015-05-04 18:04 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>:
>>
>>> Attached image shows the Spark UI for the job.
>>>
>>>
>>>
>>>
>>>
>>> On Mon, May 4, 2015 at 3:28 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>> wrote:
>>>
>>>> Four tasks are now failing with
>>>>
>>>> IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
>>>> DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
>>>> Spill (Disk)Errors  0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04
>>>> 01:27:44   /   ExecutorLostFailure (executor 114 lost)  1007 4973 1
>>>> FAILED PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14   /   
>>>> FetchFailed(null,
>>>> shuffleId=1, mapId=-1, reduceId=1007, message= +details
>>>>
>>>> FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message=
>>>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
>>>> location for shuffle 1
>>>>    at 
>>>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
>>>>    at 
>>>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
>>>>    at 
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>    at 
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>    at 
>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>>    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>>    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>>>    at 
>>>> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381)
>>>>    at 
>>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177)
>>>>    at 
>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>>>>    at 
>>>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
>>>>    at 
>>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
>>>>    at 
>>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127)
>>>>    at 
>>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>>>    at 
>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>>    at 
>>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>>>    at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>    at 
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>    at 
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>    at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>    at 
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>    at 
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>    at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> )
>>>>
>>>>  371 4972 1 FAILED PROCESS_LOCAL 563 / host3 2015/05/04 02:13:14   /   
>>>> FetchFailed(null,
>>>> shuffleId=1, mapId=-1, reduceId=371, message= +details
>>>>
>>>> FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message=
>>>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
>>>> location for shuffle 1
>>>>    at 
>>>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
>>>>    at 
>>>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
>>>>    at 
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>    at 
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>    at 
>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>>    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>>    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>>>    at 
>>>> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381)
>>>>    at 
>>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177)
>>>>    at 
>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>>>>    at 
>>>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
>>>>    at 
>>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
>>>>    at 
>>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127)
>>>>    at 
>>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>>>    at 
>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>>    at 
>>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>>>    at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>    at 
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>    at 
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>    at 
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>    at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>    at 
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>    at 
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>    at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> )
>>>>
>>>>  0 4971 1 FAILED PROCESS_LOCAL 45 / host4 2015/05/04 02:13:14 0 ms  2
>>>> s  374.3 MB / 19281537  0.0 B 0.0 B
>>>>
>>>>
>>>> On Mon, May 4, 2015 at 3:15 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>>> wrote:
>>>>
>>>>> One dataset (RDD Pair)
>>>>>
>>>>> val lstgItem = listings.map { lstg => (lstg.getItemId().toLong, lstg)
>>>>> }
>>>>>
>>>>> Second Dataset (RDDPair)
>>>>>
>>>>> val viEvents = viEventsRaw.map { vi => (vi.get(14).asInstanceOf[Long],
>>>>> vi) }
>>>>>
>>>>> As i want to join based on item Id that is used as first element in
>>>>> the tuple in both cases and i think thats what is shuffle key.
>>>>>
>>>>> listings ==> Data set contains all the unique item ids that are ever
>>>>> listed on the ecommerce site.
>>>>>
>>>>> viEvents ===> List of items viewed by user in last day. This will
>>>>> always be a subset of the total set.
>>>>>
>>>>> So i do not understand what is data skewness. When my long running
>>>>> task is working on 1591.2 MB / 98,931,767 does that mean 98 million
>>>>> reocrds contain all the same item ID ? How can millions of user look at 
>>>>> the
>>>>> same item in last day ?
>>>>>
>>>>> Or does this dataset contain records across item ids ?
>>>>>
>>>>>
>>>>> Regards,
>>>>>
>>>>> Deepak
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, May 4, 2015 at 3:08 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Shuffle key is depending on your implementation, I'm not sure if you
>>>>>> are familiar with MapReduce, the mapper output is a key-value pair, where
>>>>>> the key is the shuffle key for shuffling, Spark is also the same.
>>>>>>
>>>>>> 2015-05-04 17:31 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>:
>>>>>>
>>>>>>> Hello Shao,
>>>>>>> Can you talk more about shuffle key or point me to APIs that allow
>>>>>>> me to change shuffle key. I will try with different keys and see the
>>>>>>> performance.
>>>>>>>
>>>>>>> What is the shuffle key by default ?
>>>>>>>
>>>>>>> On Mon, May 4, 2015 at 2:37 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> IMHO If your data or your algorithm is prone to data skew, I think
>>>>>>>> you have to fix this from application level, Spark itself cannot 
>>>>>>>> overcome
>>>>>>>> this problem (if one key has large amount of values), you may change 
>>>>>>>> your
>>>>>>>> algorithm to choose another shuffle key, somethings like this to avoid
>>>>>>>> shuffle on skewed keys.
>>>>>>>>
>>>>>>>> 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>:
>>>>>>>>
>>>>>>>>> Hello Dean & Others,
>>>>>>>>> Thanks for the response.
>>>>>>>>>
>>>>>>>>> I tried with 100,200, 400, 600 and 1200 repartitions with
>>>>>>>>> 100,200,400 and 800 executors. Each time all the tasks of join 
>>>>>>>>> complete in
>>>>>>>>> less than a minute except one and that one tasks runs forever. I have 
>>>>>>>>> a
>>>>>>>>> huge cluster at my disposal.
>>>>>>>>>
>>>>>>>>> The data for each of 1199 tasks is around 40MB/30k records and for
>>>>>>>>> 1 never ending task is 1.5G/98million records. I see that there is 
>>>>>>>>> data
>>>>>>>>> skew among tasks. I had observed this a week earlier and i have no 
>>>>>>>>> clue on
>>>>>>>>> how to fix it and when someone suggested that repartition might make 
>>>>>>>>> things
>>>>>>>>> more parallel, but the problem is still persistent.
>>>>>>>>>
>>>>>>>>> Please suggest on how to get the task to complete.
>>>>>>>>> All i want to do is join two datasets. (dataset1 is in sequence
>>>>>>>>> file and dataset2 is in avro format).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Ex:
>>>>>>>>> Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch
>>>>>>>>> TimeDurationGC TimeShuffle Read Size / RecordsShuffle Spill
>>>>>>>>> (Memory)Shuffle Spill (Disk)Errors  0 3771 0 RUNNING PROCESS_LOCAL 114
>>>>>>>>> / host1 2015/05/04 01:27:44 7.3 min  19 s  1591.2 MB / 98931767
>>>>>>>>> 0.0 B 0.0 B   1 3772 0 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04
>>>>>>>>> 01:27:44 28 s  2 s  39.2 MB / 29754  0.0 B 0.0 B   2 3773 0
>>>>>>>>> SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04 01:27:44 26 s  2 s
>>>>>>>>> 39.0 MB / 29646  0.0 B 0.0 B   5 3776 0 SUCCESS PROCESS_LOCAL 320
>>>>>>>>> / host4 2015/05/04 01:27:44 31 s  3 s  38.8 MB / 29512  0.0 B 0.0
>>>>>>>>> B   4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04 01:27:44 41
>>>>>>>>> s  3 s  38.4 MB / 29169  0.0 B 0.0 B   3 3774 0 SUCCESS
>>>>>>>>> PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s  2 s  38.5 MB /
>>>>>>>>> 29258  0.0 B 0.0 B   8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 
>>>>>>>>> 2015/05/04
>>>>>>>>> 01:27:44 31 s  4 s  39.5 MB / 30008  0.0 B 0.0 B
>>>>>>>>>
>>>>>>>>> There are 1200 tasks in total.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, May 3, 2015 at 9:53 PM, Dean Wampler <
>>>>>>>>> deanwamp...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I don't know the full context of what you're doing, but
>>>>>>>>>> serialization errors usually mean you're attempting to serialize 
>>>>>>>>>> something
>>>>>>>>>> that can't be serialized, like the SparkContext. Kryo won't help 
>>>>>>>>>> there.
>>>>>>>>>>
>>>>>>>>>> The arguments to spark-submit you posted previously look good:
>>>>>>>>>>
>>>>>>>>>> 2)  --num-executors 96 --driver-memory 12g --driver-java-options
>>>>>>>>>> "-XX:MaxPermSize=10G" --executor-memory 12g --executor-cores 4
>>>>>>>>>>
>>>>>>>>>> I suspect you aren't getting the parallelism you need. For
>>>>>>>>>> partitioning, if your data is in HDFS and your block size is 128MB, 
>>>>>>>>>> then
>>>>>>>>>> you'll get ~195 partitions anyway. If it takes 7 hours to do a join 
>>>>>>>>>> over
>>>>>>>>>> 25GB of data, you have some other serious bottleneck. You should 
>>>>>>>>>> examine
>>>>>>>>>> the web console and the logs to determine where all the time is 
>>>>>>>>>> going.
>>>>>>>>>> Questions you might pursue:
>>>>>>>>>>
>>>>>>>>>>    - How long does each task take to complete?
>>>>>>>>>>    - How many of those 195 partitions/tasks are processed at the
>>>>>>>>>>    same time? That is, how many "slots" are available?  Maybe you 
>>>>>>>>>> need more
>>>>>>>>>>    nodes if the number of slots is too low. Based on your command 
>>>>>>>>>> arguments,
>>>>>>>>>>    you should be able to process 1/2 of them at a time, unless the 
>>>>>>>>>> cluster is
>>>>>>>>>>    busy.
>>>>>>>>>>    - Is the cluster swamped with other work?
>>>>>>>>>>    - How much data does each task process? Is the data roughly
>>>>>>>>>>    the same from one task to the next? If not, then you might have 
>>>>>>>>>> serious key
>>>>>>>>>>    skew?
>>>>>>>>>>
>>>>>>>>>> You may also need to research the details of how joins are
>>>>>>>>>> implemented and some of the common tricks for organizing data to 
>>>>>>>>>> minimize
>>>>>>>>>> having to shuffle all N by M records.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Dean Wampler, Ph.D.
>>>>>>>>>> Author: Programming Scala, 2nd Edition
>>>>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>>>>>>>> Typesafe <http://typesafe.com>
>>>>>>>>>> @deanwampler <http://twitter.com/deanwampler>
>>>>>>>>>> http://polyglotprogramming.com
>>>>>>>>>>
>>>>>>>>>> On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello Deam,
>>>>>>>>>>> If I don;t use Kryo serializer i got Serialization error and
>>>>>>>>>>> hence am using it.
>>>>>>>>>>> If I don';t use partitionBy/reparition then the simply join
>>>>>>>>>>> never completed even after 7 hours and infact as next step i need 
>>>>>>>>>>> to run it
>>>>>>>>>>> against 250G as that is my full dataset size. Someone here 
>>>>>>>>>>> suggested to me
>>>>>>>>>>> to use repartition.
>>>>>>>>>>>
>>>>>>>>>>> Assuming reparition is mandatory , how do i decide whats the
>>>>>>>>>>> right number ? When i am using 400 i do not get 
>>>>>>>>>>> NullPointerException that i
>>>>>>>>>>> talked about, which is strange. I never saw that exception against 
>>>>>>>>>>> small
>>>>>>>>>>> random dataset but see it with 25G and again with 400 partitions , 
>>>>>>>>>>> i do not
>>>>>>>>>>> see it.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sun, May 3, 2015 at 9:15 PM, Dean Wampler <
>>>>>>>>>>> deanwamp...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> IMHO, you are trying waaay to hard to optimize work on what is
>>>>>>>>>>>> really a small data set. 25G, even 250G, is not that much data, 
>>>>>>>>>>>> especially
>>>>>>>>>>>> if you've spent a month trying to get something to work that 
>>>>>>>>>>>> should be
>>>>>>>>>>>> simple. All these errors are from optimization attempts.
>>>>>>>>>>>>
>>>>>>>>>>>> Kryo is great, but if it's not working reliably for some
>>>>>>>>>>>> reason, then don't use it. Rather than force 200 partitions, let 
>>>>>>>>>>>> Spark try
>>>>>>>>>>>> to figure out a good-enough number. (If you really need to force a
>>>>>>>>>>>> partition count, use the repartition method instead, unless you're
>>>>>>>>>>>> overriding the partitioner.)
>>>>>>>>>>>>
>>>>>>>>>>>> So. I recommend that you eliminate all the optimizations: Kryo,
>>>>>>>>>>>> partitionBy, etc. Just use the simplest code you can. Make it work 
>>>>>>>>>>>> first.
>>>>>>>>>>>> Then, if it really isn't fast enough, look for actual evidence of
>>>>>>>>>>>> bottlenecks and optimize those.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Dean Wampler, Ph.D.
>>>>>>>>>>>> Author: Programming Scala, 2nd Edition
>>>>>>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>>>>>>>>>> Typesafe <http://typesafe.com>
>>>>>>>>>>>> @deanwampler <http://twitter.com/deanwampler>
>>>>>>>>>>>> http://polyglotprogramming.com
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello Dean & Others,
>>>>>>>>>>>>> Thanks for your suggestions.
>>>>>>>>>>>>> I have two data sets and all i want to do is a simple equi
>>>>>>>>>>>>> join. I have 10G limit and as my dataset_1 exceeded that it was 
>>>>>>>>>>>>> throwing
>>>>>>>>>>>>> OOM error. Hence i switched back to use .join() API instead of 
>>>>>>>>>>>>> map-side
>>>>>>>>>>>>> broadcast join.
>>>>>>>>>>>>> I am repartitioning the data with 100,200 and i see a
>>>>>>>>>>>>> NullPointerException now.
>>>>>>>>>>>>>
>>>>>>>>>>>>> When i run against 25G of each input and with .partitionBy(new
>>>>>>>>>>>>> org.apache.spark.HashPartitioner(200)) , I see
>>>>>>>>>>>>> NullPointerExveption
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> this trace does not include a line from my code and hence i do
>>>>>>>>>>>>> not what is causing error ?
>>>>>>>>>>>>> I do have registered kryo serializer.
>>>>>>>>>>>>>
>>>>>>>>>>>>> val conf = new SparkConf()
>>>>>>>>>>>>>       .setAppName(detail)
>>>>>>>>>>>>> *      .set("spark.serializer",
>>>>>>>>>>>>> "org.apache.spark.serializer.KryoSerializer")*
>>>>>>>>>>>>>       .set("spark.kryoserializer.buffer.mb",
>>>>>>>>>>>>> arguments.get("buffersize").get)
>>>>>>>>>>>>>       .set("spark.kryoserializer.buffer.max.mb",
>>>>>>>>>>>>> arguments.get("maxbuffersize").get)
>>>>>>>>>>>>>       .set("spark.driver.maxResultSize",
>>>>>>>>>>>>> arguments.get("maxResultSize").get)
>>>>>>>>>>>>>       .set("spark.yarn.maxAppAttempts", "0")
>>>>>>>>>>>>> * 
>>>>>>>>>>>>> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve*
>>>>>>>>>>>>> lMetricSum]))
>>>>>>>>>>>>>     val sc = new SparkContext(conf)
>>>>>>>>>>>>>
>>>>>>>>>>>>> I see the exception when this task runs
>>>>>>>>>>>>>
>>>>>>>>>>>>> val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long],
>>>>>>>>>>>>> vi) }
>>>>>>>>>>>>>
>>>>>>>>>>>>> Its a simple mapping of input records to (itemId, record)
>>>>>>>>>>>>>
>>>>>>>>>>>>> I found this
>>>>>>>>>>>>>
>>>>>>>>>>>>> http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
>>>>>>>>>>>>> and
>>>>>>>>>>>>>
>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html
>>>>>>>>>>>>>
>>>>>>>>>>>>> Looks like Kryo (2.21v)  changed something to stop using
>>>>>>>>>>>>> default constructors.
>>>>>>>>>>>>>
>>>>>>>>>>>>> (Kryo.DefaultInstantiatorStrategy) 
>>>>>>>>>>>>> kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new
>>>>>>>>>>>>>  StdInstantiatorStrategy());
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Please suggest
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Trace:
>>>>>>>>>>>>> 15/05/01 03:02:15 ERROR executor.Executor: Exception in task
>>>>>>>>>>>>> 110.1 in stage 2.0 (TID 774)
>>>>>>>>>>>>> com.esotericsoftware.kryo.KryoException:
>>>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>>> Serialization trace:
>>>>>>>>>>>>> values (org.apache.avro.generic.GenericData$Record)
>>>>>>>>>>>>> datum (org.apache.avro.mapred.AvroKey)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>>>>>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>>>>>>>>>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.
>>>>>>>>>>>>> java:729)Regards,
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Any suggestions.
>>>>>>>>>>>>> I am not able to get this thing to work over a month now, its
>>>>>>>>>>>>> kind of getting frustrating.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, May 3, 2015 at 8:03 PM, Dean Wampler <
>>>>>>>>>>>>> deanwamp...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> How big is the data you're returning to the driver with
>>>>>>>>>>>>>> collectAsMap? You are probably running out of memory trying to 
>>>>>>>>>>>>>> copy too
>>>>>>>>>>>>>> much data back to it.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If you're trying to force a map-side join, Spark can do that
>>>>>>>>>>>>>> for you in some cases within the regular DataFrame/RDD context. 
>>>>>>>>>>>>>> See
>>>>>>>>>>>>>> http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning
>>>>>>>>>>>>>> and this talk by Michael Armbrust for example,
>>>>>>>>>>>>>> http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> dean
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Dean Wampler, Ph.D.
>>>>>>>>>>>>>> Author: Programming Scala, 2nd Edition
>>>>>>>>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>>>>>>>>>>>> Typesafe <http://typesafe.com>
>>>>>>>>>>>>>> @deanwampler <http://twitter.com/deanwampler>
>>>>>>>>>>>>>> http://polyglotprogramming.com
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Full Exception
>>>>>>>>>>>>>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1
>>>>>>>>>>>>>>> (collectAsMap at VISummaryDataProvider.scala:37) failed in 
>>>>>>>>>>>>>>> 884.087 s*
>>>>>>>>>>>>>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0
>>>>>>>>>>>>>>> failed: collectAsMap at VISummaryDataProvider.scala:37, took 
>>>>>>>>>>>>>>> 1093.418249 s*
>>>>>>>>>>>>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class
>>>>>>>>>>>>>>> threw exception: Job aborted due to stage failure: Exception 
>>>>>>>>>>>>>>> while getting
>>>>>>>>>>>>>>> task result: org.apache.spark.SparkException: Error sending 
>>>>>>>>>>>>>>> message
>>>>>>>>>>>>>>> [message = GetLocations(taskresult_112)]
>>>>>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>>>>>>>> failure: Exception while getting task result:
>>>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message [message 
>>>>>>>>>>>>>>> =
>>>>>>>>>>>>>>> GetLocations(taskresult_112)]
>>>>>>>>>>>>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>>>>>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>>>>>>>>>> at scala.Option.foreach(Option.scala:236)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>>>>>> 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app
>>>>>>>>>>>>>>> status: FAILED, exitCode: 15, (reason: User class threw 
>>>>>>>>>>>>>>> exception: Job
>>>>>>>>>>>>>>> aborted due to stage failure: Exception while getting task 
>>>>>>>>>>>>>>> result:
>>>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message [message 
>>>>>>>>>>>>>>> =
>>>>>>>>>>>>>>> GetLocations(taskresult_112)])
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *Code at line 37*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> val lstgItemMap = listings.map { lstg =>
>>>>>>>>>>>>>>> (lstg.getItemId().toLong, lstg) }.collectAsMap
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Listing data set size is 26G (10 files) and my driver memory
>>>>>>>>>>>>>>> is 12G (I cant go beyond it). The reason i do collectAsMap is 
>>>>>>>>>>>>>>> to brodcast
>>>>>>>>>>>>>>> it and do a map-side join instead of regular join.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Please suggest ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> My Spark Job is failing  and i see
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ==============================
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class
>>>>>>>>>>>>>>>> threw exception: Job aborted due to stage failure: Exception 
>>>>>>>>>>>>>>>> while getting
>>>>>>>>>>>>>>>> task result: org.apache.spark.SparkException: Error sending 
>>>>>>>>>>>>>>>> message
>>>>>>>>>>>>>>>> [message = GetLocations(taskresult_112)]
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>>>>>>>>> failure: Exception while getting task result:
>>>>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message 
>>>>>>>>>>>>>>>> [message =
>>>>>>>>>>>>>>>> GetLocations(taskresult_112)]
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>>>>>>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> at scala.Option.foreach(Option.scala:236)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> java.util.concurrent.TimeoutException: Futures timed out
>>>>>>>>>>>>>>>> after [30 seconds]
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I see multiple of these
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures
>>>>>>>>>>>>>>>> timed out after [30 seconds]
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> And finally i see this
>>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>>>> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>>>>>>>>>>>>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:95)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:206)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:72)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:124)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>>>>>>>>>>>>>>> at io.netty.util.concurrent.SingleThreadEven
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Solutions
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 1)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>       .set("spark.akka.askTimeout", "6000")
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>       .set("spark.akka.timeout", "6000")
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>       .set("spark.worker.timeout", "6000")
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2)  --num-executors 96 --driver-memory 12g
>>>>>>>>>>>>>>>> --driver-java-options "-XX:MaxPermSize=10G" --executor-memory 
>>>>>>>>>>>>>>>> 12g
>>>>>>>>>>>>>>>> --executor-cores 4
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 12G is the limit imposed by YARN cluster, I cant go beyond
>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ANY suggestions ?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Apr 30, 2015 at 6:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Did not work. Same problem.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Apr 30, 2015 at 1:28 PM, Akhil Das <
>>>>>>>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> You could try increasing your heap space explicitly. like
>>>>>>>>>>>>>>>>>> export _JAVA_OPTIONS="-Xmx10g", its not the correct approach 
>>>>>>>>>>>>>>>>>> but try.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Tue, Apr 28, 2015 at 10:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I have a SparkApp that runs completes in 45 mins for 5
>>>>>>>>>>>>>>>>>>> files (5*750MB size) and it takes 16 executors to do so.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I wanted to run it against 10 files of each input type
>>>>>>>>>>>>>>>>>>> (10*3 files as there are three inputs that are 
>>>>>>>>>>>>>>>>>>> transformed). [Input1 =
>>>>>>>>>>>>>>>>>>> 10*750 MB, Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used 
>>>>>>>>>>>>>>>>>>> 32 executors.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I see multiple
>>>>>>>>>>>>>>>>>>> 5/04/28 09:23:31 WARN executor.Executor: Issue
>>>>>>>>>>>>>>>>>>> communicating with driver in heartbeater
>>>>>>>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message
>>>>>>>>>>>>>>>>>>> [message = 
>>>>>>>>>>>>>>>>>>> Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22,
>>>>>>>>>>>>>>>>>>> phxaishdc9dn1048.stratus.phx.ebay.com, 39505))]
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
>>>>>>>>>>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException:
>>>>>>>>>>>>>>>>>>> Futures timed out after [30 seconds]
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>>>>>>>>>>>>>>>>>> at scala.concurrent.Await$.result(package.scala:107)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
>>>>>>>>>>>>>>>>>>> ... 1 more
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> When i searched deeper, i found OOM error.
>>>>>>>>>>>>>>>>>>> 15/04/28 09:10:15 INFO storage.BlockManagerMasterActor:
>>>>>>>>>>>>>>>>>>> Removing block manager BlockManagerId(17,
>>>>>>>>>>>>>>>>>>> phxdpehdc9dn2643.stratus.phx.ebay.com, 36819)
>>>>>>>>>>>>>>>>>>> 15/04/28 09:11:26 WARN storage.BlockManagerMasterActor:
>>>>>>>>>>>>>>>>>>> Removing BlockManager BlockManagerId(9,
>>>>>>>>>>>>>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304) with no
>>>>>>>>>>>>>>>>>>> recent heart beats: 121200ms exceeds 120000ms
>>>>>>>>>>>>>>>>>>> 15/04/28 09:11:26 INFO storage.BlockManagerMasterActor:
>>>>>>>>>>>>>>>>>>> Removing block manager BlockManagerId(9,
>>>>>>>>>>>>>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304)
>>>>>>>>>>>>>>>>>>> 15/04/28 09:11:26 ERROR util.Utils: Uncaught exception
>>>>>>>>>>>>>>>>>>> in thread task-result-getter-3
>>>>>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245)
>>>>>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219)
>>>>>>>>>>>>>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
>>>>>>>>>>>>>>>>>>> at java.util.ArrayList.add(ArrayList.java:440)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>>>>>> Exception in thread "task-result-getter-3"
>>>>>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245)
>>>>>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219)
>>>>>>>>>>>>>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
>>>>>>>>>>>>>>>>>>> at java.util.ArrayList.add(ArrayList.java:440)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> LogType: stdout
>>>>>>>>>>>>>>>>>>> LogLength: 96
>>>>>>>>>>>>>>>>>>> Log Contents:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> hdfs://hostName:8020/sys/edw/dw_lstg_item/snapshot/2015/04/28/00/part-r-0000*
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Spark Command:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> ./bin/spark-submit -v --master yarn-cluster
>>>>>>>>>>>>>>>>>>> --driver-class-path
>>>>>>>>>>>>>>>>>>> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
>>>>>>>>>>>>>>>>>>> --jars
>>>>>>>>>>>>>>>>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting_dep_only-1.0-SNAPSHOT-jar-with-dependencies.jar
>>>>>>>>>>>>>>>>>>> --num-executors 32 --driver-memory 12g --driver-java-options
>>>>>>>>>>>>>>>>>>> "-XX:MaxPermSize=8G" --executor-memory 12g --executor-cores 
>>>>>>>>>>>>>>>>>>> 4 --queue
>>>>>>>>>>>>>>>>>>> hdmi-express --class 
>>>>>>>>>>>>>>>>>>> com.ebay.ep.poc.spark.reporting.SparkApp
>>>>>>>>>>>>>>>>>>> /home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting-1.0-SNAPSHOT.jar
>>>>>>>>>>>>>>>>>>> startDate=2015-04-6 endDate=2015-04-7
>>>>>>>>>>>>>>>>>>> input=/user/dvasthimal/epdatasets_small/exptsession 
>>>>>>>>>>>>>>>>>>> subcommand=viewItem
>>>>>>>>>>>>>>>>>>> output=/user/dvasthimal/epdatasets/viewItem buffersize=128
>>>>>>>>>>>>>>>>>>> maxbuffersize=1068 maxResultSize=200G askTimeout=1200
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> There is 12G limit on memory that i can use as this
>>>>>>>>>>>>>>>>>>> Spark is running over YARN.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Spark Version: 1.3.1
>>>>>>>>>>>>>>>>>>> Should i increase the number of executors form 32?
>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Deepak
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Deepak
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Deepak
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Deepak
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>
>
> --
> Deepak
>
>


-- 
Deepak

Reply via email to