Make sure you’re importing the right namespace for Hadoop v2.0. This is what I
tried:
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat}
val hadoopConf = new org.apache.hadoop.conf.Configuration()
hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 2048)
val input = sc.newAPIHadoopFile(
"README.md",
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text],
hadoopConf).map(_._2.toString())
println(input.partitions.size)
input.
flatMap(_.split(" ")).
filter(_.length > 0).
map((_, 1)).
reduceByKey(_ + _).
coalesce(1).
sortBy(_._2, false).
take(10).
foreach(println)
From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
Date: Friday, June 26, 2015 at 10:18 AM
To: Silvio Fiorito
Cc: user
Subject: Re:
All these throw compilation error at newAPIHadoopFile
1)
val hadoopConfiguration = new Configuration()
hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize",
"67108864")
sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path +
"/*.avro", classOf[AvroKey], classOf[NullWritable],
classOf[AvroKeyInputFormat], hadoopConfiguration)
2)
val hadoopConfiguration = new Configuration()
hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize",
"67108864")
sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path +
"/*.avro", classOf[AvroKey[GenericRecord]], classOf[NullWritable],
classOf[AvroKeyInputFormat[GenericRecord]],hadoopConfiguration)
3)
val hadoopConfiguration = new Configuration()
hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize",
"67108864")
sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
AvroKeyInputFormat[GenericRecord]](path + "/*.avro",
classOf[AvroKey[GenericRecord]], classOf[NullWritable],
classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)
Error:
[ERROR]
/Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37:
error: overloaded method value newAPIHadoopFile with alternatives:
[INFO] (path: String,fClass:
Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass:
Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass:
Class[org.apache.hadoop.io.NullWritable],conf:
org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
org.apache.hadoop.io.NullWritable)] <and>
[INFO] (path: String)(implicit km:
scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],
implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable],
implicit fm:
scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
org.apache.hadoop.io.NullWritable)]
[INFO] cannot be applied to (String,
Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],
Class[org.apache.hadoop.io.NullWritable],
Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],
org.apache.hadoop.conf.Configuration)
[INFO] sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
AvroKeyInputFormat[GenericRecord]](path + "/*.avro",
classOf[AvroKey[GenericRecord]], classOf[NullWritable],
classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)
On Thu, Jun 25, 2015 at 4:14 PM, Silvio Fiorito
<[email protected]<mailto:[email protected]>> wrote:
Ok, in that case I think you can set the max split size in the Hadoop config
object, using the FileInputFormat.SPLIT_MAXSIZE config parameter.
Again, I haven’t done this myself, but looking through the Spark codebase here:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1053
And the HDFS FileInputFormat implementation, that seems like a good option to
try.
You should be able to call conf.setLong(FileInputFormat.SPLIT_MAXSIZE, max).
I hope that helps!
From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
Date: Thursday, June 25, 2015 at 5:49 PM
To: Silvio Fiorito
Cc: user
Subject: Re:
I use
sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
AvroKeyInputFormat[GenericRecord]](path + "/*.avro")
https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/SparkContext.html#newAPIHadoopFile(java.lang.String,
java.lang.Class, java.lang.Class, java.lang.Class,
org.apache.hadoop.conf.Configuration)
Does not seem to have that partition option.
On Thu, Jun 25, 2015 at 12:24 PM, Silvio Fiorito
<[email protected]<mailto:[email protected]>> wrote:
Hi Deepak,
Have you tried specifying the minimum partitions when you load the file? I
haven’t tried that myself against HDFS before, so I’m not sure if it will
affect data locality. Ideally not, it should still maintain data locality but
just more partitions. Once your job runs, you can check in the Spark tasks web
UI to ensure they’re all Node local.
val details = sc.textFile(“hdfs://….”, 500)
If you’re using something other than text file you can also specify minimum
partitions when using sc.hadoopFile.
Thanks,
Silvio
From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
Date: Thursday, June 25, 2015 at 3:10 PM
To: Akhil Das
Cc: user
Subject: Re:
How can i increase the number of tasks from 174 to 500 without running
repartition.
The input size is 512.0 MB (hadoop) / 4159106. Can this be reduced to 64 MB so
as to increase the number of tasks. Similar to split size that increases the
number of mappers in Hadoop M/R.
On Thu, Jun 25, 2015 at 12:06 AM, Akhil Das
<[email protected]<mailto:[email protected]>> wrote:
Look in the tuning section<https://spark.apache.org/docs/latest/tuning.html>,
also you need to figure out whats taking time and where's your bottleneck etc.
If everything is tuned properly, then you will need to throw more cores :)
Thanks
Best Regards
On Thu, Jun 25, 2015 at 12:19 AM, ÐΞ€ρ@Ҝ (๏̯͡๏)
<[email protected]<mailto:[email protected]>> wrote:
Its taking an hour and on Hadoop it takes 1h 30m, is there a way to make it run
faster ?
On Wed, Jun 24, 2015 at 11:39 AM, Akhil Das
<[email protected]<mailto:[email protected]>> wrote:
Cool. :)
On 24 Jun 2015 23:44, "ÐΞ€ρ@Ҝ (๏̯͡๏)"
<[email protected]<mailto:[email protected]>> wrote:
Its running now.
On Wed, Jun 24, 2015 at 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏)
<[email protected]<mailto:[email protected]>> wrote:
Now running with
--num-executors 9973 --driver-memory 14g --driver-java-options
"-XX:MaxPermSize=512M -Xmx4096M -Xms4096M" --executor-memory 14g
--executor-cores 1
On Wed, Jun 24, 2015 at 10:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏)
<[email protected]<mailto:[email protected]>> wrote:
There are multiple of these
1)
15/06/24 09:53:37 ERROR executor.Executor: Exception in task 443.0 in stage 3.0
(TID 1767)
java.lang.OutOfMemoryError: GC overhead limit exceeded
at
sun.reflect.GeneratedSerializationConstructorAccessor1327.newInstance(Unknown
Source)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS? timer thread
2)
15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS? timer thread
java.lang.OutOfMemoryError: GC overhead limit exceeded
at akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22)
at akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443)
at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409)
at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)
3)
# java.lang.OutOfMemoryError: GC overhead limit exceeded
# -XX:OnOutOfMemoryError="kill %p"
# Executing /bin/sh -c "kill 20674"...
[ERROR] [06/24/2015 09:53:37.590] [Executor task launch worker-5]
[akka.tcp://[email protected]:47708/<http://[email protected]:47708/>]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)
On Wed, Jun 24, 2015 at 10:31 AM, ÐΞ€ρ@Ҝ (๏̯͡๏)
<[email protected]<mailto:[email protected]>> wrote:
I see this
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Arrays.copyOfRange(Arrays.java:2694)
at java.lang.String.<init>(String.java:203)
at java.lang.StringBuilder.toString(StringBuilder.java:405)
at java.io.UnixFileSystem.resolve(UnixFileSystem.java:108)
at java.io.File.<init>(File.java:367)
at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:81)
at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:84)
at
org.apache.spark.shuffle.IndexShuffleBlockManager.getIndexFile(IndexShuffleBlockManager.scala:60)
at
org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:107)
at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:304)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
at
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
at
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
On Wed, Jun 24, 2015 at 7:16 AM, Akhil Das
<[email protected]<mailto:[email protected]>> wrote:
Can you look a bit more in the error logs? It could be getting killed because
of OOM etc. One thing you can try is to set the
spark.shuffle.blockTransferService to nio from netty.
Thanks
Best Regards
On Wed, Jun 24, 2015 at 5:46 AM, ÐΞ€ρ@Ҝ (๏̯͡๏)
<[email protected]<mailto:[email protected]>> wrote:
I have a Spark job that has 7 stages. The first 3 stage complete and the fourth
stage beings (joins two RDDs). This stage has multiple task failures all the
below exception.
Multiple tasks (100s) of them get the same exception with different hosts. How
can all the host suddenly stop responding when few moments ago 3 stages ran
successfully. If I re-run the three stages will again run successfully. I
cannot think of it being a cluster issue.
Any suggestions ?
Spark Version : 1.3.1
Exception:
org.apache.spark.shuffle.FetchFailedException: Failed to connect to HOST
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
at org.apache.sp
--
Deepak
--
Deepak
--
Deepak
--
Deepak
--
Deepak
--
Deepak
--
Deepak
--
Deepak
--
Deepak