Hi,  Sparkers:
In this case, I want to use Spark as an ETL engine to load the data from 
Cassandra, and save it into HDFS.
Here is the environment specified information:
Spark 1.3.1Cassandra 2.1HDFS/Hadoop 2.2
I am using the Cassandra Spark Connector 1.3.x, which I have no problem to 
query the C* data in the Spark. But I have a problem trying to save the data 
into HDFS, like below:
val df = sqlContext.load("org.apache.spark.sql.cassandra", options = Map( 
"c_table" -> "table_name", "keyspace" -> "keyspace_name")df: 
org.apache.spark.sql.DataFrame = [account_id: bigint, campaign_id: uuid, 
business_info_ids: array<uuid>, closed_date: timestamp, compliance_hold: 
boolean, contacts_list_id: uuid, contacts_list_seq: bigint, currency_type: 
string, deleted_date: timestamp, discount_info: map<string,string>, end_date: 
timestamp, insert_by: string, insert_time: timestamp, last_update_by: string, 
last_update_time: timestamp, name: string, parent_id: uuid, publish_date: 
timestamp, share_incentive: map<string,string>, start_date: timestamp, version: 
int]
scala> df.countres12: Long = 757704
I can also dump the data output suing df.first, without any problem.
But when I try to save it:
scala> df.save("hdfs://location", "parquet")java.lang.RuntimeException: 
Unsupported datatype UUIDType   at scala.sys.package$.error(package.scala:27)   
at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:372)
     at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:316)
     at scala.Option.getOrElse(Option.scala:120)     at 
org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:315)
     at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:395)
  at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:394)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
     at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
     at scala.collection.immutable.List.foreach(List.scala:318)      at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)        at 
scala.collection.AbstractTraversable.map(Traversable.scala:105)      at 
org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:393)
    at 
org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:440)
    at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.prepareMetadata(newParquet.scala:260)
    at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:276)
   at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:269)
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
     at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
     at scala.collection.immutable.List.foreach(List.scala:318)      at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)        at 
scala.collection.AbstractTraversable.map(Traversable.scala:105)      at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:269)
    at 
org.apache.spark.sql.parquet.ParquetRelation2.<init>(newParquet.scala:391)   at 
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:98)  
     at 
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:128) 
     at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)   
     at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196)    at 
org.apache.spark.sql.DataFrame.save(DataFrame.scala:1156)    at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:28)
     at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)  at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)       at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)    at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39) at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)      at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)   at 
$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)        at 
$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)     at 
$iwC$$iwC$$iwC.<init>(<console>:49)  at $iwC$$iwC.<init>(<console>:51)       at 
$iwC.<init>(<console>:53)    at <init>(<console>:55) at .<init>(<console>:59)   
     at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at 
$print(<console>)    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)   
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)     at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)   at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)   at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)       at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)     at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)     at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)     at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) at 
org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)       at 
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656) at 
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)   at 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
        at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
     at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
       at 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
     at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)      at 
org.apache.spark.repl.Main$.main(Main.scala:31)      at 
org.apache.spark.repl.Main.main(Main.scala)  at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)   
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)     at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)   at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)     at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
It looks like Spark doesn't know how to handle the UUID type, and as you can 
see, the UUID type existed in both top level column, and also in the nested 
level.
My question is, giving the version of all the components I current have, is 
there any option for me?
Thanks
Yong                                      

Reply via email to