Hi Ted,

The link is useful. Still I could not figure out the way to convert the 
RDD[GenericRecord] in to DF.


Tried to create the spark sql schema from avro schema.

val json = """{"type":"record","name":"Profile","fields":

                      [{"name":"userid","type":"string"},
                      {"name":"created_time","type":"long"},
                      {"name":"updated_time","type":"long"}]}"""

    val schema: StructType = DataType.fromJson(json).asInstanceOf[StructType]
    val profileDataFrame = sqlContext.createDataFrame(mergedProfiles, schema)



Getting the following compilation error:


[ERROR] ProfileService.scala:119: error: overloaded method value 
createDataFrame with alternatives:
[INFO]   (data: java.util.List[_],beanClass: 
Class[_])org.apache.spark.sql.DataFrame <and>
[INFO]   (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: 
Class[_])org.apache.spark.sql.DataFrame <and>
[INFO]   (rdd: org.apache.spark.rdd.RDD[_],beanClass: 
Class[_])org.apache.spark.sql.DataFrame <and>
[INFO]   (rows: java.util.List[org.apache.spark.sql.Row],schema: 
org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
[INFO]   (rowRDD: 
org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: 
org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
[INFO]   (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: 
org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
[INFO]  cannot be applied to 
(org.apache.spark.streaming.dstream.MapWithStateDStream[(String, String, 
String),org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord,((String,
 String, String), org.apache.avro.generic.GenericRecord)], 
org.apache.spark.sql.types.StructType)
[INFO]     val profileDataFrame = sqlContext.createDataFrame(mergedProfiles, 
schema)


Thanks,


Raj


________________________________
From: Ted Yu <yuzhih...@gmail.com>
Sent: Thursday, May 26, 2016 12:01:43 PM
To: Govindasamy, Nagarajan
Cc: user@spark.apache.org
Subject: Re: save RDD of Avro GenericRecord as parquet throws 
UnsupportedOperationException

Have you seen this thread ?

http://search-hadoop.com/m/q3RTtWmyYB5fweR&subj=Re+Best+way+to+store+Avro+Objects+as+Parquet+using+SPARK

On Thu, May 26, 2016 at 6:55 AM, Govindasamy, Nagarajan 
<ngovindas...@turbine.com<mailto:ngovindas...@turbine.com>> wrote:

Hi,

I am trying to save RDD of Avro GenericRecord as parquet. I am using Spark 
1.6.1.

DStreamOfAvroGenericRecord.foreachRDD(rdd => 
rdd.toDF().write.parquet("s3://bucket/data.parquet"))

Getting the following exception. Is there a way to save Avro GenericRecord as 
Parquet or ORC file?

java.lang.UnsupportedOperationException: Schema for type 
org.apache.avro.generic.GenericRecord is not supported
        at 
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:715)
        at 
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
        at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:690)
        at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:689)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at 
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:689)
        at 
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
        at 
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:642)
        at 
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
        at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:414)
        at 
org.apache.spark.sql.SQLImplicits.rddToDataFrameHolder(SQLImplicits.scala:155)

Thanks,

Raj

Reply via email to