I tried this val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents, new org.apache.spark.RangePartitioner(partitions = 1200, rdd = viEvents)).map {
It fired two jobs and still i have 1 task that never completes. IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill (Disk) Errors 0 4818 0 RUNNING PROCESS_LOCAL 5 / host1 2015/05/04 07:24:25 1.1 h 13 min 778.0 MB / 50314161 4.5 GB 47.4 MB 955 5773 0 SUCCESS PROCESS_LOCAL 5 / host2 2015/05/04 07:47:16 2.2 min 1.5 min 106.3 MB / 4197539 0.0 B 0.0 B 1199 6017 0 SUCCESS PROCESS_LOCAL 3 / host3 2015/05/04 07:51:51 48 s 2 s 94.2 MB / 3618335 2.8 GB 8.6 MB 216 2) I tried reversing the datasets in join val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] =viEvents.join(lstgItem) This led to same problem of a long running task. 3) Next, i am trying this val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = lstgItem.join(viEvents, 1200).map { I have exhausted all my options. Regards, Deepak On Mon, May 4, 2015 at 6:24 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote: > I ran it against one file instead of 10 files and i see one task is still > running after 33 mins its shuffle read size is 780MB/50 mil records. > > I did a count of records for each itemId from dataset-2 [One FILE] (Second > Dataset (RDDPair) val viEvents = viEventsRaw.map { vi => (vi.get(14 > ).asInstanceOf[Long], vi) } ). This is the dataset that contains the list > of items viewed by user in one day. > > *Item Id Count* > 201335783004 537 > 111654496030 353 > 141640703798 287 > 191568402102 258 > 111654479898 217 > 231521843148 211 > 251931716094 201 > 111654493548 181 > 181503913062 181 > 121635453050 152 > 261798565828 140 > 151494617682 139 > 251927181728 127 > 231516683056 119 > 141640492864 117 > 161677270656 117 > 171771073616 113 > 111649942124 109 > 191516989450 97 > 231539161292 94 > 221555628408 88 > 131497785968 87 > 121632233872 84 > 131335379184 83 > 281531363490 83 > 131492727742 79 > 231174157820 79 > 161666914810 77 > 251699753072 77 > 161683664300 76 > > > I was assuming that data-skew would be if the top item(201335783004) had > a count of 1 million, however its only few hundreds, then why is Spark > skewing it in join ? What should i do that Spark distributes the records > evenly ? > > In M/R we can change the Partitioner between mapper and reducer, how can i > do in Spark for Join? > > > IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC > TimeShuffle Read Size / Records ▴Shuffle Spill (Memory)Shuffle Spill > (Disk)Errors 0 3618 0 RUNNING PROCESS_LOCAL 4 / host12015/05/04 05:09:53 33 > min 8.5 min 783.9 MB / 50,761,322 4.6 GB 47.5 MB 433 4051 0 SUCCESS > PROCESS_LOCAL 1 / host2 2015/05/04 05:16:27 1.1 min 20 s 116.0 MB / > 4505143 1282.3 MB 10.1 MB 218 3836 0 SUCCESS PROCESS_LOCAL 3 / host3 > 2015/05/04 > 05:13:01 53 s 11 s 76.4 MB / 2865143 879.6 MB 6.9 MB 113 3731 0 > SUCCESS PROCESS_LOCAL 2 / host4 2015/05/04 05:11:30 31 s 8 s 6.9 MB / > 5187 0.0 B 0.0 B > > On Mon, May 4, 2015 at 6:00 PM, Saisai Shao <sai.sai.s...@gmail.com> > wrote: > >> From the symptoms you mentioned that one task's shuffle write is much >> larger than all the other task, it is quite similar to normal data skew >> behavior, I just give some advice based on your descriptions, I think you >> need to detect whether data is actually skewed or not. >> >> The shuffle will put data with same partitioner strategy (default is hash >> partitioner) into one task, so the same key data will be put into the same >> task, but one task not just has only one key. >> >> 2015-05-04 18:04 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>: >> >>> Attached image shows the Spark UI for the job. >>> >>> >>> >>> >>> >>> On Mon, May 4, 2015 at 3:28 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>> wrote: >>> >>>> Four tasks are now failing with >>>> >>>> IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time >>>> DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle >>>> Spill (Disk)Errors 0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04 >>>> 01:27:44 / ExecutorLostFailure (executor 114 lost) 1007 4973 1 >>>> FAILED PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14 / >>>> FetchFailed(null, >>>> shuffleId=1, mapId=-1, reduceId=1007, message= +details >>>> >>>> FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= >>>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output >>>> location for shuffle 1 >>>> at >>>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385) >>>> at >>>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382) >>>> at >>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>> at >>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>> at >>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >>>> at >>>> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381) >>>> at >>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177) >>>> at >>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) >>>> at >>>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) >>>> at >>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) >>>> at >>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127) >>>> at >>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >>>> at >>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>>> at >>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >>>> at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>> at >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>> at >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:64) >>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> ) >>>> >>>> 371 4972 1 FAILED PROCESS_LOCAL 563 / host3 2015/05/04 02:13:14 / >>>> FetchFailed(null, >>>> shuffleId=1, mapId=-1, reduceId=371, message= +details >>>> >>>> FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message= >>>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output >>>> location for shuffle 1 >>>> at >>>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385) >>>> at >>>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382) >>>> at >>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>> at >>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>> at >>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >>>> at >>>> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381) >>>> at >>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177) >>>> at >>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) >>>> at >>>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) >>>> at >>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) >>>> at >>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127) >>>> at >>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >>>> at >>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>>> at >>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >>>> at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>> at >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>> at >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:64) >>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> ) >>>> >>>> 0 4971 1 FAILED PROCESS_LOCAL 45 / host4 2015/05/04 02:13:14 0 ms 2 >>>> s 374.3 MB / 19281537 0.0 B 0.0 B >>>> >>>> >>>> On Mon, May 4, 2015 at 3:15 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>> wrote: >>>> >>>>> One dataset (RDD Pair) >>>>> >>>>> val lstgItem = listings.map { lstg => (lstg.getItemId().toLong, lstg) >>>>> } >>>>> >>>>> Second Dataset (RDDPair) >>>>> >>>>> val viEvents = viEventsRaw.map { vi => (vi.get(14).asInstanceOf[Long], >>>>> vi) } >>>>> >>>>> As i want to join based on item Id that is used as first element in >>>>> the tuple in both cases and i think thats what is shuffle key. >>>>> >>>>> listings ==> Data set contains all the unique item ids that are ever >>>>> listed on the ecommerce site. >>>>> >>>>> viEvents ===> List of items viewed by user in last day. This will >>>>> always be a subset of the total set. >>>>> >>>>> So i do not understand what is data skewness. When my long running >>>>> task is working on 1591.2 MB / 98,931,767 does that mean 98 million >>>>> reocrds contain all the same item ID ? How can millions of user look at >>>>> the >>>>> same item in last day ? >>>>> >>>>> Or does this dataset contain records across item ids ? >>>>> >>>>> >>>>> Regards, >>>>> >>>>> Deepak >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, May 4, 2015 at 3:08 PM, Saisai Shao <sai.sai.s...@gmail.com> >>>>> wrote: >>>>> >>>>>> Shuffle key is depending on your implementation, I'm not sure if you >>>>>> are familiar with MapReduce, the mapper output is a key-value pair, where >>>>>> the key is the shuffle key for shuffling, Spark is also the same. >>>>>> >>>>>> 2015-05-04 17:31 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>: >>>>>> >>>>>>> Hello Shao, >>>>>>> Can you talk more about shuffle key or point me to APIs that allow >>>>>>> me to change shuffle key. I will try with different keys and see the >>>>>>> performance. >>>>>>> >>>>>>> What is the shuffle key by default ? >>>>>>> >>>>>>> On Mon, May 4, 2015 at 2:37 PM, Saisai Shao <sai.sai.s...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> IMHO If your data or your algorithm is prone to data skew, I think >>>>>>>> you have to fix this from application level, Spark itself cannot >>>>>>>> overcome >>>>>>>> this problem (if one key has large amount of values), you may change >>>>>>>> your >>>>>>>> algorithm to choose another shuffle key, somethings like this to avoid >>>>>>>> shuffle on skewed keys. >>>>>>>> >>>>>>>> 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>: >>>>>>>> >>>>>>>>> Hello Dean & Others, >>>>>>>>> Thanks for the response. >>>>>>>>> >>>>>>>>> I tried with 100,200, 400, 600 and 1200 repartitions with >>>>>>>>> 100,200,400 and 800 executors. Each time all the tasks of join >>>>>>>>> complete in >>>>>>>>> less than a minute except one and that one tasks runs forever. I have >>>>>>>>> a >>>>>>>>> huge cluster at my disposal. >>>>>>>>> >>>>>>>>> The data for each of 1199 tasks is around 40MB/30k records and for >>>>>>>>> 1 never ending task is 1.5G/98million records. I see that there is >>>>>>>>> data >>>>>>>>> skew among tasks. I had observed this a week earlier and i have no >>>>>>>>> clue on >>>>>>>>> how to fix it and when someone suggested that repartition might make >>>>>>>>> things >>>>>>>>> more parallel, but the problem is still persistent. >>>>>>>>> >>>>>>>>> Please suggest on how to get the task to complete. >>>>>>>>> All i want to do is join two datasets. (dataset1 is in sequence >>>>>>>>> file and dataset2 is in avro format). >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Ex: >>>>>>>>> Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch >>>>>>>>> TimeDurationGC TimeShuffle Read Size / RecordsShuffle Spill >>>>>>>>> (Memory)Shuffle Spill (Disk)Errors 0 3771 0 RUNNING PROCESS_LOCAL 114 >>>>>>>>> / host1 2015/05/04 01:27:44 7.3 min 19 s 1591.2 MB / 98931767 >>>>>>>>> 0.0 B 0.0 B 1 3772 0 SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 >>>>>>>>> 01:27:44 28 s 2 s 39.2 MB / 29754 0.0 B 0.0 B 2 3773 0 >>>>>>>>> SUCCESS PROCESS_LOCAL 283 / host3 2015/05/04 01:27:44 26 s 2 s >>>>>>>>> 39.0 MB / 29646 0.0 B 0.0 B 5 3776 0 SUCCESS PROCESS_LOCAL 320 >>>>>>>>> / host4 2015/05/04 01:27:44 31 s 3 s 38.8 MB / 29512 0.0 B 0.0 >>>>>>>>> B 4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04 01:27:44 41 >>>>>>>>> s 3 s 38.4 MB / 29169 0.0 B 0.0 B 3 3774 0 SUCCESS >>>>>>>>> PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s 2 s 38.5 MB / >>>>>>>>> 29258 0.0 B 0.0 B 8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 >>>>>>>>> 2015/05/04 >>>>>>>>> 01:27:44 31 s 4 s 39.5 MB / 30008 0.0 B 0.0 B >>>>>>>>> >>>>>>>>> There are 1200 tasks in total. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sun, May 3, 2015 at 9:53 PM, Dean Wampler < >>>>>>>>> deanwamp...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I don't know the full context of what you're doing, but >>>>>>>>>> serialization errors usually mean you're attempting to serialize >>>>>>>>>> something >>>>>>>>>> that can't be serialized, like the SparkContext. Kryo won't help >>>>>>>>>> there. >>>>>>>>>> >>>>>>>>>> The arguments to spark-submit you posted previously look good: >>>>>>>>>> >>>>>>>>>> 2) --num-executors 96 --driver-memory 12g --driver-java-options >>>>>>>>>> "-XX:MaxPermSize=10G" --executor-memory 12g --executor-cores 4 >>>>>>>>>> >>>>>>>>>> I suspect you aren't getting the parallelism you need. For >>>>>>>>>> partitioning, if your data is in HDFS and your block size is 128MB, >>>>>>>>>> then >>>>>>>>>> you'll get ~195 partitions anyway. If it takes 7 hours to do a join >>>>>>>>>> over >>>>>>>>>> 25GB of data, you have some other serious bottleneck. You should >>>>>>>>>> examine >>>>>>>>>> the web console and the logs to determine where all the time is >>>>>>>>>> going. >>>>>>>>>> Questions you might pursue: >>>>>>>>>> >>>>>>>>>> - How long does each task take to complete? >>>>>>>>>> - How many of those 195 partitions/tasks are processed at the >>>>>>>>>> same time? That is, how many "slots" are available? Maybe you >>>>>>>>>> need more >>>>>>>>>> nodes if the number of slots is too low. Based on your command >>>>>>>>>> arguments, >>>>>>>>>> you should be able to process 1/2 of them at a time, unless the >>>>>>>>>> cluster is >>>>>>>>>> busy. >>>>>>>>>> - Is the cluster swamped with other work? >>>>>>>>>> - How much data does each task process? Is the data roughly >>>>>>>>>> the same from one task to the next? If not, then you might have >>>>>>>>>> serious key >>>>>>>>>> skew? >>>>>>>>>> >>>>>>>>>> You may also need to research the details of how joins are >>>>>>>>>> implemented and some of the common tricks for organizing data to >>>>>>>>>> minimize >>>>>>>>>> having to shuffle all N by M records. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Dean Wampler, Ph.D. >>>>>>>>>> Author: Programming Scala, 2nd Edition >>>>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >>>>>>>>>> Typesafe <http://typesafe.com> >>>>>>>>>> @deanwampler <http://twitter.com/deanwampler> >>>>>>>>>> http://polyglotprogramming.com >>>>>>>>>> >>>>>>>>>> On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hello Deam, >>>>>>>>>>> If I don;t use Kryo serializer i got Serialization error and >>>>>>>>>>> hence am using it. >>>>>>>>>>> If I don';t use partitionBy/reparition then the simply join >>>>>>>>>>> never completed even after 7 hours and infact as next step i need >>>>>>>>>>> to run it >>>>>>>>>>> against 250G as that is my full dataset size. Someone here >>>>>>>>>>> suggested to me >>>>>>>>>>> to use repartition. >>>>>>>>>>> >>>>>>>>>>> Assuming reparition is mandatory , how do i decide whats the >>>>>>>>>>> right number ? When i am using 400 i do not get >>>>>>>>>>> NullPointerException that i >>>>>>>>>>> talked about, which is strange. I never saw that exception against >>>>>>>>>>> small >>>>>>>>>>> random dataset but see it with 25G and again with 400 partitions , >>>>>>>>>>> i do not >>>>>>>>>>> see it. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sun, May 3, 2015 at 9:15 PM, Dean Wampler < >>>>>>>>>>> deanwamp...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> IMHO, you are trying waaay to hard to optimize work on what is >>>>>>>>>>>> really a small data set. 25G, even 250G, is not that much data, >>>>>>>>>>>> especially >>>>>>>>>>>> if you've spent a month trying to get something to work that >>>>>>>>>>>> should be >>>>>>>>>>>> simple. All these errors are from optimization attempts. >>>>>>>>>>>> >>>>>>>>>>>> Kryo is great, but if it's not working reliably for some >>>>>>>>>>>> reason, then don't use it. Rather than force 200 partitions, let >>>>>>>>>>>> Spark try >>>>>>>>>>>> to figure out a good-enough number. (If you really need to force a >>>>>>>>>>>> partition count, use the repartition method instead, unless you're >>>>>>>>>>>> overriding the partitioner.) >>>>>>>>>>>> >>>>>>>>>>>> So. I recommend that you eliminate all the optimizations: Kryo, >>>>>>>>>>>> partitionBy, etc. Just use the simplest code you can. Make it work >>>>>>>>>>>> first. >>>>>>>>>>>> Then, if it really isn't fast enough, look for actual evidence of >>>>>>>>>>>> bottlenecks and optimize those. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Dean Wampler, Ph.D. >>>>>>>>>>>> Author: Programming Scala, 2nd Edition >>>>>>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >>>>>>>>>>>> Typesafe <http://typesafe.com> >>>>>>>>>>>> @deanwampler <http://twitter.com/deanwampler> >>>>>>>>>>>> http://polyglotprogramming.com >>>>>>>>>>>> >>>>>>>>>>>> On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hello Dean & Others, >>>>>>>>>>>>> Thanks for your suggestions. >>>>>>>>>>>>> I have two data sets and all i want to do is a simple equi >>>>>>>>>>>>> join. I have 10G limit and as my dataset_1 exceeded that it was >>>>>>>>>>>>> throwing >>>>>>>>>>>>> OOM error. Hence i switched back to use .join() API instead of >>>>>>>>>>>>> map-side >>>>>>>>>>>>> broadcast join. >>>>>>>>>>>>> I am repartitioning the data with 100,200 and i see a >>>>>>>>>>>>> NullPointerException now. >>>>>>>>>>>>> >>>>>>>>>>>>> When i run against 25G of each input and with .partitionBy(new >>>>>>>>>>>>> org.apache.spark.HashPartitioner(200)) , I see >>>>>>>>>>>>> NullPointerExveption >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> this trace does not include a line from my code and hence i do >>>>>>>>>>>>> not what is causing error ? >>>>>>>>>>>>> I do have registered kryo serializer. >>>>>>>>>>>>> >>>>>>>>>>>>> val conf = new SparkConf() >>>>>>>>>>>>> .setAppName(detail) >>>>>>>>>>>>> * .set("spark.serializer", >>>>>>>>>>>>> "org.apache.spark.serializer.KryoSerializer")* >>>>>>>>>>>>> .set("spark.kryoserializer.buffer.mb", >>>>>>>>>>>>> arguments.get("buffersize").get) >>>>>>>>>>>>> .set("spark.kryoserializer.buffer.max.mb", >>>>>>>>>>>>> arguments.get("maxbuffersize").get) >>>>>>>>>>>>> .set("spark.driver.maxResultSize", >>>>>>>>>>>>> arguments.get("maxResultSize").get) >>>>>>>>>>>>> .set("spark.yarn.maxAppAttempts", "0") >>>>>>>>>>>>> * >>>>>>>>>>>>> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve* >>>>>>>>>>>>> lMetricSum])) >>>>>>>>>>>>> val sc = new SparkContext(conf) >>>>>>>>>>>>> >>>>>>>>>>>>> I see the exception when this task runs >>>>>>>>>>>>> >>>>>>>>>>>>> val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long], >>>>>>>>>>>>> vi) } >>>>>>>>>>>>> >>>>>>>>>>>>> Its a simple mapping of input records to (itemId, record) >>>>>>>>>>>>> >>>>>>>>>>>>> I found this >>>>>>>>>>>>> >>>>>>>>>>>>> http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist >>>>>>>>>>>>> and >>>>>>>>>>>>> >>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html >>>>>>>>>>>>> >>>>>>>>>>>>> Looks like Kryo (2.21v) changed something to stop using >>>>>>>>>>>>> default constructors. >>>>>>>>>>>>> >>>>>>>>>>>>> (Kryo.DefaultInstantiatorStrategy) >>>>>>>>>>>>> kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new >>>>>>>>>>>>> StdInstantiatorStrategy()); >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Please suggest >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Trace: >>>>>>>>>>>>> 15/05/01 03:02:15 ERROR executor.Executor: Exception in task >>>>>>>>>>>>> 110.1 in stage 2.0 (TID 774) >>>>>>>>>>>>> com.esotericsoftware.kryo.KryoException: >>>>>>>>>>>>> java.lang.NullPointerException >>>>>>>>>>>>> Serialization trace: >>>>>>>>>>>>> values (org.apache.avro.generic.GenericData$Record) >>>>>>>>>>>>> datum (org.apache.avro.mapred.AvroKey) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) >>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo. >>>>>>>>>>>>> java:729)Regards, >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Any suggestions. >>>>>>>>>>>>> I am not able to get this thing to work over a month now, its >>>>>>>>>>>>> kind of getting frustrating. >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, May 3, 2015 at 8:03 PM, Dean Wampler < >>>>>>>>>>>>> deanwamp...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> How big is the data you're returning to the driver with >>>>>>>>>>>>>> collectAsMap? You are probably running out of memory trying to >>>>>>>>>>>>>> copy too >>>>>>>>>>>>>> much data back to it. >>>>>>>>>>>>>> >>>>>>>>>>>>>> If you're trying to force a map-side join, Spark can do that >>>>>>>>>>>>>> for you in some cases within the regular DataFrame/RDD context. >>>>>>>>>>>>>> See >>>>>>>>>>>>>> http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning >>>>>>>>>>>>>> and this talk by Michael Armbrust for example, >>>>>>>>>>>>>> http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> dean >>>>>>>>>>>>>> >>>>>>>>>>>>>> Dean Wampler, Ph.D. >>>>>>>>>>>>>> Author: Programming Scala, 2nd Edition >>>>>>>>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >>>>>>>>>>>>>> Typesafe <http://typesafe.com> >>>>>>>>>>>>>> @deanwampler <http://twitter.com/deanwampler> >>>>>>>>>>>>>> http://polyglotprogramming.com >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Full Exception >>>>>>>>>>>>>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 >>>>>>>>>>>>>>> (collectAsMap at VISummaryDataProvider.scala:37) failed in >>>>>>>>>>>>>>> 884.087 s* >>>>>>>>>>>>>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 >>>>>>>>>>>>>>> failed: collectAsMap at VISummaryDataProvider.scala:37, took >>>>>>>>>>>>>>> 1093.418249 s* >>>>>>>>>>>>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class >>>>>>>>>>>>>>> threw exception: Job aborted due to stage failure: Exception >>>>>>>>>>>>>>> while getting >>>>>>>>>>>>>>> task result: org.apache.spark.SparkException: Error sending >>>>>>>>>>>>>>> message >>>>>>>>>>>>>>> [message = GetLocations(taskresult_112)] >>>>>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage >>>>>>>>>>>>>>> failure: Exception while getting task result: >>>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message [message >>>>>>>>>>>>>>> = >>>>>>>>>>>>>>> GetLocations(taskresult_112)] >>>>>>>>>>>>>>> at org.apache.spark.scheduler.DAGScheduler.org >>>>>>>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) >>>>>>>>>>>>>>> at scala.Option.foreach(Option.scala:236) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>>>>>>>>>>>> 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app >>>>>>>>>>>>>>> status: FAILED, exitCode: 15, (reason: User class threw >>>>>>>>>>>>>>> exception: Job >>>>>>>>>>>>>>> aborted due to stage failure: Exception while getting task >>>>>>>>>>>>>>> result: >>>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message [message >>>>>>>>>>>>>>> = >>>>>>>>>>>>>>> GetLocations(taskresult_112)]) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *Code at line 37* >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> val lstgItemMap = listings.map { lstg => >>>>>>>>>>>>>>> (lstg.getItemId().toLong, lstg) }.collectAsMap >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Listing data set size is 26G (10 files) and my driver memory >>>>>>>>>>>>>>> is 12G (I cant go beyond it). The reason i do collectAsMap is >>>>>>>>>>>>>>> to brodcast >>>>>>>>>>>>>>> it and do a map-side join instead of regular join. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Please suggest ? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> My Spark Job is failing and i see >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ============================== >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class >>>>>>>>>>>>>>>> threw exception: Job aborted due to stage failure: Exception >>>>>>>>>>>>>>>> while getting >>>>>>>>>>>>>>>> task result: org.apache.spark.SparkException: Error sending >>>>>>>>>>>>>>>> message >>>>>>>>>>>>>>>> [message = GetLocations(taskresult_112)] >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage >>>>>>>>>>>>>>>> failure: Exception while getting task result: >>>>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message >>>>>>>>>>>>>>>> [message = >>>>>>>>>>>>>>>> GetLocations(taskresult_112)] >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> at org.apache.spark.scheduler.DAGScheduler.org >>>>>>>>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> at scala.Option.foreach(Option.scala:236) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> java.util.concurrent.TimeoutException: Futures timed out >>>>>>>>>>>>>>>> after [30 seconds] >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I see multiple of these >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures >>>>>>>>>>>>>>>> timed out after [30 seconds] >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> And finally i see this >>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space >>>>>>>>>>>>>>>> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) >>>>>>>>>>>>>>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:95) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:206) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:72) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:124) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >>>>>>>>>>>>>>>> at io.netty.util.concurrent.SingleThreadEven >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Solutions >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> .set("spark.akka.askTimeout", "6000") >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> .set("spark.akka.timeout", "6000") >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> .set("spark.worker.timeout", "6000") >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2) --num-executors 96 --driver-memory 12g >>>>>>>>>>>>>>>> --driver-java-options "-XX:MaxPermSize=10G" --executor-memory >>>>>>>>>>>>>>>> 12g >>>>>>>>>>>>>>>> --executor-cores 4 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 12G is the limit imposed by YARN cluster, I cant go beyond >>>>>>>>>>>>>>>> this. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ANY suggestions ? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Deepak >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, Apr 30, 2015 at 6:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Did not work. Same problem. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, Apr 30, 2015 at 1:28 PM, Akhil Das < >>>>>>>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> You could try increasing your heap space explicitly. like >>>>>>>>>>>>>>>>>> export _JAVA_OPTIONS="-Xmx10g", its not the correct approach >>>>>>>>>>>>>>>>>> but try. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Tue, Apr 28, 2015 at 10:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I have a SparkApp that runs completes in 45 mins for 5 >>>>>>>>>>>>>>>>>>> files (5*750MB size) and it takes 16 executors to do so. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I wanted to run it against 10 files of each input type >>>>>>>>>>>>>>>>>>> (10*3 files as there are three inputs that are >>>>>>>>>>>>>>>>>>> transformed). [Input1 = >>>>>>>>>>>>>>>>>>> 10*750 MB, Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used >>>>>>>>>>>>>>>>>>> 32 executors. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I see multiple >>>>>>>>>>>>>>>>>>> 5/04/28 09:23:31 WARN executor.Executor: Issue >>>>>>>>>>>>>>>>>>> communicating with driver in heartbeater >>>>>>>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message >>>>>>>>>>>>>>>>>>> [message = >>>>>>>>>>>>>>>>>>> Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22, >>>>>>>>>>>>>>>>>>> phxaishdc9dn1048.stratus.phx.ebay.com, 39505))] >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) >>>>>>>>>>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException: >>>>>>>>>>>>>>>>>>> Futures timed out after [30 seconds] >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) >>>>>>>>>>>>>>>>>>> at scala.concurrent.Await$.result(package.scala:107) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) >>>>>>>>>>>>>>>>>>> ... 1 more >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> When i searched deeper, i found OOM error. >>>>>>>>>>>>>>>>>>> 15/04/28 09:10:15 INFO storage.BlockManagerMasterActor: >>>>>>>>>>>>>>>>>>> Removing block manager BlockManagerId(17, >>>>>>>>>>>>>>>>>>> phxdpehdc9dn2643.stratus.phx.ebay.com, 36819) >>>>>>>>>>>>>>>>>>> 15/04/28 09:11:26 WARN storage.BlockManagerMasterActor: >>>>>>>>>>>>>>>>>>> Removing BlockManager BlockManagerId(9, >>>>>>>>>>>>>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304) with no >>>>>>>>>>>>>>>>>>> recent heart beats: 121200ms exceeds 120000ms >>>>>>>>>>>>>>>>>>> 15/04/28 09:11:26 INFO storage.BlockManagerMasterActor: >>>>>>>>>>>>>>>>>>> Removing block manager BlockManagerId(9, >>>>>>>>>>>>>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304) >>>>>>>>>>>>>>>>>>> 15/04/28 09:11:26 ERROR util.Utils: Uncaught exception >>>>>>>>>>>>>>>>>>> in thread task-result-getter-3 >>>>>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space >>>>>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245) >>>>>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219) >>>>>>>>>>>>>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) >>>>>>>>>>>>>>>>>>> at java.util.ArrayList.add(ArrayList.java:440) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>>>>>>>>>> Exception in thread "task-result-getter-3" >>>>>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space >>>>>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245) >>>>>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219) >>>>>>>>>>>>>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) >>>>>>>>>>>>>>>>>>> at java.util.ArrayList.add(ArrayList.java:440) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> LogType: stdout >>>>>>>>>>>>>>>>>>> LogLength: 96 >>>>>>>>>>>>>>>>>>> Log Contents: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> hdfs://hostName:8020/sys/edw/dw_lstg_item/snapshot/2015/04/28/00/part-r-0000* >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Spark Command: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> ./bin/spark-submit -v --master yarn-cluster >>>>>>>>>>>>>>>>>>> --driver-class-path >>>>>>>>>>>>>>>>>>> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar >>>>>>>>>>>>>>>>>>> --jars >>>>>>>>>>>>>>>>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting_dep_only-1.0-SNAPSHOT-jar-with-dependencies.jar >>>>>>>>>>>>>>>>>>> --num-executors 32 --driver-memory 12g --driver-java-options >>>>>>>>>>>>>>>>>>> "-XX:MaxPermSize=8G" --executor-memory 12g --executor-cores >>>>>>>>>>>>>>>>>>> 4 --queue >>>>>>>>>>>>>>>>>>> hdmi-express --class >>>>>>>>>>>>>>>>>>> com.ebay.ep.poc.spark.reporting.SparkApp >>>>>>>>>>>>>>>>>>> /home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting-1.0-SNAPSHOT.jar >>>>>>>>>>>>>>>>>>> startDate=2015-04-6 endDate=2015-04-7 >>>>>>>>>>>>>>>>>>> input=/user/dvasthimal/epdatasets_small/exptsession >>>>>>>>>>>>>>>>>>> subcommand=viewItem >>>>>>>>>>>>>>>>>>> output=/user/dvasthimal/epdatasets/viewItem buffersize=128 >>>>>>>>>>>>>>>>>>> maxbuffersize=1068 maxResultSize=200G askTimeout=1200 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> There is 12G limit on memory that i can use as this >>>>>>>>>>>>>>>>>>> Spark is running over YARN. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Spark Version: 1.3.1 >>>>>>>>>>>>>>>>>>> Should i increase the number of executors form 32? >>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>> Deepak >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> Deepak >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> Deepak >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Deepak >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Deepak >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Deepak >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Deepak >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Deepak >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Deepak >>>>> >>>>> >>>> >>>> >>>> -- >>>> Deepak >>>> >>>> >>> >>> >>> -- >>> Deepak >>> >>> >> > > > -- > Deepak > > -- Deepak