Hello everyone,

We are writing Spark Data frame to s3 in parquet and it is failing with below 
exception.

I wanted to try following to avoid OOM


  1.  increase the default sql shuffle partitions to reduce load on parquet 
writer tasks to avoid OOM and
  2.  Increase user memory (reduce memory fraction) to have more memory for 
other data structures assuming parquet writer uses user memory.

I am not sure if these fixes the OOM issue. So wanted to reach out community 
for any suggestions. Please let me know.

Exception:

org.apache.spark.SparkException: Task failed while writing rows.
         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
         at org.apache.spark.scheduler.Task.run(Task.scala:123)
         at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
         at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
         at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError
         at sun.misc.Unsafe.allocateMemory(Native Method)
         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
         at 
org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
         at 
org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
         at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
         at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
         at 
org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
         at 
org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
         at 
org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
         at 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
         at 
org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
         at 
org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
         at 
org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
         at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
         at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
         at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
         at 
org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
         at 
org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
         at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
         at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
         at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
         ... 10 more
         Suppressed: java.io.IOException: The file being written is in an 
invalid state. Probably caused by an error thrown previously. Current state: 
BLOCK
                 at 
org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)
                 at 
org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)
                 at 
org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291)

Regards,
Anil

Reply via email to