The fact that you have 60 partitions or brokers in kaka is not directly correlated to Spark Structured Streaming (SSS) executors by itself. See below.
Spark starts with 200 partitions. However, by default, Spark/PySpark creates partitions that are equal to the number of CPU cores in the node, the so called vcores. So it depends on the number of nodes you are using in your spark cluster. Without doing a PoC you would not need to worry about repartition(10) in your writeStream. I suggest that for now you remove that parameter and observe the spark processing through Spark GUI (default port 4040) and in particular the page on Structured Streaming". Your sink is Delta Lake which is no different from any other data warehouses such as Google BigQuery. My general advice, the usual thing to watch from Spark GUI Processing Time (Process Rate) + Reserved Capacity < Batch Interval (Batch Duration) If your sink ( Delta Lake) has an issue absorbing data in a timely manner as per above formulae, you will see the defect on the Processing Rate Batch Interval, i.e. the rate at which the upstream source sends messages through Kafka. We can start by assuming that the rate of increase in the number of messages processed (processing time) will require an additional reserved capacity. We can anticipate a heuristic 70% (~1SD) increase in the processing time so in theory you should be able to handle all this work below the batch interval. The parameter which I think many deploy is spark.streaming.backpressure.enabled > (spark.conf.set("spark.streaming.backpressure.enabled", "true"). The central idea is that if a component is struggling to keep up, it should communicate to upstream components and get them to reduce the load. In the context of Spark Streaming, the receiver is the upstream component which gets notified if the executors cannot keep up. There are a number of occasions this will (not just necessarily the spike in the incoming messages). For example: - Streaming Source: Unexpected short burst of incoming messages in source system - YARN: Lost Spark executors due to node(s) failure - External Sink System: High load on external systems such as Delta Lake, BigQuery etc Without backpressure, microbatches queue up over time and the scheduling delay increases (check Operation Duration from GUI). The next parameter I think of is sparkStreamingBackpressurePidMinRate. It is the total records per second. It relies on spark.streaming.kafka.maxRatePerPartition, (not set), which is the maximum rate (number of records per second) at which messages will be read from each Kafka partition. So sparkStreamingBackpressurePidMinRate starts with n (total number of kafka partitions) * spark.streaming.kafka.maxRatePerPartition * Batch Interval spark.streaming.kafka.maxRatePerPartition is used to control the maximum rate of data ingestion from Kafka per partition. Kafka topics can have multiple partitions, and Spark Streaming processes data in parallel by reading from these partitions. If you set spark.streaming.kafka.maxRatePerPartition to 1000, Spark Streaming will consume data from each Kafka partition at a rate of up to 1000 messages per second. So in your case if you set it goes something like 60 * 1000 * Batch Interval (in seconds) Of course I stand corrected. HTH Mich Talebzadeh, Distinguished Technologist, Solutions Architect & Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 5 Oct 2023 at 05:54, Shao Yang Hong <shaoyang.h...@ninjavan.co.invalid> wrote: > Hi all on user@spark: > > We are looking for advice and suggestions on how to tune the > .repartition() parameter. > > We are using Spark Streaming on our data pipeline to consume messages > and persist them to a Delta Lake > (https://delta.io/learn/getting-started/). > > We read messages from a Kafka topic, then add a generated date column > as a daily partitioning, and save these records to Delta Lake. We have > 60 Kafka partitions on the Kafka topic, 15 Spark executor instances > (so 4 Kafka partitions per executor). > > How then, should we use .repartition()? Should we omit this parameter? > Or set it to 15? or 4? > > Our code looks roughly like the below: > > ``` > df = ( > spark.readStream.format("kafka") > .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"]) > .option("subscribe", os.environ["KAFKA_TOPIC"]) > .load() > ) > > table = ( > df.select( > from_protobuf( > "value", "table", "/opt/protobuf-desc/table.desc" > ).alias("msg") > ) > .withColumn("uuid", col("msg.uuid")) > # etc other columns... > > # generated column for daily partitioning in Delta Lake > .withColumn(CREATED_DATE, > date_format(from_unixtime("msg.logged_at"), "yyyy-MM-dd")) > .drop("msg") > ) > > query = ( > table > .repartition(10).writeStream > .queryName(APP_NAME) > .outputMode("append") > .format("delta") > .partitionBy(CREATED_DATE) > .option("checkpointLocation", os.environ["CHECKPOINT"]) > .start(os.environ["DELTA_PATH"]) > ) > > query.awaitTermination() > spark.stop() > ``` > > Any advice would be appreciated. > > -- > Best Regards, > Shao Yang HONG > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >