You might consider using the native parquet support built into Spark SQL
instead of using the raw library:

http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files

On Mon, Nov 3, 2014 at 7:33 PM, Michael Albert <
m_albert...@yahoo.com.invalid> wrote:

> Greetings!
>
>
> I'm trying to use avro and parquet with the following schema:
>
> {
>
>     "name": "TestStruct",
>
>     "namespace": "bughunt",
>
>     "type": "record",
>
>     "fields": [
>
>         {
>
>             "name": "string_array",
>
>             "type": { "type": "array", "items": "string" }
>
>         }
>
>     ]
>
> }
> The writing process seems to be OK, but when I try to read it with Spark,
> I get:
>
> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
>
> Serialization trace:
>
> string_array (bughunt.TestStruct)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
> When I try to read it with Hive, I get this:
>
> Failed with exception
> java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException:
> java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be
> cast to org.apache.hadoop.io.ArrayWritable
> Which would lead me to suspect that this might be related to this one:
> https://github.com/Parquet/parquet-mr/issues/281 , but that one seems to
> be Hive specific, and I am not seeing Spark read the data it claims to have
> written itself.
>
> I'm running on an Amazon EMR cluster using the "version 2.4.0" hadoop code
> and spark 1.1.0.
> Has anyone else observed this sort of behavior?
>
> For completeness, here is the code that writes the data:
>
> package bughunt
>
>
> import org.apache.hadoop.mapreduce.Job
>
>
> import org.apache.spark.SparkConf
>
> import org.apache.spark.SparkContext
>
> import org.apache.spark.SparkContext._
>
>
>
> import parquet.avro.AvroWriteSupport
>
> import parquet.avro.AvroParquetOutputFormat
>
> import parquet.hadoop.ParquetOutputFormat
>
>
> import java.util.ArrayList
>
>
>
> object GenData {
>
>     val outputPath = "/user/xxxxx/testdata"
>
>     val words = List(
>
>                     List("apple", "banana", "cherry"),
>
>                     List("car", "boat", "plane"),
>
>                     List("lion", "tiger", "bear"),
>
>                     List("north", "south", "east", "west"),
>
>                     List("up", "down", "left", "right"),
>
>                     List("red", "green", "blue"))
>
>
>     def main(args: Array[String]) {
>
>         val conf = new SparkConf(true)
>
>                     .setAppName("IngestLoanApplicattion")
>
>                     //.set("spark.kryo.registrator",
>
>                     //            classOf[CommonRegistrator].getName)
>
>                     .set("spark.serializer",
>
>                             "org.apache.spark.serializer.KryoSerializer")
>
>                     .set("spark.kryoserializer.buffer.mb", 4.toString)
>
>                     .set("spark.kryo.referenceTracking", "false")
>
>
>         val sc = new SparkContext(conf)
>
>
>         val rdd = sc.parallelize(words)
>
>
>         val job = new Job(sc.hadoopConfiguration)
>
>
>         ParquetOutputFormat.setWriteSupportClass(job,
> classOf[AvroWriteSupport])
>
>         AvroParquetOutputFormat.setSchema(job,
>
>                     TestStruct.SCHEMA$)
>
>
>         rdd.map(p => {
>
>                     val xs = new java.util.ArrayList[String]
>
>                     for (z<-p) { xs.add(z) }
>
>                     val bldr = TestStruct.newBuilder()
>
>                     bldr.setStringArray(xs)
>
>                     (null, bldr.build()) })
>
>            .saveAsNewAPIHadoopFile(outputPath,
>
>                 classOf[Void],
>
>                 classOf[TestStruct],
>
>                 classOf[ParquetOutputFormat[TestStruct]],
>
>                 job.getConfiguration)
>
>     }
>
> }
>
> To read the data, I use this sort of code from the spark-shell:
>
> :paste
>
>
> import bughunt.TestStruct
>
>
> import org.apache.hadoop.mapreduce.Job
>
> import org.apache.spark.SparkContext
>
>
> import parquet.hadoop.ParquetInputFormat
>
> import parquet.avro.AvroReadSupport
>
>
> def openRddSpecific(sc: SparkContext) = {
>
>     val job = new Job(sc.hadoopConfiguration)
>
>
>     ParquetInputFormat.setReadSupportClass(job,
>
>             classOf[AvroReadSupport[TestStruct]])
>
>
>     sc.newAPIHadoopFile("/user/malbert/testdata",
>
>             classOf[ParquetInputFormat[TestStruct]],
>
>             classOf[Void],
>
>             classOf[TestStruct],
>
>             job.getConfiguration)
>
> }
> I start the Spark shell as follows:
>
> spark-shell \
>
>     --jars ../my-jar-containing-the-class-definitions.jar \
>
>     --conf mapreduce.user.classpath.first=true \
>
>     --conf spark.kryo.referenceTracking=false \
>
>     --conf spark.kryoserializer.buffer.mb=4 \
>
>     --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
>
> I'm stumped.  I can read and write records and maps, but arrays/vectors
> elude me.
> Am I missing something obvious?
>
> Thanks!
>
> Sincerely,
>  Mike Albert
>

Reply via email to