First of all, I guess you've asked for using both "arbitrary stateful operation" and "native support of windowing". (Even you don't deal with state directly, whenever you use stateful operations like streaming aggregation or stream-stream join, you use state.)
In short, there's no native support of windowing when you use flatMapGroupsWithState - actually the meaning of "arbitrary" is "low level" - you may need to deal with window logic by yourself. Please refer below example to see how you can deal with window in flatMapGroupsWithState (processing time session window): https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala There're two types of APIs in Spark Dataset - "typed" and "untyped". Most of features are available in untyped API (as you've referred), but not exhaustive like (flat)MapGroupsWithState which leverage typed API. "window" function is not supported in typed API so they cannot be used altogether. I guess the restriction was introduced for simplicity and performance, but not 100% sure. You can still leverage "window" function to populate rows with window bound, and map them via typed API, and apply groupByKey. Please take a look at below query: you'll get more rows than the number of input rows here since it applies sliding window. val inputData = MemoryStream[(Long, Long)] > > inputData.toDF() > .selectExpr("_1", "CAST(_2 / 1000 AS TIMESTAMP) AS timestamp") > .select(col("*"), window(col("timestamp"), "10 seconds", "5 > seconds").as("window")) > .select(col("_1"), col("window.start").as("window_start"), > col("window.end").as("window_end")) > .as[(Long, Timestamp, Timestamp)] > Hope it helps. Thanks, Jungtaek Lim (HeartSaVioR) On Wed, Oct 16, 2019 at 6:36 AM Richard Reitmeyer <richard_reitme...@symantec.com.invalid> wrote: > What’s the right way use Structured Streaming with both state and windows? > > > > Looking at the slides from > https://www.slideshare.net/databricks/arbitrary-stateful-aggregations-using-structured-streaming-in-apache-spark > slides 26 and 31, it looks like stateful processing events for every > device every minute should be > > > > events > > .withWatermark(“event_time”, “2 minutes”) > > .groupBy(“device_id”, window(“event_time”, “1 minute”)) > > .flatMapWithState(…)(…) > > … > > > > But with Apache Spark 2.4.4 this won’t work and it looks like > https://issues.apache.org/jira/browse/SPARK-21641 is to blame. > > > > What’s the recommended way to handle this? > > > > >