You might consider using the native parquet support built into Spark SQL instead of using the raw library:
http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files On Mon, Nov 3, 2014 at 7:33 PM, Michael Albert < m_albert...@yahoo.com.invalid> wrote: > Greetings! > > > I'm trying to use avro and parquet with the following schema: > > { > > "name": "TestStruct", > > "namespace": "bughunt", > > "type": "record", > > "fields": [ > > { > > "name": "string_array", > > "type": { "type": "array", "items": "string" } > > } > > ] > > } > The writing process seems to be OK, but when I try to read it with Spark, > I get: > > com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException > > Serialization trace: > > string_array (bughunt.TestStruct) > > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) > > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) > > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) > When I try to read it with Hive, I get this: > > Failed with exception > java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: > java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be > cast to org.apache.hadoop.io.ArrayWritable > Which would lead me to suspect that this might be related to this one: > https://github.com/Parquet/parquet-mr/issues/281 , but that one seems to > be Hive specific, and I am not seeing Spark read the data it claims to have > written itself. > > I'm running on an Amazon EMR cluster using the "version 2.4.0" hadoop code > and spark 1.1.0. > Has anyone else observed this sort of behavior? > > For completeness, here is the code that writes the data: > > package bughunt > > > import org.apache.hadoop.mapreduce.Job > > > import org.apache.spark.SparkConf > > import org.apache.spark.SparkContext > > import org.apache.spark.SparkContext._ > > > > import parquet.avro.AvroWriteSupport > > import parquet.avro.AvroParquetOutputFormat > > import parquet.hadoop.ParquetOutputFormat > > > import java.util.ArrayList > > > > object GenData { > > val outputPath = "/user/xxxxx/testdata" > > val words = List( > > List("apple", "banana", "cherry"), > > List("car", "boat", "plane"), > > List("lion", "tiger", "bear"), > > List("north", "south", "east", "west"), > > List("up", "down", "left", "right"), > > List("red", "green", "blue")) > > > def main(args: Array[String]) { > > val conf = new SparkConf(true) > > .setAppName("IngestLoanApplicattion") > > //.set("spark.kryo.registrator", > > // classOf[CommonRegistrator].getName) > > .set("spark.serializer", > > "org.apache.spark.serializer.KryoSerializer") > > .set("spark.kryoserializer.buffer.mb", 4.toString) > > .set("spark.kryo.referenceTracking", "false") > > > val sc = new SparkContext(conf) > > > val rdd = sc.parallelize(words) > > > val job = new Job(sc.hadoopConfiguration) > > > ParquetOutputFormat.setWriteSupportClass(job, > classOf[AvroWriteSupport]) > > AvroParquetOutputFormat.setSchema(job, > > TestStruct.SCHEMA$) > > > rdd.map(p => { > > val xs = new java.util.ArrayList[String] > > for (z<-p) { xs.add(z) } > > val bldr = TestStruct.newBuilder() > > bldr.setStringArray(xs) > > (null, bldr.build()) }) > > .saveAsNewAPIHadoopFile(outputPath, > > classOf[Void], > > classOf[TestStruct], > > classOf[ParquetOutputFormat[TestStruct]], > > job.getConfiguration) > > } > > } > > To read the data, I use this sort of code from the spark-shell: > > :paste > > > import bughunt.TestStruct > > > import org.apache.hadoop.mapreduce.Job > > import org.apache.spark.SparkContext > > > import parquet.hadoop.ParquetInputFormat > > import parquet.avro.AvroReadSupport > > > def openRddSpecific(sc: SparkContext) = { > > val job = new Job(sc.hadoopConfiguration) > > > ParquetInputFormat.setReadSupportClass(job, > > classOf[AvroReadSupport[TestStruct]]) > > > sc.newAPIHadoopFile("/user/malbert/testdata", > > classOf[ParquetInputFormat[TestStruct]], > > classOf[Void], > > classOf[TestStruct], > > job.getConfiguration) > > } > I start the Spark shell as follows: > > spark-shell \ > > --jars ../my-jar-containing-the-class-definitions.jar \ > > --conf mapreduce.user.classpath.first=true \ > > --conf spark.kryo.referenceTracking=false \ > > --conf spark.kryoserializer.buffer.mb=4 \ > > --conf spark.serializer=org.apache.spark.serializer.KryoSerializer > > I'm stumped. I can read and write records and maps, but arrays/vectors > elude me. > Am I missing something obvious? > > Thanks! > > Sincerely, > Mike Albert >