Re: {EXT} Re: Spark Parquet write OOM

2022-03-05 Thread Gourav Sengupta
Hi Anil,

superb, when I said increase the number of partitions, I was implying
shuffle partitions because you are doing de duplicates by default I think
that should be around 200, which can create issues in case your data volume
is large.

I always prefer to SPARK SQL instead of SPARK dataframes. And the number of
records per file configuration should be mentioned in the following link as
maxrecordsperfile or something like that :
https://spark.apache.org/docs/latest/configuration.html#runtime-sql-configuration
.



Regards,
Gourav Sengupta

On Sat, Mar 5, 2022 at 5:09 PM Anil Dasari  wrote:

> I am not sure how to set the records limit. Let me check. I couldn’t find
> parquet row group size configuration in spark.
>
> For now, I increased the number if shuffle partitions to reduce the
> records processed by task to avoid OOM.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Saturday, March 5, 2022 at 1:59 AM
> *To: *Anil Dasari 
> *Cc: *Yang,Jie(INF) , user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> any chance you tried setting the limit on the number of records to be
> written out at a time?
>
>
>
> Regards,
>
> Gourav
>
>
>
> On Thu, Mar 3, 2022 at 3:12 PM Anil Dasari  wrote:
>
> Hi Gourav,
>
> Tried increasing shuffle partitions number and higher executor memory.
> Both didn’t work.
>
>
>
> Regards
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Thursday, March 3, 2022 at 2:24 AM
> *To: *Anil Dasari 
> *Cc: *Yang,Jie(INF) , user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi,
>
>
>
> I do not think that you are doing anything very particularly concerning
> here.
>
>
>
> There is a setting in SPARK which limits the number of records that we can
> write out at a time you can try that. The other thing that you can try is
> to ensure that the number of partitions are more (just like you suggested)
> let me know how things are giong on your end
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
> On Thu, Mar 3, 2022 at 8:37 AM Anil Dasari  wrote:
>
> Answers in the context. Thanks.
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Thursday, March 3, 2022 at 12:13 AM
> *To: *Anil Dasari 
> *Cc: *Yang,Jie(INF) , user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> I was trying to work out things for a while yesterday, but may need your
> kind help.
>
>
>
> Can you please share the code for the following steps?
>
> ·
> Create DF from hive (from step #c)
>
> [AD] sparkSession.table()
>
>
>
> ·  Deduplicate spark DF by primary key
>
> [AD] dataFrame.dropDuplicates()
>
>
>
> ·  Write DF to s3 in parquet format
>
> [AD] dataFrame.write.mode(saveMode).parquet(path)
>
>
>
> ·  Write metadata to s3
>
> [AD] metadata in json written to s3 using aws sdk
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
> On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari  wrote:
>
> 2nd attempt..
>
>
>
> Any suggestions to troubleshoot and fix the problem ? thanks in advance.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Anil Dasari 
> *Date: *Wednesday, March 2, 2022 at 7:00 AM
> *To: *Gourav Sengupta , Yang,Jie(INF) <
> yangji...@baidu.com>
> *Cc: *user@spark.apache.org 
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Gourav and Yang
>
> Thanks for the response.
>
>
>
> Please find the answers below.
>
>
>
> 1. What is the version of SPARK you are using?
>
> [AD] : Spark 2.4.7 (EMR 5.33.1)
>
>
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> [AD] No. Only one new column is added. Our flow is
>
>1. Read avro data from kafka
>2. Avro deserialization and add new colum to RDD
>3. Create spark dataframe (DF) against to latest schema (avro evolved
>schema) and persist to hive (checkpointing)
>4. Create DF from hive (from step #c)
>5. Deduplicate spark DF by primary key
>6. Write DF to s3 in parquet format
>7. Write metadata to s3
>
>
>
> The failure is from spark batch job
>
>
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> [AD] : Data volume Is fixed as it is batch job.
>
>
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> [AD] We running one core node and 50 task nodes .. i.e total 51 nodes
> ..each node can create 2 executors (2 core cpu and 8 gb memory)
>
>
>
> 5. Can you show the list of transformations that you are running ?
>
> [AD] No explicit transformations other than basic map transformations
> required to create dataframe from avor record rdd.
>
>
>
> Please let me know if yo have any questions.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Wednesday, March 2, 2022 at 1:07 AM
> *To: *Yang,Jie(INF) 
> *Cc: *Anil Dasari , user@spark.apache.org <
> us

Re: {EXT} Re: Spark Parquet write OOM

2022-03-05 Thread Anil Dasari
I am not sure how to set the records limit. Let me check. I couldn’t find 
parquet row group size configuration in spark.
For now, I increased the number if shuffle partitions to reduce the records 
processed by task to avoid OOM.

Regards,
Anil

From: Gourav Sengupta 
Date: Saturday, March 5, 2022 at 1:59 AM
To: Anil Dasari 
Cc: Yang,Jie(INF) , user@spark.apache.org 

Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Anil,

any chance you tried setting the limit on the number of records to be written 
out at a time?

Regards,
Gourav

On Thu, Mar 3, 2022 at 3:12 PM Anil Dasari 
mailto:adas...@guidewire.com>> wrote:
Hi Gourav,
Tried increasing shuffle partitions number and higher executor memory. Both 
didn’t work.

Regards

From: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>
Date: Thursday, March 3, 2022 at 2:24 AM
To: Anil Dasari mailto:adas...@guidewire.com>>
Cc: Yang,Jie(INF) mailto:yangji...@baidu.com>>, 
user@spark.apache.org 
mailto:user@spark.apache.org>>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi,

I do not think that you are doing anything very particularly concerning here.

There is a setting in SPARK which limits the number of records that we can 
write out at a time you can try that. The other thing that you can try is to 
ensure that the number of partitions are more (just like you suggested) let me 
know how things are giong on your end


Regards,
Gourav Sengupta


On Thu, Mar 3, 2022 at 8:37 AM Anil Dasari 
mailto:adas...@guidewire.com>> wrote:
Answers in the context. Thanks.

From: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>
Date: Thursday, March 3, 2022 at 12:13 AM
To: Anil Dasari mailto:adas...@guidewire.com>>
Cc: Yang,Jie(INF) mailto:yangji...@baidu.com>>, 
user@spark.apache.org 
mailto:user@spark.apache.org>>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Anil,

I was trying to work out things for a while yesterday, but may need your kind 
help.

Can you please share the code for the following steps?
*
Create DF from hive (from step #c)
[AD] sparkSession.table()

*  Deduplicate spark DF by primary key
[AD] dataFrame.dropDuplicates()

*  Write DF to s3 in parquet format
[AD] dataFrame.write.mode(saveMode).parquet(path)

*  Write metadata to s3
[AD] metadata in json written to s3 using aws sdk

Regards,
Gourav Sengupta

On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari 
mailto:adas...@guidewire.com>> wrote:
2nd attempt..

Any suggestions to troubleshoot and fix the problem ? thanks in advance.

Regards,
Anil

From: Anil Dasari mailto:adas...@guidewire.com>>
Date: Wednesday, March 2, 2022 at 7:00 AM
To: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>, Yang,Jie(INF) 
mailto:yangji...@baidu.com>>
Cc: user@spark.apache.org 
mailto:user@spark.apache.org>>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Gourav and Yang
Thanks for the response.

Please find the answers below.

1. What is the version of SPARK you are using?
[AD] : Spark 2.4.7 (EMR 5.33.1)

2. Are you doing a lot of in-memory transformations like adding columns, or 
running joins, or UDFs thus increasing the size of the data before writing out?
[AD] No. Only one new column is added. Our flow is

  1.  Read avro data from kafka
  2.  Avro deserialization and add new colum to RDD
  3.  Create spark dataframe (DF) against to latest schema (avro evolved 
schema) and persist to hive (checkpointing)
  4.  Create DF from hive (from step #c)
  5.  Deduplicate spark DF by primary key
  6.  Write DF to s3 in parquet format
  7.  Write metadata to s3

The failure is from spark batch job

3. Is your pipeline going to change or evolve soon, or the data volumes going 
to vary, or particularly increase, over time?
[AD] : Data volume Is fixed as it is batch job.

4. What is the memory that you are having in your executors, and drivers?
[AD] We running one core node and 50 task nodes .. i.e total 51 nodes ..each 
node can create 2 executors (2 core cpu and 8 gb memory)

5. Can you show the list of transformations that you are running ?
[AD] No explicit transformations other than basic map transformations required 
to create dataframe from avor record rdd.

Please let me know if yo have any questions.

Regards,
Anil

From: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>
Date: Wednesday, March 2, 2022 at 1:07 AM
To: Yang,Jie(INF) mailto:yangji...@baidu.com>>
Cc: Anil Dasari mailto:adas...@guidewire.com>>, 
user@spark.apache.org 
mailto:user@spark.apache.org>>
Subject: {EXT} Re: Spark Parquet write OOM
Hi Anil,

before jumping to the quick symptomatic fix, can we try to understand the 
issues?

1. What is the version of SPARK you are using?
2. Are you doing a lot of in-memory transformations like adding columns, or 
running joins, or UDFs thus increasing the size of the data before writing out?
3. Is your pipeline going to change or evolve soon, or the data volumes going 
to vary, or particularly increase

Re: [EXTERNAL] Re: Need to make WHERE clause compulsory in Spark SQL

2022-03-05 Thread Gourav Sengupta
Hi,

I completely agree with Saurabh, the use of BQ with SPARK does not make
sense at all, if you are trying to cut down your costs. I think that costs
do matter to a few people at the end.

Saurabh, is there any chance you can see what actual queries are hitting
the thrift server? Using hive metastore is something that I have been doing
in AWS EMR for the last 5 years and for sure it does not cause full table
scan.

Hi Sean,
for some reason, I am not able to receive any emails from the spark user
group. My account should be a very old one, is there any chance you can
kindly have a look into it and kindly let me know if there is something
blocking me? I will be sincerely obliged.

Regards,
Gourav Sengupta


On Tue, Feb 22, 2022 at 3:58 PM Saurabh Gulati
 wrote:

> Hey Mich,
> We use spark 3.2 now. We are using BQ but migrating away because:
>
>- Its not reflective of our current lake structure with all
>deltas/history tables/models outputs etc
>- Its pretty expensive to load everything in BQ and essentially it
>will be a copy of all data in gcs. External tables in BQ didnt work for us.
>Currently we store only latest snapshots in BQ. This breaks idempotency of
>models which need to time travel and run in the past.
>- We might move to a different cloud provider in future so we want to
>be cloud agnostic.
>
> So we need to have an execution engine which has the same overview of data
> as we have in gcs.
> We tried presto but performance was similar and presto didn't support auto
> scaling.
>
> TIA
> Saurabh
> --
> *From:* Mich Talebzadeh 
> *Sent:* 22 February 2022 16:49
> *To:* Kidong Lee ; Saurabh Gulati <
> saurabh.gul...@fedex.com>
> *Cc:* user@spark.apache.org 
> *Subject:* Re: [EXTERNAL] Re: Need to make WHERE clause compulsory in
> Spark SQL
>
> Ok interesting.
>
> I am surprised why you are not using BigQuery and using Hive. My
> assumption is that your Spark is version 3.1.1 with standard GKE on
> auto-scaler. What benefits are you getting from Using Hive here? As you
> have your hive tables on gs buckets, you can easily download your hive
> tables into BigQuery and run spark on BigQuery?
>
> HTH
>
> On Tue, 22 Feb 2022 at 15:34, Saurabh Gulati 
> wrote:
>
> Thanks Sean for your response.
>
> @Mich Talebzadeh  We run all workloads on GKE
> as docker containers. So to answer your questions, Hive is running in a
> container as K8S service and spark thrift-server in another container as a
> service and Superset in a third container.
>
> We use Spark on GKE setup to run thrift-server which spawns workers
> depending on the load. For buckets we use gcs.
>
>
> TIA
> Saurabh
> --
> *From:* Mich Talebzadeh 
> *Sent:* 22 February 2022 16:05
> *To:* Saurabh Gulati 
> *Cc:* user@spark.apache.org 
> *Subject:* [EXTERNAL] Re: Need to make WHERE clause compulsory in Spark
> SQL
>
> *Caution! This email originated outside of FedEx. Please do not open
> attachments or click links from an unknown or suspicious origin*.
> Is your hive on prem with external tables in cloud storage?
>
> Where is your spark running from and what cloud buckets are you using?
>
> HTH
>
> On Tue, 22 Feb 2022 at 12:36, Saurabh Gulati
>  wrote:
>
> Hello,
> We are trying to setup Spark as the execution engine for exposing our data
> stored in lake. We have hive metastore running along with Spark thrift
> server and are using Superset as the UI.
>
> We save all tables as External tables in hive metastore with storge being
> on Cloud.
>
> We see that right now when users run a query in Superset SQL Lab it scans
> the whole table. What we want is to limit the data scan by setting
> something like hive.mapred.mode=strict​ in spark, so that user gets an
> exception if they don't specify a partition column.
>
> We tried setting spark.hadoop.hive.mapred.mode=strict ​in
> spark-defaults.conf​ in thrift server  but it still scans the whole table.
> Also tried setting hive.mapred.mode=strict​ in hive-defaults.conf for
> metastore container.
>
> We use Spark 3.2 with hive-metastore version 3.1.2
>
> Is there a way in spark settings to make it happen.
>
>
> TIA
> Saurabh
>
> --
>
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> --
>
>
>
>view

Re: {EXT} Re: Spark Parquet write OOM

2022-03-05 Thread Gourav Sengupta
Hi Anil,

any chance you tried setting the limit on the number of records to be
written out at a time?

Regards,
Gourav

On Thu, Mar 3, 2022 at 3:12 PM Anil Dasari  wrote:

> Hi Gourav,
>
> Tried increasing shuffle partitions number and higher executor memory.
> Both didn’t work.
>
>
>
> Regards
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Thursday, March 3, 2022 at 2:24 AM
> *To: *Anil Dasari 
> *Cc: *Yang,Jie(INF) , user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi,
>
>
>
> I do not think that you are doing anything very particularly concerning
> here.
>
>
>
> There is a setting in SPARK which limits the number of records that we can
> write out at a time you can try that. The other thing that you can try is
> to ensure that the number of partitions are more (just like you suggested)
> let me know how things are giong on your end
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
> On Thu, Mar 3, 2022 at 8:37 AM Anil Dasari  wrote:
>
> Answers in the context. Thanks.
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Thursday, March 3, 2022 at 12:13 AM
> *To: *Anil Dasari 
> *Cc: *Yang,Jie(INF) , user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> I was trying to work out things for a while yesterday, but may need your
> kind help.
>
>
>
> Can you please share the code for the following steps?
>
> ·
> Create DF from hive (from step #c)
>
> [AD] sparkSession.table()
>
>
>
> ·  Deduplicate spark DF by primary key
>
> [AD] dataFrame.dropDuplicates()
>
>
>
> ·  Write DF to s3 in parquet format
>
> [AD] dataFrame.write.mode(saveMode).parquet(path)
>
>
>
> ·  Write metadata to s3
>
> [AD] metadata in json written to s3 using aws sdk
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
> On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari  wrote:
>
> 2nd attempt..
>
>
>
> Any suggestions to troubleshoot and fix the problem ? thanks in advance.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Anil Dasari 
> *Date: *Wednesday, March 2, 2022 at 7:00 AM
> *To: *Gourav Sengupta , Yang,Jie(INF) <
> yangji...@baidu.com>
> *Cc: *user@spark.apache.org 
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Gourav and Yang
>
> Thanks for the response.
>
>
>
> Please find the answers below.
>
>
>
> 1. What is the version of SPARK you are using?
>
> [AD] : Spark 2.4.7 (EMR 5.33.1)
>
>
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> [AD] No. Only one new column is added. Our flow is
>
>1. Read avro data from kafka
>2. Avro deserialization and add new colum to RDD
>3. Create spark dataframe (DF) against to latest schema (avro evolved
>schema) and persist to hive (checkpointing)
>4. Create DF from hive (from step #c)
>5. Deduplicate spark DF by primary key
>6. Write DF to s3 in parquet format
>7. Write metadata to s3
>
>
>
> The failure is from spark batch job
>
>
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> [AD] : Data volume Is fixed as it is batch job.
>
>
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> [AD] We running one core node and 50 task nodes .. i.e total 51 nodes
> ..each node can create 2 executors (2 core cpu and 8 gb memory)
>
>
>
> 5. Can you show the list of transformations that you are running ?
>
> [AD] No explicit transformations other than basic map transformations
> required to create dataframe from avor record rdd.
>
>
>
> Please let me know if yo have any questions.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Wednesday, March 2, 2022 at 1:07 AM
> *To: *Yang,Jie(INF) 
> *Cc: *Anil Dasari , user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *{EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> before jumping to the quick symptomatic fix, can we try to understand the
> issues?
>
>
>
> 1. What is the version of SPARK you are using?
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> 5. Can you show the list of transformations that you are running ?
>
>
>
>
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
> On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF)  wrote:
>
> This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase
> the capacity of DirectByteBuffer size by configuring
>  `-XX:MaxDirectMemorySize` and this is a Java opts.
>
>
>
> However, we'd better check the length of memory to be allocated,  because
>  `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity