Yes.... that was it! It seems it only works if input data is continuously flowing. I had stopped the input job because I had enough data but it seems timeouts work only if the data is continuously fed. Not sure why it's designed that way. Makes it a bit harder to write unit/integration tests BUT I am sure there's a reason why it's designed this way. Thanks.
On Wed, Mar 4, 2020 at 6:31 PM Tathagata Das <tathagata.das1...@gmail.com> wrote: > Make sure that you are continuously feeding data into the query to trigger > the batches. only then timeouts are processed. > See the timeout behavior details here - > https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState > > On Wed, Mar 4, 2020 at 2:51 PM Something Something < > mailinglist...@gmail.com> wrote: > >> I've set the timeout duration to "2 minutes" as follows: >> >> def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: >> Iterator[R00tJsonObject], >> oldState: GroupState[MyState]): OutputRow = { >> >> println("$$$$ Inside updateAcrossEvents with : " + tuple3._1 + ", " + >> tuple3._2 + ", " + tuple3._3) >> var state: MyState = if (oldState.exists) oldState.get else >> MyState(tuple3._1, tuple3._2, tuple3._3) >> >> if (oldState.hasTimedOut) { >> println("@@@@@ oldState has timed out @@@@") >> // Logic to Write OutputRow >> OutputRow("some values here...") >> } else { >> for (input <- inputs) { >> state = updateWithEvent(state, input) >> oldState.update(state) >> *oldState.setTimeoutDuration("2 minutes")* >> } >> OutputRow(null, null, null) >> } >> >> } >> >> I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as >> follows... >> >> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents) >> >> But 'hasTimedOut' is never true so I don't get any output! What am I doing >> wrong? >> >> >> >>