I've set the timeout duration to "2 minutes" as follows: def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[R00tJsonObject], oldState: GroupState[MyState]): OutputRow = {
println("$$$$ Inside updateAcrossEvents with : " + tuple3._1 + ", " + tuple3._2 + ", " + tuple3._3) var state: MyState = if (oldState.exists) oldState.get else MyState(tuple3._1, tuple3._2, tuple3._3) if (oldState.hasTimedOut) { println("@@@@@ oldState has timed out @@@@") // Logic to Write OutputRow OutputRow("some values here...") } else { for (input <- inputs) { state = updateWithEvent(state, input) oldState.update(state) *oldState.setTimeoutDuration("2 minutes")* } OutputRow(null, null, null) } } I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as follows... .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents) But 'hasTimedOut' is never true so I don't get any output! What am I doing wrong?