Hi Anil,

superb, when I said increase the number of partitions, I was implying
shuffle partitions because you are doing de duplicates by default I think
that should be around 200, which can create issues in case your data volume
is large.

I always prefer to SPARK SQL instead of SPARK dataframes. And the number of
records per file configuration should be mentioned in the following link as
maxrecordsperfile or something like that :
https://spark.apache.org/docs/latest/configuration.html#runtime-sql-configuration
.



Regards,
Gourav Sengupta

On Sat, Mar 5, 2022 at 5:09 PM Anil Dasari <adas...@guidewire.com> wrote:

> I am not sure how to set the records limit. Let me check. I couldn’t find
> parquet row group size configuration in spark.
>
> For now, I increased the number if shuffle partitions to reduce the
> records processed by task to avoid OOM.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Gourav Sengupta <gourav.sengu...@gmail.com>
> *Date: *Saturday, March 5, 2022 at 1:59 AM
> *To: *Anil Dasari <adas...@guidewire.com>
> *Cc: *Yang,Jie(INF) <yangji...@baidu.com>, user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> any chance you tried setting the limit on the number of records to be
> written out at a time?
>
>
>
> Regards,
>
> Gourav
>
>
>
> On Thu, Mar 3, 2022 at 3:12 PM Anil Dasari <adas...@guidewire.com> wrote:
>
> Hi Gourav,
>
> Tried increasing shuffle partitions number and higher executor memory.
> Both didn’t work.
>
>
>
> Regards
>
>
>
> *From: *Gourav Sengupta <gourav.sengu...@gmail.com>
> *Date: *Thursday, March 3, 2022 at 2:24 AM
> *To: *Anil Dasari <adas...@guidewire.com>
> *Cc: *Yang,Jie(INF) <yangji...@baidu.com>, user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi,
>
>
>
> I do not think that you are doing anything very particularly concerning
> here.
>
>
>
> There is a setting in SPARK which limits the number of records that we can
> write out at a time you can try that. The other thing that you can try is
> to ensure that the number of partitions are more (just like you suggested)
> let me know how things are giong on your end
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
> On Thu, Mar 3, 2022 at 8:37 AM Anil Dasari <adas...@guidewire.com> wrote:
>
> Answers in the context. Thanks.
>
>
>
> *From: *Gourav Sengupta <gourav.sengu...@gmail.com>
> *Date: *Thursday, March 3, 2022 at 12:13 AM
> *To: *Anil Dasari <adas...@guidewire.com>
> *Cc: *Yang,Jie(INF) <yangji...@baidu.com>, user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> I was trying to work out things for a while yesterday, but may need your
> kind help.
>
>
>
> Can you please share the code for the following steps?
>
> ·
> Create DF from hive (from step #c)
>
> [AD] sparkSession.table(<tablename>)
>
>
>
> ·  Deduplicate spark DF by primary key
>
> [AD] dataFrame.dropDuplicates(<primary key columns>)
>
>
>
> ·  Write DF to s3 in parquet format
>
> [AD] dataFrame.write.mode(saveMode).parquet(path)
>
>
>
> ·  Write metadata to s3
>
> [AD] metadata in json written to s3 using aws sdk
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
> On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari <adas...@guidewire.com> wrote:
>
> 2nd attempt..
>
>
>
> Any suggestions to troubleshoot and fix the problem ? thanks in advance.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Anil Dasari <adas...@guidewire.com>
> *Date: *Wednesday, March 2, 2022 at 7:00 AM
> *To: *Gourav Sengupta <gourav.sengu...@gmail.com>, Yang,Jie(INF) <
> yangji...@baidu.com>
> *Cc: *user@spark.apache.org <user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Gourav and Yang
>
> Thanks for the response.
>
>
>
> Please find the answers below.
>
>
>
> 1. What is the version of SPARK you are using?
>
> [AD] : Spark 2.4.7 (EMR 5.33.1)
>
>
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> [AD] No. Only one new column is added. Our flow is
>
>    1. Read avro data from kafka
>    2. Avro deserialization and add new colum to RDD
>    3. Create spark dataframe (DF) against to latest schema (avro evolved
>    schema) and persist to hive (checkpointing)
>    4. Create DF from hive (from step #c)
>    5. Deduplicate spark DF by primary key
>    6. Write DF to s3 in parquet format
>    7. Write metadata to s3
>
>
>
> The failure is from spark batch job
>
>
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> [AD] : Data volume Is fixed as it is batch job.
>
>
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> [AD] We running one core node and 50 task nodes .. i.e total 51 nodes
> ..each node can create 2 executors (2 core cpu and 8 gb memory)
>
>
>
> 5. Can you show the list of transformations that you are running ?
>
> [AD] No explicit transformations other than basic map transformations
> required to create dataframe from avor record rdd.
>
>
>
> Please let me know if yo have any questions.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Gourav Sengupta <gourav.sengu...@gmail.com>
> *Date: *Wednesday, March 2, 2022 at 1:07 AM
> *To: *Yang,Jie(INF) <yangji...@baidu.com>
> *Cc: *Anil Dasari <adas...@guidewire.com>, user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *{EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> before jumping to the quick symptomatic fix, can we try to understand the
> issues?
>
>
>
> 1. What is the version of SPARK you are using?
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> 5. Can you show the list of transformations that you are running ?
>
>
>
>
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
> On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) <yangji...@baidu.com> wrote:
>
> This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase
> the capacity of DirectByteBuffer size by configuring
>  `-XX:MaxDirectMemorySize` and this is a Java opts.
>
>
>
> However, we'd better check the length of memory to be allocated,  because
>  `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by
> default.
>
>
>
>
>
> *发件人**: *Anil Dasari <adas...@guidewire.com>
> *日期**: *2022年3月2日 星期三 09:45
> *收件人**: *"user@spark.apache.org" <user@spark.apache.org>
> *主题**: *Spark Parquet write OOM
>
>
>
> Hello everyone,
>
>
>
> We are writing Spark Data frame to s3 in parquet and it is failing with
> below exception.
>
>
>
> I wanted to try following to avoid OOM
>
>
>
>    1. increase the default sql shuffle partitions to reduce load on
>    parquet writer tasks to avoid OOM and
>    2. Increase user memory (reduce memory fraction) to have more memory
>    for other data structures assuming parquet writer uses user memory.
>
>
>
> I am not sure if these fixes the OOM issue. So wanted to reach out
> community for any suggestions. Please let me know.
>
>
>
> Exception:
>
>
>
> org.apache.spark.SparkException: Task failed while writing rows.
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>
>          at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>
>          at org.apache.spark.scheduler.Task.run(Task.scala:123)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>
>          at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>
>          at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>          at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>          at java.lang.Thread.run(Thread.java:750)
>
> Caused by: java.lang.OutOfMemoryError
>
>          at sun.misc.Unsafe.allocateMemory(Native Method)
>
>          at java.nio.DirectByteBuffer.<init>(
> http://DirectByteBuffer.java:127)
>
>          at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>
>          at
> org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
>
>          at
> org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
>
>          at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
>
>          at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
>
>          at
> org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
>
>          at
> org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
>
>          at
> org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
>
>          at
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
>
>          at
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>
>          at
> org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
>
>          at
> org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
>
>          at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
>
>          at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
>
>          at
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
>
>          at
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
>
>          at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
>
>          ... 10 more
>
>          Suppressed: java.io.IOException: The file being written is in an
> invalid state. Probably caused by an error thrown previously. Current
> state: BLOCK
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291)
>
>
>
> Regards,
>
> Anil
>
>

Reply via email to