Agree with Eugen that you should used Kryo.
But even better is to embed your Avro objects inside of Kryo. This allows
you to have the benefits of both Avro and Kryo.
Here's example code for using Avro with Kryo.
https://github.com/massie/adam/blob/master/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/serialization/AdamKryoRegistrator.scala
You need to register all of you Avro SpecificClasses with Kryo and have it
use the AvroSerializer class to encode/decode them.
e.g.
class AdamKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[MySpecificAvroClass], new
AvroSerializer[MySpecificAvroClass]()
}
}
--
Matt Massie
UC, Berkeley AMPLab
Twitter: @matt_massie https://twitter.com/matt_massie,
@amplabhttps://twitter.com/amplab
https://amplab.cs.berkeley.edu/
On Mon, Nov 18, 2013 at 10:45 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote:
Hi Robert,
The problem is that spark uses java serialization requiring serialized
objects to implement Serializable, AvroKey doesn't.
As a workaround you can try using
kryohttp://spark.incubator.apache.org/docs/latest/tuning.html#data-serializationfor
the serialization.
Eugen
2013/11/11 Robert Fink ursula2...@gmail.com
Hi,
I am trying to get the following minimal Scala example work: Using Spark
to process Avro records. Here's my dummy Avro definition:
{
namespace: com.avrotest,
type: record,
name: AvroTest,
fields: [
{name: field1, type: [string, null]}
]
}
I experiment with a simple job that creates three AvroTest objects,
writes them out to a file through a SparkContext, and then reads in the
thus generated Avro file and performs a simple grouping operation:
//
-
import org.apache.spark.SparkContext._
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.avro.file.DataFileWriter
import org.apache.avro._
import org.apache.avro.generic._
import org.apache.hadoop.mapreduce.Job
import com.avrotest.AvroTest
import java.io.File
object SparkTest{
def main(args: Array[String]) {
def avrofile = output.avro
def sc = new SparkContext(local, Simple App)
val job = new Job()
val record1 = new AvroTest()
record1.setField1(value1)
val record2 = new AvroTest()
record2.setField1(value1)
val record3 = new AvroTest()
record3.setField1(value2)
def userDatumWriter = new SpecificDatumWriter[AvroTest]()
val dataFileWriter = new DataFileWriter[AvroTest](userDatumWriter)
def file = new File(avrofile)
dataFileWriter.create(record1.getSchema(), file)
dataFileWriter.append(record1)
dataFileWriter.append(record2)
dataFileWriter.append(record3)
dataFileWriter.close()
def rdd = sc.newAPIHadoopFile(
avrofile,
classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[AvroTest]],
classOf[org.apache.avro.mapred.AvroKey[AvroTest]],
classOf[org.apache.hadoop.io.NullWritable],
job.getConfiguration)
// rdd.foreach( x = println(x._1.datum.getField1) ) // Prints
value1, value1, value2
val numGroups= rdd.groupBy(x = x._1.datum.getField1).count()
}
}
//
-
I would expect numGroups==2 in the last step, because record1 and record2
share the getField1()==value1, and record3 has getField1() == value2.
However, the script fails to execute with the following error (see below).
Can anyone give me a hint what could be wrong in the above code, or post an
example of reading from an Avro file and performing some simple
computations on the retrieved objects? Thank you so much! Robert.
11650 [pool-109-thread-1] WARN
org.apache.avro.mapreduce.AvroKeyInputFormat - Reader schema was not set.
Use AvroJob.setInputKeySchema() if desired.
11661 [pool-109-thread-1] INFO
org.apache.avro.mapreduce.AvroKeyInputFormat - Using a reader schema equal
to the writer schema.
12293 [spark-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.local.LocalTaskSetManager - Loss was due to
java.io.NotSerializableException
java.io.NotSerializableException: org.apache.avro.mapred.AvroKey
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508