Hi Anil, I was trying to work out things for a while yesterday, but may need your kind help.
Can you please share the code for the following steps? - Create DF from hive (from step #c) - Deduplicate spark DF by primary key - Write DF to s3 in parquet format - Write metadata to s3 Regards, Gourav Sengupta On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari <adas...@guidewire.com> wrote: > 2nd attempt.. > > > > Any suggestions to troubleshoot and fix the problem ? thanks in advance. > > > > Regards, > > Anil > > > > *From: *Anil Dasari <adas...@guidewire.com> > *Date: *Wednesday, March 2, 2022 at 7:00 AM > *To: *Gourav Sengupta <gourav.sengu...@gmail.com>, Yang,Jie(INF) < > yangji...@baidu.com> > *Cc: *user@spark.apache.org <user@spark.apache.org> > *Subject: *Re: {EXT} Re: Spark Parquet write OOM > > Hi Gourav and Yang > > Thanks for the response. > > > > Please find the answers below. > > > > 1. What is the version of SPARK you are using? > > [AD] : Spark 2.4.7 (EMR 5.33.1) > > > > 2. Are you doing a lot of in-memory transformations like adding columns, > or running joins, or UDFs thus increasing the size of the data before > writing out? > > [AD] No. Only one new column is added. Our flow is > > 1. Read avro data from kafka > 2. Avro deserialization and add new colum to RDD > 3. Create spark dataframe (DF) against to latest schema (avro evolved > schema) and persist to hive (checkpointing) > 4. Create DF from hive (from step #c) > 5. Deduplicate spark DF by primary key > 6. Write DF to s3 in parquet format > 7. Write metadata to s3 > > > > The failure is from spark batch job > > > > 3. Is your pipeline going to change or evolve soon, or the data volumes > going to vary, or particularly increase, over time? > > [AD] : Data volume Is fixed as it is batch job. > > > > 4. What is the memory that you are having in your executors, and drivers? > > [AD] We running one core node and 50 task nodes .. i.e total 51 nodes > ..each node can create 2 executors (2 core cpu and 8 gb memory) > > > > 5. Can you show the list of transformations that you are running ? > > [AD] No explicit transformations other than basic map transformations > required to create dataframe from avor record rdd. > > > > Please let me know if yo have any questions. > > > > Regards, > > Anil > > > > *From: *Gourav Sengupta <gourav.sengu...@gmail.com> > *Date: *Wednesday, March 2, 2022 at 1:07 AM > *To: *Yang,Jie(INF) <yangji...@baidu.com> > *Cc: *Anil Dasari <adas...@guidewire.com>, user@spark.apache.org < > user@spark.apache.org> > *Subject: *{EXT} Re: Spark Parquet write OOM > > Hi Anil, > > > > before jumping to the quick symptomatic fix, can we try to understand the > issues? > > > > 1. What is the version of SPARK you are using? > > 2. Are you doing a lot of in-memory transformations like adding columns, > or running joins, or UDFs thus increasing the size of the data before > writing out? > > 3. Is your pipeline going to change or evolve soon, or the data volumes > going to vary, or particularly increase, over time? > > 4. What is the memory that you are having in your executors, and drivers? > > 5. Can you show the list of transformations that you are running ? > > > > > > > > > > Regards, > > Gourav Sengupta > > > > > > On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) <yangji...@baidu.com> wrote: > > This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase > the capacity of DirectByteBuffer size by configuring > `-XX:MaxDirectMemorySize` and this is a Java opts. > > > > However, we'd better check the length of memory to be allocated, because > `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by > default. > > > > > > *发件人**: *Anil Dasari <adas...@guidewire.com> > *日期**: *2022年3月2日 星期三 09:45 > *收件人**: *"user@spark.apache.org" <user@spark.apache.org> > *主题**: *Spark Parquet write OOM > > > > Hello everyone, > > > > We are writing Spark Data frame to s3 in parquet and it is failing with > below exception. > > > > I wanted to try following to avoid OOM > > > > 1. increase the default sql shuffle partitions to reduce load on > parquet writer tasks to avoid OOM and > 2. Increase user memory (reduce memory fraction) to have more memory > for other data structures assuming parquet writer uses user memory. > > > > I am not sure if these fixes the OOM issue. So wanted to reach out > community for any suggestions. Please let me know. > > > > Exception: > > > > org.apache.spark.SparkException: Task failed while writing rows. > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) > > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > > at org.apache.spark.scheduler.Task.run(Task.scala:123) > > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > > at > org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) > > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:750) > > Caused by: java.lang.OutOfMemoryError > > at sun.misc.Unsafe.allocateMemory(Native Method) > > at java.nio.DirectByteBuffer.<init>( > http://DirectByteBuffer.java:127) > > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) > > at > org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97) > > at > org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48) > > at > org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227) > > at > org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247) > > at > org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405) > > at > org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296) > > at > org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164) > > at > org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95) > > at > org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147) > > at > org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235) > > at > org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122) > > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172) > > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148) > > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130) > > at > org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182) > > at > org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44) > > at > org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40) > > at > org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242) > > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248) > > ... 10 more > > Suppressed: java.io.IOException: The file being written is in an > invalid state. Probably caused by an error thrown previously. Current > state: BLOCK > > at > org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168) > > at > org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160) > > at > org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291) > > > > Regards, > > Anil > >