[ 
https://issues.apache.org/jira/browse/SPARK-40925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-40925:
------------------------------------

    Assignee: Alex Balikov

> Fix late record filtering to support chaining of steteful operators
> -------------------------------------------------------------------
>
>                 Key: SPARK-40925
>                 URL: https://issues.apache.org/jira/browse/SPARK-40925
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 3.4.0
>            Reporter: Alex Balikov
>            Assignee: Alex Balikov
>            Priority: Major
>
> This is followup ticket on https://issues.apache.org/jira/browse/SPARK-40821.
> Here we propose fixing the late record filtering in stateful operators to 
> allow chaining of stateful operators which do not produce delayed records 
> (like time-interval join of potentially flatMapGroupsWithState) - e.g. 
> time-equality streaming join followed by aggregations or chaining of window 
> aggregations.
>  
> There are 2 issues which need to be addressed:
>  # Stateful operators filter input late records based on the current 
> watermark. If e.g. chaining window aggregations, the records produced by the 
> first window aggregation will be behind the current watermark by semantics 
> (the watermark closes all past windows and emits the corresponding 
> aggregates) and therefore these records will by definition appear late 
> relative to the current watermark in the second stateful operator. The 
> proposed fix for this issue is to use the previous batch watermark for late 
> record filtering and the current batch watermark for state eviction - 
> effectively each stateful operator should be initialized with 2 watermark 
> values instead of 1.
>  # The second issue with chaining window aggregators is that the records 
> produced by the first aggregator do not have explicit event time column and 
> thus can not be directly fed into a subsequent stateful operator which needs 
> that column. This is partially handled by 
> [https://github.com/apache/spark/pull/38288] so the user can explicitly 
> introduce a new event time column by extracting the event time from the 
> window column. This is slightly cumbersome. We propose changing the window 
> function to handle the window column transparently - e.g.
> input
>   .withWatermark("eventTime", "1 seconds")
>   .groupBy(window($"eventTime", "5 seconds") as 'window)
>   .agg(count("*") as 'count)
>   .groupBy({_}*window($"window", "10 seconds")*{_})
>   .agg(count("*") as 'count, sum("count") as 'sum)
>   .select($"count".as[Long], $"sum".as[Long])
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to