[ 
https://issues.apache.org/jira/browse/SPARK-8165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Berger closed SPARK-8165.
-------------------------------

> sqlContext.createDataFrame(dataWithoutHeader, csvSchema) type conversion 
> error after .cache
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-8165
>                 URL: https://issues.apache.org/jira/browse/SPARK-8165
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.3.1
>            Reporter: David Berger
>            Priority: Major
>
> I create a DF from a simple redshift extract and I use the struct below to 
> define the schema: 
>   val ophanAdImpressions = StructType(Array( 
>                                             
> StructField("ad_impression_timestamp",StringType,true), 
>                                             
> StructField("page_view_id",StringType,true), 
>                                             
> StructField("ad_impression_key",LongType,true), 
>                                             
> StructField("ad_position",StringType,true) 
>                                             )) 
> Before caching the dataframe works fine and I can access the data via the 
> schema I've defined. 
> BUT once I cache the dataframe I get conversion errors on any datatype apart 
> from StringType. 
> See below: 
> scala> :load dfp_ophan.scalascript 
> Loading dfp_ophan.scalascript... 
> import com.gu.dfp._ 
> import com.gu.vladetl._ 
> Unloading from Redshift with Query:'select 
>                             ad_impression_timestamp, 
>                             rtrim(page_view_id, ophan_visitor_id) as 
> page_view_id, 
>                             ad_impression_key, 
>                             ad_position 
>                               from  vlad.ad_impression_fact 
>                                 where ad_impression_timestamp 
>                                     between \'2015-06-07 22:00:00\' 
>                                     and \'2015-06-07 23:00:00\'' 
> Creating Dataframe from CSV without header 
> ophanDF: org.apache.spark.sql.DataFrame = [ad_impression_timestamp: string, 
> page_view_id: string, ad_impression_key: bigint, ad_position: string]
> scala> ophanDF.schema
> res0: org.apache.spark.sql.types.StructType = 
> StructType(StructField(ad_impression_timestamp,StringType,true), 
> StructField(page_view_id,StringType,true), 
> StructField(ad_impression_key,LongType,true), 
> StructField(ad_position,StringType,true)) 
> scala> ophanDF.count 
> res2: Long = 1460053                                                          
>                                                                               
>                                                                               
>                              
> scala> ophanDF.select('ad_impression_key).show(5)
> ad_impression_key 
> 39470392965       
> 39470389269       
> 39470389521       
> 39470397417       
> 39470393217       
> scala> ophanDF.cache
> res4: ophanDF.type = [ad_impression_timestamp: string, page_view_id: string, 
> ad_impression_key: bigint, ad_position: string] 
> scala> ophanDF.select('ad_impression_key).show(5)
> 15/06/08 15:27:30 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 86) 
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Long
>         at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110) 
>         at 
> org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:88) 
>         at 
> org.apache.spark.sql.columnar.LongColumnStats.gatherStats(ColumnStats.scala:140)
>  
>         at 
> org.apache.spark.sql.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:56)
>  
>         at 
> org.apache.spark.sql.columnar.NativeColumnBuilder.org$apache$spark$sql$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:87)
>  
>         at 
> org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:78)
>  
>         at 
> org.apache.spark.sql.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:87)
>  
>         at 
> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:141)
>  
>         at 
> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:117)
>  
>         at 
> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) 
>         at 
> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) 
>         at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) 
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) 
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
>         at org.apache.spark.scheduler.Task.run(Task.scala:64) 
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  
>         at java.lang.Thread.run(Thread.java:745) 
> 15/06/08 15:27:30 ERROR TaskSetManager: Task 0 in stage 3.0 failed 1 times; 
> aborting job 
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 
> (TID 86, localhost): java.lang.ClassCastException: java.lang.String cannot be 
> cast to java.lang.Long 
>         at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110) 
>         at 
> org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:88) 
>         at 
> org.apache.spark.sql.columnar.LongColumnStats.gatherStats(ColumnStats.scala:140)
>  
>         at 
> org.apache.spark.sql.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:56)
>  
>         at 
> org.apache.spark.sql.columnar.NativeColumnBuilder.org$apache$spark$sql$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:87)
>  
>         at 
> org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:78)
>  
>         at 
> org.apache.spark.sql.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:87)
>  
>         at 
> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:141)
>  
>         at 
> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:117)
>  
>         at 
> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) 
>         at 
> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) 
>         at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) 
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) 
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
>         at org.apache.spark.scheduler.Task.run(Task.scala:64) 
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  
>         at java.lang.Thread.run(Thread.java:745) 
> Driver stacktrace: 
>         at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>  
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>  
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>  
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
>         at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) 
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>  
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>  
>         at scala.Option.foreach(Option.scala:236) 
>         at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>  
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>  
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>  
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
> scala> 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to