This is related: SPARK-10501 On Fri, Oct 9, 2015 at 7:28 AM, java8964 <java8...@hotmail.com> wrote:
> Hi, Sparkers: > > In this case, I want to use Spark as an ETL engine to load the data from > Cassandra, and save it into HDFS. > > Here is the environment specified information: > > Spark 1.3.1 > Cassandra 2.1 > HDFS/Hadoop 2.2 > > I am using the Cassandra Spark Connector 1.3.x, which I have no problem to > query the C* data in the Spark. But I have a problem trying to save the > data into HDFS, like below: > > val df = sqlContext.load("org.apache.spark.sql.cassandra", options = Map( > "c_table" -> "table_name", "keyspace" -> "keyspace_name") > df: org.apache.spark.sql.DataFrame = [account_id: bigint, campaign_id: > uuid, business_info_ids: array<uuid>, closed_date: timestamp, > compliance_hold: boolean, contacts_list_id: uuid, contacts_list_seq: > bigint, currency_type: string, deleted_date: timestamp, discount_info: > map<string,string>, end_date: timestamp, insert_by: string, insert_time: > timestamp, last_update_by: string, last_update_time: timestamp, name: > string, parent_id: uuid, publish_date: timestamp, share_incentive: > map<string,string>, start_date: timestamp, version: int] > > scala> df.count > res12: Long = 757704 > > I can also dump the data output suing df.first, without any problem. > > But when I try to save it: > > scala> df.save("hdfs://location", "parquet") > java.lang.RuntimeException: Unsupported datatype UUIDType > at scala.sys.package$.error(package.scala:27) > at > org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:372) > at > org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:316) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:315) > at > org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:395) > at > org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:394) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:393) > at > org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:440) > at > org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.prepareMetadata(newParquet.scala:260) > at > org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:276) > at > org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:269) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:269) > at > org.apache.spark.sql.parquet.ParquetRelation2.<init>(newParquet.scala:391) > at > org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:98) > at > org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:128) > at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240) > at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196) > at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1156) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:28) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33) > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35) > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37) > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39) > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41) > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43) > at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45) > at $iwC$$iwC$$iwC$$iwC.<init>(<console>:47) > at $iwC$$iwC$$iwC.<init>(<console>:49) > at $iwC$$iwC.<init>(<console>:51) > at $iwC.<init>(<console>:53) > at <init>(<console>:55) > at .<init>(<console>:59) > at .<clinit>(<console>) > at .<init>(<console>:7) > at .<clinit>(<console>) > at $print(<console>) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) > at > org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) > at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) > at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) > at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) > at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856) > at > org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) > at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813) > at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656) > at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664) > at org.apache.spark.repl.SparkILoop.org > $apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669) > at > org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996) > at > org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) > at > org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) > at > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > at org.apache.spark.repl.SparkILoop.org > $apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944) > at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058) > at org.apache.spark.repl.Main$.main(Main.scala:31) > at org.apache.spark.repl.Main.main(Main.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) > at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > It looks like Spark doesn't know how to handle the UUID type, and as you > can see, the UUID type existed in both top level column, and also in the > nested level. > > My question is, giving the version of all the components I current have, > is there any option for me? > > Thanks > > Yong >