[ https://issues.apache.org/jira/browse/SPARK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-8450: ----------------------------------- Assignee: (was: Apache Spark) > PySpark write.parquet raises Unsupported datatype DecimalType() > --------------------------------------------------------------- > > Key: SPARK-8450 > URL: https://issues.apache.org/jira/browse/SPARK-8450 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL > Environment: Spark 1.4.0 on Debian > Reporter: Peter Hoffmann > > I'm getting an Exception when I try to save a DataFrame with a DeciamlType as > an parquet file > Minimal Example: > from decimal import Decimal > from pyspark.sql import SQLContext > from pyspark.sql.types import * > sqlContext = SQLContext(sc) > schema = StructType([ > StructField('id', LongType()), > StructField('value', DecimalType())]) > rdd = sc.parallelize([[1, Decimal("0.5")],[2, Decimal("2.9")]]) > df = sqlContext.createDataFrame(rdd, schema) > df.write.parquet("hdfs://srv:9000/user/ph/decimal.parquet", 'overwrite') > Stack Trace > --------------------------------------------------------------------------- > Py4JJavaError Traceback (most recent call last) > <ipython-input-19-a77dac8de5f3> in <module>() > ----> 1 sr.write.parquet("hdfs://srv:9000/user/ph/decimal.parquet", > 'overwrite') > /home/spark/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/readwriter.pyc in > parquet(self, path, mode) > 367 :param mode: one of `append`, `overwrite`, `error`, `ignore` > (default: error) > 368 """ > --> 369 return self._jwrite.mode(mode).parquet(path) > 370 > 371 @since(1.4) > /home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py > in __call__(self, *args) > 536 answer = self.gateway_client.send_command(command) > 537 return_value = get_return_value(answer, self.gateway_client, > --> 538 self.target_id, self.name) > 539 > 540 for temp_arg in temp_args: > /home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py > in get_return_value(answer, gateway_client, target_id, name) > 298 raise Py4JJavaError( > 299 'An error occurred while calling {0}{1}{2}.\n'. > --> 300 format(target_id, '.', name), value) > 301 else: > 302 raise Py4JError( > Py4JJavaError: An error occurred while calling o361.parquet. > : org.apache.spark.SparkException: Job aborted. > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insert(commands.scala:138) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:114) > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) > at > org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87) > at > org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:939) > at > org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:939) > at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:332) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135) > at > org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:281) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) > at py4j.Gateway.invoke(Gateway.java:259) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:207) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 158 in stage 35.0 failed 4 times, most recent failure: Lost task 158.3 > in stage 35.0 (TID 2736, 10.2.160.14): java.lang.RuntimeException: > Unsupported datatype DecimalType() > at scala.sys.package$.error(package.scala:27) > at > org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:374) > at > org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:318) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:317) > at > org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:398) > at > org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:397) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:396) > at > org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:150) > at > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278) > at > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) > at > org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111) > at > org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244) > at > org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:386) > at > org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:298) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:142) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > I also tried to set the precision < 18 > schema = StructType([ > StructField('id', LongType()), > StructField('value', DecimalType(16,2))]) > which raises a different exception > --------------------------------------------------------------------------- > Py4JJavaError Traceback (most recent call last) > <ipython-input-23-bba70b7c0805> in <module>() > ----> 1 df.write.parquet("hdfs://srv:9000/user/ph/decimal.parquet", > 'overwrite') > /home/spark/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/readwriter.pyc in > parquet(self, path, mode) > 367 :param mode: one of `append`, `overwrite`, `error`, `ignore` > (default: error) > 368 """ > --> 369 return self._jwrite.mode(mode).parquet(path) > 370 > 371 @since(1.4) > /home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py > in __call__(self, *args) > 536 answer = self.gateway_client.send_command(command) > 537 return_value = get_return_value(answer, self.gateway_client, > --> 538 self.target_id, self.name) > 539 > 540 for temp_arg in temp_args: > /home/spark/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py > in get_return_value(answer, gateway_client, target_id, name) > 298 raise Py4JJavaError( > 299 'An error occurred while calling {0}{1}{2}.\n'. > --> 300 format(target_id, '.', name), value) > 301 else: > 302 raise Py4JError( > Py4JJavaError: An error occurred while calling o417.parquet. > : org.apache.spark.SparkException: Job aborted. > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insert(commands.scala:138) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:114) > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) > at > org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87) > at > org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:939) > at > org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:939) > at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:332) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135) > at > org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:281) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) > at py4j.Gateway.invoke(Gateway.java:259) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:207) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 159 in stage 41.0 failed 4 times, most recent failure: Lost task 159.3 > in stage 41.0 (TID 3211, 10.2.160.14): org.apache.spark.SparkException: Task > failed while writing rows. > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot be cast > to org.apache.spark.sql.types.Decimal > at > org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:365) > at > org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:335) > at > org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:321) > at > parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120) > at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) > at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) > at > org.apache.spark.sql.parquet.ParquetOutputWriter.write(newParquet.scala:114) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:154) > ... 8 more > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > The corresponding Scala Version works > import org.apache.spark.SparkContext > import org.apache.spark.sql.{ Row, SQLContext } > import org.apache.spark.sql.types.{ DecimalType, IntegerType, StructType, > StructField } > > object ParquetDecimal { > def main(args: Array[String]) { > // Connect to Spark > val sc = new SparkContext() > val sqlContext = new SQLContext(sc) > > val schema = StructType(Seq(StructField("id", IntegerType), > StructField("value", DecimalType(16, 2)))) > val rows = sc.parallelize(Seq(Row(1, BigDecimal("0.9")), Row(2, > BigDecimal("2.9")))) > val df = sqlContext.createDataFrame(rows, schema) > df.write.parquet("test.parquet") > } > } -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org