Re: DataFrame.write().partitionBy(some_column).parquet(path) produces OutOfMemory with very few items

2015-07-16 Thread Cheng Lian

Hi Nikos,

How many columns and distinct values of some_column are there in the 
DataFrame? Parquet writer is known to be very memory consuming for wide 
tables. And lots of distinct partition column values result in many 
concurrent Parquet writers. One possible workaround is to first 
repartition the data by partition columns first.


Cheng

On 7/15/15 7:05 PM, Nikos Viorres wrote:

Hi,

I am trying to test partitioning for DataFrames with parquet usage so 
i attempted to do df.write().partitionBy(some_column).parquet(path) 
on a small dataset of 20.000 records which when saved as parquet 
locally with gzip take 4mb of disk space.
However, on my dev machine with 
-Dspark.master=local[4] -Dspark.executor.memory=2g -Xmx10g this always 
fails with an OutOfMemoryError.

Does anyone have any ideas?

stack trace:
[Stage 2:  (0 + 4) / 8]2015-07-15 13:57:21,021 ERROR 
Logging$class Exception in task 3.0 in stage 2.0 (TID 8)

java.lang.OutOfMemoryError: Java heap space
at 
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
at 
parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57)
at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68)
at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48)
at 
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
at 
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
at 
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
at 
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178)

at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at 
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
at 
parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94)

at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64)
at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
at 
org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:111)
at 
org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244)
at 
org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441)
at 
org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436)
at 
scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189)

at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)
at 
org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org 
http://org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)
2015-07-15 13:57:21,051 ERROR Logging$class Uncaught exception in 
thread Thread[Executor task launch worker-2,5,main]

java.lang.OutOfMemoryError: Java heap space
at 
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
at 
parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57)
at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68)
at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48)
at 
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
at 
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
at 
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
at 
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178)

at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at 
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
at 
parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94)

at 

Re: DataFrame.write().partitionBy(some_column).parquet(path) produces OutOfMemory with very few items

2015-07-16 Thread Nikos Viorres
Ηι Lian,

Thank you for the tip. Indeed, there were a lot of distinct values in my
result set (approximately 3000). As you suggested i decided to partition
the data firstly on a column with much smaller cardinality.
Thanks

n

On Thu, Jul 16, 2015 at 2:09 PM, Cheng Lian lian.cs@gmail.com wrote:

  Hi Nikos,

 How many columns and distinct values of some_column are there in the
 DataFrame? Parquet writer is known to be very memory consuming for wide
 tables. And lots of distinct partition column values result in many
 concurrent Parquet writers. One possible workaround is to first repartition
 the data by partition columns first.

 Cheng


 On 7/15/15 7:05 PM, Nikos Viorres wrote:

 Hi,

  I am trying to test partitioning for DataFrames with parquet usage so i
 attempted to do df.write().partitionBy(some_column).parquet(path) on a
 small dataset of 20.000 records which when saved as parquet locally with
 gzip take 4mb of disk space.
 However, on my dev machine with
 -Dspark.master=local[4] -Dspark.executor.memory=2g -Xmx10g this always
 fails with an OutOfMemoryError.
 Does anyone have any ideas?

  stack trace:
  [Stage 2:  (0 +
 4) / 8]2015-07-15 13:57:21,021 ERROR Logging$class Exception in task 3.0 in
 stage 2.0 (TID 8)
 java.lang.OutOfMemoryError: Java heap space
  at
 parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
  at
 parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57)
  at
 parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68)
  at
 parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48)
  at
 parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
  at
 parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
  at
 parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
  at
 parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178)
  at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
  at
 parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
  at
 parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94)
  at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64)
  at
 parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
  at
 parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
  at
 org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:111)
  at
 org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244)
  at
 org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441)
  at
 org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436)
  at
 scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189)
  at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)
  at
 org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436)
  at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
 $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227)
  at
 org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)
  at
 org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
  at org.apache.spark.scheduler.Task.run(Task.scala:70)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
  at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
 2015-07-15 13:57:21,051 ERROR Logging$class Uncaught exception in thread
 Thread[Executor task launch worker-2,5,main]
 java.lang.OutOfMemoryError: Java heap space
  at
 parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
  at
 parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57)
  at
 parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68)
  at
 parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48)
  at
 parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
  at
 parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
  at
 

DataFrame.write().partitionBy(some_column).parquet(path) produces OutOfMemory with very few items

2015-07-15 Thread Nikos Viorres
Hi,

I am trying to test partitioning for DataFrames with parquet usage so i
attempted to do df.write().partitionBy(some_column).parquet(path) on a
small dataset of 20.000 records which when saved as parquet locally with
gzip take 4mb of disk space.
However, on my dev machine with
-Dspark.master=local[4] -Dspark.executor.memory=2g -Xmx10g this always
fails with an OutOfMemoryError.
Does anyone have any ideas?

stack trace:
[Stage 2:  (0 + 4)
/ 8]2015-07-15 13:57:21,021 ERROR Logging$class Exception in task 3.0 in
stage 2.0 (TID 8)
java.lang.OutOfMemoryError: Java heap space
at
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
at
parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57)
at
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68)
at
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48)
at
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
at
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
at
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
at
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178)
at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
at
parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94)
at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64)
at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
at
org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:111)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244)
at
org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441)
at
org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436)
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)
at
org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2015-07-15 13:57:21,051 ERROR Logging$class Uncaught exception in thread
Thread[Executor task launch worker-2,5,main]
java.lang.OutOfMemoryError: Java heap space
at
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
at
parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57)
at
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68)
at
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48)
at
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
at
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
at
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
at
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178)
at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
at
parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94)
at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64)
at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
at
org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:111)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244)
at