Re: OOM concern

2024-05-27 Thread Perez
Thank you everyone for your response.

I am not getting any errors as of now. I am just trying to choose the right
tool for my task which is data loading from an external source into s3 via
Glue/EMR.

I think Glue job would be the best fit for me because I can calculate DPUs
needed (maybe keeping some extra buffer) so just wanted to check if there
are any edge cases I need to consider.


On Tue, May 28, 2024 at 5:39 AM Russell Jurney 
wrote:

> If you’re using EMR and Spark, you need to choose nodes with enough RAM to
> accommodate any given partition in your data or you can get an OOM error.
> Not sure if this job involves a reduce, but I would choose a single 128GB+
> memory optimized instance and then adjust parallelism as via the Dpark docs
> using pyspark.sql.DataFrame.repartition(n) at the start of your job.
>
> Thanks,
> Russell Jurney @rjurney 
> russell.jur...@gmail.com LI  FB
>  datasyndrome.com
>
>
> On Mon, May 27, 2024 at 9:15 AM Perez  wrote:
>
>> Hi Team,
>>
>> I want to extract the data from DB and just dump it into S3. I
>> don't have to perform any transformations on the data yet. My data size
>> would be ~100 GB (historical load).
>>
>> Choosing the right DPUs(Glue jobs) should solve this problem right? Or
>> should I move to EMR.
>>
>> I don't feel the need to move to EMR but wanted the expertise suggestions.
>>
>> TIA.
>>
>


Re: Spark Protobuf Deserialization

2024-05-27 Thread Sandish Kumar HN
Did you try using to_protobuf and from_protobuf ?

https://spark.apache.org/docs/latest/sql-data-sources-protobuf.html


On Mon, May 27, 2024 at 15:45 Satyam Raj  wrote:

> Hello guys,
> We're using Spark 3.5.0 for processing Kafka source that contains protobuf
> serialized data. The format is as follows:
>
> message Request {
>   long sent_ts = 1;
>   Event[] event = 2;
> }
>
> message Event {
>  string event_name = 1;
>  bytes event_bytes = 2;
> }
>
> The event_bytes contains the data for the event_name. event_name is the
> className of the Protobuf class.
> Currently, we parse the protobuf message from the Kafka topic, and for
> every event in the array, push the event_bytes to the `event_name` topic,
> over which spark jobs run and use the same event_name protobuf class to
> deserialize the data.
>
> Is there a better way to do all this in a single job?
>


Re: OOM concern

2024-05-27 Thread Russell Jurney
If you’re using EMR and Spark, you need to choose nodes with enough RAM to
accommodate any given partition in your data or you can get an OOM error.
Not sure if this job involves a reduce, but I would choose a single 128GB+
memory optimized instance and then adjust parallelism as via the Dpark docs
using pyspark.sql.DataFrame.repartition(n) at the start of your job.

Thanks,
Russell Jurney @rjurney 
russell.jur...@gmail.com LI  FB
 datasyndrome.com


On Mon, May 27, 2024 at 9:15 AM Perez  wrote:

> Hi Team,
>
> I want to extract the data from DB and just dump it into S3. I
> don't have to perform any transformations on the data yet. My data size
> would be ~100 GB (historical load).
>
> Choosing the right DPUs(Glue jobs) should solve this problem right? Or
> should I move to EMR.
>
> I don't feel the need to move to EMR but wanted the expertise suggestions.
>
> TIA.
>


Re: OOM concern

2024-05-27 Thread Meena Rajani
What exactly is the error? Is it erroring out while reading the data from
db? How are you partitioning the data?

How much memory currently do you have? What is the network time out?

Regards,
Meena


On Mon, May 27, 2024 at 4:22 PM Perez  wrote:

> Hi Team,
>
> I want to extract the data from DB and just dump it into S3. I
> don't have to perform any transformations on the data yet. My data size
> would be ~100 GB (historical load).
>
> Choosing the right DPUs(Glue jobs) should solve this problem right? Or
> should I move to EMR.
>
> I don't feel the need to move to EMR but wanted the expertise suggestions.
>
> TIA.
>


Re: [Spark SQL]: Does Spark support processing records with timestamp NULL in stateful streaming?

