One dataset (RDD Pair) val lstgItem = listings.map { lstg => (lstg.getItemId().toLong, lstg) }
Second Dataset (RDDPair) val viEvents = viEventsRaw.map { vi => (vi.get(14).asInstanceOf[Long], vi) } As i want to join based on item Id that is used as first element in the tuple in both cases and i think thats what is shuffle key. listings ==> Data set contains all the unique item ids that are ever listed on the ecommerce site. viEvents ===> List of items viewed by user in last day. This will always be a subset of the total set. So i do not understand what is data skewness. When my long running task is working on 1591.2 MB / 98,931,767 does that mean 98 million reocrds contain all the same item ID ? How can millions of user look at the same item in last day ? Or does this dataset contain records across item ids ? Regards, Deepak On Mon, May 4, 2015 at 3:08 PM, Saisai Shao <sai.sai.s...@gmail.com> wrote: > Shuffle key is depending on your implementation, I'm not sure if you are > familiar with MapReduce, the mapper output is a key-value pair, where the > key is the shuffle key for shuffling, Spark is also the same. > > 2015-05-04 17:31 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>: > >> Hello Shao, >> Can you talk more about shuffle key or point me to APIs that allow me to >> change shuffle key. I will try with different keys and see the performance. >> >> What is the shuffle key by default ? >> >> On Mon, May 4, 2015 at 2:37 PM, Saisai Shao <sai.sai.s...@gmail.com> >> wrote: >> >>> IMHO If your data or your algorithm is prone to data skew, I think you >>> have to fix this from application level, Spark itself cannot overcome this >>> problem (if one key has large amount of values), you may change your >>> algorithm to choose another shuffle key, somethings like this to avoid >>> shuffle on skewed keys. >>> >>> 2015-05-04 16:41 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>: >>> >>>> Hello Dean & Others, >>>> Thanks for the response. >>>> >>>> I tried with 100,200, 400, 600 and 1200 repartitions with 100,200,400 >>>> and 800 executors. Each time all the tasks of join complete in less than a >>>> minute except one and that one tasks runs forever. I have a huge cluster at >>>> my disposal. >>>> >>>> The data for each of 1199 tasks is around 40MB/30k records and for 1 >>>> never ending task is 1.5G/98million records. I see that there is data skew >>>> among tasks. I had observed this a week earlier and i have no clue on how >>>> to fix it and when someone suggested that repartition might make things >>>> more parallel, but the problem is still persistent. >>>> >>>> Please suggest on how to get the task to complete. >>>> All i want to do is join two datasets. (dataset1 is in sequence file >>>> and dataset2 is in avro format). >>>> >>>> >>>> >>>> Ex: >>>> Tasks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time >>>> DurationGC TimeShuffle Read Size / RecordsShuffle Spill (Memory)Shuffle >>>> Spill (Disk)Errors 0 3771 0 RUNNING PROCESS_LOCAL 114 / host1 2015/05/04 >>>> 01:27:44 7.3 min 19 s 1591.2 MB / 98931767 0.0 B 0.0 B 1 3772 0 >>>> SUCCESS PROCESS_LOCAL 226 / host2 2015/05/04 01:27:44 28 s 2 s 39.2 >>>> MB / 29754 0.0 B 0.0 B 2 3773 0 SUCCESS PROCESS_LOCAL 283 / host3 >>>> 2015/05/04 >>>> 01:27:44 26 s 2 s 39.0 MB / 29646 0.0 B 0.0 B 5 3776 0 SUCCESS >>>> PROCESS_LOCAL 320 / host4 2015/05/04 01:27:44 31 s 3 s 38.8 MB / >>>> 29512 0.0 B 0.0 B 4 3775 0 SUCCESS PROCESS_LOCAL 203 / host5 2015/05/04 >>>> 01:27:44 41 s 3 s 38.4 MB / 29169 0.0 B 0.0 B 3 3774 0 SUCCESS >>>> PROCESS_LOCAL 84 / host6 2015/05/04 01:27:44 24 s 2 s 38.5 MB / >>>> 29258 0.0 B 0.0 B 8 3779 0 SUCCESS PROCESS_LOCAL 309 / host7 2015/05/04 >>>> 01:27:44 31 s 4 s 39.5 MB / 30008 0.0 B 0.0 B >>>> >>>> There are 1200 tasks in total. >>>> >>>> >>>> On Sun, May 3, 2015 at 9:53 PM, Dean Wampler <deanwamp...@gmail.com> >>>> wrote: >>>> >>>>> I don't know the full context of what you're doing, but serialization >>>>> errors usually mean you're attempting to serialize something that can't be >>>>> serialized, like the SparkContext. Kryo won't help there. >>>>> >>>>> The arguments to spark-submit you posted previously look good: >>>>> >>>>> 2) --num-executors 96 --driver-memory 12g --driver-java-options >>>>> "-XX:MaxPermSize=10G" --executor-memory 12g --executor-cores 4 >>>>> >>>>> I suspect you aren't getting the parallelism you need. For >>>>> partitioning, if your data is in HDFS and your block size is 128MB, then >>>>> you'll get ~195 partitions anyway. If it takes 7 hours to do a join over >>>>> 25GB of data, you have some other serious bottleneck. You should examine >>>>> the web console and the logs to determine where all the time is going. >>>>> Questions you might pursue: >>>>> >>>>> - How long does each task take to complete? >>>>> - How many of those 195 partitions/tasks are processed at the same >>>>> time? That is, how many "slots" are available? Maybe you need more >>>>> nodes >>>>> if the number of slots is too low. Based on your command arguments, you >>>>> should be able to process 1/2 of them at a time, unless the cluster is >>>>> busy. >>>>> - Is the cluster swamped with other work? >>>>> - How much data does each task process? Is the data roughly the >>>>> same from one task to the next? If not, then you might have serious key >>>>> skew? >>>>> >>>>> You may also need to research the details of how joins are implemented >>>>> and some of the common tricks for organizing data to minimize having to >>>>> shuffle all N by M records. >>>>> >>>>> >>>>> >>>>> Dean Wampler, Ph.D. >>>>> Author: Programming Scala, 2nd Edition >>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >>>>> Typesafe <http://typesafe.com> >>>>> @deanwampler <http://twitter.com/deanwampler> >>>>> http://polyglotprogramming.com >>>>> >>>>> On Sun, May 3, 2015 at 11:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hello Deam, >>>>>> If I don;t use Kryo serializer i got Serialization error and hence am >>>>>> using it. >>>>>> If I don';t use partitionBy/reparition then the simply join never >>>>>> completed even after 7 hours and infact as next step i need to run it >>>>>> against 250G as that is my full dataset size. Someone here suggested to >>>>>> me >>>>>> to use repartition. >>>>>> >>>>>> Assuming reparition is mandatory , how do i decide whats the right >>>>>> number ? When i am using 400 i do not get NullPointerException that i >>>>>> talked about, which is strange. I never saw that exception against small >>>>>> random dataset but see it with 25G and again with 400 partitions , i do >>>>>> not >>>>>> see it. >>>>>> >>>>>> >>>>>> On Sun, May 3, 2015 at 9:15 PM, Dean Wampler <deanwamp...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> IMHO, you are trying waaay to hard to optimize work on what is >>>>>>> really a small data set. 25G, even 250G, is not that much data, >>>>>>> especially >>>>>>> if you've spent a month trying to get something to work that should be >>>>>>> simple. All these errors are from optimization attempts. >>>>>>> >>>>>>> Kryo is great, but if it's not working reliably for some reason, >>>>>>> then don't use it. Rather than force 200 partitions, let Spark try to >>>>>>> figure out a good-enough number. (If you really need to force a >>>>>>> partition >>>>>>> count, use the repartition method instead, unless you're overriding the >>>>>>> partitioner.) >>>>>>> >>>>>>> So. I recommend that you eliminate all the optimizations: Kryo, >>>>>>> partitionBy, etc. Just use the simplest code you can. Make it work >>>>>>> first. >>>>>>> Then, if it really isn't fast enough, look for actual evidence of >>>>>>> bottlenecks and optimize those. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Dean Wampler, Ph.D. >>>>>>> Author: Programming Scala, 2nd Edition >>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >>>>>>> Typesafe <http://typesafe.com> >>>>>>> @deanwampler <http://twitter.com/deanwampler> >>>>>>> http://polyglotprogramming.com >>>>>>> >>>>>>> On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hello Dean & Others, >>>>>>>> Thanks for your suggestions. >>>>>>>> I have two data sets and all i want to do is a simple equi join. I >>>>>>>> have 10G limit and as my dataset_1 exceeded that it was throwing OOM >>>>>>>> error. >>>>>>>> Hence i switched back to use .join() API instead of map-side broadcast >>>>>>>> join. >>>>>>>> I am repartitioning the data with 100,200 and i see a >>>>>>>> NullPointerException now. >>>>>>>> >>>>>>>> When i run against 25G of each input and with .partitionBy(new >>>>>>>> org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption >>>>>>>> >>>>>>>> >>>>>>>> this trace does not include a line from my code and hence i do not >>>>>>>> what is causing error ? >>>>>>>> I do have registered kryo serializer. >>>>>>>> >>>>>>>> val conf = new SparkConf() >>>>>>>> .setAppName(detail) >>>>>>>> * .set("spark.serializer", >>>>>>>> "org.apache.spark.serializer.KryoSerializer")* >>>>>>>> .set("spark.kryoserializer.buffer.mb", >>>>>>>> arguments.get("buffersize").get) >>>>>>>> .set("spark.kryoserializer.buffer.max.mb", >>>>>>>> arguments.get("maxbuffersize").get) >>>>>>>> .set("spark.driver.maxResultSize", >>>>>>>> arguments.get("maxResultSize").get) >>>>>>>> .set("spark.yarn.maxAppAttempts", "0") >>>>>>>> * >>>>>>>> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve* >>>>>>>> lMetricSum])) >>>>>>>> val sc = new SparkContext(conf) >>>>>>>> >>>>>>>> I see the exception when this task runs >>>>>>>> >>>>>>>> val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long], >>>>>>>> vi) } >>>>>>>> >>>>>>>> Its a simple mapping of input records to (itemId, record) >>>>>>>> >>>>>>>> I found this >>>>>>>> >>>>>>>> http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist >>>>>>>> and >>>>>>>> >>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html >>>>>>>> >>>>>>>> Looks like Kryo (2.21v) changed something to stop using default >>>>>>>> constructors. >>>>>>>> >>>>>>>> (Kryo.DefaultInstantiatorStrategy) >>>>>>>> kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new >>>>>>>> StdInstantiatorStrategy()); >>>>>>>> >>>>>>>> >>>>>>>> Please suggest >>>>>>>> >>>>>>>> >>>>>>>> Trace: >>>>>>>> 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 >>>>>>>> in stage 2.0 (TID 774) >>>>>>>> com.esotericsoftware.kryo.KryoException: >>>>>>>> java.lang.NullPointerException >>>>>>>> Serialization trace: >>>>>>>> values (org.apache.avro.generic.GenericData$Record) >>>>>>>> datum (org.apache.avro.mapred.AvroKey) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>> at >>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41) >>>>>>>> at >>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo. >>>>>>>> java:729)Regards, >>>>>>>> >>>>>>>> >>>>>>>> Any suggestions. >>>>>>>> I am not able to get this thing to work over a month now, its kind >>>>>>>> of getting frustrating. >>>>>>>> >>>>>>>> On Sun, May 3, 2015 at 8:03 PM, Dean Wampler <deanwamp...@gmail.com >>>>>>>> > wrote: >>>>>>>> >>>>>>>>> How big is the data you're returning to the driver with >>>>>>>>> collectAsMap? You are probably running out of memory trying to copy >>>>>>>>> too >>>>>>>>> much data back to it. >>>>>>>>> >>>>>>>>> If you're trying to force a map-side join, Spark can do that for >>>>>>>>> you in some cases within the regular DataFrame/RDD context. See >>>>>>>>> http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning >>>>>>>>> and this talk by Michael Armbrust for example, >>>>>>>>> http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf. >>>>>>>>> >>>>>>>>> >>>>>>>>> dean >>>>>>>>> >>>>>>>>> Dean Wampler, Ph.D. >>>>>>>>> Author: Programming Scala, 2nd Edition >>>>>>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >>>>>>>>> Typesafe <http://typesafe.com> >>>>>>>>> @deanwampler <http://twitter.com/deanwampler> >>>>>>>>> http://polyglotprogramming.com >>>>>>>>> >>>>>>>>> On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Full Exception >>>>>>>>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 >>>>>>>>>> (collectAsMap at VISummaryDataProvider.scala:37) failed in 884.087 s* >>>>>>>>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed: >>>>>>>>>> collectAsMap at VISummaryDataProvider.scala:37, took 1093.418249 s* >>>>>>>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw >>>>>>>>>> exception: Job aborted due to stage failure: Exception while getting >>>>>>>>>> task >>>>>>>>>> result: org.apache.spark.SparkException: Error sending message >>>>>>>>>> [message = >>>>>>>>>> GetLocations(taskresult_112)] >>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage >>>>>>>>>> failure: Exception while getting task result: >>>>>>>>>> org.apache.spark.SparkException: Error sending message [message = >>>>>>>>>> GetLocations(taskresult_112)] >>>>>>>>>> at org.apache.spark.scheduler.DAGScheduler.org >>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) >>>>>>>>>> at >>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>>>>>> at >>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) >>>>>>>>>> at scala.Option.foreach(Option.scala:236) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) >>>>>>>>>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>>>>>>> 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app status: >>>>>>>>>> FAILED, exitCode: 15, (reason: User class threw exception: Job >>>>>>>>>> aborted due >>>>>>>>>> to stage failure: Exception while getting task result: >>>>>>>>>> org.apache.spark.SparkException: Error sending message [message = >>>>>>>>>> GetLocations(taskresult_112)]) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> *Code at line 37* >>>>>>>>>> >>>>>>>>>> val lstgItemMap = listings.map { lstg => >>>>>>>>>> (lstg.getItemId().toLong, lstg) }.collectAsMap >>>>>>>>>> >>>>>>>>>> Listing data set size is 26G (10 files) and my driver memory is >>>>>>>>>> 12G (I cant go beyond it). The reason i do collectAsMap is to >>>>>>>>>> brodcast it >>>>>>>>>> and do a map-side join instead of regular join. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Please suggest ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> My Spark Job is failing and i see >>>>>>>>>>> >>>>>>>>>>> ============================== >>>>>>>>>>> >>>>>>>>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw >>>>>>>>>>> exception: Job aborted due to stage failure: Exception while >>>>>>>>>>> getting task >>>>>>>>>>> result: org.apache.spark.SparkException: Error sending message >>>>>>>>>>> [message = >>>>>>>>>>> GetLocations(taskresult_112)] >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage >>>>>>>>>>> failure: Exception while getting task result: >>>>>>>>>>> org.apache.spark.SparkException: Error sending message [message = >>>>>>>>>>> GetLocations(taskresult_112)] >>>>>>>>>>> >>>>>>>>>>> at org.apache.spark.scheduler.DAGScheduler.org >>>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) >>>>>>>>>>> >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) >>>>>>>>>>> >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) >>>>>>>>>>> >>>>>>>>>>> at >>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>>>>>>> >>>>>>>>>>> at >>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>>>>>> >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) >>>>>>>>>>> >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) >>>>>>>>>>> >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) >>>>>>>>>>> >>>>>>>>>>> at scala.Option.foreach(Option.scala:236) >>>>>>>>>>> >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> java.util.concurrent.TimeoutException: Futures timed out after >>>>>>>>>>> [30 seconds] >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I see multiple of these >>>>>>>>>>> >>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed >>>>>>>>>>> out after [30 seconds] >>>>>>>>>>> >>>>>>>>>>> And finally i see this >>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space >>>>>>>>>>> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) >>>>>>>>>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:95) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:206) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:72) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:124) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) >>>>>>>>>>> at >>>>>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>>>>>>>>>> at >>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>> at >>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>>> at >>>>>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >>>>>>>>>>> at >>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>> at >>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>>> at >>>>>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) >>>>>>>>>>> at >>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>> at >>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>>> at >>>>>>>>>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) >>>>>>>>>>> at >>>>>>>>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) >>>>>>>>>>> at >>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >>>>>>>>>>> at >>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >>>>>>>>>>> at >>>>>>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >>>>>>>>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >>>>>>>>>>> at io.netty.util.concurrent.SingleThreadEven >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Solutions >>>>>>>>>>> >>>>>>>>>>> 1) >>>>>>>>>>> >>>>>>>>>>> .set("spark.akka.askTimeout", "6000") >>>>>>>>>>> >>>>>>>>>>> .set("spark.akka.timeout", "6000") >>>>>>>>>>> >>>>>>>>>>> .set("spark.worker.timeout", "6000") >>>>>>>>>>> >>>>>>>>>>> 2) --num-executors 96 --driver-memory 12g >>>>>>>>>>> --driver-java-options "-XX:MaxPermSize=10G" --executor-memory 12g >>>>>>>>>>> --executor-cores 4 >>>>>>>>>>> >>>>>>>>>>> 12G is the limit imposed by YARN cluster, I cant go beyond this. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> ANY suggestions ? >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> >>>>>>>>>>> Deepak >>>>>>>>>>> >>>>>>>>>>> On Thu, Apr 30, 2015 at 6:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Did not work. Same problem. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Apr 30, 2015 at 1:28 PM, Akhil Das < >>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> You could try increasing your heap space explicitly. like >>>>>>>>>>>>> export _JAVA_OPTIONS="-Xmx10g", its not the correct approach but >>>>>>>>>>>>> try. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Best Regards >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Apr 28, 2015 at 10:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I have a SparkApp that runs completes in 45 mins for 5 files >>>>>>>>>>>>>> (5*750MB size) and it takes 16 executors to do so. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I wanted to run it against 10 files of each input type (10*3 >>>>>>>>>>>>>> files as there are three inputs that are transformed). [Input1 = >>>>>>>>>>>>>> 10*750 MB, >>>>>>>>>>>>>> Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used 32 executors. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I see multiple >>>>>>>>>>>>>> 5/04/28 09:23:31 WARN executor.Executor: Issue communicating >>>>>>>>>>>>>> with driver in heartbeater >>>>>>>>>>>>>> org.apache.spark.SparkException: Error sending message >>>>>>>>>>>>>> [message = >>>>>>>>>>>>>> Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22, >>>>>>>>>>>>>> phxaishdc9dn1048.stratus.phx.ebay.com, 39505))] >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) >>>>>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures >>>>>>>>>>>>>> timed out after [30 seconds] >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) >>>>>>>>>>>>>> at scala.concurrent.Await$.result(package.scala:107) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) >>>>>>>>>>>>>> ... 1 more >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> When i searched deeper, i found OOM error. >>>>>>>>>>>>>> 15/04/28 09:10:15 INFO storage.BlockManagerMasterActor: >>>>>>>>>>>>>> Removing block manager BlockManagerId(17, >>>>>>>>>>>>>> phxdpehdc9dn2643.stratus.phx.ebay.com, 36819) >>>>>>>>>>>>>> 15/04/28 09:11:26 WARN storage.BlockManagerMasterActor: >>>>>>>>>>>>>> Removing BlockManager BlockManagerId(9, >>>>>>>>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304) with no recent >>>>>>>>>>>>>> heart beats: 121200ms exceeds 120000ms >>>>>>>>>>>>>> 15/04/28 09:11:26 INFO storage.BlockManagerMasterActor: >>>>>>>>>>>>>> Removing block manager BlockManagerId(9, >>>>>>>>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304) >>>>>>>>>>>>>> 15/04/28 09:11:26 ERROR util.Utils: Uncaught exception in >>>>>>>>>>>>>> thread task-result-getter-3 >>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space >>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245) >>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219) >>>>>>>>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) >>>>>>>>>>>>>> at java.util.ArrayList.add(ArrayList.java:440) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>>>>> Exception in thread "task-result-getter-3" >>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space >>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245) >>>>>>>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219) >>>>>>>>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) >>>>>>>>>>>>>> at java.util.ArrayList.add(ArrayList.java:440) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>>>>> >>>>>>>>>>>>>> LogType: stdout >>>>>>>>>>>>>> LogLength: 96 >>>>>>>>>>>>>> Log Contents: >>>>>>>>>>>>>> >>>>>>>>>>>>>> hdfs://hostName:8020/sys/edw/dw_lstg_item/snapshot/2015/04/28/00/part-r-0000* >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Spark Command: >>>>>>>>>>>>>> >>>>>>>>>>>>>> ./bin/spark-submit -v --master yarn-cluster >>>>>>>>>>>>>> --driver-class-path >>>>>>>>>>>>>> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar >>>>>>>>>>>>>> --jars >>>>>>>>>>>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting_dep_only-1.0-SNAPSHOT-jar-with-dependencies.jar >>>>>>>>>>>>>> --num-executors 32 --driver-memory 12g --driver-java-options >>>>>>>>>>>>>> "-XX:MaxPermSize=8G" --executor-memory 12g --executor-cores 4 >>>>>>>>>>>>>> --queue >>>>>>>>>>>>>> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp >>>>>>>>>>>>>> /home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting-1.0-SNAPSHOT.jar >>>>>>>>>>>>>> startDate=2015-04-6 endDate=2015-04-7 >>>>>>>>>>>>>> input=/user/dvasthimal/epdatasets_small/exptsession >>>>>>>>>>>>>> subcommand=viewItem >>>>>>>>>>>>>> output=/user/dvasthimal/epdatasets/viewItem buffersize=128 >>>>>>>>>>>>>> maxbuffersize=1068 maxResultSize=200G askTimeout=1200 >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> There is 12G limit on memory that i can use as this Spark is >>>>>>>>>>>>>> running over YARN. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Spark Version: 1.3.1 >>>>>>>>>>>>>> Should i increase the number of executors form 32? >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Deepak >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Deepak >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Deepak >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Deepak >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Deepak >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Deepak >>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Deepak >>>> >>>> >>> >> >> >> -- >> Deepak >> >> > -- Deepak