Make sure that you are continuously feeding data into the query to trigger the batches. only then timeouts are processed. See the timeout behavior details here - https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState
On Wed, Mar 4, 2020 at 2:51 PM Something Something <mailinglist...@gmail.com> wrote: > I've set the timeout duration to "2 minutes" as follows: > > def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: > Iterator[R00tJsonObject], > oldState: GroupState[MyState]): OutputRow = { > > println("$$$$ Inside updateAcrossEvents with : " + tuple3._1 + ", " + > tuple3._2 + ", " + tuple3._3) > var state: MyState = if (oldState.exists) oldState.get else > MyState(tuple3._1, tuple3._2, tuple3._3) > > if (oldState.hasTimedOut) { > println("@@@@@ oldState has timed out @@@@") > // Logic to Write OutputRow > OutputRow("some values here...") > } else { > for (input <- inputs) { > state = updateWithEvent(state, input) > oldState.update(state) > *oldState.setTimeoutDuration("2 minutes")* > } > OutputRow(null, null, null) > } > > } > > I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as > follows... > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents) > > But 'hasTimedOut' is never true so I don't get any output! What am I doing > wrong? > > > >