Hi Anil,

I was trying to work out things for a while yesterday, but may need your
kind help.

Can you please share the code for the following steps?
-
Create DF from hive (from step #c)
- Deduplicate spark DF by primary key
- Write DF to s3 in parquet format
- Write metadata to s3
Regards,
Gourav Sengupta

On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari <adas...@guidewire.com> wrote:

> 2nd attempt..
>
>
>
> Any suggestions to troubleshoot and fix the problem ? thanks in advance.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Anil Dasari <adas...@guidewire.com>
> *Date: *Wednesday, March 2, 2022 at 7:00 AM
> *To: *Gourav Sengupta <gourav.sengu...@gmail.com>, Yang,Jie(INF) <
> yangji...@baidu.com>
> *Cc: *user@spark.apache.org <user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Gourav and Yang
>
> Thanks for the response.
>
>
>
> Please find the answers below.
>
>
>
> 1. What is the version of SPARK you are using?
>
> [AD] : Spark 2.4.7 (EMR 5.33.1)
>
>
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> [AD] No. Only one new column is added. Our flow is
>
>    1. Read avro data from kafka
>    2. Avro deserialization and add new colum to RDD
>    3. Create spark dataframe (DF) against to latest schema (avro evolved
>    schema) and persist to hive (checkpointing)
>    4. Create DF from hive (from step #c)
>    5. Deduplicate spark DF by primary key
>    6. Write DF to s3 in parquet format
>    7. Write metadata to s3
>
>
>
> The failure is from spark batch job
>
>
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> [AD] : Data volume Is fixed as it is batch job.
>
>
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> [AD] We running one core node and 50 task nodes .. i.e total 51 nodes
> ..each node can create 2 executors (2 core cpu and 8 gb memory)
>
>
>
> 5. Can you show the list of transformations that you are running ?
>
> [AD] No explicit transformations other than basic map transformations
> required to create dataframe from avor record rdd.
>
>
>
> Please let me know if yo have any questions.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Gourav Sengupta <gourav.sengu...@gmail.com>
> *Date: *Wednesday, March 2, 2022 at 1:07 AM
> *To: *Yang,Jie(INF) <yangji...@baidu.com>
> *Cc: *Anil Dasari <adas...@guidewire.com>, user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *{EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> before jumping to the quick symptomatic fix, can we try to understand the
> issues?
>
>
>
> 1. What is the version of SPARK you are using?
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> 5. Can you show the list of transformations that you are running ?
>
>
>
>
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
> On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) <yangji...@baidu.com> wrote:
>
> This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase
> the capacity of DirectByteBuffer size by configuring
>  `-XX:MaxDirectMemorySize` and this is a Java opts.
>
>
>
> However, we'd better check the length of memory to be allocated,  because
>  `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by
> default.
>
>
>
>
>
> *发件人**: *Anil Dasari <adas...@guidewire.com>
> *日期**: *2022年3月2日 星期三 09:45
> *收件人**: *"user@spark.apache.org" <user@spark.apache.org>
> *主题**: *Spark Parquet write OOM
>
>
>
> Hello everyone,
>
>
>
> We are writing Spark Data frame to s3 in parquet and it is failing with
> below exception.
>
>
>
> I wanted to try following to avoid OOM
>
>
>
>    1. increase the default sql shuffle partitions to reduce load on
>    parquet writer tasks to avoid OOM and
>    2. Increase user memory (reduce memory fraction) to have more memory
>    for other data structures assuming parquet writer uses user memory.
>
>
>
> I am not sure if these fixes the OOM issue. So wanted to reach out
> community for any suggestions. Please let me know.
>
>
>
> Exception:
>
>
>
> org.apache.spark.SparkException: Task failed while writing rows.
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>
>          at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>
>          at org.apache.spark.scheduler.Task.run(Task.scala:123)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>
>          at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>
>          at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>          at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>          at java.lang.Thread.run(Thread.java:750)
>
> Caused by: java.lang.OutOfMemoryError
>
>          at sun.misc.Unsafe.allocateMemory(Native Method)
>
>          at java.nio.DirectByteBuffer.<init>(
> http://DirectByteBuffer.java:127)
>
>          at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>
>          at
> org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
>
>          at
> org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
>
>          at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
>
>          at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
>
>          at
> org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
>
>          at
> org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
>
>          at
> org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
>
>          at
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
>
>          at
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>
>          at
> org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
>
>          at
> org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
>
>          at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
>
>          at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
>
>          at
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
>
>          at
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
>
>          at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
>
>          ... 10 more
>
>          Suppressed: java.io.IOException: The file being written is in an
> invalid state. Probably caused by an error thrown previously. Current
> state: BLOCK
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291)
>
>
>
> Regards,
>
> Anil
>
>

Reply via email to