[ https://issues.apache.org/jira/browse/SPARK-40925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-40925: ------------------------------------ Assignee: Apache Spark > Fix late record filtering to support chaining of steteful operators > ------------------------------------------------------------------- > > Key: SPARK-40925 > URL: https://issues.apache.org/jira/browse/SPARK-40925 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 3.4.0 > Reporter: Alex Balikov > Assignee: Apache Spark > Priority: Major > Fix For: 3.4.0 > > > This is followup ticket on https://issues.apache.org/jira/browse/SPARK-40821. > Here we propose fixing the late record filtering in stateful operators to > allow chaining of stateful operators which do not produce delayed records > (like time-interval join of potentially flatMapGroupsWithState) - e.g. > time-equality streaming join followed by aggregations or chaining of window > aggregations. > > There are 2 issues which need to be addressed: > # Stateful operators filter input late records based on the current > watermark. If e.g. chaining window aggregations, the records produced by the > first window aggregation will be behind the current watermark by semantics > (the watermark closes all past windows and emits the corresponding > aggregates) and therefore these records will by definition appear late > relative to the current watermark in the second stateful operator. The > proposed fix for this issue is to use the previous batch watermark for late > record filtering and the current batch watermark for state eviction - > effectively each stateful operator should be initialized with 2 watermark > values instead of 1. > # The second issue with chaining window aggregators is that the records > produced by the first aggregator do not have explicit event time column and > thus can not be directly fed into a subsequent stateful operator which needs > that column. This is partially handled by > [https://github.com/apache/spark/pull/38288] so the user can explicitly > introduce a new event time column by extracting the event time from the > window column. This is slightly cumbersome. We propose changing the window > function to handle the window column transparently - e.g. > input > .withWatermark("eventTime", "1 seconds") > .groupBy(window($"eventTime", "5 seconds") as 'window) > .agg(count("*") as 'count) > .groupBy({_}*window($"window", "10 seconds")*{_}) > .agg(count("*") as 'count, sum("count") as 'sum) > .select($"count".as[Long], $"sum".as[Long]) > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org