I think I found the issue causing it.

I was calling schemaRDD.coalesce(n).saveAsParquetFile to reduce the number
of partitions in parquet file - in which case the stack trace happens.

If I compress the partitions before creating schemaRDD then the
schemaRDD.saveAsParquetFile call works for decimal

So it seems schemaRDD.coalesce returns a RDD whose schema does not matches
the source RDD in that decimal type seem to get changed.

Any thoughts ? Is this a bug ???

Thanks,


On Sun, Feb 1, 2015 at 12:26 PM, Manoj Samel <manojsamelt...@gmail.com>
wrote:

> Spark 1.2
>
> SchemaRDD has schema with decimal columns created like
>
> x1 = new StructField("a", DecimalType(14,4), true)
>
> x2 = new StructField("b", DecimalType(14,4), true)
>
> Registering as SQL Temp table and doing SQL queries on these columns ,
> including SUM etc. works fine, so the schema Decimal does not seems to be
> issue
>
> When doing saveAsParquetFile on the RDD, it gives following error. Not
> sure why the "DecimalType" in SchemaRDD is not seen by Parquet, which seems
> to see it as scala.math.BigDecimal
>
> java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to
> org.apache.spark.sql.catalyst.types.decimal.Decimal
>
> at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(
> ParquetTableSupport.scala:359)
>
> at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(
> ParquetTableSupport.scala:328)
>
> at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(
> ParquetTableSupport.scala:314)
>
> at parquet.hadoop.InternalParquetRecordWriter.write(
> InternalParquetRecordWriter.java:120)
>
> at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
>
> at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
>
> at org.apache.spark.sql.parquet.InsertIntoParquetTable.org
> $apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(
> ParquetTableOperations.scala:308)
>
> at
> org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(
> ParquetTableOperations.scala:325)
>
> at
> org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(
> ParquetTableOperations.scala:325)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1145)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:615)
>
>  at java.lang.Thread.run(Thread.java:744)
>
>
>
>

Reply via email to