[ https://issues.apache.org/jira/browse/SPARK-8165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
David Berger closed SPARK-8165. ------------------------------- > sqlContext.createDataFrame(dataWithoutHeader, csvSchema) type conversion > error after .cache > ------------------------------------------------------------------------------------------- > > Key: SPARK-8165 > URL: https://issues.apache.org/jira/browse/SPARK-8165 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.3.1 > Reporter: David Berger > Priority: Major > > I create a DF from a simple redshift extract and I use the struct below to > define the schema: > val ophanAdImpressions = StructType(Array( > > StructField("ad_impression_timestamp",StringType,true), > > StructField("page_view_id",StringType,true), > > StructField("ad_impression_key",LongType,true), > > StructField("ad_position",StringType,true) > )) > Before caching the dataframe works fine and I can access the data via the > schema I've defined. > BUT once I cache the dataframe I get conversion errors on any datatype apart > from StringType. > See below: > scala> :load dfp_ophan.scalascript > Loading dfp_ophan.scalascript... > import com.gu.dfp._ > import com.gu.vladetl._ > Unloading from Redshift with Query:'select > ad_impression_timestamp, > rtrim(page_view_id, ophan_visitor_id) as > page_view_id, > ad_impression_key, > ad_position > from vlad.ad_impression_fact > where ad_impression_timestamp > between \'2015-06-07 22:00:00\' > and \'2015-06-07 23:00:00\'' > Creating Dataframe from CSV without header > ophanDF: org.apache.spark.sql.DataFrame = [ad_impression_timestamp: string, > page_view_id: string, ad_impression_key: bigint, ad_position: string] > scala> ophanDF.schema > res0: org.apache.spark.sql.types.StructType = > StructType(StructField(ad_impression_timestamp,StringType,true), > StructField(page_view_id,StringType,true), > StructField(ad_impression_key,LongType,true), > StructField(ad_position,StringType,true)) > scala> ophanDF.count > res2: Long = 1460053 > > > > scala> ophanDF.select('ad_impression_key).show(5) > ad_impression_key > 39470392965 > 39470389269 > 39470389521 > 39470397417 > 39470393217 > scala> ophanDF.cache > res4: ophanDF.type = [ad_impression_timestamp: string, page_view_id: string, > ad_impression_key: bigint, ad_position: string] > scala> ophanDF.select('ad_impression_key).show(5) > 15/06/08 15:27:30 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 86) > java.lang.ClassCastException: java.lang.String cannot be cast to > java.lang.Long > at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110) > at > org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:88) > at > org.apache.spark.sql.columnar.LongColumnStats.gatherStats(ColumnStats.scala:140) > > at > org.apache.spark.sql.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:56) > > at > org.apache.spark.sql.columnar.NativeColumnBuilder.org$apache$spark$sql$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:87) > > at > org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:78) > > at > org.apache.spark.sql.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:87) > > at > org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:141) > > at > org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:117) > > at > org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) > at > org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:745) > 15/06/08 15:27:30 ERROR TaskSetManager: Task 0 in stage 3.0 failed 1 times; > aborting job > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in > stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 > (TID 86, localhost): java.lang.ClassCastException: java.lang.String cannot be > cast to java.lang.Long > at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110) > at > org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:88) > at > org.apache.spark.sql.columnar.LongColumnStats.gatherStats(ColumnStats.scala:140) > > at > org.apache.spark.sql.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:56) > > at > org.apache.spark.sql.columnar.NativeColumnBuilder.org$apache$spark$sql$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:87) > > at > org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:78) > > at > org.apache.spark.sql.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:87) > > at > org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:141) > > at > org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:117) > > at > org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) > at > org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:745) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) > > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) > > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > scala> -- This message was sent by Atlassian JIRA (v7.6.14#76016) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org