Hi Anil, superb, when I said increase the number of partitions, I was implying shuffle partitions because you are doing de duplicates by default I think that should be around 200, which can create issues in case your data volume is large.
I always prefer to SPARK SQL instead of SPARK dataframes. And the number of records per file configuration should be mentioned in the following link as maxrecordsperfile or something like that : https://spark.apache.org/docs/latest/configuration.html#runtime-sql-configuration . Regards, Gourav Sengupta On Sat, Mar 5, 2022 at 5:09 PM Anil Dasari <adas...@guidewire.com> wrote: > I am not sure how to set the records limit. Let me check. I couldn’t find > parquet row group size configuration in spark. > > For now, I increased the number if shuffle partitions to reduce the > records processed by task to avoid OOM. > > > > Regards, > > Anil > > > > *From: *Gourav Sengupta <gourav.sengu...@gmail.com> > *Date: *Saturday, March 5, 2022 at 1:59 AM > *To: *Anil Dasari <adas...@guidewire.com> > *Cc: *Yang,Jie(INF) <yangji...@baidu.com>, user@spark.apache.org < > user@spark.apache.org> > *Subject: *Re: {EXT} Re: Spark Parquet write OOM > > Hi Anil, > > > > any chance you tried setting the limit on the number of records to be > written out at a time? > > > > Regards, > > Gourav > > > > On Thu, Mar 3, 2022 at 3:12 PM Anil Dasari <adas...@guidewire.com> wrote: > > Hi Gourav, > > Tried increasing shuffle partitions number and higher executor memory. > Both didn’t work. > > > > Regards > > > > *From: *Gourav Sengupta <gourav.sengu...@gmail.com> > *Date: *Thursday, March 3, 2022 at 2:24 AM > *To: *Anil Dasari <adas...@guidewire.com> > *Cc: *Yang,Jie(INF) <yangji...@baidu.com>, user@spark.apache.org < > user@spark.apache.org> > *Subject: *Re: {EXT} Re: Spark Parquet write OOM > > Hi, > > > > I do not think that you are doing anything very particularly concerning > here. > > > > There is a setting in SPARK which limits the number of records that we can > write out at a time you can try that. The other thing that you can try is > to ensure that the number of partitions are more (just like you suggested) > let me know how things are giong on your end > > > > > > Regards, > > Gourav Sengupta > > > > > > On Thu, Mar 3, 2022 at 8:37 AM Anil Dasari <adas...@guidewire.com> wrote: > > Answers in the context. Thanks. > > > > *From: *Gourav Sengupta <gourav.sengu...@gmail.com> > *Date: *Thursday, March 3, 2022 at 12:13 AM > *To: *Anil Dasari <adas...@guidewire.com> > *Cc: *Yang,Jie(INF) <yangji...@baidu.com>, user@spark.apache.org < > user@spark.apache.org> > *Subject: *Re: {EXT} Re: Spark Parquet write OOM > > Hi Anil, > > > > I was trying to work out things for a while yesterday, but may need your > kind help. > > > > Can you please share the code for the following steps? > > · > Create DF from hive (from step #c) > > [AD] sparkSession.table(<tablename>) > > > > · Deduplicate spark DF by primary key > > [AD] dataFrame.dropDuplicates(<primary key columns>) > > > > · Write DF to s3 in parquet format > > [AD] dataFrame.write.mode(saveMode).parquet(path) > > > > · Write metadata to s3 > > [AD] metadata in json written to s3 using aws sdk > > > > Regards, > > Gourav Sengupta > > > > On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari <adas...@guidewire.com> wrote: > > 2nd attempt.. > > > > Any suggestions to troubleshoot and fix the problem ? thanks in advance. > > > > Regards, > > Anil > > > > *From: *Anil Dasari <adas...@guidewire.com> > *Date: *Wednesday, March 2, 2022 at 7:00 AM > *To: *Gourav Sengupta <gourav.sengu...@gmail.com>, Yang,Jie(INF) < > yangji...@baidu.com> > *Cc: *user@spark.apache.org <user@spark.apache.org> > *Subject: *Re: {EXT} Re: Spark Parquet write OOM > > Hi Gourav and Yang > > Thanks for the response. > > > > Please find the answers below. > > > > 1. What is the version of SPARK you are using? > > [AD] : Spark 2.4.7 (EMR 5.33.1) > > > > 2. Are you doing a lot of in-memory transformations like adding columns, > or running joins, or UDFs thus increasing the size of the data before > writing out? > > [AD] No. Only one new column is added. Our flow is > > 1. Read avro data from kafka > 2. Avro deserialization and add new colum to RDD > 3. Create spark dataframe (DF) against to latest schema (avro evolved > schema) and persist to hive (checkpointing) > 4. Create DF from hive (from step #c) > 5. Deduplicate spark DF by primary key > 6. Write DF to s3 in parquet format > 7. Write metadata to s3 > > > > The failure is from spark batch job > > > > 3. Is your pipeline going to change or evolve soon, or the data volumes > going to vary, or particularly increase, over time? > > [AD] : Data volume Is fixed as it is batch job. > > > > 4. What is the memory that you are having in your executors, and drivers? > > [AD] We running one core node and 50 task nodes .. i.e total 51 nodes > ..each node can create 2 executors (2 core cpu and 8 gb memory) > > > > 5. Can you show the list of transformations that you are running ? > > [AD] No explicit transformations other than basic map transformations > required to create dataframe from avor record rdd. > > > > Please let me know if yo have any questions. > > > > Regards, > > Anil > > > > *From: *Gourav Sengupta <gourav.sengu...@gmail.com> > *Date: *Wednesday, March 2, 2022 at 1:07 AM > *To: *Yang,Jie(INF) <yangji...@baidu.com> > *Cc: *Anil Dasari <adas...@guidewire.com>, user@spark.apache.org < > user@spark.apache.org> > *Subject: *{EXT} Re: Spark Parquet write OOM > > Hi Anil, > > > > before jumping to the quick symptomatic fix, can we try to understand the > issues? > > > > 1. What is the version of SPARK you are using? > > 2. Are you doing a lot of in-memory transformations like adding columns, > or running joins, or UDFs thus increasing the size of the data before > writing out? > > 3. Is your pipeline going to change or evolve soon, or the data volumes > going to vary, or particularly increase, over time? > > 4. What is the memory that you are having in your executors, and drivers? > > 5. Can you show the list of transformations that you are running ? > > > > > > > > > > Regards, > > Gourav Sengupta > > > > > > On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) <yangji...@baidu.com> wrote: > > This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase > the capacity of DirectByteBuffer size by configuring > `-XX:MaxDirectMemorySize` and this is a Java opts. > > > > However, we'd better check the length of memory to be allocated, because > `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by > default. > > > > > > *发件人**: *Anil Dasari <adas...@guidewire.com> > *日期**: *2022年3月2日 星期三 09:45 > *收件人**: *"user@spark.apache.org" <user@spark.apache.org> > *主题**: *Spark Parquet write OOM > > > > Hello everyone, > > > > We are writing Spark Data frame to s3 in parquet and it is failing with > below exception. > > > > I wanted to try following to avoid OOM > > > > 1. increase the default sql shuffle partitions to reduce load on > parquet writer tasks to avoid OOM and > 2. Increase user memory (reduce memory fraction) to have more memory > for other data structures assuming parquet writer uses user memory. > > > > I am not sure if these fixes the OOM issue. So wanted to reach out > community for any suggestions. Please let me know. > > > > Exception: > > > > org.apache.spark.SparkException: Task failed while writing rows. > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) > > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > > at org.apache.spark.scheduler.Task.run(Task.scala:123) > > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > > at > org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) > > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:750) > > Caused by: java.lang.OutOfMemoryError > > at sun.misc.Unsafe.allocateMemory(Native Method) > > at java.nio.DirectByteBuffer.<init>( > http://DirectByteBuffer.java:127) > > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) > > at > org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97) > > at > org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48) > > at > org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227) > > at > org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247) > > at > org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405) > > at > org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296) > > at > org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164) > > at > org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95) > > at > org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147) > > at > org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235) > > at > org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122) > > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172) > > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148) > > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130) > > at > org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182) > > at > org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44) > > at > org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40) > > at > org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242) > > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248) > > ... 10 more > > Suppressed: java.io.IOException: The file being written is in an > invalid state. Probably caused by an error thrown previously. Current > state: BLOCK > > at > org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168) > > at > org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160) > > at > org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291) > > > > Regards, > > Anil > >