Hi, I’m attempting to leverage flatMapGroupsWithState to handle some arbitrary aggregations and am noticing a couple of things:
- *ProcessingTimeTimeout* + *setTimeoutDuration* timeout not being honored - *EventTimeTimeout* + watermark value not being honored. - *EventTimeTimeout* + *setTimeoutTimestamp* not being honored I’ve come to this conclusion due to never hitting a conditional check (with log output) for the *hasTimedOut* property. Each of these scenarios was tested in isolation from each other and all three exhibited the same behavior — failure to reach a timeout event, and Spark induced huge duration between batches. The test was 2000 messages read from a Kafka topic with two distinct groups (1000 messages / group). To give an idea of what I’m attempting to do: aggregate all events into a single bucket given some timeout expiry. Also, it should be noted, in this example I’m attempting to get the *final* value of the GroupState object as its timedout. This is why I attempt to do a second pass on the timeout — but that doesn’t really matter as I’m not even getting the timeout event. My code is here: val stream = reader .load() .selectExpr( "CAST(key AS STRING)", "topic", "CAST(value AS BINARY)", "timestamp" ) .as[KafkaLoadType].map(el => getJacksonReader(classOf[Data]).readValue[Data](new String(el._3))) .withWatermark("when", "10 seconds") .groupByKey(f => (f.name, f.when)) .flatMapGroupsWithState[SessionInfo, Result](OutputMode.Append, GroupStateTimeout.EventTimeTimeout()) { case ((name, when), events: Iterator[Data], state: GroupState[SessionInfo]) => { state.setTimeoutTimestamp(DateTime.now.plusMinutes(1).getMillis) info("Starting flatMapGroupsWithState func") val asList = events.toList info(s"${name} iterator size: ${asList.size}") if (state.exists) { info(s"State exists: ${state.get}") } var session = state.getOption.getOrElse(SessionInfo.zero(when, name)) asList.foreach(e => { session = session.add(e.value) }) info(s"Updating value to ${session}") state.update(session) val result = if (state.hasTimedOut && !state.get.finalized) { info("State has timedout ... finalizing") state.update(state.get.copy(finalized = true)) Iterator(Option(state.get).map(r => Result(r.when, r.name, r.value)).get) } else if (state.hasTimedOut && state.get.finalized) { info("State has timedout AND is finalized") val r = state.get state.remove() Iterator(Option(r).map(r => Result(r.when, r.name, r.value)).get) } else { val result = state.get info(s"Returning ${result}") // state.remove() Iterator(Option(result).map(r => Result(r.when, r.name, r.value)).get) } info("Exiting flatMapGroupsWithState func") result } }.writeStream.trigger(Trigger.ProcessingTime(500)) .format("console").option("truncate", false) .outputMode(OutputMode.Append) .start() Thanks for any help. dan