[ 
https://issues.apache.org/jira/browse/SPARK-40821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-40821:
------------------------------------

    Assignee: Alex Balikov

> Fix late record filtering to support chaining of steteful operators
> -------------------------------------------------------------------
>
>                 Key: SPARK-40821
>                 URL: https://issues.apache.org/jira/browse/SPARK-40821
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 3.3.0
>            Reporter: Alex Balikov
>            Assignee: Alex Balikov
>            Priority: Major
>
> Currently chaining of stateful operators is Spark Structured Streaming is not 
> supported for various reasons and is blocked by the unsupported operations 
> check (spark.sql.streaming.unsupportedOperationCheck flag). We propose to fix 
> this as chaining of stateful operators is a common streaming scenario - e.g.
> stream-stream join -> windowed aggregation
> window aggregation -> window aggregation
> etc
> What is broken:
>  # every stateful operator performs late record filtering against the global 
> watermark. When chaining stateful operators (e.g. window aggregations) the 
> output produced by the first stateful operator is effectively late against 
> the watermark and thus filtered out by the next operator late record 
> filtering (technically the next operator should not do late record filtering 
> but it can be changed to assert for correctness detection, etc)
>  # when chaining window aggregations, the first window aggregating operator 
> produces records with schema \{ window: { start: Timestamp, end: Timestamp }, 
> agg: Long } - there is not explicit event time in the schema to be used by 
> the next stateful operator (the correct event time should be window.end - 1 )
>  # stream-stream time-interval join can produce late records by semantics, 
> e.g. if the join condition is:
> left.eventTime BETWEEN right.eventTime + INTERVAL 1 HOUR right.eventTime - 
> INTERVAL 1 HOUR
>           the produced records can be delayed by 1 hr relative to the 
> watermark.
> Proposed fixes:
>  1. 1 can be fixed by performing late record filtering against the previous 
> microbatch watermark instead of the current microbatch watermark.
> 2. 2 can be fixed by allowing the window and session_window functions to work 
> on the window column directly and compute the correct event time 
> transparently to the user. Also, introduce window_time SQL function to 
> compute correct event time from the window column.
> 3. 3 can be fixed by adding support for per-operator watermarks instead of a 
> single global watermark. In the example of stream-stream time interval join 
> followed by a stateful operator, the join operator will 'delay' the 
> downstream operator watermarks by a correct value to handle the delayed 
> records. Only stream-stream time-interval joins will be delaying the 
> watermark, any other operators will not delay downstream watermarks.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to