2024-05-27 Thread Mich Talebzadeh
When you use applyInPandasWithState, Spark processes each input row as it
arrives, regardless of whether certain columns, such as the timestamp
column, contain NULL values. This behavior is useful where you want to
handle incomplete or missing data gracefully within your stateful
processing logic. By allowing NULL timestamps to trigger calls to the
stateful function, you can implement custom handling strategies, such as
skipping incomplete records, within your stateful function.


However, it is important to understand that this behavior also *means that
the watermark is not advanced for NULL timestamps*. The watermark is used
for event-time processing in Spark Structured Streaming, to track the
progress of event-time in your data stream and is typically based on the
timestamp column. Since NULL timestamps do not contribute to the watermark
advancement,

Regarding whether you can rely on this behavior for your production code,
it largely depends on your requirements and use case. If your application
logic is designed to handle NULL timestamps appropriately and you have
tested it to ensure it behaves as expected, then you can generally rely on
this behavior. FYI, I have not tested it myself, so I cannot provide a
definitive answer.

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD  Imperial College
London 
London, United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 27 May 2024 at 22:04, Juan Casse  wrote:

> I am using applyInPandasWithState in PySpark 3.5.0.
>
> I noticed that records with timestamp==NULL are processed (i.e., trigger a
> call to the stateful function). And, as you would expect, does not advance
> the watermark.
>
> I am taking advantage of this in my application.
>
> My question: Is this a supported feature of Spark? Can I rely on this
> behavior for my production code?
>
> Thanks,
> Juan
>


Spark Protobuf Deserialization

2024-05-27 Thread Satyam Raj
Hello guys,
We're using Spark 3.5.0 for processing Kafka source that contains protobuf
serialized data. The format is as follows:

message Request {
  long sent_ts = 1;
  Event[] event = 2;
}

message Event {
 string event_name = 1;
 bytes event_bytes = 2;
}

The event_bytes contains the data for the event_name. event_name is the
className of the Protobuf class.
Currently, we parse the protobuf message from the Kafka topic, and for
every event in the array, push the event_bytes to the `event_name` topic,
over which spark jobs run and use the same event_name protobuf class to
deserialize the data.

Is there a better way to do all this in a single job?


[Spark SQL]: Does Spark support processing records with timestamp NULL in stateful streaming?

2024-05-27 Thread Juan Casse
I am using applyInPandasWithState in PySpark 3.5.0.

I noticed that records with timestamp==NULL are processed (i.e., trigger a
call to the stateful function). And, as you would expect, does not advance
the watermark.

I am taking advantage of this in my application.

My question: Is this a supported feature of Spark? Can I rely on this
behavior for my production code?

Thanks,
Juan


OOM concern

2024-05-27 Thread Perez
Hi Team,

I want to extract the data from DB and just dump it into S3. I
don't have to perform any transformations on the data yet. My data size
would be ~100 GB (historical load).

Choosing the right DPUs(Glue jobs) should solve this problem right? Or
should I move to EMR.

I don't feel the need to move to EMR but wanted the expertise suggestions.

TIA.


Re: Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame Processing

2024-05-27 Thread Shay Elbaz
Seen this before; had a very(!) complex plan behind a DataFrame, to the point 
where any additional transformation went OOM on the driver.

A quick and ugly solution was to break the plan - convert the DataFrame to rdd 
and back to DF at certain points to make the plan shorter. This has obvious 
drawbacks, and is not recommended in general, but at least we had something 
working. The real, long-term solution was to replace the many ( > 200)  
withColumn() calls to only a few select() calls. You can easily find sources on 
the internet for why this is better. (it was on Spark 2.3, but I think the main 
principles remain). TBH, it was easier than I expected, as it mainly involved 
moving pieces of code from one place to another, and not a "real", meaningful 
refactoring.



From: Mich Talebzadeh 
Sent: Monday, May 27, 2024 15:43
Cc: user@spark.apache.org 
Subject: Re: Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame 
Processing


This message contains hyperlinks, take precaution before opening these links.

Few ideas on top of my head for how to go about solving the problem


  1.  Try with subsets: Try reproducing the issue with smaller subsets of your 
data to pinpoint the specific operation causing the memory problems.
  2.  Explode or Flatten Nested Structures: If your DataFrame schema involves 
deep nesting, consider using techniques like explode or flattening to transform 
it into a less nested structure. This can reduce memory usage during operations 
like withColumn.
  3.  Lazy Evaluation: Use select before withColumn: this ensures lazy 
evaluation, meaning Spark only materializes the data when necessary. This can 
improve memory usage compared to directly calling withColumn on the entire 
DataFrame.
  4.  spark.sql.shuffle.partitions: Setting this configuration to a value close 
to the number of executors can improve shuffle performance and potentially 
reduce memory usage.
  5.  Spark UI Monitoring: Utilize the Spark UI to monitor memory usage 
throughout your job execution and identify potential memory bottlenecks.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
 Von 
Braun)".



On Mon, 27 May 2024 at 12:50, Gaurav Madan 
 wrote:
Dear Community,

I'm reaching out to seek your assistance with a memory issue we've been facing 
while processing certain large and nested DataFrames using Apache Spark. We 
have encountered a scenario where the driver runs out of memory when applying 
the `withColumn` method on specific DataFrames in Spark 3.4.1. However, the 
same DataFrames are processed successfully in Spark 2.4.0.

Problem Summary:
For certain DataFrames, applying the `withColumn` method in Spark 3.4.1 causes 
the driver to choke and run out of memory. However, the same DataFrames are 
processed successfully in Spark 2.4.0.

Heap Dump Analysis:
We performed a heap dump analysis after enabling heap dump on out-of-memory 
errors, and the analysis revealed the following significant frames and local 
variables:

```

org.apache.spark.sql.Dataset.withPlan(Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:4273)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

org.apache.spark.sql.Dataset.select(Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:1622)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:2820)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

org.apache.spark.sql.Dataset.withColumn(Ljava/lang/String;Lorg/apache/spark/sql/Column;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:2759)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.addStartTimeInPartitionColumn(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)Lorg/apache/spark/sql/Dataset;
 (DataPersistenceUtil.scala:88)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.writeRawData(Lorg/apache/spark

Re: Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame Processing

2024-05-27 Thread Mich Talebzadeh
Few ideas on top of my head for how to go about solving the problem


   1. Try with subsets: Try reproducing the issue with smaller subsets of
   your data to pinpoint the specific operation causing the memory problems.
   2. Explode or Flatten Nested Structures: If your DataFrame schema
   involves deep nesting, consider using techniques like explode or flattening
   to transform it into a less nested structure. This can reduce memory usage
   during operations like withColumn.
   3. Lazy Evaluation: Use select before withColumn: this ensures lazy
   evaluation, meaning Spark only materializes the data when necessary. This
   can improve memory usage compared to directly calling withColumn on the
   entire DataFrame.
   4. spark.sql.shuffle.partitions: Setting this configuration to a value
   close to the number of executors can improve shuffle performance and
   potentially reduce memory usage.
   5. Spark UI Monitoring: Utilize the Spark UI to monitor memory usage
   throughout your job execution and identify potential memory bottlenecks.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".



On Mon, 27 May 2024 at 12:50, Gaurav Madan
 wrote:

> Dear Community,
>
> I'm reaching out to seek your assistance with a memory issue we've been
> facing while processing certain large and nested DataFrames using Apache
> Spark. We have encountered a scenario where the driver runs out of memory
> when applying the `withColumn` method on specific DataFrames in Spark
> 3.4.1. However, the same DataFrames are processed successfully in Spark
> 2.4.0.
>
>
> *Problem Summary:*For certain DataFrames, applying the `withColumn`
> method in Spark 3.4.1 causes the driver to choke and run out of memory.
> However, the same DataFrames are processed successfully in Spark 2.4.0.
>
>
> *Heap Dump Analysis:*We performed a heap dump analysis after enabling
> heap dump on out-of-memory errors, and the analysis revealed the following
> significant frames and local variables:
>
> ```
>
> org.apache.spark.sql.Dataset.withPlan(Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;)Lorg/apache/spark/sql/Dataset;
> (Dataset.scala:4273)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> org.apache.spark.sql.Dataset.select(Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
> (Dataset.scala:1622)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
> (Dataset.scala:2820)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> org.apache.spark.sql.Dataset.withColumn(Ljava/lang/String;Lorg/apache/spark/sql/Column;)Lorg/apache/spark/sql/Dataset;
> (Dataset.scala:2759)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.addStartTimeInPartitionColumn(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)Lorg/apache/spark/sql/Dataset;
> (DataPersistenceUtil.scala:88)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.writeRawData(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Ljava/lang/String;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)V
> (DataPersistenceUtil.scala:19)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.step.bronze.BronzeStep$.persistRecords(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)V
> (BronzeStep.scala:23)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.MainJob$.processRecords(Lorg/apache/spark/sql/SparkSession;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;Lorg/apache/spark/sql/Dataset;)V
> (MainJob.scala:78)
>
> org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
> bytes
>
> com.urbanclap.dp.eventpersistence.MainJob$.processBatchCB(Lorg/apache/spark/sql/SparkSession;Lcom/urbanclap/dp/eventpersistence/JobMetadata;Lcom/urbanclap/dp/factory/output/Event;Lorg/apache/spark/sql/Dataset;)V
> (MainJob.sc

Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame Processing

2024-05-27 Thread Gaurav Madan
Dear Community,

I'm reaching out to seek your assistance with a memory issue we've been
facing while processing certain large and nested DataFrames using Apache
Spark. We have encountered a scenario where the driver runs out of memory
when applying the `withColumn` method on specific DataFrames in Spark
3.4.1. However, the same DataFrames are processed successfully in Spark
2.4.0.


*Problem Summary:*For certain DataFrames, applying the `withColumn` method
in Spark 3.4.1 causes the driver to choke and run out of memory. However,
the same DataFrames are processed successfully in Spark 2.4.0.


*Heap Dump Analysis:*We performed a heap dump analysis after enabling heap
dump on out-of-memory errors, and the analysis revealed the following
significant frames and local variables:

```

org.apache.spark.sql.Dataset.withPlan(Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;)Lorg/apache/spark/sql/Dataset;
(Dataset.scala:4273)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
bytes

org.apache.spark.sql.Dataset.select(Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
(Dataset.scala:1622)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
bytes

org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
(Dataset.scala:2820)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
bytes

org.apache.spark.sql.Dataset.withColumn(Ljava/lang/String;Lorg/apache/spark/sql/Column;)Lorg/apache/spark/sql/Dataset;
(Dataset.scala:2759)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
bytes

com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.addStartTimeInPartitionColumn(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)Lorg/apache/spark/sql/Dataset;
(DataPersistenceUtil.scala:88)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
bytes

com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.writeRawData(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Ljava/lang/String;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)V
(DataPersistenceUtil.scala:19)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
bytes

com.urbanclap.dp.eventpersistence.step.bronze.BronzeStep$.persistRecords(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)V
(BronzeStep.scala:23)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
bytes

com.urbanclap.dp.eventpersistence.MainJob$.processRecords(Lorg/apache/spark/sql/SparkSession;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;Lorg/apache/spark/sql/Dataset;)V
(MainJob.scala:78)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
bytes

com.urbanclap.dp.eventpersistence.MainJob$.processBatchCB(Lorg/apache/spark/sql/SparkSession;Lcom/urbanclap/dp/eventpersistence/JobMetadata;Lcom/urbanclap/dp/factory/output/Event;Lorg/apache/spark/sql/Dataset;)V
(MainJob.scala:66)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%)
bytes

```


*Driver Configuration:*1. Driver instance: c6g.xlarge with 4 vCPUs and 8 GB
RAM.
2.  `spark.driver.memory` and `spark.driver.memoryOverhead` are set to
default values.


*Observations:*- The DataFrame schema is very nested and large, which might
be contributing to the memory issue.
- Despite similar configurations, Spark 2.4.0 processes the DataFrame
without issues, while Spark 3.4.1 does not.


*Tried Solutions:*We have tried several solutions, including disabling
Adaptive Query Execution, setting the driver max result size, increasing
driver cores, and enabling specific optimizer rules. However, the issue
persisted until we increased the driver memory to 48 GB and memory overhead
to 5 GB, which allowed the driver to schedule the tasks successfully.


*Request for Suggestions:*Are there any additional configurations or
optimizations that could help mitigate this memory issue without always
resorting to a larger machine? We would greatly appreciate any insights or
recommendations from the community on how to resolve this issue effectively.

I have attached the DataFrame schema and the complete stack trace from the
heap dump analysis for your reference.

Doc explaining the issue

DataFrame Schema


Thank you in advance for your assistance.

Best regards,
Gaurav Madan
LinkedIn 
*Personal Mail: *gauravmadan...@gmail.com
*Work Mail:* gauravma...@urbancompany.com