[ 
https://issues.apache.org/jira/browse/SPARK-53972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anish Shrigondekar resolved SPARK-53972.
----------------------------------------
    Fix Version/s: 4.1.0
       Resolution: Fixed

Issue resolved by pull request 52688
[https://github.com/apache/spark/pull/52688]

> Streaming Query RecentProgress performance regression in Classic Pyspark
> ------------------------------------------------------------------------
>
>                 Key: SPARK-53972
>                 URL: https://issues.apache.org/jira/browse/SPARK-53972
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 4.0.0, 4.0.1
>            Reporter: Zifei Feng
>            Assignee: Zifei Feng
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 4.1.0
>
>         Attachments: Screenshot 2025-10-21 at 9.26.23 AM.png, Screenshot 
> 2025-10-21 at 9.26.29 AM.png
>
>
> We have identified a significant performance regression in Apache Spark's 
> streaming recentProgress method in python notebook starting from version 
> 4.0.0. The time required to fetch recentProgress increases substantially as 
> the number of progress records grows, creating a linear or worse scaling 
> issue. We only observe this behavior in classic pyspark.
> With the following code, it output charts for time it takes to get 
> recentProgress before and after changes in [this 
> commit|https://github.com/apache/spark/commit/22eb6c4b0a82b9fcf84fc9952b1f6c41dde9bd8d#diff-4d4ed29d139877b160de444add7ee63cfa7a7577d849ab2686f1aa2d5b4aae64]
> ```
> %python
> from datetime import datetime
> import time
> df = spark.readStream.format("rate").option("rowsPerSecond", 10).load()
> q = df.writeStream.format("noop").start()
> print("begin waiting for progress")
> progress_list = []
> time_diff_list = []
> numProgress = len(q.recentProgress)
> while numProgress < 70 and q.exception() is None:
> time.sleep(1)
> beforeTime = datetime.now()
> print(beforeTime.strftime("%Y-%m-%d %H:%M:%S") +": before we got those 
> progress: "+str(numProgress))
> rep = q.recentProgress
> numProgress = len(rep)
> afterTime = datetime.now()
> print(afterTime.strftime("%Y-%m-%d %H:%M:%S") +": after we got those 
> progress: "+str(numProgress))
> time_diff = (afterTime - beforeTime).total_seconds()
> print("Total Time: "+str(time_diff) +" seconds")
> progress_list.append(numProgress)
> time_diff_list.append(time_diff)
> q.stop()
> q.awaitTermination()
> assert(q.exception() is None)
> import pandas as pd
> plot_df = pd.DataFrame(\{'numProgress': progress_list, 'time_diff': 
> time_diff_list})
> display(spark.createDataFrame(plot_df).orderBy("numProgress").toPandas().plot.line(x="numProgress",
>  y="time_diff"))
> ```
> See attachment for the generated graph. Attachment 1 is regression shown in 
> current version. Attachment 2 is regression shown in previous version



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to