The fact that you have 60 partitions or brokers in kaka  is not directly
correlated  to Spark Structured Streaming (SSS) executors by itself. See
below.

Spark starts with 200 partitions. However, by default, Spark/PySpark
creates partitions that are equal to the number of CPU cores in the node,
the so called vcores. So it depends on the number of nodes you are using in
your spark cluster.

Without doing a PoC you would not need to worry about repartition(10) in
your writeStream. I suggest that for now you remove that parameter and
observe the spark processing through Spark GUI (default port 4040) and in
particular the page on Structured Streaming". Your sink is Delta Lake which
is no different from any other data warehouses such as Google BigQuery.

My general advice, the usual thing to watch  from Spark GUI

Processing Time (Process Rate)  + Reserved Capacity < Batch Interval (Batch
Duration)

If your sink ( Delta Lake) has an issue absorbing data in a timely manner
as per above formulae, you will see the defect on the Processing Rate

Batch Interval, i.e. the rate at which the upstream source sends messages
through Kafka. We can start by assuming that the rate of increase in the
number of messages processed (processing time) will require an additional
reserved capacity. We can anticipate a heuristic 70% (~1SD) increase in the
processing time so in theory you  should be able to handle all this work
below the batch interval.

The parameter which I think many deploy is
spark.streaming.backpressure.enabled
> (spark.conf.set("spark.streaming.backpressure.enabled", "true"). The
central idea is that if a component is struggling to keep up, it should
communicate to upstream components and get them to reduce the load. In the
context of Spark Streaming, the receiver is the upstream component which
gets notified if the executors cannot keep up. There are a number of
occasions this will  (not just necessarily the spike in the incoming
messages). For example:

   - Streaming Source: Unexpected short burst of incoming messages in
   source system
   - YARN: Lost Spark executors due to node(s) failure
   - External Sink System: High load on external systems such as Delta
   Lake, BigQuery etc

Without backpressure, microbatches queue up over time and the scheduling
delay increases (check Operation Duration from GUI).

The next parameter I think of is sparkStreamingBackpressurePidMinRate. It is
 the total records per second. It relies on
spark.streaming.kafka.maxRatePerPartition, (not set), which is the maximum
rate (number of records per second) at which messages will be read from
each Kafka partition.

So  sparkStreamingBackpressurePidMinRate starts with

n (total number of kafka partitions)
* spark.streaming.kafka.maxRatePerPartition * Batch Interval

spark.streaming.kafka.maxRatePerPartition is used to control the maximum
rate of data ingestion from Kafka per partition. Kafka topics can have
multiple partitions, and Spark Streaming processes data in parallel by
reading from these partitions.
If you set spark.streaming.kafka.maxRatePerPartition to 1000, Spark
Streaming will consume data from each Kafka partition at a rate of up to
1000 messages per second.

So in your case if you set it goes something like

60 * 1000 * Batch Interval (in seconds)

Of course I stand corrected.

HTH

Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 5 Oct 2023 at 05:54, Shao Yang Hong
<shaoyang.h...@ninjavan.co.invalid> wrote:

> Hi all on user@spark:
>
> We are looking for advice and suggestions on how to tune the
> .repartition() parameter.
>
> We are using Spark Streaming on our data pipeline to consume messages
> and persist them to a Delta Lake
> (https://delta.io/learn/getting-started/).
>
> We read messages from a Kafka topic, then add a generated date column
> as a daily partitioning, and save these records to Delta Lake. We have
> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
> (so 4 Kafka partitions per executor).
>
> How then, should we use .repartition()? Should we omit this parameter?
> Or set it to 15? or 4?
>
> Our code looks roughly like the below:
>
> ```
>     df = (
>         spark.readStream.format("kafka")
>         .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
>         .option("subscribe", os.environ["KAFKA_TOPIC"])
>         .load()
>     )
>
>     table = (
>         df.select(
>             from_protobuf(
>                 "value", "table", "/opt/protobuf-desc/table.desc"
>             ).alias("msg")
>         )
>         .withColumn("uuid", col("msg.uuid"))
>         # etc other columns...
>
>         # generated column for daily partitioning in Delta Lake
>         .withColumn(CREATED_DATE,
> date_format(from_unixtime("msg.logged_at"), "yyyy-MM-dd"))
>         .drop("msg")
>     )
>
>     query = (
>         table
>         .repartition(10).writeStream
>         .queryName(APP_NAME)
>         .outputMode("append")
>         .format("delta")
>         .partitionBy(CREATED_DATE)
>         .option("checkpointLocation", os.environ["CHECKPOINT"])
>         .start(os.environ["DELTA_PATH"])
>     )
>
>     query.awaitTermination()
>     spark.stop()
> ```
>
> Any advice would be appreciated.
>
> --
> Best Regards,
> Shao Yang HONG
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to