Hi all,

I've been looking heavily into Spark 2.2 to solve a problem I have by
specifically using mapGroupsWithState.  What I've discovered is that a
*groupBy(window(..))* does not work when being used with a subsequent
*mapGroupsWithState* and produces an AnalysisException of :

*"mapGroupsWithState is not supported with aggregation on a streaming
DataFrame/Dataset;;"*

I have http logs that have been rolled up via a previous jobs window
function in the form of:

{"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"},
"account": "A", "verb": "GET","statusCode": 500, "eventCount": 10}
{"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"},
"account": "A", "verb": "GET","statusCode": 200, "eventCount": 89}

In this data the *when* sub-object is of one minute blocks.  I'd lock to
use a *window* function to aggregate that to 10 minute windows and sum the
eventCount by grouping on account, verb, and statusCode.  From there I'd
like to *mapGroupsWithState* for each *account* and *verb* to produce
buckets for some configurable window, say 10 minutes for example's sake, of
the form:

{"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:20"},
"account": "A", "verb": "GET", "totalRequests": 999, "totalErrors": 198}

*mapGroupsWithState* is perfect for this but, as stated, I've not found a
way to apply a window function *and* use the mapsGroupsWithState.

Example:

ds.withColumn("bucket", $"when.from")

.withWatermark("bucket", "1 minutes")

.groupBy(window($"bucket", "10 minutes"), -- buckets and sums smaller
windowed events into a rolled up larger window event with summed eventCount

  $"account",

  $"verb",

  $"statusCode")

.agg(

  sum($"eventCount")

)

.map(r => Log(....))

.groupByKey(l => (l.when, l.account, l.verb)) -- maps

.mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout -- will
calculate totalErrors / totalRequests per bucket

       .EventTimeTimeout()) {

   case ((when: Window, account: String, verb: String),

             events: Iterator[Log],

             state: GroupState[SessionInfo]) => {

        ..........

  }
}


Any suggestions would be greatly appreciated.

I've also noticed that *groupByKey().reduceGroups()* does not work
with *mapGroupsWithState
*which is another strategy that I've tried.

Thanks.

dan

Reply via email to