Hi all, I've been looking heavily into Spark 2.2 to solve a problem I have by specifically using mapGroupsWithState. What I've discovered is that a *groupBy(window(..))* does not work when being used with a subsequent *mapGroupsWithState* and produces an AnalysisException of :
*"mapGroupsWithState is not supported with aggregation on a streaming DataFrame/Dataset;;"* I have http logs that have been rolled up via a previous jobs window function in the form of: {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"}, "account": "A", "verb": "GET","statusCode": 500, "eventCount": 10} {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"}, "account": "A", "verb": "GET","statusCode": 200, "eventCount": 89} In this data the *when* sub-object is of one minute blocks. I'd lock to use a *window* function to aggregate that to 10 minute windows and sum the eventCount by grouping on account, verb, and statusCode. From there I'd like to *mapGroupsWithState* for each *account* and *verb* to produce buckets for some configurable window, say 10 minutes for example's sake, of the form: {"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:20"}, "account": "A", "verb": "GET", "totalRequests": 999, "totalErrors": 198} *mapGroupsWithState* is perfect for this but, as stated, I've not found a way to apply a window function *and* use the mapsGroupsWithState. Example: ds.withColumn("bucket", $"when.from") .withWatermark("bucket", "1 minutes") .groupBy(window($"bucket", "10 minutes"), -- buckets and sums smaller windowed events into a rolled up larger window event with summed eventCount $"account", $"verb", $"statusCode") .agg( sum($"eventCount") ) .map(r => Log(....)) .groupByKey(l => (l.when, l.account, l.verb)) -- maps .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout -- will calculate totalErrors / totalRequests per bucket .EventTimeTimeout()) { case ((when: Window, account: String, verb: String), events: Iterator[Log], state: GroupState[SessionInfo]) => { .......... } } Any suggestions would be greatly appreciated. I've also noticed that *groupByKey().reduceGroups()* does not work with *mapGroupsWithState *which is another strategy that I've tried. Thanks. dan