OK, here’s how I did it, using just the built-in Avro libraries with Spark 1.3:

import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat

val hadoopConf = new 
org.apache.hadoop.conf.Configuration(sc.hadoopConfiguration)
hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 100)

val input = sc.newAPIHadoopFile(
  "examples/src/main/resources/users.avro",
  classOf[AvroKeyInputFormat[GenericRecord]],
  classOf[AvroKey[GenericRecord]],
  classOf[NullWritable],
  hadoopConf).map(_._1.datum.get("name"))

println(input.partitions.size)




From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
Date: Friday, June 26, 2015 at 11:04 AM
To: Silvio Fiorito
Cc: user
Subject: Re:


<dependency>

<groupId>org.apache.avro</groupId>

<artifactId>avro</artifactId>

<version>1.7.7</version>

<scope>provided</scope>

</dependency>

<dependency>

<groupId>com.databricks</groupId>

<artifactId>spark-avro_2.10</artifactId>

<version>1.0.0</version>

</dependency>

<dependency>

<groupId>org.apache.avro</groupId>

<artifactId>avro-mapred</artifactId>

<version>1.7.7</version>

<classifier>hadoop2</classifier>

<scope>provided</scope>

</dependency>

On Fri, Jun 26, 2015 at 8:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
<deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote:
Same code of yours works for me as well

On Fri, Jun 26, 2015 at 8:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
<deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote:
Is that its not supported with Avro. Unlikely.

On Fri, Jun 26, 2015 at 8:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
<deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote:
My imports:


import org.apache.avro.generic.GenericData

import org.apache.avro.generic.GenericRecord

import org.apache.avro.mapred.AvroKey

import org.apache.avro.Schema

import org.apache.hadoop.io.NullWritable

import org.apache.avro.mapreduce.AvroKeyInputFormat

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.fs.FileSystem

import org.apache.hadoop.fs.Path

import org.apache.hadoop.io.Text


  def readGenericRecords(sc: SparkContext, inputDir: String, startDate: Date, 
endDate: Date) = {

    val path = getInputPaths(inputDir, startDate, endDate)

    val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)

    hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", 
"67108864")

    sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, 
AvroKeyInputFormat[GenericRecord]](path + "/*.avro")

  }

I need to read Avro datasets and am using strings instead of constant from 
InputFormat class.


When i click on any hadoop dependency from eclipse, i see they point to hadoop 
2.2.x jars.


On Fri, Jun 26, 2015 at 7:44 AM, Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote:
Make sure you’re importing the right namespace for Hadoop v2.0. This is what I 
tried:

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat}

val hadoopConf = new org.apache.hadoop.conf.Configuration()
hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 2048)

val input = sc.newAPIHadoopFile(
  "README.md",
  classOf[TextInputFormat],
  classOf[LongWritable],
  classOf[Text],
  hadoopConf).map(_._2.toString())

println(input.partitions.size)

input.
  flatMap(_.split(" ")).
  filter(_.length > 0).
  map((_, 1)).
  reduceByKey(_ + _).
  coalesce(1).
  sortBy(_._2, false).
  take(10).
  foreach(println)


From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
Date: Friday, June 26, 2015 at 10:18 AM
To: Silvio Fiorito
Cc: user
Subject: Re:


All these throw compilation error at newAPIHadoopFile

1)

val hadoopConfiguration = new Configuration()

    hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", 
"67108864")

    sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path + 
"/*.avro", classOf[AvroKey], classOf[NullWritable], 
classOf[AvroKeyInputFormat], hadoopConfiguration)

2)

val hadoopConfiguration = new Configuration()

    hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", 
"67108864")

    sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path + 
"/*.avro", classOf[AvroKey[GenericRecord]], classOf[NullWritable], 
classOf[AvroKeyInputFormat[GenericRecord]],hadoopConfiguration)

3)

    val hadoopConfiguration = new Configuration()

    hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", 
"67108864")

    sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, 
AvroKeyInputFormat[GenericRecord]](path + "/*.avro", 
classOf[AvroKey[GenericRecord]], classOf[NullWritable], 
classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)

Error:

[ERROR] 
/Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37:
 error: overloaded method value newAPIHadoopFile with alternatives:

[INFO]   (path: String,fClass: 
Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass:
 
Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass:
 Class[org.apache.hadoop.io.NullWritable],conf: 
org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
 org.apache.hadoop.io.NullWritable)] <and>

[INFO]   (path: String)(implicit km: 
scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],
 implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable], 
implicit fm: 
scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
 org.apache.hadoop.io.NullWritable)]

[INFO]  cannot be applied to (String, 
Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], 
Class[org.apache.hadoop.io.NullWritable], 
Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],
 org.apache.hadoop.conf.Configuration)

[INFO]     sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, 
AvroKeyInputFormat[GenericRecord]](path + "/*.avro", 
classOf[AvroKey[GenericRecord]], classOf[NullWritable], 
classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)


On Thu, Jun 25, 2015 at 4:14 PM, Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote:
Ok, in that case I think you can set the max split size in the Hadoop config 
object, using the FileInputFormat.SPLIT_MAXSIZE config parameter.

Again, I haven’t done this myself, but looking through the Spark codebase here: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1053

And the HDFS FileInputFormat implementation, that seems like a good option to 
try.

You should be able to call conf.setLong(FileInputFormat.SPLIT_MAXSIZE, max).

I hope that helps!

