Yes. Disk spill can be a huge performance hit, with smaller partitions you
may avoid this and possibly complete your job faster. I hope you don't get
OOM.

On Sat, 18 Jan 2020 at 10:06, Arwin Tio <arwin....@hotmail.com> wrote:

> Okay! I didn't realize you can pump those partition numbers up that high.
> 15000 partitions still failed. I am trying 30000 partitions now. There is
> still some disk spill but it is not that high.
>
> Thanks,
>
> Arwin
>
> ------------------------------
> *From:* Chris Teoh <chris.t...@gmail.com>
> *Sent:* January 17, 2020 7:32 PM
> *To:* Arwin Tio <arwin....@hotmail.com>
> *Cc:* user @spark <user@spark.apache.org>
> *Subject:* Re: Spark Executor OOMs when writing Parquet
>
> You also have disk spill which is a performance hit.
>
> Try multiplying the number of partitions by about 20x - 40x and see if you
> can eliminate shuffle spill.
>
> On Fri, 17 Jan 2020, 10:37 pm Arwin Tio, <arwin....@hotmail.com> wrote:
>
> Yes, mostly memory spills though (36.9 TiB memory, 895 GiB disk). I was
> under the impression that memory spill is OK?
>
>
> (If you're wondering, this is EMR).
>
> ------------------------------
> *From:* Chris Teoh <chris.t...@gmail.com>
> *Sent:* January 17, 2020 10:30 AM
> *To:* Arwin Tio <arwin....@hotmail.com>
> *Cc:* user @spark <user@spark.apache.org>
> *Subject:* Re: Spark Executor OOMs when writing Parquet
>
> Sounds like you don't have enough partitions. Try and repartition to 14496
> partitions. Are your stages experiencing shuffle spill?
>
> On Fri, 17 Jan 2020, 10:12 pm Arwin Tio, <arwin....@hotmail.com> wrote:
>
> Hello,
>
> I have a fairly straightforward Spark job that converts CSV to Parquet:
>
> ```
> Dataset<Row> df = spark.read(...)
>
> df
>   .repartition(5000)
>   .write()
>   .format("parquet")
>   .parquet("s3://mypath/...);
> ```
>
> For context, there are about 5 billion rows, each with 2000 columns. The
> entire dataset is about 1 TB (compressed).
>
> The error looks like this:
>
> ```
>   20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID
> 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13):
> org.apache.spark.SparkException: Task failed while writing rows.
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>         at org.apache.spark.scheduler.Task.run(Task.scala:123)
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>         at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.OutOfMemoryError
>         at sun.misc.Unsafe.allocateMemory(Native Method)
>         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
>         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>         at
> org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
>         at
> org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
>         at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
>         at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
>         at
> org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
>         at
> org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
>         at
> org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
>         at
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
>         at
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>         at
> org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
>         at
> org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
>         at
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
>         at
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
>         at
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
>         at
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
>         at
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113)
>         at
> org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:50)
>         at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
>         at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
>         at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
>         at
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
>         at
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
>         at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
>         at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
>         ... 10 more
> ```
>
> Can anybody give me some pointers on how to tune Spark to avoid this? I am
> using giant machines (r5d.12xlarge) with 384GB of memory. My executors have
> about 350GB of memory (-Xmx350000m). Could it be that my heap is too large?
>
> Another issue I found was
> https://issues.apache.org/jira/browse/PARQUET-222, which seems to suggest
> that too many columns could be a problem. I don't want to throw any columns
> though.
>
> One more question, why does saving as Parquet create 4 different stages in
> Spark? You can see in the picture that there are 4 different stages, all at
> "save at LoglineParquetGenerator.java:241". I am not sure how to interpret
> these stages:
>
>
> Thanks,
>
> Arwin
>
>

-- 
Chris

Reply via email to