[ 
https://issues.apache.org/jira/browse/SPARK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davies Liu updated SPARK-8450:
------------------------------
    Description: 
I'm getting an Exception when I try to save a DataFrame with a DeciamlType as 
an parquet file

Minimal Example:
{code}
from decimal import Decimal
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sqlContext = SQLContext(sc)
schema = StructType([
    StructField('id', LongType()),
    StructField('value', DecimalType())])
rdd = sc.parallelize([[1, Decimal("0.5")],[2, Decimal("2.9")]])
df = sqlContext.createDataFrame(rdd, schema)
df.write.parquet("hdfs://srv:9000/user/ph/decimal.parquet", 'overwrite')

{code}

Stack Trace
{code}
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-19-a77dac8de5f3> in <module>()
----> 1 sr.write.parquet("hdfs://srv:9000/user/ph/decimal.parquet", 'overwrite')

/home/spark/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/readwriter.pyc in 
parquet(self, path, mode)
    367         :param mode: one of `append`, `overwrite`, `error`, `ignore` 
(default: error)
    368         """
--> 369         return self._jwrite.mode(mode).parquet(path)
    370 
    371     @since(1.4)

/home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o361.parquet.
: org.apache.spark.SparkException: Job aborted.
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insert(commands.scala:138)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:114)
        at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
        at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
        at 
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
        at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:939)
        at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:939)
        at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:332)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135)
        at 
org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:281)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 158 in stage 35.0 failed 4 times, most recent failure: Lost task 158.3 in 
stage 35.0 (TID 2736, 10.2.160.14): java.lang.RuntimeException: Unsupported 
datatype DecimalType()
        at scala.sys.package$.error(package.scala:27)
        at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:374)
        at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:318)
        at scala.Option.getOrElse(Option.scala:120)
        at 
org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:317)
        at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:398)
        at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:397)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at 
org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:396)
        at 
org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:150)
        at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)
        at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
        at 
org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111)
        at 
org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244)
        at 
org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:386)
        at 
org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:298)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:142)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
{code}

I also tried to set the precision < 18
{code}
schema = StructType([
    StructField('id', LongType()),
    StructField('value', DecimalType(16,2))])
{code}

which raises a different exception
{code}
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-23-bba70b7c0805> in <module>()
----> 1 df.write.parquet("hdfs://srv:9000/user/ph/decimal.parquet", 'overwrite')

/home/spark/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/readwriter.pyc in 
parquet(self, path, mode)
    367         :param mode: one of `append`, `overwrite`, `error`, `ignore` 
(default: error)
    368         """
--> 369         return self._jwrite.mode(mode).parquet(path)
    370 
    371     @since(1.4)

/home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o417.parquet.
: org.apache.spark.SparkException: Job aborted.
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insert(commands.scala:138)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:114)
        at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
        at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
        at 
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
        at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:939)
        at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:939)
        at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:332)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135)
        at 
org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:281)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 159 in stage 41.0 failed 4 times, most recent failure: Lost task 159.3 in 
stage 41.0 (TID 3211, 10.2.160.14): org.apache.spark.SparkException: Task 
failed while writing rows.
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot be cast to 
org.apache.spark.sql.types.Decimal
        at 
org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:365)
        at 
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:335)
        at 
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:321)
        at 
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
        at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
        at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
        at 
org.apache.spark.sql.parquet.ParquetOutputWriter.write(newParquet.scala:114)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:154)
        ... 8 more

Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

{code}
The corresponding Scala Version works

{code}
import org.apache.spark.SparkContext
import org.apache.spark.sql.{ Row, SQLContext }
import org.apache.spark.sql.types.{ DecimalType, IntegerType, StructType, 
StructField }
 
object ParquetDecimal {
  def main(args: Array[String]) {
    // Connect to Spark
    val sc = new SparkContext()
    val sqlContext = new SQLContext(sc)
 
    val schema = StructType(Seq(StructField("id", IntegerType), 
StructField("value", DecimalType(16, 2))))
    val rows = sc.parallelize(Seq(Row(1, BigDecimal("0.9")), Row(2, 
BigDecimal("2.9"))))
    val df = sqlContext.createDataFrame(rows, schema)
    df.write.parquet("test.parquet")
  }
}
{code}

  was:
I'm getting an Exception when I try to save a DataFrame with a DeciamlType as 
an parquet file

Minimal Example:

from decimal import Decimal
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sqlContext = SQLContext(sc)
schema = StructType([
    StructField('id', LongType()),
    StructField('value', DecimalType())])
rdd = sc.parallelize([[1, Decimal("0.5")],[2, Decimal("2.9")]])
df = sqlContext.createDataFrame(rdd, schema)
df.write.parquet("hdfs://srv:9000/user/ph/decimal.parquet", 'overwrite')

Stack Trace

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-19-a77dac8de5f3> in <module>()
----> 1 sr.write.parquet("hdfs://srv:9000/user/ph/decimal.parquet", 'overwrite')

/home/spark/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/readwriter.pyc in 
parquet(self, path, mode)
    367         :param mode: one of `append`, `overwrite`, `error`, `ignore` 
