Re: unsubscribe

2022-03-02 Thread Ghousia
unsubscribe

On Thu, Mar 3, 2022 at 5:44 AM Basavaraj  wrote:

> unsubscribe


unsubscribe

2022-03-02 Thread Basavaraj
unsubscribe

smime.p7s
Description: S/MIME cryptographic signature


Re: {EXT} Re: Spark Parquet write OOM

2022-03-02 Thread Anil Dasari
2nd attempt..

Any suggestions to troubleshoot and fix the problem ? thanks in advance.

Regards,
Anil

From: Anil Dasari 
Date: Wednesday, March 2, 2022 at 7:00 AM
To: Gourav Sengupta , Yang,Jie(INF) 

Cc: user@spark.apache.org 
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Gourav and Yang
Thanks for the response.

Please find the answers below.

1. What is the version of SPARK you are using?
[AD] : Spark 2.4.7 (EMR 5.33.1)

2. Are you doing a lot of in-memory transformations like adding columns, or 
running joins, or UDFs thus increasing the size of the data before writing out?
[AD] No. Only one new column is added. Our flow is

  1.  Read avro data from kafka
  2.  Avro deserialization and add new colum to RDD
  3.  Create spark dataframe (DF) against to latest schema (avro evolved 
schema) and persist to hive (checkpointing)
  4.  Create DF from hive (from step #c)
  5.  Deduplicate spark DF by primary key
  6.  Write DF to s3 in parquet format
  7.  Write metadata to s3

The failure is from spark batch job

3. Is your pipeline going to change or evolve soon, or the data volumes going 
to vary, or particularly increase, over time?
[AD] : Data volume Is fixed as it is batch job.

4. What is the memory that you are having in your executors, and drivers?
[AD] We running one core node and 50 task nodes .. i.e total 51 nodes ..each 
node can create 2 executors (2 core cpu and 8 gb memory)

5. Can you show the list of transformations that you are running ?
[AD] No explicit transformations other than basic map transformations required 
to create dataframe from avor record rdd.

Please let me know if yo have any questions.

Regards,
Anil

From: Gourav Sengupta 
Date: Wednesday, March 2, 2022 at 1:07 AM
To: Yang,Jie(INF) 
Cc: Anil Dasari , user@spark.apache.org 

Subject: {EXT} Re: Spark Parquet write OOM
Hi Anil,

before jumping to the quick symptomatic fix, can we try to understand the 
issues?

1. What is the version of SPARK you are using?
2. Are you doing a lot of in-memory transformations like adding columns, or 
running joins, or UDFs thus increasing the size of the data before writing out?
3. Is your pipeline going to change or evolve soon, or the data volumes going 
to vary, or particularly increase, over time?
4. What is the memory that you are having in your executors, and drivers?
5. Can you show the list of transformations that you are running ?




Regards,
Gourav Sengupta


On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) 
mailto:yangji...@baidu.com>> wrote:
This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase the 
capacity of DirectByteBuffer size by configuring  `-XX:MaxDirectMemorySize` and 
this is a Java opts.

However, we'd better check the length of memory to be allocated,  because  
`-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by default.


发件人: Anil Dasari mailto:adas...@guidewire.com>>
日期: 2022年3月2日 星期三 09:45
收件人: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
主题: Spark Parquet write OOM

Hello everyone,

We are writing Spark Data frame to s3 in parquet and it is failing with below 
exception.

I wanted to try following to avoid OOM


  1.  increase the default sql shuffle partitions to reduce load on parquet 
writer tasks to avoid OOM and
  2.  Increase user memory (reduce memory fraction) to have more memory for 
other data structures assuming parquet writer uses user memory.

I am not sure if these fixes the OOM issue. So wanted to reach out community 
for any suggestions. Please let me know.

Exception:

org.apache.spark.SparkException: Task failed while writing rows.
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:123)
 at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
 at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError
 at sun.misc.Unsafe.allocateMemory(Native Method)
 at 
java.nio.DirectByteBuffer.(http://DirectByteBuffer.java:127)
 at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
 at 

Re: {EXT} Re: Spark Parquet write OOM

2022-03-02 Thread Anil Dasari
Hi Gourav and Yang
Thanks for the response.

Please find the answers below.

1. What is the version of SPARK you are using?
[AD] : Spark 2.4.7 (EMR 5.33.1)

2. Are you doing a lot of in-memory transformations like adding columns, or 
running joins, or UDFs thus increasing the size of the data before writing out?
[AD] No. Only one new column is added. Our flow is

  1.  Read avro data from kafka
  2.  Avro deserialization and add new colum to RDD
  3.  Create spark dataframe (DF) against to latest schema (avro evolved 
schema) and persist to hive (checkpointing)
  4.  Create DF from hive (from step #c)
  5.  Deduplicate spark DF by primary key
  6.  Write DF to s3 in parquet format
  7.  Write metadata to s3

The failure is from spark batch job

3. Is your pipeline going to change or evolve soon, or the data volumes going 
to vary, or particularly increase, over time?
[AD] : Data volume Is fixed as it is batch job.

4. What is the memory that you are having in your executors, and drivers?
[AD] We running one core node and 50 task nodes .. i.e total 51 nodes ..each 
node can create 2 executors (2 core cpu and 8 gb memory)

5. Can you show the list of transformations that you are running ?
[AD] No explicit transformations other than basic map transformations required 
to create dataframe from avor record rdd.

Please let me know if yo have any questions.

Regards,
Anil

From: Gourav Sengupta 
Date: Wednesday, March 2, 2022 at 1:07 AM
To: Yang,Jie(INF) 
Cc: Anil Dasari , user@spark.apache.org 

Subject: {EXT} Re: Spark Parquet write OOM
Hi Anil,

before jumping to the quick symptomatic fix, can we try to understand the 
issues?

1. What is the version of SPARK you are using?
2. Are you doing a lot of in-memory transformations like adding columns, or 
running joins, or UDFs thus increasing the size of the data before writing out?
3. Is your pipeline going to change or evolve soon, or the data volumes going 
to vary, or particularly increase, over time?
4. What is the memory that you are having in your executors, and drivers?
5. Can you show the list of transformations that you are running ?




Regards,
Gourav Sengupta


On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) 
mailto:yangji...@baidu.com>> wrote:
This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase the 
capacity of DirectByteBuffer size by configuring  `-XX:MaxDirectMemorySize` and 
this is a Java opts.

However, we'd better check the length of memory to be allocated,  because  
`-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by default.


发件人: Anil Dasari mailto:adas...@guidewire.com>>
日期: 2022年3月2日 星期三 09:45
收件人: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
主题: Spark Parquet write OOM

Hello everyone,

We are writing Spark Data frame to s3 in parquet and it is failing with below 
exception.

I wanted to try following to avoid OOM


  1.  increase the default sql shuffle partitions to reduce load on parquet 
writer tasks to avoid OOM and
  2.  Increase user memory (reduce memory fraction) to have more memory for 
other data structures assuming parquet writer uses user memory.

I am not sure if these fixes the OOM issue. So wanted to reach out community 
for any suggestions. Please let me know.

Exception:

org.apache.spark.SparkException: Task failed while writing rows.
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:123)
 at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
 at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError
 at sun.misc.Unsafe.allocateMemory(Native Method)
 at 
java.nio.DirectByteBuffer.(http://DirectByteBuffer.java:127)
 at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
 at 
org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
 at 
org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
 at 

Re: Spark Parquet write OOM

2022-03-02 Thread Gourav Sengupta
Hi Anil,

before jumping to the quick symptomatic fix, can we try to understand the
issues?

1. What is the version of SPARK you are using?
2. Are you doing a lot of in-memory transformations like adding columns, or
running joins, or UDFs thus increasing the size of the data before writing
out?
3. Is your pipeline going to change or evolve soon, or the data volumes
going to vary, or particularly increase, over time?
4. What is the memory that you are having in your executors, and drivers?
5. Can you show the list of transformations that you are running ?




Regards,
Gourav Sengupta


On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF)  wrote:

> This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase
> the capacity of DirectByteBuffer size by configuring
>  `-XX:MaxDirectMemorySize` and this is a Java opts.
>
>
>
> However, we'd better check the length of memory to be allocated,  because
>  `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by
> default.
>
>
>
>
>
> *发件人**: *Anil Dasari 
> *日期**: *2022年3月2日 星期三 09:45
> *收件人**: *"user@spark.apache.org" 
> *主题**: *Spark Parquet write OOM
>
>
>
> Hello everyone,
>
>
>
> We are writing Spark Data frame to s3 in parquet and it is failing with
> below exception.
>
>
>
> I wanted to try following to avoid OOM
>
>
>
>1. increase the default sql shuffle partitions to reduce load on
>parquet writer tasks to avoid OOM and
>2. Increase user memory (reduce memory fraction) to have more memory
>for other data structures assuming parquet writer uses user memory.
>
>
>
> I am not sure if these fixes the OOM issue. So wanted to reach out
> community for any suggestions. Please let me know.
>
>
>
> Exception:
>
>
>
> org.apache.spark.SparkException: Task failed while writing rows.
>
>  at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>
>  at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>
>  at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>
>  at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>
>  at org.apache.spark.scheduler.Task.run(Task.scala:123)
>
>  at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>
>  at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
>
>  at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>  at java.lang.Thread.run(Thread.java:750)
>
> Caused by: java.lang.OutOfMemoryError
>
>  at sun.misc.Unsafe.allocateMemory(Native Method)
>
>  at java.nio.DirectByteBuffer.(DirectByteBuffer.java:127)
>
>  at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>
>  at
> org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
>
>  at
> org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
>
>  at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
>
>  at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
>
>  at
> org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
>
>  at
> org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
>
>  at
> org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
>
>  at
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
>
>  at
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>
>  at
> org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
>
>  at
> org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
>
>  at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
>
>  at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
>
>  at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
>
>  at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
>
>  at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
>
>  at
>