No worries, glad to help! It also helped me as I had not worked directly with the Hadoop APIs for controlling splits.
From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" Date: Friday, June 26, 2015 at 1:31 PM To: Silvio Fiorito Cc: user Subject: Re: Silvio, Thanks for your responses and patience. It worked after i reshuffled the arguments and removed avro dependencies. On Fri, Jun 26, 2015 at 9:55 AM, Silvio Fiorito <silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote: OK, here’s how I did it, using just the built-in Avro libraries with Spark 1.3: import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.mapred.AvroKey import org.apache.avro.mapreduce.AvroKeyInputFormat import org.apache.hadoop.io.NullWritable import org.apache.hadoop.mapreduce.lib.input.FileInputFormat val hadoopConf = new org.apache.hadoop.conf.Configuration(sc.hadoopConfiguration) hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 100) val input = sc.newAPIHadoopFile( "examples/src/main/resources/users.avro", classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[NullWritable], hadoopConf).map(_._1.datum.get("name")) println(input.partitions.size) From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" Date: Friday, June 26, 2015 at 11:04 AM To: Silvio Fiorito Cc: user Subject: Re: <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.7</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-avro_2.10</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-mapred</artifactId> <version>1.7.7</version> <classifier>hadoop2</classifier> <scope>provided</scope> </dependency> On Fri, Jun 26, 2015 at 8:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote: Same code of yours works for me as well On Fri, Jun 26, 2015 at 8:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote: Is that its not supported with Avro. Unlikely. On Fri, Jun 26, 2015 at 8:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote: My imports: import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericRecord import org.apache.avro.mapred.AvroKey import org.apache.avro.Schema import org.apache.hadoop.io.NullWritable import org.apache.avro.mapreduce.AvroKeyInputFormat import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.io.Text def readGenericRecords(sc: SparkContext, inputDir: String, startDate: Date, endDate: Date) = { val path = getInputPaths(inputDir, startDate, endDate) val hadoopConfiguration = new Configuration(sc.hadoopConfiguration) hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "67108864") sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](path + "/*.avro") } I need to read Avro datasets and am using strings instead of constant from InputFormat class. When i click on any hadoop dependency from eclipse, i see they point to hadoop 2.2.x jars. On Fri, Jun 26, 2015 at 7:44 AM, Silvio Fiorito <silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote: Make sure you’re importing the right namespace for Hadoop v2.0. This is what I tried: import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat} val hadoopConf = new org.apache.hadoop.conf.Configuration() hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 2048) val input = sc.newAPIHadoopFile( "README.md", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], hadoopConf).map(_._2.toString()) println(input.partitions.size) input. flatMap(_.split(" ")). filter(_.length > 0). map((_, 1)). reduceByKey(_ + _). coalesce(1). sortBy(_._2, false). take(10). foreach(println) From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" Date: Friday, June 26, 2015 at 10:18 AM To: Silvio Fiorito Cc: user Subject: Re: All these throw compilation error at newAPIHadoopFile 1) val hadoopConfiguration = new Configuration() hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "67108864") sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path + "/*.avro", classOf[AvroKey], classOf[NullWritable], classOf[AvroKeyInputFormat], hadoopConfiguration) 2) val hadoopConfiguration = new Configuration() hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "67108864") sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path + "/*.avro", classOf[AvroKey[GenericRecord]], classOf[NullWritable], classOf[AvroKeyInputFormat[GenericRecord]],hadoopConfiguration) 3) val hadoopConfiguration = new Configuration() hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "67108864") sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](path + "/*.avro", classOf[AvroKey[GenericRecord]], classOf[NullWritable], classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) Error: [ERROR] /Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37: error: overloaded method value newAPIHadoopFile with alternatives: [INFO] (path: String,fClass: Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass: Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass: Class[org.apache.hadoop.io.NullWritable],conf: org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] <and> [INFO] (path: String)(implicit km: scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable], implicit fm: scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] [INFO] cannot be applied to (String, Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], Class[org.apache.hadoop.io.NullWritable], Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]], org.apache.hadoop.conf.Configuration) [INFO] sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](path + "/*.avro", classOf[AvroKey[GenericRecord]], classOf[NullWritable], classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) On Thu, Jun 25, 2015 at 4:14 PM, Silvio Fiorito <silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote: Ok, in that case I think you can set the max split size in the Hadoop config object, using the FileInputFormat.SPLIT_MAXSIZE config parameter. Again, I haven’t done this myself, but looking through the Spark codebase here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1053 And the HDFS FileInputFormat implementation, that seems like a good option to try. You should be able to call conf.setLong(FileInputFormat.SPLIT_MAXSIZE, max). I hope that helps! From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" Date: Thursday, June 25, 2015 at 5:49 PM To: Silvio Fiorito Cc: user Subject: Re: I use sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](path + "/*.avro") https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/SparkContext.html#newAPIHadoopFile(java.lang.String, java.lang.Class, java.lang.Class, java.lang.Class, org.apache.hadoop.conf.Configuration) Does not seem to have that partition option. On Thu, Jun 25, 2015 at 12:24 PM, Silvio Fiorito <silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote: Hi Deepak, Have you tried specifying the minimum partitions when you load the file? I haven’t tried that myself against HDFS before, so I’m not sure if it will affect data locality. Ideally not, it should still maintain data locality but just more partitions. Once your job runs, you can check in the Spark tasks web UI to ensure they’re all Node local. val details = sc.textFile(“hdfs://….”, 500) If you’re using something other than text file you can also specify minimum partitions when using sc.hadoopFile. Thanks, Silvio From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" Date: Thursday, June 25, 2015 at 3:10 PM To: Akhil Das Cc: user Subject: Re: How can i increase the number of tasks from 174 to 500 without running repartition. The input size is 512.0 MB (hadoop) / 4159106. Can this be reduced to 64 MB so as to increase the number of tasks. Similar to split size that increases the number of mappers in Hadoop M/R. On Thu, Jun 25, 2015 at 12:06 AM, Akhil Das <ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> wrote: Look in the tuning section<https://spark.apache.org/docs/latest/tuning.html>, also you need to figure out whats taking time and where's your bottleneck etc. If everything is tuned properly, then you will need to throw more cores :) Thanks Best Regards On Thu, Jun 25, 2015 at 12:19 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote: Its taking an hour and on Hadoop it takes 1h 30m, is there a way to make it run faster ? On Wed, Jun 24, 2015 at 11:39 AM, Akhil Das <ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> wrote: Cool. :) On 24 Jun 2015 23:44, "ÐΞ€ρ@Ҝ (๏̯͡๏)" <deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote: Its running now. On Wed, Jun 24, 2015 at 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote: Now running with --num-executors 9973 --driver-memory 14g --driver-java-options "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M" --executor-memory 14g --executor-cores 1 On Wed, Jun 24, 2015 at 10:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote: There are multiple of these 1) 15/06/24 09:53:37 ERROR executor.Executor: Exception in task 443.0 in stage 3.0 (TID 1767) java.lang.OutOfMemoryError: GC overhead limit exceeded at sun.reflect.GeneratedSerializationConstructorAccessor1327.newInstance(Unknown Source) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56) at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065) at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS? timer thread 2) 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS? timer thread java.lang.OutOfMemoryError: GC overhead limit exceeded at akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22) at akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) at java.lang.Thread.run(Thread.java:745) 3) # java.lang.OutOfMemoryError: GC overhead limit exceeded # -XX:OnOutOfMemoryError="kill %p" # Executing /bin/sh -c "kill 20674"... [ERROR] [06/24/2015 09:53:37.590] [Executor task launch worker-5] [akka.tcp://sparkdri...@phxdpehdc9dn2137.stratus.phx.ebay.com:47708/<http://sparkdri...@phxdpehdc9dn2137.stratus.phx.ebay.com:47708/>] swallowing exception during message send (akka.remote.RemoteTransportExceptionNoStackTrace) On Wed, Jun 24, 2015 at 10:31 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote: I see this java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.<init>(String.java:203) at java.lang.StringBuilder.toString(StringBuilder.java:405) at java.io.UnixFileSystem.resolve(UnixFileSystem.java:108) at java.io.File.<init>(File.java:367) at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:81) at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:84) at org.apache.spark.shuffle.IndexShuffleBlockManager.getIndexFile(IndexShuffleBlockManager.scala:60) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:107) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:304) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) On Wed, Jun 24, 2015 at 7:16 AM, Akhil Das <ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> wrote: Can you look a bit more in the error logs? It could be getting killed because of OOM etc. One thing you can try is to set the spark.shuffle.blockTransferService to nio from netty. Thanks Best Regards On Wed, Jun 24, 2015 at 5:46 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote: I have a Spark job that has 7 stages. The first 3 stage complete and the fourth stage beings (joins two RDDs). This stage has multiple task failures all the below exception. Multiple tasks (100s) of them get the same exception with different hosts. How can all the host suddenly stop responding when few moments ago 3 stages ran successfully. If I re-run the three stages will again run successfully. I cannot think of it being a cluster issue. Any suggestions ? Spark Version : 1.3.1 Exception: org.apache.spark.shuffle.FetchFailedException: Failed to connect to HOST at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) at org.apache.sp -- Deepak -- Deepak -- Deepak -- Deepak -- Deepak -- Deepak -- Deepak -- Deepak -- Deepak -- Deepak -- Deepak -- Deepak -- Deepak -- Deepak