Re: DataFrame.write().partitionBy(some_column).parquet(path) produces OutOfMemory with very few items
Hi Nikos, How many columns and distinct values of some_column are there in the DataFrame? Parquet writer is known to be very memory consuming for wide tables. And lots of distinct partition column values result in many concurrent Parquet writers. One possible workaround is to first repartition the data by partition columns first. Cheng On 7/15/15 7:05 PM, Nikos Viorres wrote: Hi, I am trying to test partitioning for DataFrames with parquet usage so i attempted to do df.write().partitionBy(some_column).parquet(path) on a small dataset of 20.000 records which when saved as parquet locally with gzip take 4mb of disk space. However, on my dev machine with -Dspark.master=local[4] -Dspark.executor.memory=2g -Xmx10g this always fails with an OutOfMemoryError. Does anyone have any ideas? stack trace: [Stage 2: (0 + 4) / 8]2015-07-15 13:57:21,021 ERROR Logging$class Exception in task 3.0 in stage 2.0 (TID 8) java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178) at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94) at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:111) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436) at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189) at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org http://org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-07-15 13:57:21,051 ERROR Logging$class Uncaught exception in thread Thread[Executor task launch worker-2,5,main] java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178) at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94) at
Re: DataFrame.write().partitionBy(some_column).parquet(path) produces OutOfMemory with very few items
Ηι Lian, Thank you for the tip. Indeed, there were a lot of distinct values in my result set (approximately 3000). As you suggested i decided to partition the data firstly on a column with much smaller cardinality. Thanks n On Thu, Jul 16, 2015 at 2:09 PM, Cheng Lian lian.cs@gmail.com wrote: Hi Nikos, How many columns and distinct values of some_column are there in the DataFrame? Parquet writer is known to be very memory consuming for wide tables. And lots of distinct partition column values result in many concurrent Parquet writers. One possible workaround is to first repartition the data by partition columns first. Cheng On 7/15/15 7:05 PM, Nikos Viorres wrote: Hi, I am trying to test partitioning for DataFrames with parquet usage so i attempted to do df.write().partitionBy(some_column).parquet(path) on a small dataset of 20.000 records which when saved as parquet locally with gzip take 4mb of disk space. However, on my dev machine with -Dspark.master=local[4] -Dspark.executor.memory=2g -Xmx10g this always fails with an OutOfMemoryError. Does anyone have any ideas? stack trace: [Stage 2: (0 + 4) / 8]2015-07-15 13:57:21,021 ERROR Logging$class Exception in task 3.0 in stage 2.0 (TID 8) java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178) at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94) at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:111) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436) at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189) at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-07-15 13:57:21,051 ERROR Logging$class Uncaught exception in thread Thread[Executor task launch worker-2,5,main] java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at
DataFrame.write().partitionBy(some_column).parquet(path) produces OutOfMemory with very few items
Hi, I am trying to test partitioning for DataFrames with parquet usage so i attempted to do df.write().partitionBy(some_column).parquet(path) on a small dataset of 20.000 records which when saved as parquet locally with gzip take 4mb of disk space. However, on my dev machine with -Dspark.master=local[4] -Dspark.executor.memory=2g -Xmx10g this always fails with an OutOfMemoryError. Does anyone have any ideas? stack trace: [Stage 2: (0 + 4) / 8]2015-07-15 13:57:21,021 ERROR Logging$class Exception in task 3.0 in stage 2.0 (TID 8) java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178) at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94) at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:111) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436) at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189) at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-07-15 13:57:21,051 ERROR Logging$class Uncaught exception in thread Thread[Executor task launch worker-2,5,main] java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178) at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94) at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:111) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244) at