Hello everyone, We are writing Spark Data frame to s3 in parquet and it is failing with below exception.
I wanted to try following to avoid OOM 1. increase the default sql shuffle partitions to reduce load on parquet writer tasks to avoid OOM and 2. Increase user memory (reduce memory fraction) to have more memory for other data structures assuming parquet writer uses user memory. I am not sure if these fixes the OOM issue. So wanted to reach out community for any suggestions. Please let me know. Exception: org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.OutOfMemoryError at sun.misc.Unsafe.allocateMemory(Native Method) at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97) at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48) at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227) at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247) at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405) at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296) at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164) at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95) at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147) at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235) at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122) at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172) at org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148) at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130) at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182) at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248) ... 10 more Suppressed: java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: BLOCK at org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168) at org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160) at org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291) Regards, Anil