I have also been struggling with reading avro. Very glad to hear that there
is a new avro library coming in Spark 1.2 (which by the way, seems to have
a lot of other very useful improvements).

In the meanwhile, I have been able to piece together several snippets/tips
that I found from various sources and I am now able to read/write avro
correctly. From my understanding, you basically need 3 pieces:
1. Use the kryo serializer.
2. Register your avro classes. I have done this using twitter chill 0.4.0.
3. Read/write avro with a snippet of code like the one you posted.

Here is relevant code (hopefully all of it).

// All of the following are needed in order to read/write AVRO files
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.fs.{ FileSystem, Path }
// Uncomment the following line if you want to use generic AVRO, I am using
specific
//import org.apache.avro.generic.GenericData
import org.apache.avro.Schema
import org.apache.avro.mapreduce.{ AvroJob, AvroKeyInputFormat,
AvroKeyOutputFormat }
import org.apache.avro.mapred.AvroKey
// Kryo/avro serialization stuff
import com.esotericsoftware.kryo.Kryo
import com.twitter.chill.avro.AvroSerializer
import org.apache.spark.serializer.{ KryoSerializer, KryoRegistrator }

object MyApp {
    def main(args: Array[String]) {
val conf = new SparkConf().setAppName("MyApp").setMaster("local[*]")
            .set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
            .set("spark.kryo.registrator", "MyRegistrator")

}

// Read
val readJob = new Job()
AvroJob.setInputKeySchema(readJob, schema)
        sc.newAPIHadoopFile(inputPath,
            classOf[AvroKeyInputFormat[MyAvroClass]],
            classOf[AvroKey[MyAvroClass]],
            classOf[NullWritable],
            readJob.getConfiguration)
.map { t => t._1.datum }

// Write
val rddAvroWritable = rdd.map { s => (new AvroKey(s), NullWritable.get) }
val writeJob = new Job()
        AvroJob.setOutputKeySchema(writeJob, schema)

writeJob.setOutputFormatClass(classOf[AvroKeyOutputFormat[MyAvroClass]])
        rddAvroWritable.saveAsNewAPIHadoopFile(outputPath,
            classOf[AvroKey[MyAvroClass]],
            classOf[NullWritable],
            classOf[AvroKeyOutputFormat[MyAvroClass]],
            writeJob.getConfiguration)

    }
}


class MyRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo) {
// Put a line like the following for each of your Avro classes if you use
specific Avro
// If you use generic Avro, chill also has a function for that:
GenericRecordSerializer
        kryo.register(classOf[MyAvroClass],
AvroSerializer.SpecificRecordBinarySerializer[MyAvroClass])
    }
}

Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini

On Fri, Nov 21, 2014 at 7:04 AM, thomas j <beanb...@googlemail.com> wrote:

> I've been able to load a different avro file based on GenericRecord with:
>
> val person = sqlContext.avroFile("/tmp/person.avro")
>
> When I try to call `first()` on it, I get "NotSerializableException"
> exceptions again:
>
> person.first()
>
> ...
> 14/11/21 12:59:17 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID
> 20)
> java.io.NotSerializableException:
> org.apache.avro.generic.GenericData$Record
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     ...
>
> Apart from this I want to transform the records into pairs of (user_id,
> record). I can do this by specifying the offset of the user_id column with
> something like this:
>
> person.map(r => (r.getInt(2), r)).take(4).collect()
>
> Is there any way to be able to specify the column name ("user_id") instead
> of needing to know/calculate the offset somehow?
>
> Thanks again
>
>
> On Fri, Nov 21, 2014 at 11:48 AM, thomas j <beanb...@googlemail.com>
> wrote:
>
>> Thanks for the pointer Michael.
>>
>> I've downloaded spark 1.2.0 from
>> https://people.apache.org/~pwendell/spark-1.2.0-snapshot1/ and clone and
>> built the spark-avro repo you linked to.
>>
>> When I run it against the example avro file linked to in the
>> documentation it works. However, when I try to load my avro file (linked to
>> in my original question) I receive the following error:
>>
>> java.lang.RuntimeException: Unsupported type LONG
>>     at scala.sys.package$.error(package.scala:27)
>>     at com.databricks.spark.avro.AvroRelation.com
>> $databricks$spark$avro$AvroRelation$$toSqlType(AvroRelation.scala:116)
>>     at
>> com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:97)
>>     at
>> com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:96)
>>     at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>     at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>     ...
>>
>> If this is useful I'm happy to try loading the various different avro
>> files I have to try to battle-test spark-avro.
>>
>> Thanks
>>
>> On Thu, Nov 20, 2014 at 6:30 PM, Michael Armbrust <mich...@databricks.com
>> > wrote:
>>
>>> One option (starting with Spark 1.2, which is currently in preview) is
>>> to use the Avro library for Spark SQL.  This is very new, but we would love
>>> to get feedback: https://github.com/databricks/spark-avro
>>>
>>> On Thu, Nov 20, 2014 at 10:19 AM, al b <beanb...@googlemail.com> wrote:
>>>
>>>> I've read several posts of people struggling to read avro in spark. The
>>>> examples I've tried don't work. When I try this solution (
>>>> https://stackoverflow.com/questions/23944615/how-can-i-load-avros-in-spark-using-the-schema-on-board-the-avro-files)
>>>> I get errors:
>>>>
>>>> spark java.io.NotSerializableException:
>>>> org.apache.avro.mapred.AvroWrapper
>>>>
>>>> How can I read the following sample file in spark using scala?
>>>>
>>>> http://www.4shared.com/file/SxnYcdgJce/sample.html
>>>>
>>>> Thomas
>>>>
>>>
>>>
>>
>

Reply via email to