On 6/10/15 8:53 PM, Bipin Nag wrote:

Hi Cheng,

I am using Spark 1.3.1 binary available for Hadoop 2.6. I am loading an existing parquet file, then repartitioning and saving it. Doing this gives the error. The code for this doesn't look like causing problem. I have a feeling the source - the existing parquet is the culprit.

I created that parquet using a jdbcrdd (pulled from microsoft sql server). First I saved jdbcrdd as an objectfile on disk. Then loaded it again, made a dataframe from it using a schema then saved it as a parquet.

Following is the code :
For saving jdbcrdd:
 name - fullqualifiedtablename
 pk - string for primarykey
 pklast - last id to pull
    val myRDD = new JdbcRDD( sc, () =>
DriverManager.getConnection(url,username,password) ,
"SELECT * FROM " + name + " WITH (NOLOCK) WHERE ? <= "+pk+" and "+pk+" <= ?",
        1, lastpk, 1, JdbcRDD.resultSetToObjectArray)
    myRDD.saveAsObjectFile("rawdata/"+name);

For applying schema and saving the parquet:
    val myschema = schemamap(name)
val myrdd = sc.objectFile[Array[Object]]("/home/bipin/rawdata/"+name).map(x => org.apache.spark.sql.Row(x:_*))

Have you tried to print out |x| here to check its contents? My guess is that |x| actually contains unit values. For example, the follow Spark shell code can reproduce a similar exception:

|import  org.apache.spark.sql.types._
import  org.apache.spark.sql.Row

val  schema  =  StructType(StructField("dec",DecimalType(10,0)) ::Nil)
val  rdd  =  sc.parallelize(1  to10).map(_ =>Array(())).map(arr =>Row(arr: _*))
val  df  =  sqlContext.createDataFrame(rdd, schema)

df.saveAsParquetFile("file:///tmp/foo")
|

    val actualdata = sqlContext.createDataFrame(myrdd, myschema)
actualdata.saveAsParquetFile("/home/bipin/stageddata/"+name)

Schema structtype can be made manually, though I pull table's metadata and make one. It is a simple string translation (see sql docs <https://msdn.microsoft.com/en-us/library/ms378878%28v=sql.110%29.aspx> and/or spark datatypes <https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#data-types>)

That is how I created the parquet file. Any help to solve the issue is appreciated.
Thanks
Bipin


On 9 June 2015 at 20:44, Cheng Lian <lian.cs....@gmail.com <mailto:lian.cs....@gmail.com>> wrote:

    Would you please provide a snippet that reproduce this issue? What
    version of Spark were you using?

    Cheng

    On 6/9/15 8:18 PM, bipin wrote:

        Hi,
        When I try to save my data frame as a parquet file I get the
        following
        error:

        java.lang.ClassCastException: scala.runtime.BoxedUnit cannot
        be cast to
        org.apache.spark.sql.types.Decimal
                at
        
org.apache.spark.sql.parquet.RowWriteSupport.writePrimitive(ParquetTableSupport.scala:220)
                at
        
org.apache.spark.sql.parquet.RowWriteSupport.writeValue(ParquetTableSupport.scala:192)
                at
        
org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:171)
                at
        
org.apache.spark.sql.parquet.RowWriteSupport.write(ParquetTableSupport.scala:134)
                at
        
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
                at
        parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
                at
        parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
                at
        org.apache.spark.sql.parquet.ParquetRelation2.org
        
<http://org.apache.spark.sql.parquet.ParquetRelation2.org>$apache$spark$sql$parquet$ParquetRelation2$writeShard$1(newParquet.scala:671)
                at
        
org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
                at
        
org.apache.spark.sql.parquet.ParquetRelation2$anonfun$insert$2.apply(newParquet.scala:689)
                at
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
                at org.apache.spark.scheduler.Task.run(Task.scala:64)
                at
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
                at
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
                at
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
                at java.lang.Thread.run(Thread.java:745)

        How to fix this problem ?



        --
        View this message in context:
        
http://apache-spark-user-list.1001560.n3.nabble.com/BigDecimal-problem-in-parquet-file-tp23221.html
        Sent from the Apache Spark User List mailing list archive at
        Nabble.com.

        ---------------------------------------------------------------------
        To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
        <mailto:user-unsubscr...@spark.apache.org>
        For additional commands, e-mail: user-h...@spark.apache.org
        <mailto:user-h...@spark.apache.org>




Reply via email to