(default: error)
    368         """
--> 369         return self._jwrite.mode(mode).parquet(path)
    370 
    371     @since(1.4)

/home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o361.parquet.
: org.apache.spark.SparkException: Job aborted.
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insert(commands.scala:138)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:114)
        at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
        at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
        at 
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
        at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:939)
        at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:939)
        at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:332)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135)
        at 
org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:281)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 158 in stage 35.0 failed 4 times, most recent failure: Lost task 158.3 in 
stage 35.0 (TID 2736, 10.2.160.14): java.lang.RuntimeException: Unsupported 
datatype DecimalType()
        at scala.sys.package$.error(package.scala:27)
        at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:374)
        at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:318)
        at scala.Option.getOrElse(Option.scala:120)
        at 
org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:317)
        at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:398)
        at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:397)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at 
org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:396)
        at 
org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:150)
        at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)
        at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
        at 
org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111)
        at 
org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244)
        at 
org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:386)
        at 
org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:298)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:142)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


I also tried to set the precision < 18

schema = StructType([
    StructField('id', LongType()),
    StructField('value', DecimalType(16,2))])


which raises a different exception

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-23-bba70b7c0805> in <module>()
----> 1 df.write.parquet("hdfs://srv:9000/user/ph/decimal.parquet", 'overwrite')

/home/spark/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/readwriter.pyc in 
parquet(self, path, mode)
    367         :param mode: one of `append`, `overwrite`, `error`, `ignore` 
(default: error)
    368         """
--> 369         return self._jwrite.mode(mode).parquet(path)
    370 
    371     @since(1.4)

/home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o417.parquet.
: org.apache.spark.SparkException: Job aborted.
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insert(commands.scala:138)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:114)
        at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
        at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
        at 
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
        at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:939)
        at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:939)
        at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:332)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135)
        at 
org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:281)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 159 in stage 41.0 failed 4 times, most recent failure: Lost task 159.3 in 
stage 41.0 (TID 3211, 10.2.160.14): org.apache.spark.SparkException: Task 
failed while writing rows.
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot be cast to 
org.apache.spark.sql.types.Decimal
        at 
org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:365)
        at 
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:335)
        at 
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:321)
        at 
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
        at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
        at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
        at 
org.apache.spark.sql.parquet.ParquetOutputWriter.write(newParquet.scala:114)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:154)
        ... 8 more

Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


The corresponding Scala Version works

import org.apache.spark.SparkContext
import org.apache.spark.sql.{ Row, SQLContext }
import org.apache.spark.sql.types.{ DecimalType, IntegerType, StructType, 
StructField }
 
object ParquetDecimal {
  def main(args: Array[String]) {
    // Connect to Spark
    val sc = new SparkContext()
    val sqlContext = new SQLContext(sc)
 
    val schema = StructType(Seq(StructField("id", IntegerType), 
StructField("value", DecimalType(16, 2))))
    val rows = sc.parallelize(Seq(Row(1, BigDecimal("0.9")), Row(2, 
BigDecimal("2.9"))))
    val df = sqlContext.createDataFrame(rows, schema)
    df.write.parquet("test.parquet")
  }
}



