Four tasks are now failing with

IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch TimeDurationGC
TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)
Errors  0 3771 0 FAILED PROCESS_LOCAL 114 / host1 2015/05/04 01:27:44
 /   ExecutorLostFailure
(executor 114 lost)  1007 4973 1 FAILED PROCESS_LOCAL 420 / host2 2015/05/04
02:13:14   /   FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007,
message= +details

FetchFailed(null, shuffleId=1, mapId=-1, reduceId=1007, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 1
        at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
        at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
        at 
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381)
        at 
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177)
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
        at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

)

 371 4972 1 FAILED PROCESS_LOCAL 563 / host3 2015/05/04 02:13:14   /
FetchFailed(null,
shuffleId=1, mapId=-1, reduceId=371, message= +details

FetchFailed(null, shuffleId=1, mapId=-1, reduceId=371, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 1
        at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:385)
        at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:382)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
        at 
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:381)
        at 
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:177)
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
        at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

)

 0 4971 1 FAILED PROCESS_LOCAL 45 / host4 2015/05/04 02:13:14 0 ms  2 s
374.3 MB / 19281537  0.0 B 0.0 B


On Mon, May 4, 2015 at 3:15 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:

> One dataset (RDD Pair)
>
> val lstgItem = listings.map { lstg => (lstg.getItemId().toLong, lstg) }
>
> Second Dataset (RDDPair)
>
> val viEvents = viEventsRaw.map { vi => (vi.get(14).asInstanceOf[Long],
> vi) }
>
> As i want to join based on item Id that is used as first element in the
> tuple in both cases and i think thats what is shuffle key.
>
> listings ==> Data set contains all the unique item ids that are ever
> listed on the ecommerce site.
>
> viEvents ===> List of items viewed by user in last day. This will always
> be a subset of the total set.
>
> So i do not understand what is data skewness. When my long running task is
> working on 1591.2 MB / 98,931,767 does that mean 98 million reocrds
> contain all the same item ID ? How can millions of user look at the same
> item in last day ?
>
> Or does this dataset contain records across item ids ?
>
>
> Regards,
>
> Deepak
>
>
>
>
> On Mon, May 4, 2015 at 3:08 PM, Saisai Shao <sai.sai.s...@gmail.com>
> wrote:
>
>> Shuffle key is depending on your implementation, I'm not sure if you are
>> familiar with MapReduce, the mapper output is a key-value pair, where the
>> key is the shuffle key for shuffling, Spark is also the same.
>>
>> 2015-05-04 17:31 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>:
>>
>>> Hello Shao,
>>> Can you talk more about shuffle key or point me to APIs that allow me to
>>> change shuffle key. I will try with different keys and see the performance.
>>>
>>> What is the shuffle key by default ?
>>>
>>> On Mon, May 4, 2015 at 2:37 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>> wrote:
>>>
>>>> IMHO If your data or your algorithm is prone to data skew, I think you
>>>> have to fix this from application level, Spark itself cannot overcome this
>>>> problem (if one key has large amount of values), you may change your
>>>> algorithm to choose another shuffle key, somethings like this to avoid
>>>> shuffle on skewed keys.
>>>>
>>>> 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>:
>>>>
>>>>> Hello Dean & Others,
>>>>> Thanks for the response.
>>>>>
>>>>> I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400
>>>>> and 800 executors. Each time all the tasks of join complete in less than a
>>>>> minute except one and that one tasks runs forever. I have a huge cluster 
>>>>> at
>>>>> my disposal.
>>>>>
>>>>> The data for each of 1199 tasks is around 40MB/30k records and for 1
>>>>> never ending task is 1.5G/98million records. I see that there is data skew
>>>>> among tasks. I had observed this a week earlier and i have no clue on how
>>>>> to fix it and when someone suggested that repartition might make things
>>>>> more parallel, but the problem is still persistent.
>>>>>
>>>>> Please suggest on how to get the task to complete.
>>>>> All i want to do is join two datasets. (dataset1 is in sequence file
>>>>> and dataset2 is in avro format).
>>>>>
>>>>>
>>>>>
>>>>> Ex:
>>>>> Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time
>>>>> DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle
>>>>> Spill (Disk)Errors  0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04
>>>>> 01:27:44 7.3 min  19 s  1591.2 MB / 98931767  0.0 B 0.0 B   1 3772 0
>>>>> SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s  2 s  39.2
>>>>> MB / 29754  0.0 B 0.0 B   2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 
>>>>> 2015/05/04
>>>>> 01:27:44 26 s  2 s  39.0 MB / 29646  0.0 B 0.0 B   5 3776 0 SUCCESS
>>>>> PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s  3 s  38.8 MB /
>>>>> 29512  0.0 B 0.0 B   4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04
>>>>> 01:27:44 41 s  3 s  38.4 MB / 29169  0.0 B 0.0 B   3 3774 0 SUCCESS
>>>>> PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s  2 s  38.5 MB /
>>>>> 29258  0.0 B 0.0 B   8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04
>>>>> 01:27:44 31 s  4 s  39.5 MB / 30008  0.0 B 0.0 B
>>>>>
>>>>> There are 1200 tasks in total.
>>>>>
>>>>>
>>>>> On Sun, May 3, 2015 at 9:53 PM, Dean Wampler <deanwamp...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I don't know the full context of what you're doing, but serialization
>>>>>> errors usually mean you're attempting to serialize something that can't 
>>>>>> be
>>>>>> serialized, like the SparkContext. Kryo won't help there.
>>>>>>
>>>>>> The arguments to spark-submit you posted previously look good:
>>>>>>
>>>>>> 2)  --num-executors 96 --driver-memory 12g --driver-java-options
>>>>>> "-XX:MaxPermSize=10G" --executor-memory 12g --executor-cores 4
>>>>>>
>>>>>> I suspect you aren't getting the parallelism you need. For
>>>>>> partitioning, if your data is in HDFS and your block size is 128MB, then
>>>>>> you'll get ~195 partitions anyway. If it takes 7 hours to do a join over
>>>>>> 25GB of data, you have some other serious bottleneck. You should examine
>>>>>> the web console and the logs to determine where all the time is going.
>>>>>> Questions you might pursue:
>>>>>>
>>>>>>    - How long does each task take to complete?
>>>>>>    - How many of those 195 partitions/tasks are processed at the
>>>>>>    same time? That is, how many "slots" are available?  Maybe you need 
>>>>>> more
>>>>>>    nodes if the number of slots is too low. Based on your command 
>>>>>> arguments,
>>>>>>    you should be able to process 1/2 of them at a time, unless the 
>>>>>> cluster is
>>>>>>    busy.
>>>>>>    - Is the cluster swamped with other work?
>>>>>>    - How much data does each task process? Is the data roughly the
>>>>>>    same from one task to the next? If not, then you might have serious 
>>>>>> key
>>>>>>    skew?
>>>>>>
>>>>>> You may also need to research the details of how joins are
>>>>>> implemented and some of the common tricks for organizing data to minimize
>>>>>> having to shuffle all N by M records.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Dean Wampler, Ph.D.
>>>>>> Author: Programming Scala, 2nd Edition
>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>>>> Typesafe <http://typesafe.com>
>>>>>> @deanwampler <http://twitter.com/deanwampler>
>>>>>> http://polyglotprogramming.com
>>>>>>
>>>>>> On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello Deam,
>>>>>>> If I don;t use Kryo serializer i got Serialization error and hence
>>>>>>> am using it.
>>>>>>> If I don';t use partitionBy/reparition then the simply join never
>>>>>>> completed even after 7 hours and infact as next step i need to run it
>>>>>>> against 250G as that is my full dataset size. Someone here suggested to 
>>>>>>> me
>>>>>>> to use repartition.
>>>>>>>
>>>>>>> Assuming reparition is mandatory , how do i decide whats the right
>>>>>>> number ? When i am using 400 i do not get NullPointerException that i
>>>>>>> talked about, which is strange. I never saw that exception against small
>>>>>>> random dataset but see it with 25G and again with 400 partitions , i do 
>>>>>>> not
>>>>>>> see it.
>>>>>>>
>>>>>>>
>>>>>>> On Sun, May 3, 2015 at 9:15 PM, Dean Wampler <deanwamp...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> IMHO, you are trying waaay to hard to optimize work on what is
>>>>>>>> really a small data set. 25G, even 250G, is not that much data, 
>>>>>>>> especially
>>>>>>>> if you've spent a month trying to get something to work that should be
>>>>>>>> simple. All these errors are from optimization attempts.
>>>>>>>>
>>>>>>>> Kryo is great, but if it's not working reliably for some reason,
>>>>>>>> then don't use it. Rather than force 200 partitions, let Spark try to
>>>>>>>> figure out a good-enough number. (If you really need to force a 
>>>>>>>> partition
>>>>>>>> count, use the repartition method instead, unless you're overriding the
>>>>>>>> partitioner.)
>>>>>>>>
>>>>>>>> So. I recommend that you eliminate all the optimizations: Kryo,
>>>>>>>> partitionBy, etc. Just use the simplest code you can. Make it work 
>>>>>>>> first.
>>>>>>>> Then, if it really isn't fast enough, look for actual evidence of
>>>>>>>> bottlenecks and optimize those.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Dean Wampler, Ph.D.
>>>>>>>> Author: Programming Scala, 2nd Edition
>>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>>>>>> Typesafe <http://typesafe.com>
>>>>>>>> @deanwampler <http://twitter.com/deanwampler>
>>>>>>>> http://polyglotprogramming.com
>>>>>>>>
>>>>>>>> On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Hello Dean & Others,
>>>>>>>>> Thanks for your suggestions.
>>>>>>>>> I have two data sets and all i want to do is a simple equi join. I
>>>>>>>>> have 10G limit and as my dataset_1 exceeded that it was throwing OOM 
>>>>>>>>> error.
>>>>>>>>> Hence i switched back to use .join() API instead of map-side broadcast
>>>>>>>>> join.
>>>>>>>>> I am repartitioning the data with 100,200 and i see a
>>>>>>>>> NullPointerException now.
>>>>>>>>>
>>>>>>>>> When i run against 25G of each input and with .partitionBy(new
>>>>>>>>> org.apache.spark.HashPartitioner(200)) , I see
>>>>>>>>> NullPointerExveption
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> this trace does not include a line from my code and hence i do not
>>>>>>>>> what is causing error ?
>>>>>>>>> I do have registered kryo serializer.
>>>>>>>>>
>>>>>>>>> val conf = new SparkConf()
>>>>>>>>>       .setAppName(detail)
>>>>>>>>> *      .set("spark.serializer",
>>>>>>>>> "org.apache.spark.serializer.KryoSerializer")*
>>>>>>>>>       .set("spark.kryoserializer.buffer.mb",
>>>>>>>>> arguments.get("buffersize").get)
>>>>>>>>>       .set("spark.kryoserializer.buffer.max.mb",
>>>>>>>>> arguments.get("maxbuffersize").get)
>>>>>>>>>       .set("spark.driver.maxResultSize",
>>>>>>>>> arguments.get("maxResultSize").get)
>>>>>>>>>       .set("spark.yarn.maxAppAttempts", "0")
>>>>>>>>> * 
>>>>>>>>> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve*
>>>>>>>>> lMetricSum]))
>>>>>>>>>     val sc = new SparkContext(conf)
>>>>>>>>>
>>>>>>>>> I see the exception when this task runs
>>>>>>>>>
>>>>>>>>> val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long],
>>>>>>>>> vi) }
>>>>>>>>>
>>>>>>>>> Its a simple mapping of input records to (itemId, record)
>>>>>>>>>
>>>>>>>>> I found this
>>>>>>>>>
>>>>>>>>> http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
>>>>>>>>> and
>>>>>>>>>
>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html
>>>>>>>>>
>>>>>>>>> Looks like Kryo (2.21v)  changed something to stop using default
>>>>>>>>> constructors.
>>>>>>>>>
>>>>>>>>> (Kryo.DefaultInstantiatorStrategy) 
>>>>>>>>> kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new 
>>>>>>>>> StdInstantiatorStrategy());
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Please suggest
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Trace:
>>>>>>>>> 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1
>>>>>>>>> in stage 2.0 (TID 774)
>>>>>>>>> com.esotericsoftware.kryo.KryoException:
>>>>>>>>> java.lang.NullPointerException
>>>>>>>>> Serialization trace:
>>>>>>>>> values (org.apache.avro.generic.GenericData$Record)
>>>>>>>>> datum (org.apache.avro.mapred.AvroKey)
>>>>>>>>>     at
>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>>>>>>>>>     at
>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>>>>     at
>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>>>>     at
>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>>>     at
>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>     at
>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41)
>>>>>>>>>     at
>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>>>>>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.
>>>>>>>>> java:729)Regards,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Any suggestions.
>>>>>>>>> I am not able to get this thing to work over a month now, its kind
>>>>>>>>> of getting frustrating.
>>>>>>>>>
>>>>>>>>> On Sun, May 3, 2015 at 8:03 PM, Dean Wampler <
>>>>>>>>> deanwamp...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> How big is the data you're returning to the driver with
>>>>>>>>>> collectAsMap? You are probably running out of memory trying to copy 
>>>>>>>>>> too
>>>>>>>>>> much data back to it.
>>>>>>>>>>
>>>>>>>>>> If you're trying to force a map-side join, Spark can do that for
>>>>>>>>>> you in some cases within the regular DataFrame/RDD context. See
>>>>>>>>>> http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning
>>>>>>>>>> and this talk by Michael Armbrust for example,
>>>>>>>>>> http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> dean
>>>>>>>>>>
>>>>>>>>>> Dean Wampler, Ph.D.
>>>>>>>>>> Author: Programming Scala, 2nd Edition
>>>>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>>>>>>>> Typesafe <http://typesafe.com>
>>>>>>>>>> @deanwampler <http://twitter.com/deanwampler>
>>>>>>>>>> http://polyglotprogramming.com
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Full Exception
>>>>>>>>>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1
>>>>>>>>>>> (collectAsMap at VISummaryDataProvider.scala:37) failed in 884.087 
>>>>>>>>>>> s*
>>>>>>>>>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed:
>>>>>>>>>>> collectAsMap at VISummaryDataProvider.scala:37, took 1093.418249 s*
>>>>>>>>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
>>>>>>>>>>> exception: Job aborted due to stage failure: Exception while 
>>>>>>>>>>> getting task
>>>>>>>>>>> result: org.apache.spark.SparkException: Error sending message 
>>>>>>>>>>> [message =
>>>>>>>>>>> GetLocations(taskresult_112)]
>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>>>> failure: Exception while getting task result:
>>>>>>>>>>> org.apache.spark.SparkException: Error sending message [message =
>>>>>>>>>>> GetLocations(taskresult_112)]
>>>>>>>>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>>>>>>>>>>> at
>>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>>>>> at
>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>>>>>> at scala.Option.foreach(Option.scala:236)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>> 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app status:
>>>>>>>>>>> FAILED, exitCode: 15, (reason: User class threw exception: Job 
>>>>>>>>>>> aborted due
>>>>>>>>>>> to stage failure: Exception while getting task result:
>>>>>>>>>>> org.apache.spark.SparkException: Error sending message [message =
>>>>>>>>>>> GetLocations(taskresult_112)])
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *Code at line 37*
>>>>>>>>>>>
>>>>>>>>>>> val lstgItemMap = listings.map { lstg =>
>>>>>>>>>>> (lstg.getItemId().toLong, lstg) }.collectAsMap
>>>>>>>>>>>
>>>>>>>>>>> Listing data set size is 26G (10 files) and my driver memory is
>>>>>>>>>>> 12G (I cant go beyond it). The reason i do collectAsMap is to 
>>>>>>>>>>> brodcast it
>>>>>>>>>>> and do a map-side join instead of regular join.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Please suggest ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> My Spark Job is failing  and i see
>>>>>>>>>>>>
>>>>>>>>>>>> ==============================
>>>>>>>>>>>>
>>>>>>>>>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class
>>>>>>>>>>>> threw exception: Job aborted due to stage failure: Exception while 
>>>>>>>>>>>> getting
>>>>>>>>>>>> task result: org.apache.spark.SparkException: Error sending message
>>>>>>>>>>>> [message = GetLocations(taskresult_112)]
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>>>>> failure: Exception while getting task result:
>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message [message =
>>>>>>>>>>>> GetLocations(taskresult_112)]
>>>>>>>>>>>>
>>>>>>>>>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>>>>>>>>>>>>
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>>>>>>>>>>>>
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>>>>>>>>>>>>
>>>>>>>>>>>> at
>>>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>>>>>>
>>>>>>>>>>>> at
>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>>>>>
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>>>>>>>>>>>>
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>>>>>>>
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>>>>>>>>>
>>>>>>>>>>>> at scala.Option.foreach(Option.scala:236)
>>>>>>>>>>>>
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> java.util.concurrent.TimeoutException: Futures timed out after
>>>>>>>>>>>> [30 seconds]
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I see multiple of these
>>>>>>>>>>>>
>>>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed
>>>>>>>>>>>> out after [30 seconds]
>>>>>>>>>>>>
>>>>>>>>>>>> And finally i see this
>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>>>>>>>>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:95)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:206)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:72)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:124)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>>>>>>>>>>>> at
>>>>>>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>>>>>>>>>>> at
>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>>> at
>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>>>>> at
>>>>>>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>>>>>>>>>> at
>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>>> at
>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>>>>> at
>>>>>>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>>>>>>>>>>>> at
>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>>> at
>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>>>>> at
>>>>>>>>>>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>>>>>>>>>>>> at
>>>>>>>>>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>>>>>>>>>>>> at
>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>>>>>>>>>>> at
>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>>>>>>>>>>> at
>>>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>>>>>>>>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>>>>>>>>>>> at io.netty.util.concurrent.SingleThreadEven
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Solutions
>>>>>>>>>>>>
>>>>>>>>>>>> 1)
>>>>>>>>>>>>
>>>>>>>>>>>>       .set("spark.akka.askTimeout", "6000")
>>>>>>>>>>>>
>>>>>>>>>>>>       .set("spark.akka.timeout", "6000")
>>>>>>>>>>>>
>>>>>>>>>>>>       .set("spark.worker.timeout", "6000")
>>>>>>>>>>>>
>>>>>>>>>>>> 2)  --num-executors 96 --driver-memory 12g
>>>>>>>>>>>> --driver-java-options "-XX:MaxPermSize=10G" --executor-memory 12g
>>>>>>>>>>>> --executor-cores 4
>>>>>>>>>>>>
>>>>>>>>>>>> 12G is the limit imposed by YARN cluster, I cant go beyond this.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> ANY suggestions ?
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>>
>>>>>>>>>>>> Deepak
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Apr 30, 2015 at 6:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Did not work. Same problem.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Apr 30, 2015 at 1:28 PM, Akhil Das <
>>>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> You could try increasing your heap space explicitly. like
>>>>>>>>>>>>>> export _JAVA_OPTIONS="-Xmx10g", its not the correct approach but 
>>>>>>>>>>>>>> try.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Apr 28, 2015 at 10:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have a SparkApp that runs completes in 45 mins for 5 files
>>>>>>>>>>>>>>> (5*750MB size) and it takes 16 executors to do so.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I wanted to run it against 10 files of each input type (10*3
>>>>>>>>>>>>>>> files as there are three inputs that are transformed). [Input1 
>>>>>>>>>>>>>>> = 10*750 MB,
>>>>>>>>>>>>>>> Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used 32 executors.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I see multiple
>>>>>>>>>>>>>>> 5/04/28 09:23:31 WARN executor.Executor: Issue communicating
>>>>>>>>>>>>>>> with driver in heartbeater
>>>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message
>>>>>>>>>>>>>>> [message = 
>>>>>>>>>>>>>>> Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22,
>>>>>>>>>>>>>>> phxaishdc9dn1048.stratus.phx.ebay.com, 39505))]
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
>>>>>>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures
>>>>>>>>>>>>>>> timed out after [30 seconds]
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>>>>>>>>>>>>>> at scala.concurrent.Await$.result(package.scala:107)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
>>>>>>>>>>>>>>> ... 1 more
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> When i searched deeper, i found OOM error.
>>>>>>>>>>>>>>> 15/04/28 09:10:15 INFO storage.BlockManagerMasterActor:
>>>>>>>>>>>>>>> Removing block manager BlockManagerId(17,
>>>>>>>>>>>>>>> phxdpehdc9dn2643.stratus.phx.ebay.com, 36819)
>>>>>>>>>>>>>>> 15/04/28 09:11:26 WARN storage.BlockManagerMasterActor:
>>>>>>>>>>>>>>> Removing BlockManager BlockManagerId(9,
>>>>>>>>>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304) with no
>>>>>>>>>>>>>>> recent heart beats: 121200ms exceeds 120000ms
>>>>>>>>>>>>>>> 15/04/28 09:11:26 INFO storage.BlockManagerMasterActor:
>>>>>>>>>>>>>>> Removing block manager BlockManagerId(9,
>>>>>>>>>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304)
>>>>>>>>>>>>>>> 15/04/28 09:11:26 ERROR util.Utils: Uncaught exception in
>>>>>>>>>>>>>>> thread task-result-getter-3
>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245)
>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219)
>>>>>>>>>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
>>>>>>>>>>>>>>> at java.util.ArrayList.add(ArrayList.java:440)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>> Exception in thread "task-result-getter-3"
>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245)
>>>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219)
>>>>>>>>>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
>>>>>>>>>>>>>>> at java.util.ArrayList.add(ArrayList.java:440)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> LogType: stdout
>>>>>>>>>>>>>>> LogLength: 96
>>>>>>>>>>>>>>> Log Contents:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> hdfs://hostName:8020/sys/edw/dw_lstg_item/snapshot/2015/04/28/00/part-r-0000*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Spark Command:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ./bin/spark-submit -v --master yarn-cluster
>>>>>>>>>>>>>>> --driver-class-path
>>>>>>>>>>>>>>> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
>>>>>>>>>>>>>>> --jars
>>>>>>>>>>>>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting_dep_only-1.0-SNAPSHOT-jar-with-dependencies.jar
>>>>>>>>>>>>>>> --num-executors 32 --driver-memory 12g --driver-java-options
>>>>>>>>>>>>>>> "-XX:MaxPermSize=8G" --executor-memory 12g --executor-cores 4 
>>>>>>>>>>>>>>> --queue
>>>>>>>>>>>>>>> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
>>>>>>>>>>>>>>> /home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting-1.0-SNAPSHOT.jar
>>>>>>>>>>>>>>> startDate=2015-04-6 endDate=2015-04-7
>>>>>>>>>>>>>>> input=/user/dvasthimal/epdatasets_small/exptsession 
>>>>>>>>>>>>>>> subcommand=viewItem
>>>>>>>>>>>>>>> output=/user/dvasthimal/epdatasets/viewItem buffersize=128
>>>>>>>>>>>>>>> maxbuffersize=1068 maxResultSize=200G askTimeout=1200
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> There is 12G limit on memory that i can use as this Spark is
>>>>>>>>>>>>>>> running over YARN.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Spark Version: 1.3.1
>>>>>>>>>>>>>>> Should i increase the number of executors form 32?
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Deepak
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Deepak
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Deepak
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Deepak
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Deepak
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Deepak
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>
>
> --
> Deepak
>
>


-- 
Deepak

Reply via email to