Hi Deepak, Have you tried specifying the minimum partitions when you load the file? I haven’t tried that myself against HDFS before, so I’m not sure if it will affect data locality. Ideally not, it should still maintain data locality but just more partitions. Once your job runs, you can check in the Spark tasks web UI to ensure they’re all Node local.
val details = sc.textFile(“hdfs://….”, 500) If you’re using something other than text file you can also specify minimum partitions when using sc.hadoopFile. Thanks, Silvio From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" Date: Thursday, June 25, 2015 at 3:10 PM To: Akhil Das Cc: user Subject: Re: How can i increase the number of tasks from 174 to 500 without running repartition. The input size is 512.0 MB (hadoop) / 4159106. Can this be reduced to 64 MB so as to increase the number of tasks. Similar to split size that increases the number of mappers in Hadoop M/R. On Thu, Jun 25, 2015 at 12:06 AM, Akhil Das <ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> wrote: Look in the tuning section<https://spark.apache.org/docs/latest/tuning.html>, also you need to figure out whats taking time and where's your bottleneck etc. If everything is tuned properly, then you will need to throw more cores :) Thanks Best Regards On Thu, Jun 25, 2015 at 12:19 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote: Its taking an hour and on Hadoop it takes 1h 30m, is there a way to make it run faster ? On Wed, Jun 24, 2015 at 11:39 AM, Akhil Das <ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> wrote: Cool. :) On 24 Jun 2015 23:44, "ÐΞ€ρ@Ҝ (๏̯͡๏)" <deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote: Its running now. On Wed, Jun 24, 2015 at 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote: Now running with --num-executors 9973 --driver-memory 14g --driver-java-options "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M" --executor-memory 14g --executor-cores 1 On Wed, Jun 24, 2015 at 10:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote: There are multiple of these 1) 15/06/24 09:53:37 ERROR executor.Executor: Exception in task 443.0 in stage 3.0 (TID 1767) java.lang.OutOfMemoryError: GC overhead limit exceeded at sun.reflect.GeneratedSerializationConstructorAccessor1327.newInstance(Unknown Source) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56) at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065) at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS? timer thread 2) 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS? timer thread java.lang.OutOfMemoryError: GC overhead limit exceeded at akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22) at akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) at java.lang.Thread.run(Thread.java:745) 3) # java.lang.OutOfMemoryError: GC overhead limit exceeded # -XX:OnOutOfMemoryError="kill %p" # Executing /bin/sh -c "kill 20674"... [ERROR] [06/24/2015 09:53:37.590] [Executor task launch worker-5] [akka.tcp://sparkdri...@phxdpehdc9dn2137.stratus.phx.ebay.com:47708/<http://sparkdri...@phxdpehdc9dn2137.stratus.phx.ebay.com:47708/>] swallowing exception during message send (akka.remote.RemoteTransportExceptionNoStackTrace) On Wed, Jun 24, 2015 at 10:31 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote: I see this java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.<init>(String.java:203) at java.lang.StringBuilder.toString(StringBuilder.java:405) at java.io.UnixFileSystem.resolve(UnixFileSystem.java:108) at java.io.File.<init>(File.java:367) at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:81) at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:84) at org.apache.spark.shuffle.IndexShuffleBlockManager.getIndexFile(IndexShuffleBlockManager.scala:60) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:107) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:304) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) On Wed, Jun 24, 2015 at 7:16 AM, Akhil Das <ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> wrote: Can you look a bit more in the error logs? It could be getting killed because of OOM etc. One thing you can try is to set the spark.shuffle.blockTransferService to nio from netty. Thanks Best Regards On Wed, Jun 24, 2015 at 5:46 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote: I have a Spark job that has 7 stages. The first 3 stage complete and the fourth stage beings (joins two RDDs). This stage has multiple task failures all the below exception. Multiple tasks (100s) of them get the same exception with different hosts. How can all the host suddenly stop responding when few moments ago 3 stages ran successfully. If I re-run the three stages will again run successfully. I cannot think of it being a cluster issue. Any suggestions ? Spark Version : 1.3.1 Exception: org.apache.spark.shuffle.FetchFailedException: Failed to connect to HOST at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) at org.apache.sp -- Deepak -- Deepak -- Deepak -- Deepak -- Deepak -- Deepak -- Deepak