Four tasks are now failing with IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch TimeDurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk) Errors 0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44 / ExecutorLostFailure (executor 114 lost) 1007 4973 1 FAILED PROCESS_LOCAL 420 / host2 2015/05/04 02:13:14 / FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= +details
FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ) 371 4972 1 FAILED PROCESS_LOCAL 563 / host3 2015/05/04 02:13:14 / FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message= +details FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ) 0 4971 1 FAILED PROCESS_LOCAL 45 / host4 2015/05/04 02:13:14 0 ms 2 s 374.3 MB / 19281537 0.0 B 0.0 B On Mon, May 4, 2015 at 3:15 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote: > One dataset (RDD Pair) > > val lstgItem = listings.map { lstg => (lstg.getItemId().toLong, lstg) } > > Second Dataset (RDDPair) > > val viEvents = viEventsRaw.map { vi => (vi.get(14).asInstanceOf[Long], > vi) } > > As i want to join based on item Id that is used as first element in the > tuple in both cases and i think thats what is shuffle key. > > listings ==> Data set contains all the unique item ids that are ever > listed on the ecommerce site. > > viEvents ===> List of items viewed by user in last day. This will always > be a subset of the total set. > > So i do not understand what is data skewness. When my long running task is > working on 1591.2 MB / 98,931,767 does that mean 98 million reocrds > contain all the same item ID ? How can millions of user look at the same > item in last day ? > > Or does this dataset contain records across item ids ? > > > Regards, > > Deepak > > > > > On Mon, May 4, 2015 at 3:08 PM, Saisai Shao <sai.sai.s...@gmail.com> > wrote: > >> Shuffle key is depending on your implementation, I'm not sure if you are >> familiar with MapReduce, the mapper output is a key-value pair, where the >> key is the shuffle key for shuffling, Spark is also the same. >> >> 2015-05-04 17:31 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>: >> >>> Hello Shao, >>> Can you talk more about shuffle key or point me to APIs that allow me to >>> change shuffle key. I will try with different keys and see the performance. >>> >>> What is the shuffle key by default ? >>> >>> On Mon, May 4, 2015 at 2:37 PM, Saisai Shao <sai.sai.s...@gmail.com> >>> wrote: >>> >>>> IMHO If your data or your algorithm is prone to data skew, I think you >>>> have to fix this from application level, Spark itself cannot overcome this >>>> problem (if one key has large amount of values), you may change your >>>> algorithm to choose another shuffle key, somethings like this to avoid >>>> shuffle on skewed keys. >>>> >>>> 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>: >>>> >>>>> Hello Dean & Others, >>>>> Thanks for the response. >>>>> >>>>> I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 >>>>> and 800 executors. Each time all the tasks of join complete in less than a >>>>> minute except one and that one tasks runs forever. I have a huge cluster >>>>> at >>>>> my disposal. >>>>> >>>>> The data for each of 1199 tasks is around 40MB/30k records and for 1 >>>>> never ending task is 1.5G/98million records. I see that there is data skew >>>>> among tasks. I had observed this a week earlier and i have no clue on how >>>>> to fix it and when someone suggested that repartition might make things >>>>> more parallel, but the problem is still persistent. >>>>> >>>>> Please suggest on how to get the task to complete. >>>>> All i want to do is join two datasets. (dataset1 is in sequence file >>>>> and dataset2 is in avro format). >>>>> >>>>> >>>>> >>>>> Ex: >>>>> Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time >>>>> DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle >>>>> Spill (Disk)Errors 0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04 >>>>> 01:27:44 7.3 min 19 s 1591.2 MB / 98931767 0.0 B 0.0 B 1 3772 0 >>>>> SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s 2 s 39.2 >>>>> MB / 29754 0.0 B 0.0 B 2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 >>>>> 2015/05/04 >>>>> 01:27:44 26 s 2 s 39.0 MB / 29646 0.0 B 0.0 B 5 3776 0 SUCCESS >>>>> PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s 3 s 38.8 MB / >>>>> 29512 0.0 B 0.0 B 4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04 >>>>> 01:27:44 41 s 3 s 38.4 MB / 29169 0.0 B 0.0 B 3 3774 0 SUCCESS >>>>> PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s 2 s 38.5 MB / >>>>> 29258 0.0 B 0.0 B 8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04 >>>>> 01:27:44 31 s 4 s 39.5 MB / 30008 0.0 B 0.0 B >>>>> >>>>> There are 1200 tasks in total. >>>>> >>>>> >>>>> On Sun, May 3, 2015 at 9:53 PM, Dean Wampler <deanwamp...@gmail.com> >>>>> wrote: >>>>> >>>>>> I don't know the full context of what you're doing, but serialization >>>>>> errors usually mean you're attempting to serialize something that can't >>>>>> be >>>>>> serialized, like the SparkContext. Kryo won't help there. >>>>>> >>>>>> The arguments to spark-submit you posted previously look good: >>>>>> >>>>>> 2) --num-executors 96 --driver-memory 12g --driver-java-options >>>>>> "-XX:MaxPermSize=10G" --executor-memory 12g --executor-cores 4 >>>>>> >>>>>> I suspect you aren't getting the parallelism you need. For >>>>>> partitioning, if your data is in HDFS and your block size is 128MB, then >>>>>> you'll get ~195 partitions anyway. If it takes 7 hours to do a join over >>>>>> 25GB of data, you have some other serious bottleneck. You should examine >>>>>> the web console and the logs to determine where all the time is going. >>>>>> Questions you might pursue: >>>>>> >>>>>> - How long does each task take to complete? >>>>>> - How many of those 195 partitions/tasks are processed at the >>>>>> same time? That is, how many "slots" are available? Maybe you need >>>>>> more >>>>>> nodes if the number of slots is too low. Based on your command >>>>>> arguments, >>>>>> you should be able to process 1/2 of them at a time, unless the >>>>>> cluster is >>>>>> busy. >>>>>> - Is the cluster swamped with other work? >>>>>> - How much data does each task process? Is the data roughly the >>>>>> same from one task to the next? If not, then you might have serious >>>>>> key >>>>>> skew? >>>>>> >>>>>> You may also need to research the details of how joins are >>>>>> implemented and some of the common tricks for organizing data to minimize >>>>>> having to shuffle all N by M records. >>>>>> >>>>>> >>>>>> >>>>>> Dean Wampler, Ph.D. >>>>>> Author: Programming Scala, 2nd Edition >>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >>>>>> Typesafe <http://typesafe.com> >>>>>> @deanwampler <http://twitter.com/deanwampler> >>>>>> http://polyglotprogramming.com >>>>>> >>>>>> On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hello Deam, >>>>>>> If I don;t use Kryo serializer i got Serialization error and hence >>>>>>> am using it. >>>>>>> If I don';t use partitionBy/reparition then the simply join never >>>>>>> completed even after 7 hours and infact as next step i need to run it >>>>>>> against 250G as that is my full dataset size. Someone here suggested to >>>>>>> me >>>>>>> to use repartition. >>>>>>> >>>>>>> Assuming reparition is mandatory , how do i decide whats the right >>>>>>> number ? When i am using 400 i do not get NullPointerException that i >>>>>>> talked about, which is strange. I never saw that exception against small >>>>>>> random dataset but see it with 25G and again with 400 partitions , i do >>>>>>> not >>>>>>> see it. >>>>>>> >>>>>>> >>>>>>> On Sun, May 3, 2015 at 9:15 PM, Dean Wampler <deanwamp...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> IMHO, you are trying waaay to hard to optimize work on what is >>>>>>>> really a small data set. 25G, even 250G, is not that much data, >>>>>>>> especially >>>>>>>> if you've spent a month trying to get something to work that should be >>>>>>>> simple. All these errors are from optimization attempts. >>>>>>>> >>>>>>>> Kryo is great, but if it's not working reliably for some reason, >>>>>>>> then don't use it. Rather than force 200 partitions, let Spark try to >>>>>>>> figure out a good-enough number. (If you really need to force a >>>>>>>> partition >>>>>>>> count, use the repartition method instead, unless you're overriding the >>>>>>>> partitioner.) >>>>>>>> >>>>>>>> So. I recommend that you eliminate all the optimizations: Kryo, >>>>>>>> partitionBy, etc. Just use the simplest code you can. Make it work >>>>>>>> first. >>>>>>>> Then, if it really isn't fast enough, look for actual evidence of >>>>>>>> bottlenecks and optimize those. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Dean Wampler, Ph.D. >>>>>>>> Author: Programming Scala, 2nd Edition >>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >>>>>>>> Typesafe <http://typesafe.com> >>>>>>>> @deanwampler <http://twitter.com/deanwampler> >>>>>>>> http://polyglotprogramming.com >>>>>>>> >>>>>>>> On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com >>>>>>>> > wrote: >>>>>>>> >>>>>>>>> Hello Dean & Others, >>>>>>>>> Thanks for your suggestions. >>>>>>>>> I have two data sets and all i want to do is a simple equi join. I >>>>>>>>> have 10G limit and as my dataset_1 exceeded that it was throwing OOM >>>>>>>>> error. >>>>>>>>> Hence i switched back to use .join() API instead of map-side broadcast >>>>>>>>> join. >>>>>>>>> I am repartitioning the data with 100,200 and i see a >>>>>>>>> NullPointerException now. >>>>>>>>> >>>>>>>>> When i run against 25G of each input and with .partitionBy(new >>>>>>>>> org.apache.spark.HashPartitioner(200)) , I see >>>>>>>>> NullPointerExveption >>>>>>>>> >>>>>>>>> >>>>>>>>> this trace does not include a line from my code and hence i do not >>>>>>>>> what is causing error ? >>>>>>>>> I do have registered kryo serializer. >>>>>>>>> >>>>>>>>> val conf = new SparkConf() >>>>>>>>> .setAppName(detail) >>>>>>>>> * .set("spark.serializer", >>>>>>>>> "org.apache.spark.serializer.KryoSerializer")* >>>>>>>>> .set("spark.kryoserializer.buffer.mb", >>>>>>>>> arguments.get("buffersize").get) >>>>>>>>> .set("spark.kryoserializer.buffer.max.mb", >>>>>>>>> arguments.get("maxbuffersize").get) >>>>>>>>> .set("spark.driver.maxResultSize", >>>>>>>>> arguments.get("maxResultSize").get) >>>>>>>>> .set("spark.yarn.maxAppAttempts", "0") >>>>>>>>> * >>>>>>>>> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve* >>>>>>>>> lMetricSum])) >>>>>>>>> val sc = new SparkContext(conf) >>>>>>>>> >>>>>>>>> I see the exception when this task runs >>>>>>>>> >>>>>>>>> val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long], >>>>>>>>> vi) } >>>>>>>>> >>>>>>>>> Its a simple mapping of input records to (itemId, record) >>>>>>>>> >>>>>>>>> I found this >>>>>>>>> >>>>>>>>> http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist >>>>>>>>> and >>>>>>>>> >>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html >>>>>>>>> >>>>>>>>> Looks like Kryo (2.21v) changed something to stop using default >>>>>>>>> constructors. >>>>>>>>> >>>>>>>>> (Kryo.DefaultInstantiatorStrategy) >>>>>>>>> kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new >>>>>>>>> StdInstantiatorStrategy()); >>>>>>>>> >>>>>>>>> >>>>>>>>> Please suggest >>>>>>>>> >>>>>>>>> >>>>>>>>> Trace: >>>>>>>>> 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 >>>>>>>>> in stage 2.0 (TID 774) >>>>>>>>> com.esotericsoftware.kryo.KryoException: >>>>>>>>> java.lang.NullPointerException >>>>>>>>> Serialization trace: >>>>>>>>> values (org.apache.avro.generic.GenericData$Record) >>>>>>>>> datum (org.apache.avro.mapred.AvroKey) >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>> at >>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41) >>>>>>>>> at >>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) >>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo. >>>>>>>>> java:729)Regards, >>>>>>>>> >>>>>>>>> >>>>>>>>> Any suggestions. >>>>>>>>> I am not able to get this thing to work over a month now, its kind >>>>>>>>> of getting frustrating. >>>>>>>>> >>>>>>>>> On Sun, May 3, 2015 at 8:03 PM, Dean Wampler < >>>>>>>>> deanwamp...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> How big is the data you're returning to the driver with >>>>>>>>>> collectAsMap? You are probably running out of memory trying to copy >>>>>>>>>> too >>>>>>>>>> much data back to it. >>>>>>>>>> >>>>>>>>>> If you're trying to force a map-side join, Spark can do that for >>>>>>>>>> you in some cases within the regular DataFrame/RDD context. See >>>>>>>>>> http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning >>>>>>>>>> and this talk by Michael Armbrust for example, >>>>>>>>>> http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> dean >>>>>>>>>> >>>>>>>>>> Dean Wampler, Ph.D. >>>>>>>>>> Author: Programming Scala, 2nd Edition >>>>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >>>>>>>>>> Typesafe <http://typesafe.com> >>>>>>>>>> @deanwampler <http://twitter.com/deanwampler> >>>>>>>>>> http://polyglotprogramming.com >>>>>>>>>> >>>>>>>>>> On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Full Exception >>>>>>>>>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 >>>>>>>>>>> (collectAsMap at VISummaryDataProvider.scala:37) failed in 884.087 >>>>>>>>>>> s* >>>>>>>>>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed: >>>>>>>>>>> collectAsMap at VISummaryDataProvider.scala:37, took 1093.418249 s* >>>>>>>>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw >>>>>>>>>>> exception: Job aborted due to stage failure: Exception while >>>>>>>>>>> getting task >>>>>>>>>>> result: org.apache.spark.SparkException: Error sending message >>>>>>>>>>> [message = >>>>>>>>>>> GetLocations(taskresult_112)] >>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage >>>>>>>>>>> failure: Exception while getting task result: >>>>>>>>>>> org.apache.spark.SparkException: Error sending message [message = >>>>>>>>>>> GetLocations(taskresult_112)] >>>>>>>>>>> at org.apache.spark.scheduler.DAGScheduler.org >>>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) >>>>>>>>>>> at >>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>>>>>>> at >>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) >>>>>>>>>>> at scala.Option.foreach(Option.scala:236) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>>>>>>>> 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app status: >>>>>>>>>>> FAILED, exitCode: 15, (reason: User class threw exception: Job >>>>>>>>>>> aborted due >>>>>>>>>>> to stage failure: Exception while getting task result: >>>>>>>>>>> org.apache.spark.SparkException: Error sending message [message = >>>>>>>>>>> GetLocations(taskresult_112)]) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> *Code at line 37* >>>>>>>>>>> >>>>>>>>>>> val lstgItemMap = listings.map { lstg => >>>>>>>>>>> (lstg.getItemId().toLong, lstg) }.collectAsMap >>>>>>>>>>> >>>>>>>>>>> Listing data set size is 26G (10 files) and my driver memory is >>>>>>>>>>> 12G (I cant go beyond it). The reason i do collectAsMap is to >>>>>>>>>>> brodcast it >>>>>>>>>>> and do a map-side join instead of regular join. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Please suggest ? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> My Spark Job is failing and i see >>>>>>>>>>>> >>>>>>>>>>>> ============================== >>>>>>>>>>>> >>>>>>>>>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class >>>>>>>>>>>> threw exception: Job aborted due to stage failure: Exception while >>>>>>>>>>>> getting >>>>>>>>>>>> task result: org.apache.spark.SparkException: Error sending message >>>>>>>>>>>> [message = GetLocations(taskresult_112)] >>>>>>>>>>>> >>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage >>>>>>>>>>>> failure: Exception while getting task result: >>>>>>>>>>>> org.apache.spark.SparkException: Error sending message [message = >>>>>>>>>>>> GetLocations(taskresult_112)] >>>>>>>>>>>> >>>>>>>>>>>> at org.apache.spark.scheduler.DAGScheduler.org >>>>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) >>>>>>>>>>>> >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) >>>>>>>>>>>> >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) >>>>>>>>>>>> >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>>>>>>>> >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>>>>>>> >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) >>>>>>>>>>>> >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) >>>>>>>>>>>> >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) >>>>>>>>>>>> >>>>>>>>>>>> at scala.Option.foreach(Option.scala:236) >>>>>>>>>>>> >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> java.util.concurrent.TimeoutException: Futures timed out after >>>>>>>>>>>> [30 seconds] >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I see multiple of these >>>>>>>>>>>> >>>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed >>>>>>>>>>>> out after [30 seconds] >>>>>>>>>>>> >>>>>>>>>>>> And finally i see this >>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space >>>>>>>>>>>> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) >>>>>>>>>>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:95) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:206) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:72) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:124) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >>>>>>>>>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >>>>>>>>>>>> at io.netty.util.concurrent.SingleThreadEven >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Solutions >>>>>>>>>>>> >>>>>>>>>>>> 1) >>>>>>>>>>>> >>>>>>>>>>>> .set("spark.akka.askTimeout", "6000") >>>>>>>>>>>> >>>>>>>>>>>> .set("spark.akka.timeout", "6000") >>>>>>>>>>>> >>>>>>>>>>>> .set("spark.worker.timeout", "6000") >>>>>>>>>>>> >>>>>>>>>>>> 2) --num-executors 96 --driver-memory 12g >>>>>>>>>>>> --driver-java-options "-XX:MaxPermSize=10G" --executor-memory 12g >>>>>>>>>>>> --executor-cores 4 >>>>>>>>>>>> >>>>>>>>>>>> 12G is the limit imposed by YARN cluster, I cant go beyond this. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> ANY suggestions ? >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> >>>>>>>>>>>> Deepak >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Apr 30, 2015 at 6:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Did not work. Same problem. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Apr 30, 2015 at 1:28 PM, Akhil Das < >>>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> You could try increasing your heap space explicitly. like >>>>>>>>>>>>>> export _JAVA_OPTIONS="-Xmx10g", its not the correct approach but >>>>>>>>>>>>>> try. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Apr 28, 2015 at 10:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have a SparkApp that runs completes in 45 mins for 5 files >>>>>>>>>>>>>>> (5*750MB size) and it takes 16 executors to do so. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I wanted to run it against 10 files of each input type (10*3 >>>>>>>>>>>>>>> files as there are three inputs that are transformed). [Input1 >>>>>>>>>>>>>>> = 10*750 MB, >>>>>>>>>>>>>>> Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used 32 executors. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I see multiple >>>>>>>>>>>>>>> 5/04/28 09:23:31 WARN executor.Executor: Issue communicating >>>>>>>>>>>>>>> with driver in heartbeater >>>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message >>>>>>>>>>>>>>> [message = >>>>>>>>>>>>>>> Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22, >>>>>>>>>>>>>>> phxaishdc9dn1048.stratus.phx.ebay.com, 39505))] >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) >>>>>>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures >>>>>>>>>>>>>>> timed out after [30 seconds] >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) >>>>>>>>>>>>>>> at scala.concurrent.Await$.result(package.scala:107) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) >>>>>>>>>>>>>>> ... 1 more >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> When i searched deeper, i found OOM error. >>>>>>>>>>>>>>> 15/04/28 09:10:15 INFO storage.BlockManagerMasterActor: >>>>>>>>>>>>>>> Removing block manager BlockManagerId(17, >>>>>>>>>>>>>>> phxdpehdc9dn2643.stratus.phx.ebay.com, 36819) >>>>>>>>>>>>>>> 15/04/28 09:11:26 WARN storage.BlockManagerMasterActor: >>>>>>>>>>>>>>> Removing BlockManager BlockManagerId(9, >>>>>>>>>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304) with no >>>>>>>>>>>>>>> recent heart beats: 121200ms exceeds 120000ms >>>>>>>>>>>>>>> 15/04/28 09:11:26 INFO storage.BlockManagerMasterActor: >>>>>>>>>>>>>>> Removing block manager BlockManagerId(9, >>>>>>>>>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304) >>>>>>>>>>>>>>> 15/04/28 09:11:26 ERROR util.Utils: Uncaught exception in >>>>>>>>>>>>>>> thread task-result-getter-3 >>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space >>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245) >>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219) >>>>>>>>>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) >>>>>>>>>>>>>>> at java.util.ArrayList.add(ArrayList.java:440) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>>>>>> Exception in thread "task-result-getter-3" >>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space >>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245) >>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219) >>>>>>>>>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) >>>>>>>>>>>>>>> at java.util.ArrayList.add(ArrayList.java:440) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> LogType: stdout >>>>>>>>>>>>>>> LogLength: 96 >>>>>>>>>>>>>>> Log Contents: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> hdfs://hostName:8020/sys/edw/dw_lstg_item/snapshot/2015/04/28/00/part-r-0000* >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Spark Command: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ./bin/spark-submit -v --master yarn-cluster >>>>>>>>>>>>>>> --driver-class-path >>>>>>>>>>>>>>> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar >>>>>>>>>>>>>>> --jars >>>>>>>>>>>>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting_dep_only-1.0-SNAPSHOT-jar-with-dependencies.jar >>>>>>>>>>>>>>> --num-executors 32 --driver-memory 12g --driver-java-options >>>>>>>>>>>>>>> "-XX:MaxPermSize=8G" --executor-memory 12g --executor-cores 4 >>>>>>>>>>>>>>> --queue >>>>>>>>>>>>>>> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp >>>>>>>>>>>>>>> /home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting-1.0-SNAPSHOT.jar >>>>>>>>>>>>>>> startDate=2015-04-6 endDate=2015-04-7 >>>>>>>>>>>>>>> input=/user/dvasthimal/epdatasets_small/exptsession >>>>>>>>>>>>>>> subcommand=viewItem >>>>>>>>>>>>>>> output=/user/dvasthimal/epdatasets/viewItem buffersize=128 >>>>>>>>>>>>>>> maxbuffersize=1068 maxResultSize=200G askTimeout=1200 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> There is 12G limit on memory that i can use as this Spark is >>>>>>>>>>>>>>> running over YARN. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Spark Version: 1.3.1 >>>>>>>>>>>>>>> Should i increase the number of executors form 32? >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Deepak >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Deepak >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Deepak >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Deepak >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Deepak >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Deepak >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Deepak >>>>> >>>>> >>>> >>> >>> >>> -- >>> Deepak >>> >>> >> > > > -- > Deepak > > -- Deepak