Re: Spark Executor OOMs when writing Parquet
Yes. Disk spill can be a huge performance hit, with smaller partitions you may avoid this and possibly complete your job faster. I hope you don't get OOM. On Sat, 18 Jan 2020 at 10:06, Arwin Tio wrote: > Okay! I didn't realize you can pump those partition numbers up that high. > 15000 partitions still failed. I am trying 3 partitions now. There is > still some disk spill but it is not that high. > > Thanks, > > Arwin > > -- > *From:* Chris Teoh > *Sent:* January 17, 2020 7:32 PM > *To:* Arwin Tio > *Cc:* user @spark > *Subject:* Re: Spark Executor OOMs when writing Parquet > > You also have disk spill which is a performance hit. > > Try multiplying the number of partitions by about 20x - 40x and see if you > can eliminate shuffle spill. > > On Fri, 17 Jan 2020, 10:37 pm Arwin Tio, wrote: > > Yes, mostly memory spills though (36.9 TiB memory, 895 GiB disk). I was > under the impression that memory spill is OK? > > > (If you're wondering, this is EMR). > > -- > *From:* Chris Teoh > *Sent:* January 17, 2020 10:30 AM > *To:* Arwin Tio > *Cc:* user @spark > *Subject:* Re: Spark Executor OOMs when writing Parquet > > Sounds like you don't have enough partitions. Try and repartition to 14496 > partitions. Are your stages experiencing shuffle spill? > > On Fri, 17 Jan 2020, 10:12 pm Arwin Tio, wrote: > > Hello, > > I have a fairly straightforward Spark job that converts CSV to Parquet: > > ``` > Dataset df = spark.read(...) > > df > .repartition(5000) > .write() > .format("parquet") > .parquet("s3://mypath/...); > ``` > > For context, there are about 5 billion rows, each with 2000 columns. The > entire dataset is about 1 TB (compressed). > > The error looks like this: > > ``` > 20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID > 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): > org.apache.spark.SparkException: Task failed while writing rows. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:123) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > at > org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.OutOfMemoryError > at sun.misc.Unsafe.allocateMemory(Native Method) > at java.nio.DirectByteBuffer.(DirectByteBuffer.java:127) > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) > at > org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97) > at > org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48) > at > org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227) > at > org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247) > at > org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405) > at > org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296) > at > org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164) > at > org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95) > at > org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147) > at > org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106) > at > org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170) > at > org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347) > at >
Re: Spark Executor OOMs when writing Parquet
Okay! I didn't realize you can pump those partition numbers up that high. 15000 partitions still failed. I am trying 3 partitions now. There is still some disk spill but it is not that high. Thanks, Arwin From: Chris Teoh Sent: January 17, 2020 7:32 PM To: Arwin Tio Cc: user @spark Subject: Re: Spark Executor OOMs when writing Parquet You also have disk spill which is a performance hit. Try multiplying the number of partitions by about 20x - 40x and see if you can eliminate shuffle spill. On Fri, 17 Jan 2020, 10:37 pm Arwin Tio, mailto:arwin@hotmail.com>> wrote: Yes, mostly memory spills though (36.9 TiB memory, 895 GiB disk). I was under the impression that memory spill is OK? [cid:52075a7e-f05d-4d0d-a6e3-0ea4f7cf2c6c] (If you're wondering, this is EMR). From: Chris Teoh mailto:chris.t...@gmail.com>> Sent: January 17, 2020 10:30 AM To: Arwin Tio mailto:arwin@hotmail.com>> Cc: user @spark mailto:user@spark.apache.org>> Subject: Re: Spark Executor OOMs when writing Parquet Sounds like you don't have enough partitions. Try and repartition to 14496 partitions. Are your stages experiencing shuffle spill? On Fri, 17 Jan 2020, 10:12 pm Arwin Tio, mailto:arwin@hotmail.com>> wrote: Hello, I have a fairly straightforward Spark job that converts CSV to Parquet: ``` Dataset df = spark.read(...) df .repartition(5000) .write() .format("parquet") .parquet("s3://mypath/...); ``` For context, there are about 5 billion rows, each with 2000 columns. The entire dataset is about 1 TB (compressed). The error looks like this: ``` 20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.OutOfMemoryError at sun.misc.Unsafe.allocateMemory(Native Method) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:127) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97) at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48) at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227) at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247) at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405) at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296) at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164) at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95) at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147) at org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106) at org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170) at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347) at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337) at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299) at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424) at org.apache.spark.sql.execution.datasources.parquet.ParquetWri