Stateful Structured Spark Streaming: Timeout is not getting triggered
I've set the timeout duration to "2 minutes" as follows: def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[R00tJsonObject], oldState: GroupState[MyState]): OutputRow = { println(" Inside updateAcrossEvents with : " + tuple3._1 + ", " + tuple3._2 + ", " + tuple3._3) var state: MyState = if (oldState.exists) oldState.get else MyState(tuple3._1, tuple3._2, tuple3._3) if (oldState.hasTimedOut) { println("@ oldState has timed out ") // Logic to Write OutputRow OutputRow("some values here...") } else { for (input <- inputs) { state = updateWithEvent(state, input) oldState.update(state) *oldState.setTimeoutDuration("2 minutes")* } OutputRow(null, null, null) } } I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as follows... .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents) But 'hasTimedOut' is never true so I don't get any output! What am I doing wrong?
Re: Stateful Structured Spark Streaming: Timeout is not getting triggered
Make sure that you are continuously feeding data into the query to trigger the batches. only then timeouts are processed. See the timeout behavior details here - https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState On Wed, Mar 4, 2020 at 2:51 PM Something Something wrote: > I've set the timeout duration to "2 minutes" as follows: > > def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: > Iterator[R00tJsonObject], > oldState: GroupState[MyState]): OutputRow = { > > println(" Inside updateAcrossEvents with : " + tuple3._1 + ", " + > tuple3._2 + ", " + tuple3._3) > var state: MyState = if (oldState.exists) oldState.get else > MyState(tuple3._1, tuple3._2, tuple3._3) > > if (oldState.hasTimedOut) { > println("@ oldState has timed out ") > // Logic to Write OutputRow > OutputRow("some values here...") > } else { > for (input <- inputs) { > state = updateWithEvent(state, input) > oldState.update(state) > *oldState.setTimeoutDuration("2 minutes")* > } > OutputRow(null, null, null) > } > > } > > I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as > follows... > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents) > > But 'hasTimedOut' is never true so I don't get any output! What am I doing > wrong? > > > >
Re: Stateful Structured Spark Streaming: Timeout is not getting triggered
Yes that was it! It seems it only works if input data is continuously flowing. I had stopped the input job because I had enough data but it seems timeouts work only if the data is continuously fed. Not sure why it's designed that way. Makes it a bit harder to write unit/integration tests BUT I am sure there's a reason why it's designed this way. Thanks. On Wed, Mar 4, 2020 at 6:31 PM Tathagata Das wrote: > Make sure that you are continuously feeding data into the query to trigger > the batches. only then timeouts are processed. > See the timeout behavior details here - > https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState > > On Wed, Mar 4, 2020 at 2:51 PM Something Something < > mailinglist...@gmail.com> wrote: > >> I've set the timeout duration to "2 minutes" as follows: >> >> def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: >> Iterator[R00tJsonObject], >> oldState: GroupState[MyState]): OutputRow = { >> >> println(" Inside updateAcrossEvents with : " + tuple3._1 + ", " + >> tuple3._2 + ", " + tuple3._3) >> var state: MyState = if (oldState.exists) oldState.get else >> MyState(tuple3._1, tuple3._2, tuple3._3) >> >> if (oldState.hasTimedOut) { >> println("@ oldState has timed out ") >> // Logic to Write OutputRow >> OutputRow("some values here...") >> } else { >> for (input <- inputs) { >> state = updateWithEvent(state, input) >> oldState.update(state) >> *oldState.setTimeoutDuration("2 minutes")* >> } >> OutputRow(null, null, null) >> } >> >> } >> >> I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as >> follows... >> >> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents) >> >> But 'hasTimedOut' is never true so I don't get any output! What am I doing >> wrong? >> >> >> >>