Re: Spark Executor OOMs when writing Parquet

2020-01-17 Thread Chris Teoh
Yes. Disk spill can be a huge performance hit, with smaller partitions you
may avoid this and possibly complete your job faster. I hope you don't get
OOM.

On Sat, 18 Jan 2020 at 10:06, Arwin Tio  wrote:

> Okay! I didn't realize you can pump those partition numbers up that high.
> 15000 partitions still failed. I am trying 3 partitions now. There is
> still some disk spill but it is not that high.
>
> Thanks,
>
> Arwin
>
> --
> *From:* Chris Teoh 
> *Sent:* January 17, 2020 7:32 PM
> *To:* Arwin Tio 
> *Cc:* user @spark 
> *Subject:* Re: Spark Executor OOMs when writing Parquet
>
> You also have disk spill which is a performance hit.
>
> Try multiplying the number of partitions by about 20x - 40x and see if you
> can eliminate shuffle spill.
>
> On Fri, 17 Jan 2020, 10:37 pm Arwin Tio,  wrote:
>
> Yes, mostly memory spills though (36.9 TiB memory, 895 GiB disk). I was
> under the impression that memory spill is OK?
>
>
> (If you're wondering, this is EMR).
>
> --
> *From:* Chris Teoh 
> *Sent:* January 17, 2020 10:30 AM
> *To:* Arwin Tio 
> *Cc:* user @spark 
> *Subject:* Re: Spark Executor OOMs when writing Parquet
>
> Sounds like you don't have enough partitions. Try and repartition to 14496
> partitions. Are your stages experiencing shuffle spill?
>
> On Fri, 17 Jan 2020, 10:12 pm Arwin Tio,  wrote:
>
> Hello,
>
> I have a fairly straightforward Spark job that converts CSV to Parquet:
>
> ```
> Dataset df = spark.read(...)
>
> df
>   .repartition(5000)
>   .write()
>   .format("parquet")
>   .parquet("s3://mypath/...);
> ```
>
> For context, there are about 5 billion rows, each with 2000 columns. The
> entire dataset is about 1 TB (compressed).
>
> The error looks like this:
>
> ```
>   20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID
> 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13):
> org.apache.spark.SparkException: Task failed while writing rows.
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:123)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.OutOfMemoryError
> at sun.misc.Unsafe.allocateMemory(Native Method)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:127)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> at
> org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
> at
> org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
> at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
> at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
> at
> org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
> at
> org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
> at
> org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
> at
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
> at
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
> at
> org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
> at
> org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
> at
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
> at
> 

Re: Spark Executor OOMs when writing Parquet

2020-01-17 Thread Arwin Tio
Okay! I didn't realize you can pump those partition numbers up that high. 15000 
partitions still failed. I am trying 3 partitions now. There is still some 
disk spill but it is not that high.

Thanks,

Arwin


From: Chris Teoh 
Sent: January 17, 2020 7:32 PM
To: Arwin Tio 
Cc: user @spark 
Subject: Re: Spark Executor OOMs when writing Parquet

You also have disk spill which is a performance hit.

Try multiplying the number of partitions by about 20x - 40x and see if you can 
eliminate shuffle spill.

On Fri, 17 Jan 2020, 10:37 pm Arwin Tio, 
mailto:arwin@hotmail.com>> wrote:
Yes, mostly memory spills though (36.9 TiB memory, 895 GiB disk). I was under 
the impression that memory spill is OK?

[cid:52075a7e-f05d-4d0d-a6e3-0ea4f7cf2c6c]
(If you're wondering, this is EMR).


From: Chris Teoh mailto:chris.t...@gmail.com>>
Sent: January 17, 2020 10:30 AM
To: Arwin Tio mailto:arwin@hotmail.com>>
Cc: user @spark mailto:user@spark.apache.org>>
Subject: Re: Spark Executor OOMs when writing Parquet

Sounds like you don't have enough partitions. Try and repartition to 14496 
partitions. Are your stages experiencing shuffle spill?

On Fri, 17 Jan 2020, 10:12 pm Arwin Tio, 
mailto:arwin@hotmail.com>> wrote:
Hello,

I have a fairly straightforward Spark job that converts CSV to Parquet:

```
Dataset df = spark.read(...)

df
  .repartition(5000)
  .write()
  .format("parquet")
  .parquet("s3://mypath/...);
```

For context, there are about 5 billion rows, each with 2000 columns. The entire 
dataset is about 1 TB (compressed).

The error looks like this:

```
  20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 
24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): 
org.apache.spark.SparkException: Task failed while writing rows.
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError
at sun.misc.Unsafe.allocateMemory(Native Method)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java:127)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at 
org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
at 
org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
at 
org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
at 
org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
at 
org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
at 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
at 
org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
at 
org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
at 
org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
at 
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
at 
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
at 
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetWri