Re: Autoscaling in Spark

2023-10-10 Thread Mich Talebzadeh
This has been brought up a few times. I will focus on Spark Structured
Streaming

Autoscaling does not support Spark Structured Streaming (SSS). Why because
streaming jobs are typically long-running jobs that need to maintain state
across micro-batches. Autoscaling is designed to scale up and down Spark
clusters in response to workload changes However, this would cause problems
for Spark Structured Streaming jobs because it would cause the jobs to lose
their state. These jobs continuously process incoming data and update their
state incrementally (see checkpoint directory). Autoscaling, which can
dynamically add or remove worker nodes, would disrupt this stateful
processing. Although Spark itself supports dynamic allocation, (i.e. which
can add or remove executor nodes based on demand), it is not the same as
autoscaling in cloud  like GCP etc like Kubernetes or managed clusters. For
now you need to plan your workload in SSS accordingly.

My general advice, the usual thing to watch  from Spark GUI

Processing Time (Process Rate)  + Reserved Capacity < Batch Interval (Batch
Duration)

If your sink  has an issue absorbing data in a timely manner as per above
formulae, you will see the defect on the Processing Rate

Batch Interval, i.e. the rate at which the upstream source sends messages
through Kafka or other source. We can start by assuming that the rate of
increase in the number of messages processed (processing time) will require
an *additional reserved capacity*. We can anticipate a heuristic 70% (~1SD)
increase in the processing time so in theory you  should be able to handle
all this work below the batch interval.

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 10 Oct 2023 at 16:11, Kiran Biswal  wrote:

> Hello Experts
>
> Is there any true auto scaling option for spark? The dynamic auto scaling
> works only for batch. Any guidelines on spark streaming  autoscaling and
> how that will be tied to any cluster level autoscaling solutions?
>
> Thanks
>


Autoscaling in Spark

2023-10-10 Thread Kiran Biswal
Hello Experts

Is there any true auto scaling option for spark? The dynamic auto scaling
works only for batch. Any guidelines on spark streaming  autoscaling and
how that will be tied to any cluster level autoscaling solutions?

Thanks


Re: Updating delta file column data

2023-10-10 Thread Mich Talebzadeh
Hi,

Since you mentioned that  there could be duplicate records with the same
unique key in the Delta table, you will need a way to handle these
duplicate records. One approach I can suggest is to use a timestamp to
determine the latest or most relevant record among duplicates, the
so-called op_time column df = df.withColumn("op_time", current_timestamp())
at ingestion time, so you can determine the most relevant record etc

This is the pseudo-code suggestion

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, struct
appName = "DeltaHexToIntConversion"
spark = SparkSession.builder.appName(appName).getOrCreate()
delta_table_path = "path_to_your_delta_table"
df = spark.read.format("delta").load(delta_table_path)
df = df.withColumn(
"exploded_data",
struct(col("data.field1").cast("int").alias("field1_int"),
col("data.field2"))
)
df = df.select("other_columns", "exploded_data.field1_int",
"exploded_data.field2")
# Handling Duplicates:
# Define your logic here to select the most relevant record among
duplicates, say timestamp as mentioned above
df = df.dropDuplicates(["unique_key"], keep="last")
# merge the DataFrame back to the Delta table
df.write.format("delta").mode("mergr").option("mergeSchema",
"true").save(delta_table_path)


HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 9 Oct 2023 at 17:12, Mich Talebzadeh 
wrote:

> In a nutshell, is this what you are trying to do?
>
>
>1. Read the Delta table into a Spark DataFrame.
>2. Explode the string column into a struct column.
>3. Convert the hexadecimal field to an integer.
>4. Write the DataFrame back to the Delta table in merge mode with a
>unique key.
>
> Is this a fair assessment
>
> HTH
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 9 Oct 2023 at 14:46, Karthick Nk  wrote:
>
>> Hi All,
>>
>> I have  mentioned the sample data below and the operation I need to
>> perform over there,
>>
>> I have delta tables with columns, in that columns I have the data in the
>> string data type(contains the struct data),
>>
>> So, I need to update one key value in the struct field data in the string
>> column of the delta table.
>>
>> Note: I can able to explode the string column into the struct field and
>> into the individual field by using the following operation in the spark,
>>
>> [image: image.png]
>>
>> df_new = spark.read.json(df.rdd.map(lambda x: '{"data": x.data }')
>>
>> Could you suggest a possible way to perform the required action in an
>> optimistic way?
>>
>> Note: Please feel free to ask, if you need further information.
>>
>> Thanks & regards,
>> Karthick
>>
>> On Mon, Oct 2, 2023 at 10:53 PM Karthick Nk 
>> wrote:
>>
>>> Hi community members,
>>>
>>> In databricks adls2 delta tables, I need to perform the below operation,
>>> could you help me with your thoughts
>>>
>>>  I have the delta tables with one colum with data type string , which
>>> contains the json data in string data type, I need to do the following
>>> 1. I have to update one particular field value in the json and update it
>>> back in the same column of the data.
>>>
>>> Example :
>>>
>>> In string column, inside json I have one field with value in hexadecimal.
>>> Like { version : ''0xabcd1234"}
>>>
>>> I have to convert this field into corresponding integer value and update
>>> back into same strong column json value.
>>> Note: I have to perform this operation within this column. This column
>>> is basically with data type string in delta table.
>>>
>>> Could you suggest some sample example.
>>>
>>> Thanks in advance.
>>>
>>