I think I found the issue causing it. I was calling schemaRDD.coalesce(n).saveAsParquetFile to reduce the number of partitions in parquet file - in which case the stack trace happens.
If I compress the partitions before creating schemaRDD then the schemaRDD.saveAsParquetFile call works for decimal So it seems schemaRDD.coalesce returns a RDD whose schema does not matches the source RDD in that decimal type seem to get changed. Any thoughts ? Is this a bug ??? Thanks, On Sun, Feb 1, 2015 at 12:26 PM, Manoj Samel <manojsamelt...@gmail.com> wrote: > Spark 1.2 > > SchemaRDD has schema with decimal columns created like > > x1 = new StructField("a", DecimalType(14,4), true) > > x2 = new StructField("b", DecimalType(14,4), true) > > Registering as SQL Temp table and doing SQL queries on these columns , > including SUM etc. works fine, so the schema Decimal does not seems to be > issue > > When doing saveAsParquetFile on the RDD, it gives following error. Not > sure why the "DecimalType" in SchemaRDD is not seen by Parquet, which seems > to see it as scala.math.BigDecimal > > java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to > org.apache.spark.sql.catalyst.types.decimal.Decimal > > at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType( > ParquetTableSupport.scala:359) > > at org.apache.spark.sql.parquet.MutableRowWriteSupport.write( > ParquetTableSupport.scala:328) > > at org.apache.spark.sql.parquet.MutableRowWriteSupport.write( > ParquetTableSupport.scala:314) > > at parquet.hadoop.InternalParquetRecordWriter.write( > InternalParquetRecordWriter.java:120) > > at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) > > at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) > > at org.apache.spark.sql.parquet.InsertIntoParquetTable.org > $apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1( > ParquetTableOperations.scala:308) > > at > org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply( > ParquetTableOperations.scala:325) > > at > org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply( > ParquetTableOperations.scala:325) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > > at org.apache.spark.scheduler.Task.run(Task.scala:56) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1145) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:744) > > > >