This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase the 
capacity of DirectByteBuffer size by configuring  `-XX:MaxDirectMemorySize` and 
this is a Java opts.

However, we'd better check the length of memory to be allocated,  because  
`-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by default.


发件人: Anil Dasari <adas...@guidewire.com>
日期: 2022年3月2日 星期三 09:45
收件人: "user@spark.apache.org" <user@spark.apache.org>
主题: Spark Parquet write OOM

Hello everyone,

We are writing Spark Data frame to s3 in parquet and it is failing with below 
exception.

I wanted to try following to avoid OOM


  1.  increase the default sql shuffle partitions to reduce load on parquet 
writer tasks to avoid OOM and
  2.  Increase user memory (reduce memory fraction) to have more memory for 
other data structures assuming parquet writer uses user memory.

I am not sure if these fixes the OOM issue. So wanted to reach out community 
for any suggestions. Please let me know.

Exception:

org.apache.spark.SparkException: Task failed while writing rows.
         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
         at org.apache.spark.scheduler.Task.run(Task.scala:123)
         at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
         at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
         at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError
         at sun.misc.Unsafe.allocateMemory(Native Method)
         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
         at 
org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
         at 
org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
         at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
         at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
         at 
org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
         at 
org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
         at 
org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
         at 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
         at 
org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
         at 
org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
         at 
org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
         at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
         at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
         at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
         at 
org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
         at 
org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
         at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
         at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
         at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
         ... 10 more
         Suppressed: java.io.IOException: The file being written is in an 
invalid state. Probably caused by an error thrown previously. Current state: 
BLOCK
                 at 
org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)
                 at 
org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)
                 at 
org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291)

Regards,
Anil

Reply via email to