Hi,

I’m attempting to leverage flatMapGroupsWithState to handle some arbitrary
aggregations and am noticing a couple of things:

   - *ProcessingTimeTimeout* + *setTimeoutDuration* timeout not being
   honored
   - *EventTimeTimeout* + watermark value not being honored.
   - *EventTimeTimeout* + *setTimeoutTimestamp* not being honored

I’ve come to this conclusion due to never hitting a conditional check (with
log output) for the *hasTimedOut* property. Each of these scenarios was
tested in isolation from each other and all three exhibited the same
behavior — failure to reach a timeout event, and Spark induced huge
duration between batches.

The test was 2000 messages read from a Kafka topic with two distinct groups
(1000 messages / group).

To give an idea of what I’m attempting to do: aggregate all events into a
single bucket given some timeout expiry.

Also, it should be noted, in this example I’m attempting to get the *final*
value of the GroupState object as its timedout. This is why I attempt to do
a second pass on the timeout — but that doesn’t really matter as I’m not
even getting the timeout event.

My code is here:

    val stream = reader
      .load()
      .selectExpr(
        "CAST(key AS STRING)",
        "topic",
        "CAST(value AS BINARY)",
        "timestamp"
      )
      .as[KafkaLoadType].map(el =>
getJacksonReader(classOf[Data]).readValue[Data](new String(el._3)))
      .withWatermark("when", "10 seconds")
      .groupByKey(f => (f.name, f.when))
      .flatMapGroupsWithState[SessionInfo, Result](OutputMode.Append,
GroupStateTimeout.EventTimeTimeout()) {
      case ((name, when),
      events: Iterator[Data], state: GroupState[SessionInfo]) => {

        state.setTimeoutTimestamp(DateTime.now.plusMinutes(1).getMillis)

        info("Starting flatMapGroupsWithState func")

        val asList = events.toList

        info(s"${name} iterator size: ${asList.size}")

        if (state.exists) {
          info(s"State exists: ${state.get}")
        }

        var session = state.getOption.getOrElse(SessionInfo.zero(when, name))

        asList.foreach(e => {
          session = session.add(e.value)
        })

        info(s"Updating value to ${session}")

        state.update(session)

        val result = if (state.hasTimedOut && !state.get.finalized) {
          info("State has timedout ... finalizing")

          state.update(state.get.copy(finalized = true))

          Iterator(Option(state.get).map(r => Result(r.when, r.name,
r.value)).get)
        } else if (state.hasTimedOut && state.get.finalized) {
          info("State has timedout AND is finalized")

          val r = state.get

          state.remove()

          Iterator(Option(r).map(r => Result(r.when, r.name, r.value)).get)
        } else {
          val result = state.get

          info(s"Returning ${result}")

          //          state.remove()

          Iterator(Option(result).map(r => Result(r.when, r.name, r.value)).get)
        }

        info("Exiting flatMapGroupsWithState func")

        result
      }
    }.writeStream.trigger(Trigger.ProcessingTime(500))
      .format("console").option("truncate", false)
      .outputMode(OutputMode.Append)
      .start()

​



Thanks for any help.

dan

Reply via email to