> PySpark write.parquet raises Unsupported datatype DecimalType()
> ---------------------------------------------------------------
>
>                 Key: SPARK-8450
>                 URL: https://issues.apache.org/jira/browse/SPARK-8450
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>         Environment: Spark 1.4.0 on Debian
>            Reporter: Peter Hoffmann
>
> I'm getting an Exception when I try to save a DataFrame with a DeciamlType as 
> an parquet file
> Minimal Example:
> {code}
> from decimal import Decimal
> from pyspark.sql import SQLContext
> from pyspark.sql.types import *
> sqlContext = SQLContext(sc)
> schema = StructType([
>     StructField('id', LongType()),
>     StructField('value', DecimalType())])
> rdd = sc.parallelize([[1, Decimal("0.5")],[2, Decimal("2.9")]])
> df = sqlContext.createDataFrame(rdd, schema)
> df.write.parquet("hdfs://srv:9000/user/ph/decimal.parquet", 'overwrite')
> {code}
> Stack Trace
> {code}
> ---------------------------------------------------------------------------
> Py4JJavaError                             Traceback (most recent call last)
> <ipython-input-19-a77dac8de5f3> in <module>()
> ----> 1 sr.write.parquet("hdfs://srv:9000/user/ph/decimal.parquet", 
> 'overwrite')
> /home/spark/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/readwriter.pyc in 
> parquet(self, path, mode)
>     367         :param mode: one of `append`, `overwrite`, `error`, `ignore` 
> (default: error)
>     368         """
> --> 369         return self._jwrite.mode(mode).parquet(path)
>     370 
>     371     @since(1.4)
> /home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
>  in __call__(self, *args)
>     536         answer = self.gateway_client.send_command(command)
>     537         return_value = get_return_value(answer, self.gateway_client,
> --> 538                 self.target_id, self.name)
>     539 
>     540         for temp_arg in temp_args:
> /home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
>  in get_return_value(answer, gateway_client, target_id, name)
>     298                 raise Py4JJavaError(
>     299                     'An error occurred while calling {0}{1}{2}.\n'.
> --> 300                     format(target_id, '.', name), value)
>     301             else:
>     302                 raise Py4JError(
> Py4JJavaError: An error occurred while calling o361.parquet.
> : org.apache.spark.SparkException: Job aborted.
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insert(commands.scala:138)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:114)
>       at 
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
>       at 
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
>       at 
> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
>       at 
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:939)
>       at 
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:939)
>       at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:332)
>       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
>       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135)
>       at 
> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:281)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>       at py4j.Gateway.invoke(Gateway.java:259)
>       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>       at py4j.commands.CallCommand.execute(CallCommand.java:79)
>       at py4j.GatewayConnection.run(GatewayConnection.java:207)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 158 in stage 35.0 failed 4 times, most recent failure: Lost task 158.3 
> in stage 35.0 (TID 2736, 10.2.160.14): java.lang.RuntimeException: 
> Unsupported datatype DecimalType()
>       at scala.sys.package$.error(package.scala:27)
>       at 
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:374)
>       at 
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:318)
>       at scala.Option.getOrElse(Option.scala:120)
>       at 
> org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:317)
>       at 
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:398)
>       at 
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:397)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>       at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>       at 
> org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:396)
>       at 
> org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:150)
>       at 
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)
>       at 
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
>       at 
> org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111)
>       at 
> org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244)
>       at 
> org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:386)
>       at 
> org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:298)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:142)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>       at org.apache.spark.scheduler.Task.run(Task.scala:70)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
>       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> I also tried to set the precision < 18
> {code}
> schema = StructType([
>     StructField('id', LongType()),
>     StructField('value', DecimalType(16,2))])
> {code}
> which raises a different exception
> {code}
> ---------------------------------------------------------------------------
> Py4JJavaError                             Traceback (most recent call last)
> <ipython-input-23-bba70b7c0805> in <module>()
> ----> 1 df.write.parquet("hdfs://srv:9000/user/ph/decimal.parquet", 
> 'overwrite')
> /home/spark/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/readwriter.pyc in 
> parquet(self, path, mode)
>     367         :param mode: one of `append`, `overwrite`, `error`, `ignore` 
> (default: error)
>     368         """
> --> 369         return self._jwrite.mode(mode).parquet(path)
>     370 
>     371     @since(1.4)
> /home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
>  in __call__(self, *args)
>     536         answer = self.gateway_client.send_command(command)
>     537         return_value = get_return_value(answer, self.gateway_client,
> --> 538                 self.target_id, self.name)
>     539 
>     540         for temp_arg in temp_args:
> /home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
>  in get_return_value(answer, gateway_client, target_id, name)
>     298                 raise Py4JJavaError(
>     299                     'An error occurred while calling {0}{1}{2}.\n'.
> --> 300                     format(target_id, '.', name), value)
>     301             else:
>     302                 raise Py4JError(
> Py4JJavaError: An error occurred while calling o417.parquet.
> : org.apache.spark.SparkException: Job aborted.
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insert(commands.scala:138)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:114)
>       at 
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
>       at 
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
>       at 
> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
>       at 
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:939)
>       at 
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:939)
>       at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:332)
>       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
>       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135)
>       at 
> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:281)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>       at py4j.Gateway.invoke(Gateway.java:259)
>       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>       at py4j.commands.CallCommand.execute(CallCommand.java:79)
>       at py4j.GatewayConnection.run(GatewayConnection.java:207)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 159 in stage 41.0 failed 4 times, most recent failure: Lost task 159.3 
> in stage 41.0 (TID 3211, 10.2.160.14): org.apache.spark.SparkException: Task 
> failed while writing rows.
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>       at org.apache.spark.scheduler.Task.run(Task.scala:70)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot be cast 
> to org.apache.spark.sql.types.Decimal
>       at 
> org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:365)
>       at 
> org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:335)
>       at 
> org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:321)
>       at 
> parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
>       at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
>       at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
>       at 
> org.apache.spark.sql.parquet.ParquetOutputWriter.write(newParquet.scala:114)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:154)
>       ... 8 more
> Driver stacktrace:
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
>       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> The corresponding Scala Version works
> {code}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.{ Row, SQLContext }
> import org.apache.spark.sql.types.{ DecimalType, IntegerType, StructType, 
> StructField }
>  
> object ParquetDecimal {
>   def main(args: Array[String]) {
>     // Connect to Spark
>     val sc = new SparkContext()
>     val sqlContext = new SQLContext(sc)
>  
>     val schema = StructType(Seq(StructField("id", IntegerType), 
> StructField("value", DecimalType(16, 2))))
>     val rows = sc.parallelize(Seq(Row(1, BigDecimal("0.9")), Row(2, 
> BigDecimal("2.9"))))
>     val df = sqlContext.createDataFrame(rows, schema)
>     df.write.parquet("test.parquet")
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to