Agree with Eugen that you should used Kryo.

But even better is to embed your Avro objects inside of Kryo. This allows
you to have the benefits of both Avro and Kryo.

Here's example code for using Avro with Kryo.

https://github.com/massie/adam/blob/master/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/serialization/AdamKryoRegistrator.scala

You need to register all of you Avro SpecificClasses with Kryo and have it
use the AvroSerializer class to encode/decode them.

e.g.

class AdamKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[MySpecificAvroClass], new
AvroSerializer[MySpecificAvroClass]()
  }
}










--
Matt Massie
UC, Berkeley AMPLab
Twitter: @matt_massie <https://twitter.com/matt_massie>,
@amplab<https://twitter.com/amplab>
https://amplab.cs.berkeley.edu/


On Mon, Nov 18, 2013 at 10:45 AM, Eugen Cepoi <cepoi.eu...@gmail.com> wrote:

> Hi Robert,
>
> The problem is that spark uses java serialization requiring serialized
> objects to implement Serializable, AvroKey doesn't.
> As a workaround you can try using 
> kryo<http://spark.incubator.apache.org/docs/latest/tuning.html#data-serialization>for
>  the serialization.
>
> Eugen
>
>
> 2013/11/11 Robert Fink <ursula2...@gmail.com>
>
>> Hi,
>>
>> I am trying to get the following minimal Scala example work: Using Spark
>> to process Avro records. Here's my dummy Avro definition:
>>
>> {
>>   "namespace": "com.avrotest",
>>   "type": "record",
>>   "name": "AvroTest",
>>   "fields": [
>>     {"name": "field1", "type": ["string", "null"]}
>>   ]
>> }
>>
>> I experiment with a simple job that creates three AvroTest objects,
>> writes them out to a file through a SparkContext, and then reads in the
>> thus generated Avro file and performs a simple grouping operation:
>>
>> //
>> ---------------------------------------------------------------------------------------------------------
>> import org.apache.spark.SparkContext._
>> import org.apache.avro.specific.SpecificDatumWriter
>> import org.apache.avro.file.DataFileWriter
>> import org.apache.avro._
>> import org.apache.avro.generic._
>> import org.apache.hadoop.mapreduce.Job
>> import com.avrotest.AvroTest
>> import java.io.File
>>
>> object SparkTest{
>>   def main(args: Array[String]) {
>>
>>     def avrofile = "output.avro"
>>     def sc = new SparkContext("local", "Simple App")
>>     val job = new Job()
>>
>>     val record1 = new AvroTest()
>>     record1.setField1("value1")
>>     val record2 = new AvroTest()
>>     record2.setField1("value1")
>>     val record3 = new AvroTest()
>>     record3.setField1("value2")
>>
>>     def userDatumWriter = new SpecificDatumWriter[AvroTest]()
>>     val dataFileWriter = new DataFileWriter[AvroTest](userDatumWriter)
>>      def file = new File(avrofile)
>>     dataFileWriter.create(record1.getSchema(), file)
>>     dataFileWriter.append(record1)
>>     dataFileWriter.append(record2)
>>     dataFileWriter.append(record3)
>>     dataFileWriter.close()
>>
>>     def rdd = sc.newAPIHadoopFile(
>>       avrofile,
>>       classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[AvroTest]],
>>       classOf[org.apache.avro.mapred.AvroKey[AvroTest]],
>>       classOf[org.apache.hadoop.io.NullWritable],
>>       job.getConfiguration)
>>     // rdd.foreach( x => println(x._1.datum.getField1) ) // Prints
>> value1, value1, value2
>>     val numGroups= rdd.groupBy(x => x._1.datum.getField1).count()
>>   }
>> }
>> //
>> ---------------------------------------------------------------------------------------------------------
>>
>> I would expect numGroups==2 in the last step, because record1 and record2
>> share the getField1()=="value1", and record3 has getField1() == "value2".
>> However, the script fails to execute with the following error (see below).
>> Can anyone give me a hint what could be wrong in the above code, or post an
>> example of reading from an Avro file and performing some simple
>> computations on the retrieved objects? Thank you so much! Robert.
>>
>> 11650 [pool-109-thread-1] WARN
>> org.apache.avro.mapreduce.AvroKeyInputFormat - Reader schema was not set.
>> Use AvroJob.setInputKeySchema() if desired.
>> 11661 [pool-109-thread-1] INFO
>> org.apache.avro.mapreduce.AvroKeyInputFormat - Using a reader schema equal
>> to the writer schema.
>> 12293 [spark-akka.actor.default-dispatcher-5] INFO
>> org.apache.spark.scheduler.local.LocalTaskSetManager - Loss was due to
>> java.io.NotSerializableException
>> java.io.NotSerializableException: org.apache.avro.mapred.AvroKey
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>         at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>         at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>         at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>         at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>         at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>         at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>         at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>         at
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:27)
>>         at
>> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:109)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>>         at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>>         at
>> org.apache.spark.scheduler.local.LocalScheduler.runTask(LocalScheduler.scala:198)
>>         at
>> org.apache.spark.scheduler.local.LocalActor$$anonfun$launchTask$1$$anon$1.run(LocalScheduler.scala:68)
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:744)
>> 12304 [spark-akka.actor.default-dispatcher-5] INFO
>> org.apache.spark.scheduler.local.LocalScheduler - Remove TaskSet 1.0 from
>> pool
>> 12311 [run-main] INFO org.apache.spark.scheduler.DAGScheduler - Failed to
>> run count at SparkTest.scala:41
>> [error] (run-main) org.apache.spark.SparkException: Job failed: Task
>> 1.0:0 failed more than 4 times; aborting job
>> java.io.NotSerializableException: org.apache.avro.mapred.AvroKey
>> org.apache.spark.SparkException: Job failed: Task 1.0:0 failed more than
>> 4 times; aborting job java.io.NotSerializableException:
>> org.apache.avro.mapred.AvroKey
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>         at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>         at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>         at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>         at
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
>>         at 
>> org.apache.spark.scheduler.DAGScheduler.org<http://org.apache.spark.scheduler.dagscheduler.org/>
>> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>
>
>

Reply via email to