From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
Date: Thursday, June 25, 2015 at 5:49 PM
To: Silvio Fiorito
Cc: user
Subject: Re:

I use

sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, 
AvroKeyInputFormat[GenericRecord]](path + "/*.avro")


https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/SparkContext.html#newAPIHadoopFile(java.lang.String,
 java.lang.Class, java.lang.Class, java.lang.Class, 
org.apache.hadoop.conf.Configuration)

Does not seem to have that partition option.

On Thu, Jun 25, 2015 at 12:24 PM, Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote:
Hi Deepak,

Have you tried specifying the minimum partitions when you load the file? I 
haven’t tried that myself against HDFS before, so I’m not sure if it will 
affect data locality. Ideally not, it should still maintain data locality but 
just more partitions. Once your job runs, you can check in the Spark tasks web 
UI to ensure they’re all Node local.

val details = sc.textFile(“hdfs://….”, 500)

If you’re using something other than text file you can also specify minimum 
partitions when using sc.hadoopFile.

Thanks,
Silvio

From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
Date: Thursday, June 25, 2015 at 3:10 PM
To: Akhil Das
Cc: user
Subject: Re:

How can i increase the number of tasks from 174 to 500 without running 
repartition.

The input size is 512.0 MB (hadoop) / 4159106. Can this be reduced to 64 MB so 
as to increase the number of tasks. Similar to split size that increases the 
number of mappers in Hadoop M/R.

On Thu, Jun 25, 2015 at 12:06 AM, Akhil Das 
<ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> wrote:
Look in the tuning section<https://spark.apache.org/docs/latest/tuning.html>, 
also you need to figure out whats taking time and where's your bottleneck etc. 
If everything is tuned properly, then you will need to throw more cores :)

Thanks
Best Regards

On Thu, Jun 25, 2015 at 12:19 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
<deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote:
Its taking an hour and on Hadoop it takes 1h 30m, is there a way to make it run 
faster ?

On Wed, Jun 24, 2015 at 11:39 AM, Akhil Das 
<ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> wrote:

Cool. :)

On 24 Jun 2015 23:44, "ÐΞ€ρ@Ҝ (๏̯͡๏)" 
<deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote:
Its running now.

On Wed, Jun 24, 2015 at 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
<deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote:
Now running with

--num-executors 9973 --driver-memory 14g --driver-java-options 
"-XX:MaxPermSize=512M -Xmx4096M -Xms4096M" --executor-memory 14g 
--executor-cores 1


On Wed, Jun 24, 2015 at 10:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
<deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote:
There are multiple of these

1)
15/06/24 09:53:37 ERROR executor.Executor: Exception in task 443.0 in stage 3.0 
(TID 1767)
java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
sun.reflect.GeneratedSerializationConstructorAccessor1327.newInstance(Unknown 
Source)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at 
org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS? timer thread

2)
15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS? timer thread
java.lang.OutOfMemoryError: GC overhead limit exceeded
at akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22)
at akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443)
at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409)
at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)
3)
# java.lang.OutOfMemoryError: GC overhead limit exceeded
# -XX:OnOutOfMemoryError="kill %p"
#   Executing /bin/sh -c "kill 20674"...
[ERROR] [06/24/2015 09:53:37.590] [Executor task launch worker-5] 
[akka.tcp://sparkdri...@phxdpehdc9dn2137.stratus.phx.ebay.com:47708/<http://sparkdri...@phxdpehdc9dn2137.stratus.phx.ebay.com:47708/>]
 swallowing exception during message send 
(akka.remote.RemoteTransportExceptionNoStackTrace)


On Wed, Jun 24, 2015 at 10:31 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
<deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote:
I see this

java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Arrays.copyOfRange(Arrays.java:2694)
at java.lang.String.<init>(String.java:203)
at java.lang.StringBuilder.toString(StringBuilder.java:405)
at java.io.UnixFileSystem.resolve(UnixFileSystem.java:108)
at java.io.File.<init>(File.java:367)
at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:81)
at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:84)
at 
org.apache.spark.shuffle.IndexShuffleBlockManager.getIndexFile(IndexShuffleBlockManager.scala:60)
at 
org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:107)
at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:304)
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)

On Wed, Jun 24, 2015 at 7:16 AM, Akhil Das 
<ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> wrote:
Can you look a bit more in the error logs? It could be getting killed because 
of OOM etc. One thing you can try is to set the 
spark.shuffle.blockTransferService to nio from netty.

Thanks
Best Regards

On Wed, Jun 24, 2015 at 5:46 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
<deepuj...@gmail.com<mailto:deepuj...@gmail.com>> wrote:
I have a Spark job that has 7 stages. The first 3 stage complete and the fourth 
stage beings (joins two RDDs). This stage has multiple task  failures all the 
below exception.

Multiple tasks (100s) of them get the same exception with different hosts. How 
can all the host suddenly stop responding when few moments ago 3 stages ran 
successfully. If I re-run the three stages will again run successfully. I 
cannot think of it being a cluster issue.


Any suggestions ?


Spark Version : 1.3.1

Exception:

org.apache.spark.shuffle.FetchFailedException: Failed to connect to HOST
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
        at org.apache.sp

--
Deepak





--
Deepak




--
Deepak




--
Deepak




--
Deepak




--
Deepak





--
Deepak




--
Deepak




--
Deepak




--
Deepak




--
Deepak




--
Deepak




--
Deepak

  • Re: Silvio Fiorito
    • Re: ๏̯͡๏

Reply via email to