Re: Stateful Structured Spark Streaming: Timeout is not getting triggered

2020-03-05 Thread Something Something
Yes that was it! It seems it only works if input data is continuously
flowing. I had stopped the input job because I had enough data but it seems
timeouts work only if the data is continuously fed. Not sure why it's
designed that way. Makes it a bit harder to write unit/integration tests
BUT I am sure there's a reason why it's designed this way. Thanks.

On Wed, Mar 4, 2020 at 6:31 PM Tathagata Das 
wrote:

> Make sure that you are continuously feeding data into the query to trigger
> the batches. only then timeouts are processed.
> See the timeout behavior details here -
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState
>
> On Wed, Mar 4, 2020 at 2:51 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> I've set the timeout duration to "2 minutes" as follows:
>>
>> def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: 
>> Iterator[R00tJsonObject],
>>   oldState: GroupState[MyState]): OutputRow = {
>>
>> println(" Inside updateAcrossEvents with : " + tuple3._1 + ", " + 
>> tuple3._2 + ", " + tuple3._3)
>> var state: MyState = if (oldState.exists) oldState.get else 
>> MyState(tuple3._1, tuple3._2, tuple3._3)
>>
>> if (oldState.hasTimedOut) {
>>   println("@ oldState has timed out ")
>>   // Logic to Write OutputRow
>>   OutputRow("some values here...")
>> } else {
>>   for (input <- inputs) {
>> state = updateWithEvent(state, input)
>> oldState.update(state)
>> *oldState.setTimeoutDuration("2 minutes")*
>>   }
>>   OutputRow(null, null, null)
>> }
>>
>>   }
>>
>> I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as 
>> follows...
>>
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)
>>
>> But 'hasTimedOut' is never true so I don't get any output! What am I doing 
>> wrong?
>>
>>
>>
>>


Re: Stateful Structured Spark Streaming: Timeout is not getting triggered

2020-03-04 Thread Tathagata Das
Make sure that you are continuously feeding data into the query to trigger
the batches. only then timeouts are processed.
See the timeout behavior details here -
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState

On Wed, Mar 4, 2020 at 2:51 PM Something Something 
wrote:

> I've set the timeout duration to "2 minutes" as follows:
>
> def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: 
> Iterator[R00tJsonObject],
>   oldState: GroupState[MyState]): OutputRow = {
>
> println(" Inside updateAcrossEvents with : " + tuple3._1 + ", " + 
> tuple3._2 + ", " + tuple3._3)
> var state: MyState = if (oldState.exists) oldState.get else 
> MyState(tuple3._1, tuple3._2, tuple3._3)
>
> if (oldState.hasTimedOut) {
>   println("@ oldState has timed out ")
>   // Logic to Write OutputRow
>   OutputRow("some values here...")
> } else {
>   for (input <- inputs) {
> state = updateWithEvent(state, input)
> oldState.update(state)
> *oldState.setTimeoutDuration("2 minutes")*
>   }
>   OutputRow(null, null, null)
> }
>
>   }
>
> I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as 
> follows...
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)
>
> But 'hasTimedOut' is never true so I don't get any output! What am I doing 
> wrong?
>
>
>
>