Yes. Disk spill can be a huge performance hit, with smaller partitions you may avoid this and possibly complete your job faster. I hope you don't get OOM.
On Sat, 18 Jan 2020 at 10:06, Arwin Tio <arwin....@hotmail.com> wrote: > Okay! I didn't realize you can pump those partition numbers up that high. > 15000 partitions still failed. I am trying 30000 partitions now. There is > still some disk spill but it is not that high. > > Thanks, > > Arwin > > ------------------------------ > *From:* Chris Teoh <chris.t...@gmail.com> > *Sent:* January 17, 2020 7:32 PM > *To:* Arwin Tio <arwin....@hotmail.com> > *Cc:* user @spark <user@spark.apache.org> > *Subject:* Re: Spark Executor OOMs when writing Parquet > > You also have disk spill which is a performance hit. > > Try multiplying the number of partitions by about 20x - 40x and see if you > can eliminate shuffle spill. > > On Fri, 17 Jan 2020, 10:37 pm Arwin Tio, <arwin....@hotmail.com> wrote: > > Yes, mostly memory spills though (36.9 TiB memory, 895 GiB disk). I was > under the impression that memory spill is OK? > > > (If you're wondering, this is EMR). > > ------------------------------ > *From:* Chris Teoh <chris.t...@gmail.com> > *Sent:* January 17, 2020 10:30 AM > *To:* Arwin Tio <arwin....@hotmail.com> > *Cc:* user @spark <user@spark.apache.org> > *Subject:* Re: Spark Executor OOMs when writing Parquet > > Sounds like you don't have enough partitions. Try and repartition to 14496 > partitions. Are your stages experiencing shuffle spill? > > On Fri, 17 Jan 2020, 10:12 pm Arwin Tio, <arwin....@hotmail.com> wrote: > > Hello, > > I have a fairly straightforward Spark job that converts CSV to Parquet: > > ``` > Dataset<Row> df = spark.read(...) > > df > .repartition(5000) > .write() > .format("parquet") > .parquet("s3://mypath/...); > ``` > > For context, there are about 5 billion rows, each with 2000 columns. The > entire dataset is about 1 TB (compressed). > > The error looks like this: > > ``` > 20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID > 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): > org.apache.spark.SparkException: Task failed while writing rows. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:123) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > at > org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.OutOfMemoryError > at sun.misc.Unsafe.allocateMemory(Native Method) > at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127) > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) > at > org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97) > at > org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48) > at > org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227) > at > org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247) > at > org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405) > at > org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296) > at > org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164) > at > org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95) > at > org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147) > at > org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106) > at > org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170) > at > org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347) > at > org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337) > at > org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:50) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128) > at > org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182) > at > org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40) > at > org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248) > ... 10 more > ``` > > Can anybody give me some pointers on how to tune Spark to avoid this? I am > using giant machines (r5d.12xlarge) with 384GB of memory. My executors have > about 350GB of memory (-Xmx350000m). Could it be that my heap is too large? > > Another issue I found was > https://issues.apache.org/jira/browse/PARQUET-222, which seems to suggest > that too many columns could be a problem. I don't want to throw any columns > though. > > One more question, why does saving as Parquet create 4 different stages in > Spark? You can see in the picture that there are 4 different stages, all at > "save at LoglineParquetGenerator.java:241". I am not sure how to interpret > these stages: > > > Thanks, > > Arwin > > -- Chris