Data Set 1 : viEvents : Is the event activity data of 1 day. I took 10
files out of it and 10 records

*Item ID                                 Count*
     201335783004 3419  191568402102 1793  111654479898 1362  181503913062
1310  261798565828 1028  111654493548 994  231516683056 862  131497785968
746  161666914810 633  221749455474 432  201324502754 410  201334042634 402
191562605592 380  271841178238 362  161663339210 344  251615941886 313
261855748678 309  271821726658 255  111657099518 224  261868369938 218
181725710132 216  171766164072 215  221757076934 213  171763906872 212
111650132368 206  181629904282 204  261867932788 198  161668475280 194
191398227282 194





Data set 2:
ItemID Count
2217305702 1
3842604614 1
4463421160 1
4581260446 1
4632783223 1
4645316947 1
4760829454 1
4786989430 1
5530758430 1
5610056107 1
5661929425 1
5953801612 1
6141607456 1
6197204992 1
6220704442 1
6271022614 1
6282402871 1
6525123621 1
6554834772 1
6566297541 1
This data set will always have only one element for each item as it
contains metadata information.

Given the nature of these two datasets, if at all there is skewness then it
must be with dataset1. In dataset1 the top 20-30 records do not have record
count for a given itemID (shuffle key) greater than 3000 and that is very
small.

Why am i still *not* able to do a join of these two datasets given i have
unlimited capacity, repartitioning but 12G memory limit on each node.
Each time i get a task that runs forever and it process roughly around 1.5G
data when others are processing few MBs. Also 1.5G data (shuffle read size)
is very small.

Please suggest.


On Mon, May 4, 2015 at 9:08 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:

