I've set the timeout duration to "2 minutes" as follows:

def updateAcrossEvents (tuple3: Tuple3[String, String, String],
inputs: Iterator[R00tJsonObject],
                          oldState: GroupState[MyState]): OutputRow = {

    println("$$$$ Inside updateAcrossEvents with : " + tuple3._1 + ",
" + tuple3._2 + ", " + tuple3._3)
    var state: MyState = if (oldState.exists) oldState.get else
MyState(tuple3._1, tuple3._2, tuple3._3)

    if (oldState.hasTimedOut) {
      println("@@@@@ oldState has timed out @@@@")
      // Logic to Write OutputRow
      OutputRow("some values here...")
    } else {
      for (input <- inputs) {
        state = updateWithEvent(state, input)
        oldState.update(state)
        *oldState.setTimeoutDuration("2 minutes")*
      }
      OutputRow(null, null, null)
    }

  }

I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as
follows...

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)

But 'hasTimedOut' is never true so I don't get any output! What am I
doing wrong?

Reply via email to