[ 
https://issues.apache.org/jira/browse/SPARK-49473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Koray Beyaz updated SPARK-49473:
--------------------------------
    Issue Type: Improvement  (was: Bug)

> Performance improvement for window rangeBetween then Filter queries
> -------------------------------------------------------------------
>
>                 Key: SPARK-49473
>                 URL: https://issues.apache.org/jira/browse/SPARK-49473
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 4.0.0
>            Reporter: Koray Beyaz
>            Priority: Minor
>
> There is a performance improvement opportunity for a special query where 
> rangeBetween window function is followed by a filter expression.
>  
> Consider the following query:
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> val df = spark.range(10).withColumnRenamed("id", 
> "day_index").withColumn("sales", lit(10))
> df.write.parquet("sales")
>  
> val w = Window.partitionBy().orderBy("day_index").rangeBetween(-1, 0)
> val res = spark.read.parquet("sales").withColumn("moving_average_sales", 
> mean("sales").over(w)).filter("day_index = 9")
> res.explain
> {code}
>  
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Filter (isnotnull(day_index#74L) AND (day_index#74L = 9))
>    +- Window [avg(sales#75) windowspecdefinition(day_index#74L ASC NULLS 
> FIRST, specifiedwindowframe(RangeFrame, -1, currentrow$())) AS 
> moving_average_sales#78], [day_index#74L ASC NULLS FIRST]
>       +- Sort [day_index#74L ASC NULLS FIRST], false, 0
>          +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=185]
>             +- FileScan parquet [day_index#74L,sales#75] Batched: true, 
> DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 
> paths)[file:/home/koray/Desktop/libs/spark/sales], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct<day_index:bigint,sales:int>{code}
>  
> In this case, we have filtered for day_index = 9. Since the window is 
> rangeBetween(-1, 0), we can filter the input dataframe for only day_index IS 
> IN (8, 9). In general, Window orderBy column and the filter column should be 
> the same for this to work. 
>  
> This is especially useful for calculating lags, moving averages for specific 
> dates (today for example).
>  
> Do you think it is possible/doable to inject the filter
> {code:java}
> day_index IS IN (8, 9) {code}
> so that the filter will be pushed down? Is this something we can do with 
> Catalyst with a reasonable effort?
>  
> [~gurwls223] [~cloud_fan] tagging to get your opinion on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to