Hello again, steps to reproduce the same problem in JdbcRDD:
- create a table containig Date field in your favourite DBMS, I used PostgreSQL: CREATE TABLE spark_test ( pk_spark_test integer NOT NULL, text character varying(25), date1 date, CONSTRAINT pk PRIMARY KEY (pk_spark_test) ) WITH ( OIDS=FALSE ); ALTER TABLE spark_test OWNER TO postgres; GRANT ALL ON TABLE spark_test TO postgres; GRANT ALL ON TABLE spark_test TO public; - fill it with data: insert into spark_test(pk_spark_test, text, date1) values (1, 'one', '2014-04-01') insert into spark_test(pk_spark_test, text, date1) values (2, 'two', '2014-04-02') - from scala REPL, try the following: import org.apache.spark.sql.SQLContext val sqc = new SQLContext(sc) sqc.jdbc("jdbc:postgresql://localhost:5432/ebx_repository?schema=ebx_repository&user=abc&password=def", "spark_test").cache.registerTempTable("spark_test") // don’t forget the cache method sqc.sql("select * from spark_test").foreach(println) the last command will produce the following error (if you don’t use cache, it will produce correct results as expected): 11:50:27.306 [Executor task launch worker-0] ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 0.0 (TID 0) java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.MutableAny cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableInt at org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.getInt(SpecificMutableRow.scala:248) ~[spark-catalyst_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.IntColumnStats.gatherStats(ColumnStats.scala:191) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:56) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.NativeColumnBuilder.org$apache$spark$sql$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:87) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:78) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:87) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:135) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:111) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[spark-core_2.11-1.3.0.jar:1.3.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_11] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_11] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_11] 11:50:27.318 [task-result-getter-0] WARN o.a.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.MutableAny cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableInt at org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.getInt(SpecificMutableRow.scala:248) at org.apache.spark.sql.columnar.IntColumnStats.gatherStats(ColumnStats.scala:191) at org.apache.spark.sql.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:56) at org.apache.spark.sql.columnar.NativeColumnBuilder.org$apache$spark$sql$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:87) at org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:78) at org.apache.spark.sql.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:87) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:135) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:111) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 11:50:27.320 [task-result-getter-0] ERROR o.a.spark.scheduler.TaskSetManager - Task 0 in stage 0.0 failed 1 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.MutableAny cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableInt at org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.getInt(SpecificMutableRow.scala:248) at org.apache.spark.sql.columnar.IntColumnStats.gatherStats(ColumnStats.scala:191) at org.apache.spark.sql.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:56) at org.apache.spark.sql.columnar.NativeColumnBuilder.org$apache$spark$sql$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:87) at org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:78) at org.apache.spark.sql.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:87) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:135) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:111) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) From: Krist Rastislav Sent: Friday, April 17, 2015 10:31 AM To: 'Wang, Daoyuan'; Michael Armbrust Cc: user Subject: RE: ClassCastException processing date fields using spark SQL since 1.3.0 Hi, this is OK if there is a fixed structure being used as schema in dataframe. But in my case, the schema used dataframe is build dynamically from metadata provided by REST service (and is thus unknown during compilation) so I have no case class to be used as schema. Thanks, R.Krist From: Wang, Daoyuan [mailto:daoyuan.w...@intel.com] Sent: Friday, April 17, 2015 10:23 AM To: Krist Rastislav; Michael Armbrust Cc: user Subject: RE: ClassCastException processing date fields using spark SQL since 1.3.0 Normally I use like the following in scala: >case calss datetest (x: Int, y:java.sql.Date) >val dt = sc.parallelize(1 to 3).map(p => datetest(p, new >java.sql.Date(p*1000*60*60*24))) >sqlContext.createDataFrame(dt).registerTempTable(“t1”) >sql(“select * from t1”).collect.foreach(println) If you still meets exceptions, please let me know about your query. The implicit conversion should be driven when you call createDataFrame Thanks, Daoyuan From: Krist Rastislav [mailto:rkr...@vub.sk] Sent: Friday, April 17, 2015 3:52 PM To: Wang, Daoyuan; Michael Armbrust Cc: user Subject: RE: ClassCastException processing date fields using spark SQL since 1.3.0 Hello, thank You for Your answer – I am creating the DataFrames manually using org.apache.spark.sql.SQLContext#createDataFrame. RDD is my custom implementation encapsulating invocation of a remote REST-based web service and schema is created programatically upon metadata (obtained from the same WS). So in other words, the creation of Rows in DataFrame is fully under my control and the implicit conversion thus cannot occur. Is there any best practice (ideally a utility method) of creating Row instance from a set of values of types represented by DataFrame schema? I will try to take a deeper look into Your source code to locate the definition of the implicit conversion, but maybe some hint from Your side could help deliver a better implementation. Thank You very much for Your help (and for the great work you are doing there). Regards R.Krist From: Wang, Daoyuan [mailto:daoyuan.w...@intel.com] Sent: Friday, April 17, 2015 5:08 AM To: Michael Armbrust; Krist Rastislav Cc: user Subject: RE: ClassCastException processing date fields using spark SQL since 1.3.0 The conversion between date and int should be automatically handled by Implicit conversion. So we are accepting date types externally, and represented as integer internally. From: Wang, Daoyuan Sent: Friday, April 17, 2015 11:00 AM To: 'Michael Armbrust'; rkrist Cc: user Subject: RE: ClassCastException processing date fields using spark SQL since 1.3.0 Can you tell us how did you create the dataframe? From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Friday, April 17, 2015 2:52 AM To: rkrist Cc: user Subject: Re: ClassCastException processing date fields using spark SQL since 1.3.0 Filed: https://issues.apache.org/jira/browse/SPARK-6967 Shouldn't they be null? Statistics are only used to eliminate partitions that can't possibly hold matching values. So while you are right this might result in a false positive, that will not result in a wrong answer. ________________________________ Informacie, ktore su obsahom tejto spravy elektronickej posty a vsetky pripojene subory a prilohy su doverne a su/mozu byt obchodnym a/alebo bankovym tajomstvom alebo su/mozu byt pravne chranene podla inych pravnych predpisov. Pre blizsie informacie navstivte, prosim, www.vub.sk/legalcaution<http://www.vub.sk/legalcaution>. The information contained in this electronic mail message and any files and attachments transmitted are confidential and are/may be a trade and/or bank secret or are/may be legally privileged under other legal regulations. For further information, please, visit www.vub.sk/legalcaution<http://www.vub.sk/legalcaution>. VUB, a.s., Mlynske nivy 1, 829 90 Bratislava 25, Slovenska republika ________________________________ Pred vytlacenim e-mailu prosim zvazte dopad na zivotne prostredie. Before printing this e-mail, think about the impact on the environment. ________________________________ Informacie, ktore su obsahom tejto spravy elektronickej posty a vsetky pripojene subory a prilohy su doverne a su/mozu byt obchodnym a/alebo bankovym tajomstvom alebo su/mozu byt pravne chranene podla inych pravnych predpisov. Pre blizsie informacie navstivte, prosim, www.vub.sk/legalcaution. The information contained in this electronic mail message and any files and attachments transmitted are confidential and are/may be a trade and/or bank secret or are/may be legally privileged under other legal regulations. For further information, please, visit www.vub.sk/legalcaution. VUB, a.s., Mlynske nivy 1, 829 90 Bratislava 25, Slovenska republika ________________________________ Pred vytlacenim e-mailu prosim zvazte dopad na zivotne prostredie. Before printing this e-mail, think about the impact on the environment.