Hi, Sparkers:
In this case, I want to use Spark as an ETL engine to load the data from
Cassandra, and save it into HDFS.
Here is the environment specified information:
Spark 1.3.1Cassandra 2.1HDFS/Hadoop 2.2
I am using the Cassandra Spark Connector 1.3.x, which I have no problem to
query the C* data in the Spark. But I have a problem trying to save the data
into HDFS, like below:
val df = sqlContext.load("org.apache.spark.sql.cassandra", options = Map(
"c_table" -> "table_name", "keyspace" -> "keyspace_name")df:
org.apache.spark.sql.DataFrame = [account_id: bigint, campaign_id: uuid,
business_info_ids: array<uuid>, closed_date: timestamp, compliance_hold:
boolean, contacts_list_id: uuid, contacts_list_seq: bigint, currency_type:
string, deleted_date: timestamp, discount_info: map<string,string>, end_date:
timestamp, insert_by: string, insert_time: timestamp, last_update_by: string,
last_update_time: timestamp, name: string, parent_id: uuid, publish_date:
timestamp, share_incentive: map<string,string>, start_date: timestamp, version:
int]
scala> df.countres12: Long = 757704
I can also dump the data output suing df.first, without any problem.
But when I try to save it:
scala> df.save("hdfs://location", "parquet")java.lang.RuntimeException:
Unsupported datatype UUIDType at scala.sys.package$.error(package.scala:27)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:372)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:316)
at scala.Option.getOrElse(Option.scala:120) at
org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:315)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:395)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:394)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at
scala.collection.AbstractTraversable.map(Traversable.scala:105) at
org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:393)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:440)
at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.prepareMetadata(newParquet.scala:260)
at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:276)
at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:269)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at
scala.collection.AbstractTraversable.map(Traversable.scala:105) at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:269)
at
org.apache.spark.sql.parquet.ParquetRelation2.<init>(newParquet.scala:391) at
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:98)
at
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:128)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)
at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196) at
org.apache.spark.sql.DataFrame.save(DataFrame.scala:1156) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:28)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43) at
$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45) at
$iwC$$iwC$$iwC$$iwC.<init>(<console>:47) at
$iwC$$iwC$$iwC.<init>(<console>:49) at $iwC$$iwC.<init>(<console>:51) at
$iwC.<init>(<console>:53) at <init>(<console>:55) at .<init>(<console>:59)
at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at
$print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606) at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856) at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) at
org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813) at
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656) at
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664) at
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058) at
org.apache.spark.repl.Main$.main(Main.scala:31) at
org.apache.spark.repl.Main.main(Main.scala) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606) at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
It looks like Spark doesn't know how to handle the UUID type, and as you can
see, the UUID type existed in both top level column, and also in the nested
level.
My question is, giving the version of all the components I current have, is
there any option for me?
Thanks
Yong