[ https://issues.apache.org/jira/browse/SPARK-40821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jungtaek Lim reassigned SPARK-40821: ------------------------------------ Assignee: Alex Balikov > Fix late record filtering to support chaining of steteful operators > ------------------------------------------------------------------- > > Key: SPARK-40821 > URL: https://issues.apache.org/jira/browse/SPARK-40821 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 3.3.0 > Reporter: Alex Balikov > Assignee: Alex Balikov > Priority: Major > > Currently chaining of stateful operators is Spark Structured Streaming is not > supported for various reasons and is blocked by the unsupported operations > check (spark.sql.streaming.unsupportedOperationCheck flag). We propose to fix > this as chaining of stateful operators is a common streaming scenario - e.g. > stream-stream join -> windowed aggregation > window aggregation -> window aggregation > etc > What is broken: > # every stateful operator performs late record filtering against the global > watermark. When chaining stateful operators (e.g. window aggregations) the > output produced by the first stateful operator is effectively late against > the watermark and thus filtered out by the next operator late record > filtering (technically the next operator should not do late record filtering > but it can be changed to assert for correctness detection, etc) > # when chaining window aggregations, the first window aggregating operator > produces records with schema \{ window: { start: Timestamp, end: Timestamp }, > agg: Long } - there is not explicit event time in the schema to be used by > the next stateful operator (the correct event time should be window.end - 1 ) > # stream-stream time-interval join can produce late records by semantics, > e.g. if the join condition is: > left.eventTime BETWEEN right.eventTime + INTERVAL 1 HOUR right.eventTime - > INTERVAL 1 HOUR > the produced records can be delayed by 1 hr relative to the > watermark. > Proposed fixes: > 1. 1 can be fixed by performing late record filtering against the previous > microbatch watermark instead of the current microbatch watermark. > 2. 2 can be fixed by allowing the window and session_window functions to work > on the window column directly and compute the correct event time > transparently to the user. Also, introduce window_time SQL function to > compute correct event time from the window column. > 3. 3 can be fixed by adding support for per-operator watermarks instead of a > single global watermark. In the example of stream-stream time interval join > followed by a stateful operator, the join operator will 'delay' the > downstream operator watermarks by a correct value to handle the delayed > records. Only stream-stream time-interval joins will be delaying the > watermark, any other operators will not delay downstream watermarks. > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org