> I tried this
>
> val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
> Long))] = lstgItem.join(viEvents, new
> org.apache.spark.RangePartitioner(partitions = 1200, rdd = viEvents)).map
> {
>
>
> It fired two jobs and still i have 1 task that never completes.
> IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
> TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill
> (Disk)Errors  0 4818 0 RUNNING PROCESS_LOCAL 5 / host1 2015/05/04 07:24:25 1.1
> h  13 min  778.0 MB / 50314161  4.5 GB 47.4 MB   955 5773 0 SUCCESS
> PROCESS_LOCAL 5 / host2 2015/05/04 07:47:16 2.2 min  1.5 min  106.3 MB /
> 4197539  0.0 B 0.0 B   1199 6017 0 SUCCESS PROCESS_LOCAL 3 / host3 2015/05/04
> 07:51:51 48 s  2 s  94.2 MB / 3618335  2.8 GB 8.6 MB   216
>
>
>
> 2)
> I tried reversing the datasets in join
>
> val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
> Long))] =viEvents.join(lstgItem)
>
> This led to same problem of a long running task.
> 3)
> Next, i am trying this
>
> val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
> Long))] = lstgItem.join(viEvents, 1200).map {
>
>
> I have exhausted all my options.
>
>
> Regards,
>
> Deepak
>
>
> On Mon, May 4, 2015 at 6:24 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:
>
>> I ran it against one file instead of 10 files and i see one task is still
>> running after 33 mins its shuffle read size is 780MB/50 mil records.
>>
>> I did a count of records for each itemId from dataset-2 [One FILE] (Second
>> Dataset (RDDPair) val viEvents = viEventsRaw.map { vi => (vi.get(14
>> ).asInstanceOf[Long], vi) } ). This is the dataset that contains the
>> list of items viewed by user in one day.
>>
>> *Item Id                    Count*
>> 201335783004 537
>> 111654496030 353
>> 141640703798 287
>> 191568402102 258
>> 111654479898 217
>> 231521843148 211
>> 251931716094 201
>> 111654493548 181
>> 181503913062 181
>> 121635453050 152
>> 261798565828 140
>> 151494617682 139
>> 251927181728 127
>> 231516683056 119
>> 141640492864 117
>> 161677270656 117
>> 171771073616 113
>> 111649942124 109
>> 191516989450 97
>> 231539161292 94
>> 221555628408 88
>> 131497785968 87
>> 121632233872 84
>> 131335379184 83
>> 281531363490 83
>> 131492727742 79
>> 231174157820 79
>> 161666914810 77
>> 251699753072 77
>> 161683664300 76
>>
>>
>> I was assuming that data-skew would be if the top item(201335783004) had
>> a count of 1 million, however its only few hundreds, then why is Spark
>> skewing it in join ? What should i do that Spark distributes the records
>> evenly ?
>>
>> In M/R we can change the Partitioner between mapper and reducer, how can
>> i do in Spark  for Join?
>>
>>
>> IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
>> TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill
>> (Disk)Errors  0 3618 0 RUNNING PROCESS_LOCAL 4 / host12015/05/04 05:09:53 33
>> min  8.5 min  783.9 MB / 50,761,322  4.6 GB 47.5 MB   433 4051 0 SUCCESS
>> PROCESS_LOCAL 1 / host2 2015/05/04 05:16:27 1.1 min  20 s  116.0 MB /
>> 4505143  1282.3 MB 10.1 MB   218 3836 0 SUCCESS PROCESS_LOCAL 3 / host3 
>> 2015/05/04
>> 05:13:01 53 s  11 s  76.4 MB / 2865143  879.6 MB 6.9 MB   113 3731 0
>> SUCCESS PROCESS_LOCAL 2 / host4 2015/05/04 05:11:30 31 s  8 s  6.9 MB /
>> 5187  0.0 B 0.0 B
>>
>> On Mon, May 4, 2015 at 6:00 PM, Saisai Shao <sai.sai.s...@gmail.com>
>> wrote:
>>
>>> From the symptoms you mentioned that one task's shuffle write is much
>>> larger than all the other task, it is quite similar to normal data skew
>>> behavior, I just give some advice based on your descriptions, I think you
>>> need to detect whether data is actually skewed or not.
>>>
>>> The shuffle will put data with same partitioner strategy (default is
>>> hash partitioner) into one task, so the same key data will be put into the
>>> same task, but one task not just has only one key.
>>>
>>> 2015-05-04 18:04 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>:
>>>
>>>> Attached image shows the Spark UI for the job.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, May 4, 2015 at 3:28 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>>> wrote:
>>>>
>>>>> Four tasks are now failing with
>>>>>
>>>>> IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
>>>>> DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
>>>>> Spill (Disk)Errors  0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04
>>>>> 01:27:44   /   ExecutorLostFailure (executor 114 lost)  1007 4973 1
>>>>> FAILED PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14   /   
>>>>> FetchFailed(null,
>>>>> shuffleId=1, mapId=-1, reduceId=1007, message= +details
>>>>>
>>>>> FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message=
>>>>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
>>>>> location for shuffle 1
>>>>>   at 
>>>>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
>>>>>   at 
>>>>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
>>>>>   at 
>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>>   at 
>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>>   at 
>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>>>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>>>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>>>>   at 
>>>>> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381)
>>>>>   at 
>>>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177)
>>>>>   at 
>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>>>>>   at 
>>>>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
>>>>>   at 
>>>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
>>>>>   at 
>>>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127)
>>>>>   at 
>>>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>>>>   at 
>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>>>   at 
>>>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>>>>   at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127)
>>>>>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>   at 
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>   at 
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>   at 
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>   at 
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>   at 
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>   at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>   at 
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>   at 
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>   at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> )
>>>>>
>>>>>  371 4972 1 FAILED PROCESS_LOCAL 563 / host3 2015/05/04 02:13:14   /   
>>>>> FetchFailed(null,
>>>>> shuffleId=1, mapId=-1, reduceId=371, message= +details
>>>>>
>>>>> FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message=
>>>>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
>>>>> location for shuffle 1
>>>>>   at 
>>>>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
>>>>>   at 
>>>>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
>>>>>   at 
>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>>   at 
>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>>   at 
>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>>>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>>>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>>>>   at 
>>>>> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381)
>>>>>   at 
>>>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177)
>>>>>   at 
>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>>>>>   at 
>>>>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
>>>>>   at 
>>>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
>>>>>   at 
>>>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127)
>>>>>   at 
>>>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>>>>   at 
>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>>>   at 
>>>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>>>>   at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127)
>>>>>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>   at 
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>   at 
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>   at 
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>   at 
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>   at 
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>   at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>   at 
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>   at 
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>   at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> )
>>>>>
>>>>>  0 4971 1 FAILED PROCESS_LOCAL 45 / host4 2015/05/04 02:13:14 0 ms  2
>>>>> s  374.3 MB / 19281537  0.0 B 0.0 B
>>>>>
>>>>>
>>>>> On Mon, May 4, 2015 at 3:15 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> One dataset (RDD Pair)
>>>>>>
>>>>>> val lstgItem = listings.map { lstg => (lstg.getItemId().toLong,
>>>>>> lstg) }
>>>>>>
>>>>>> Second Dataset (RDDPair)
>>>>>>
>>>>>> val viEvents = viEventsRaw.map { vi => (vi.get(14).asInstanceOf[Long],
>>>>>> vi) }
>>>>>>
>>>>>> As i want to join based on item Id that is used as first element in
>>>>>> the tuple in both cases and i think thats what is shuffle key.
>>>>>>
>>>>>> listings ==> Data set contains all the unique item ids that are ever
>>>>>> listed on the ecommerce site.
>>>>>>
>>>>>> viEvents ===> List of items viewed by user in last day. This will
>>>>>> always be a subset of the total set.
>>>>>>
>>>>>> So i do not understand what is data skewness. When my long running
>>>>>> task is working on 1591.2 MB / 98,931,767 does that mean 98 million
>>>>>> reocrds contain all the same item ID ? How can millions of user look at 
>>>>>> the
>>>>>> same item in last day ?
>>>>>>
>>>>>> Or does this dataset contain records across item ids ?
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Deepak
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, May 4, 2015 at 3:08 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Shuffle key is depending on your implementation, I'm not sure if you
>>>>>>> are familiar with MapReduce, the mapper output is a key-value pair, 
>>>>>>> where
>>>>>>> the key is the shuffle key for shuffling, Spark is also the same.
>>>>>>>
>>>>>>> 2015-05-04 17:31 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>:
>>>>>>>
>>>>>>>> Hello Shao,
>>>>>>>> Can you talk more about shuffle key or point me to APIs that allow
>>>>>>>> me to change shuffle key. I will try with different keys and see the
>>>>>>>> performance.
>>>>>>>>
>>>>>>>> What is the shuffle key by default ?
>>>>>>>>
>>>>>>>> On Mon, May 4, 2015 at 2:37 PM, Saisai Shao <sai.sai.s...@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> IMHO If your data or your algorithm is prone to data skew, I think
>>>>>>>>> you have to fix this from application level, Spark itself cannot 
>>>>>>>>> overcome
>>>>>>>>> this problem (if one key has large amount of values), you may change 
>>>>>>>>> your
>>>>>>>>> algorithm to choose another shuffle key, somethings like this to avoid
>>>>>>>>> shuffle on skewed keys.
>>>>>>>>>
>>>>>>>>> 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Hello Dean & Others,
>>>>>>>>>> Thanks for the response.
>>>>>>>>>>
>>>>>>>>>> I tried with 100,200, 400, 600 and 1200 repartitions with
>>>>>>>>>> 100,200,400 and 800 executors. Each time all the tasks of join 
>>>>>>>>>> complete in
>>>>>>>>>> less than a minute except one and that one tasks runs forever. I 
>>>>>>>>>> have a
>>>>>>>>>> huge cluster at my disposal.
>>>>>>>>>>
>>>>>>>>>> The data for each of 1199 tasks is around 40MB/30k records and
>>>>>>>>>> for 1 never ending task is 1.5G/98million records. I see that there 
>>>>>>>>>> is data
>>>>>>>>>> skew among tasks. I had observed this a week earlier and i have no 
>>>>>>>>>> clue on
>>>>>>>>>> how to fix it and when someone suggested that repartition might make 
>>>>>>>>>> things
>>>>>>>>>> more parallel, but the problem is still persistent.
>>>>>>>>>>
>>>>>>>>>> Please suggest on how to get the task to complete.
>>>>>>>>>> All i want to do is join two datasets. (dataset1 is in sequence
>>>>>>>>>> file and dataset2 is in avro format).
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Ex:
>>>>>>>>>> Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch
>>>>>>>>>> TimeDurationGC TimeShuffle Read Size / RecordsShuffle Spill
>>>>>>>>>> (Memory)Shuffle Spill (Disk)Errors  0 3771 0 RUNNING
>>>>>>>>>> PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 7.3 min  19 s
>>>>>>>>>> 1591.2 MB / 98931767  0.0 B 0.0 B   1 3772 0 SUCCESS
>>>>>>>>>> PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s  2 s  39.2 MB
>>>>>>>>>> / 29754  0.0 B 0.0 B   2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 
>>>>>>>>>> 2015/05/04
>>>>>>>>>> 01:27:44 26 s  2 s  39.0 MB / 29646  0.0 B 0.0 B   5 3776 0
>>>>>>>>>> SUCCESS PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s  3 s
>>>>>>>>>> 38.8 MB / 29512  0.0 B 0.0 B   4 3775 0 SUCCESS PROCESS_LOCAL 203
>>>>>>>>>> / host5 2015/05/04 01:27:44 41 s  3 s  38.4 MB / 29169  0.0 B 0.0
>>>>>>>>>> B   3 3774 0 SUCCESS PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24
>>>>>>>>>> s  2 s  38.5 MB / 29258  0.0 B 0.0 B   8 3779 0 SUCCESS
>>>>>>>>>> PROCESS_LOCAL 309 / host7 2015/05/04 01:27:44 31 s  4 s  39.5 MB
>>>>>>>>>> / 30008  0.0 B 0.0 B
>>>>>>>>>>
>>>>>>>>>> There are 1200 tasks in total.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, May 3, 2015 at 9:53 PM, Dean Wampler <
>>>>>>>>>> deanwamp...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I don't know the full context of what you're doing, but
>>>>>>>>>>> serialization errors usually mean you're attempting to serialize 
>>>>>>>>>>> something
>>>>>>>>>>> that can't be serialized, like the SparkContext. Kryo won't help 
>>>>>>>>>>> there.
>>>>>>>>>>>
>>>>>>>>>>> The arguments to spark-submit you posted previously look good:
>>>>>>>>>>>
>>>>>>>>>>> 2)  --num-executors 96 --driver-memory 12g --driver-java-options
>>>>>>>>>>> "-XX:MaxPermSize=10G" --executor-memory 12g --executor-cores 4
>>>>>>>>>>>
>>>>>>>>>>> I suspect you aren't getting the parallelism you need. For
>>>>>>>>>>> partitioning, if your data is in HDFS and your block size is 128MB, 
>>>>>>>>>>> then
>>>>>>>>>>> you'll get ~195 partitions anyway. If it takes 7 hours to do a join 
>>>>>>>>>>> over
>>>>>>>>>>> 25GB of data, you have some other serious bottleneck. You should 
>>>>>>>>>>> examine
>>>>>>>>>>> the web console and the logs to determine where all the time is 
>>>>>>>>>>> going.
>>>>>>>>>>> Questions you might pursue:
>>>>>>>>>>>
>>>>>>>>>>>    - How long does each task take to complete?
>>>>>>>>>>>    - How many of those 195 partitions/tasks are processed at
>>>>>>>>>>>    the same time? That is, how many "slots" are available?  Maybe 
>>>>>>>>>>> you need
>>>>>>>>>>>    more nodes if the number of slots is too low. Based on your 
>>>>>>>>>>> command
>>>>>>>>>>>    arguments, you should be able to process 1/2 of them at a time, 
>>>>>>>>>>> unless the
>>>>>>>>>>>    cluster is busy.
>>>>>>>>>>>    - Is the cluster swamped with other work?
>>>>>>>>>>>    - How much data does each task process? Is the data roughly
>>>>>>>>>>>    the same from one task to the next? If not, then you might have 
>>>>>>>>>>> serious key
>>>>>>>>>>>    skew?
>>>>>>>>>>>
>>>>>>>>>>> You may also need to research the details of how joins are
>>>>>>>>>>> implemented and some of the common tricks for organizing data to 
>>>>>>>>>>> minimize
>>>>>>>>>>> having to shuffle all N by M records.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Dean Wampler, Ph.D.
>>>>>>>>>>> Author: Programming Scala, 2nd Edition
>>>>>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>>>>>>>>> Typesafe <http://typesafe.com>
>>>>>>>>>>> @deanwampler <http://twitter.com/deanwampler>
>>>>>>>>>>> http://polyglotprogramming.com
>>>>>>>>>>>
>>>>>>>>>>> On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello Deam,
>>>>>>>>>>>> If I don;t use Kryo serializer i got Serialization error and
>>>>>>>>>>>> hence am using it.
>>>>>>>>>>>> If I don';t use partitionBy/reparition then the simply join
>>>>>>>>>>>> never completed even after 7 hours and infact as next step i need 
>>>>>>>>>>>> to run it
>>>>>>>>>>>> against 250G as that is my full dataset size. Someone here 
>>>>>>>>>>>> suggested to me
>>>>>>>>>>>> to use repartition.
>>>>>>>>>>>>
>>>>>>>>>>>> Assuming reparition is mandatory , how do i decide whats the
>>>>>>>>>>>> right number ? When i am using 400 i do not get 
>>>>>>>>>>>> NullPointerException that i
>>>>>>>>>>>> talked about, which is strange. I never saw that exception against 
>>>>>>>>>>>> small
>>>>>>>>>>>> random dataset but see it with 25G and again with 400 partitions , 
>>>>>>>>>>>> i do not
>>>>>>>>>>>> see it.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, May 3, 2015 at 9:15 PM, Dean Wampler <
>>>>>>>>>>>> deanwamp...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> IMHO, you are trying waaay to hard to optimize work on what is
>>>>>>>>>>>>> really a small data set. 25G, even 250G, is not that much data, 
>>>>>>>>>>>>> especially
>>>>>>>>>>>>> if you've spent a month trying to get something to work that 
>>>>>>>>>>>>> should be
>>>>>>>>>>>>> simple. All these errors are from optimization attempts.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Kryo is great, but if it's not working reliably for some
>>>>>>>>>>>>> reason, then don't use it. Rather than force 200 partitions, let 
>>>>>>>>>>>>> Spark try
>>>>>>>>>>>>> to figure out a good-enough number. (If you really need to force a
>>>>>>>>>>>>> partition count, use the repartition method instead, unless you're
>>>>>>>>>>>>> overriding the partitioner.)
>>>>>>>>>>>>>
>>>>>>>>>>>>> So. I recommend that you eliminate all the optimizations:
>>>>>>>>>>>>> Kryo, partitionBy, etc. Just use the simplest code you can. Make 
>>>>>>>>>>>>> it work
>>>>>>>>>>>>> first. Then, if it really isn't fast enough, look for actual 
>>>>>>>>>>>>> evidence of
>>>>>>>>>>>>> bottlenecks and optimize those.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Dean Wampler, Ph.D.
>>>>>>>>>>>>> Author: Programming Scala, 2nd Edition
>>>>>>>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>>>>>>>>>>> Typesafe <http://typesafe.com>
>>>>>>>>>>>>> @deanwampler <http://twitter.com/deanwampler>
>>>>>>>>>>>>> http://polyglotprogramming.com
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello Dean & Others,
>>>>>>>>>>>>>> Thanks for your suggestions.
>>>>>>>>>>>>>> I have two data sets and all i want to do is a simple equi
>>>>>>>>>>>>>> join. I have 10G limit and as my dataset_1 exceeded that it was 
>>>>>>>>>>>>>> throwing
>>>>>>>>>>>>>> OOM error. Hence i switched back to use .join() API instead of 
>>>>>>>>>>>>>> map-side
>>>>>>>>>>>>>> broadcast join.
>>>>>>>>>>>>>> I am repartitioning the data with 100,200 and i see a
>>>>>>>>>>>>>> NullPointerException now.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> When i run against 25G of each input and with .partitionBy(
>>>>>>>>>>>>>> neworg.apache.spark.HashPartitioner(200)) , I see
>>>>>>>>>>>>>> NullPointerExveption
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> this trace does not include a line from my code and hence i
>>>>>>>>>>>>>> do not what is causing error ?
>>>>>>>>>>>>>> I do have registered kryo serializer.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> val conf = new SparkConf()
>>>>>>>>>>>>>>       .setAppName(detail)
>>>>>>>>>>>>>> *      .set("spark.serializer",
>>>>>>>>>>>>>> "org.apache.spark.serializer.KryoSerializer")*
>>>>>>>>>>>>>>       .set("spark.kryoserializer.buffer.mb",
>>>>>>>>>>>>>> arguments.get("buffersize").get)
>>>>>>>>>>>>>>       .set("spark.kryoserializer.buffer.max.mb",
>>>>>>>>>>>>>> arguments.get("maxbuffersize").get)
>>>>>>>>>>>>>>       .set("spark.driver.maxResultSize",
>>>>>>>>>>>>>> arguments.get("maxResultSize").get)
>>>>>>>>>>>>>>       .set("spark.yarn.maxAppAttempts", "0")
>>>>>>>>>>>>>> * 
>>>>>>>>>>>>>> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve*
>>>>>>>>>>>>>> lMetricSum]))
>>>>>>>>>>>>>>     val sc = new SparkContext(conf)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I see the exception when this task runs
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> val viEvents = details.map { vi => 
>>>>>>>>>>>>>> (vi.get(14).asInstanceOf[Long],
>>>>>>>>>>>>>> vi) }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Its a simple mapping of input records to (itemId, record)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I found this
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Looks like Kryo (2.21v)  changed something to stop using
>>>>>>>>>>>>>> default constructors.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> (Kryo.DefaultInstantiatorStrategy) 
>>>>>>>>>>>>>> kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new
>>>>>>>>>>>>>>  StdInstantiatorStrategy());
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Please suggest
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Trace:
>>>>>>>>>>>>>> 15/05/01 03:02:15 ERROR executor.Executor: Exception in task
>>>>>>>>>>>>>> 110.1 in stage 2.0 (TID 774)
>>>>>>>>>>>>>> com.esotericsoftware.kryo.KryoException:
>>>>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>>>> Serialization trace:
>>>>>>>>>>>>>> values (org.apache.avro.generic.GenericData$Record)
>>>>>>>>>>>>>> datum (org.apache.avro.mapred.AvroKey)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>>>>>>>>>>>>     at com.esotericsoftware.kryo.
>>>>>>>>>>>>>> Kryo.readClassAndObject(Kryo.java:729)Regards,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Any suggestions.
>>>>>>>>>>>>>> I am not able to get this thing to work over a month now, its
>>>>>>>>>>>>>> kind of getting frustrating.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, May 3, 2015 at 8:03 PM, Dean Wampler <
>>>>>>>>>>>>>> deanwamp...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> How big is the data you're returning to the driver with
>>>>>>>>>>>>>>> collectAsMap? You are probably running out of memory trying to 
>>>>>>>>>>>>>>> copy too
>>>>>>>>>>>>>>> much data back to it.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If you're trying to force a map-side join, Spark can do that
>>>>>>>>>>>>>>> for you in some cases within the regular DataFrame/RDD context. 
>>>>>>>>>>>>>>> See
>>>>>>>>>>>>>>> http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning
>>>>>>>>>>>>>>> and this talk by Michael Armbrust for example,
>>>>>>>>>>>>>>> http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> dean
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Dean Wampler, Ph.D.
>>>>>>>>>>>>>>> Author: Programming Scala, 2nd Edition
>>>>>>>>>>>>>>> <http://shop.oreilly.com/product/0636920033073.do>
>>>>>>>>>>>>>>>  (O'Reilly)
>>>>>>>>>>>>>>> Typesafe <http://typesafe.com>
>>>>>>>>>>>>>>> @deanwampler <http://twitter.com/deanwampler>
>>>>>>>>>>>>>>> http://polyglotprogramming.com
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Full Exception
>>>>>>>>>>>>>>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1
>>>>>>>>>>>>>>>> (collectAsMap at VISummaryDataProvider.scala:37) failed in 
>>>>>>>>>>>>>>>> 884.087 s*
>>>>>>>>>>>>>>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0
>>>>>>>>>>>>>>>> failed: collectAsMap at VISummaryDataProvider.scala:37, took 
>>>>>>>>>>>>>>>> 1093.418249 s*
>>>>>>>>>>>>>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class
>>>>>>>>>>>>>>>> threw exception: Job aborted due to stage failure: Exception 
>>>>>>>>>>>>>>>> while getting
>>>>>>>>>>>>>>>> task result: org.apache.spark.SparkException: Error sending 
>>>>>>>>>>>>>>>> message
>>>>>>>>>>>>>>>> [message = GetLocations(taskresult_112)]
>>>>>>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>>>>>>>>> failure: Exception while getting task result:
>>>>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message 
>>>>>>>>>>>>>>>> [message =
>>>>>>>>>>>>>>>> GetLocations(taskresult_112)]
>>>>>>>>>>>>>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>>>>>>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>>>>>>>>>>> at scala.Option.foreach(Option.scala:236)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>>>>>>> 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app
>>>>>>>>>>>>>>>> status: FAILED, exitCode: 15, (reason: User class threw 
>>>>>>>>>>>>>>>> exception: Job
>>>>>>>>>>>>>>>> aborted due to stage failure: Exception while getting task 
>>>>>>>>>>>>>>>> result:
>>>>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message 
>>>>>>>>>>>>>>>> [message =
>>>>>>>>>>>>>>>> GetLocations(taskresult_112)])
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *Code at line 37*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> val lstgItemMap = listings.map { lstg =>
>>>>>>>>>>>>>>>> (lstg.getItemId().toLong, lstg) }.collectAsMap
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Listing data set size is 26G (10 files) and my driver
>>>>>>>>>>>>>>>> memory is 12G (I cant go beyond it). The reason i do 
>>>>>>>>>>>>>>>> collectAsMap is to
>>>>>>>>>>>>>>>> brodcast it and do a map-side join instead of regular join.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Please suggest ?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> My Spark Job is failing  and i see
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ==============================
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class
>>>>>>>>>>>>>>>>> threw exception: Job aborted due to stage failure: Exception 
>>>>>>>>>>>>>>>>> while getting
>>>>>>>>>>>>>>>>> task result: org.apache.spark.SparkException: Error sending 
>>>>>>>>>>>>>>>>> message
>>>>>>>>>>>>>>>>> [message = GetLocations(taskresult_112)]
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>>>>>>>>>> failure: Exception while getting task result:
>>>>>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message 
>>>>>>>>>>>>>>>>> [message =
>>>>>>>>>>>>>>>>> GetLocations(taskresult_112)]
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>>>>>>>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> at scala.Option.foreach(Option.scala:236)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> java.util.concurrent.TimeoutException: Futures timed out
>>>>>>>>>>>>>>>>> after [30 seconds]
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I see multiple of these
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures
>>>>>>>>>>>>>>>>> timed out after [30 seconds]
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> And finally i see this
>>>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>>>>> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>>>>>>>>>>>>>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:95)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:206)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:72)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:124)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>>>>>>>>>>>>>>>> at io.netty.util.concurrent.SingleThreadEven
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Solutions
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>       .set("spark.akka.askTimeout", "6000")
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>       .set("spark.akka.timeout", "6000")
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>       .set("spark.worker.timeout", "6000")
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2)  --num-executors 96 --driver-memory 12g
>>>>>>>>>>>>>>>>> --driver-java-options "-XX:MaxPermSize=10G" --executor-memory 
>>>>>>>>>>>>>>>>> 12g
>>>>>>>>>>>>>>>>> --executor-cores 4
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 12G is the limit imposed by YARN cluster, I cant go beyond
>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ANY suggestions ?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Apr 30, 2015 at 6:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Did not work. Same problem.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Apr 30, 2015 at 1:28 PM, Akhil Das <
>>>>>>>>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> You could try increasing your heap space explicitly.
>>>>>>>>>>>>>>>>>>> like export _JAVA_OPTIONS="-Xmx10g", its not the correct 
>>>>>>>>>>>>>>>>>>> approach but try.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Apr 28, 2015 at 10:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I have a SparkApp that runs completes in 45 mins for 5
>>>>>>>>>>>>>>>>>>>> files (5*750MB size) and it takes 16 executors to do so.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I wanted to run it against 10 files of each input type
>>>>>>>>>>>>>>>>>>>> (10*3 files as there are three inputs that are 
>>>>>>>>>>>>>>>>>>>> transformed). [Input1 =
>>>>>>>>>>>>>>>>>>>> 10*750 MB, Input2=10*2.5GB, Input3 = 10*1.5G], Hence i 
>>>>>>>>>>>>>>>>>>>> used 32 executors.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I see multiple
>>>>>>>>>>>>>>>>>>>> 5/04/28 09:23:31 WARN executor.Executor: Issue
>>>>>>>>>>>>>>>>>>>> communicating with driver in heartbeater
>>>>>>>>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message
>>>>>>>>>>>>>>>>>>>> [message = 
>>>>>>>>>>>>>>>>>>>> Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22,
>>>>>>>>>>>>>>>>>>>> phxaishdc9dn1048.stratus.phx.ebay.com, 39505))]
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
>>>>>>>>>>>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException:
>>>>>>>>>>>>>>>>>>>> Futures timed out after [30 seconds]
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>>>>>>>>>>>>>>>>>>> at scala.concurrent.Await$.result(package.scala:107)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
>>>>>>>>>>>>>>>>>>>> ... 1 more
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> When i searched deeper, i found OOM error.
>>>>>>>>>>>>>>>>>>>> 15/04/28 09:10:15 INFO storage.BlockManagerMasterActor:
>>>>>>>>>>>>>>>>>>>> Removing block manager BlockManagerId(17,
>>>>>>>>>>>>>>>>>>>> phxdpehdc9dn2643.stratus.phx.ebay.com, 36819)
>>>>>>>>>>>>>>>>>>>> 15/04/28 09:11:26 WARN storage.BlockManagerMasterActor:
>>>>>>>>>>>>>>>>>>>> Removing BlockManager BlockManagerId(9,
>>>>>>>>>>>>>>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304) with no
>>>>>>>>>>>>>>>>>>>> recent heart beats: 121200ms exceeds 120000ms
>>>>>>>>>>>>>>>>>>>> 15/04/28 09:11:26 INFO storage.BlockManagerMasterActor:
>>>>>>>>>>>>>>>>>>>> Removing block manager BlockManagerId(9,
>>>>>>>>>>>>>>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304)
>>>>>>>>>>>>>>>>>>>> 15/04/28 09:11:26 ERROR util.Utils: Uncaught exception
>>>>>>>>>>>>>>>>>>>> in thread task-result-getter-3
>>>>>>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245)
>>>>>>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219)
>>>>>>>>>>>>>>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
>>>>>>>>>>>>>>>>>>>> at java.util.ArrayList.add(ArrayList.java:440)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>>>>>>> Exception in thread "task-result-getter-3"
>>>>>>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245)
>>>>>>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219)
>>>>>>>>>>>>>>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
>>>>>>>>>>>>>>>>>>>> at java.util.ArrayList.add(ArrayList.java:440)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> LogType: stdout
>>>>>>>>>>>>>>>>>>>> LogLength: 96
>>>>>>>>>>>>>>>>>>>> Log Contents:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> hdfs://hostName:8020/sys/edw/dw_lstg_item/snapshot/2015/04/28/00/part-r-0000*
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Spark Command:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> ./bin/spark-submit -v --master yarn-cluster
>>>>>>>>>>>>>>>>>>>> --driver-class-path
>>>>>>>>>>>>>>>>>>>> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
>>>>>>>>>>>>>>>>>>>> --jars
>>>>>>>>>>>>>>>>>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting_dep_only-1.0-SNAPSHOT-jar-with-dependencies.jar
>>>>>>>>>>>>>>>>>>>> --num-executors 32 --driver-memory 12g 
>>>>>>>>>>>>>>>>>>>> --driver-java-options
>>>>>>>>>>>>>>>>>>>> "-XX:MaxPermSize=8G" --executor-memory 12g 
>>>>>>>>>>>>>>>>>>>> --executor-cores 4 --queue
>>>>>>>>>>>>>>>>>>>> hdmi-express --class 
>>>>>>>>>>>>>>>>>>>> com.ebay.ep.poc.spark.reporting.SparkApp
>>>>>>>>>>>>>>>>>>>> /home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting-1.0-SNAPSHOT.jar
>>>>>>>>>>>>>>>>>>>> startDate=2015-04-6 endDate=2015-04-7
>>>>>>>>>>>>>>>>>>>> input=/user/dvasthimal/epdatasets_small/exptsession 
>>>>>>>>>>>>>>>>>>>> subcommand=viewItem
>>>>>>>>>>>>>>>>>>>> output=/user/dvasthimal/epdatasets/viewItem buffersize=128
>>>>>>>>>>>>>>>>>>>> maxbuffersize=1068 maxResultSize=200G askTimeout=1200
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> There is 12G limit on memory that i can use as this
>>>>>>>>>>>>>>>>>>>> Spark is running over YARN.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Spark Version: 1.3.1
>>>>>>>>>>>>>>>>>>>> Should i increase the number of executors form 32?
>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Deepak
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Deepak
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Deepak
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Deepak
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Deepak
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>


-- 
Deepak

Reply via email to