Re: Are timers in ProcessFunction fault tolerant?

2017-05-26 Thread Kostas Kloudas
Yes, that is correct.

Kostas

> On May 26, 2017, at 11:05 AM, Moiz S Jinia  wrote:
> 
> Thanks Kostas. So even though the timer state is managed separately from the 
> key state (from runtimeContext) I can safely assume both the states to be 
> fault tolerant and maintain association with the key of the stream?
> 
> On Fri, May 26, 2017 at 1:51 PM, Kostas Kloudas  > wrote:
> Hi Moiz,
> 
> state.clear() refers to the state that you have registered in your job, using 
> the getState()
> from the runtimeContext.
>  
> Timers are managed by Flink’s timer service and they are cleaned up by Flink 
> itself when 
> the job terminates.
> 
> Kostas
> 
>> On May 26, 2017, at 6:41 AM, Moiz S Jinia > > wrote:
>> 
>> A follow on question. Since the registered timers are part of the managed 
>> key state, do the timers get cancelled when i call state.clear()?
>> 
>> Moiz
>> 
>> On Thu, May 25, 2017 at 10:20 PM, Moiz S Jinia > > wrote:
>> Awesome. Thanks.
>> 
>> On Thu, May 25, 2017 at 10:13 PM, Eron Wright > > wrote:
>> Yes, registered timers are stored in managed keyed state and should be 
>> fault-tolerant. 
>> 
>> -Eron
>> 
>> On Thu, May 25, 2017 at 9:28 AM, Moiz S Jinia > > wrote:
>> With a checkpointed RocksDB based state backend, can I expect the registered 
>> processing timers to be fault tolerant? (along with the managed keyed state).
>> 
>> Example -
>> A task manager instance owns the key k1 (from a keyed stream) that has 
>> registered a processing timer with a timestamp thats a day ahead in the 
>> future. If this instance is killed, and the key is moved to another 
>> instance, will the onTimer trigger correctly on the other machine at the 
>> expected time with the same keyed state (for k1)?
>> 
>> Thanks,
>> Moiz
>> 
>> 
>> 
> 
> 



Re: Are timers in ProcessFunction fault tolerant?

2017-05-26 Thread Moiz S Jinia
Thanks Kostas. So even though the timer state is managed separately from
the key state (from runtimeContext) I can safely assume both the states to
be fault tolerant and maintain association with the key of the stream?

On Fri, May 26, 2017 at 1:51 PM, Kostas Kloudas  wrote:

> Hi Moiz,
>
> state.clear() refers to the state that you have registered in your job,
> using the getState()
> from the runtimeContext.
>
> Timers are managed by Flink’s timer service and they are cleaned up by
> Flink itself when
> the job terminates.
>
> Kostas
>
> On May 26, 2017, at 6:41 AM, Moiz S Jinia  wrote:
>
> A follow on question. Since the registered timers are part of the managed
> key state, do the timers get cancelled when i call state.clear()?
>
> Moiz
>
> On Thu, May 25, 2017 at 10:20 PM, Moiz S Jinia 
> wrote:
>
>> Awesome. Thanks.
>>
>> On Thu, May 25, 2017 at 10:13 PM, Eron Wright 
>> wrote:
>>
>>> Yes, registered timers are stored in managed keyed state and should be
>>> fault-tolerant.
>>>
>>> -Eron
>>>
>>> On Thu, May 25, 2017 at 9:28 AM, Moiz S Jinia 
>>> wrote:
>>>
 With a checkpointed RocksDB based state backend, can I expect the
 registered processing timers to be fault tolerant? (along with the managed
 keyed state).

 Example -
 A task manager instance owns the key k1 (from a keyed stream) that has
 registered a processing timer with a timestamp thats a day ahead in the
 future. If this instance is killed, and the key is moved to another
 instance, will the onTimer trigger correctly on the other machine at the
 expected time with the same keyed state (for k1)?

 Thanks,
 Moiz

>>>
>>>
>>
>
>


Re: Are timers in ProcessFunction fault tolerant?

2017-05-26 Thread Kostas Kloudas
Hi Moiz,

state.clear() refers to the state that you have registered in your job, using 
the getState()
from the runtimeContext.
 
Timers are managed by Flink’s timer service and they are cleaned up by Flink 
itself when 
the job terminates.

Kostas

> On May 26, 2017, at 6:41 AM, Moiz S Jinia  wrote:
> 
> A follow on question. Since the registered timers are part of the managed key 
> state, do the timers get cancelled when i call state.clear()?
> 
> Moiz
> 
> On Thu, May 25, 2017 at 10:20 PM, Moiz S Jinia  > wrote:
> Awesome. Thanks.
> 
> On Thu, May 25, 2017 at 10:13 PM, Eron Wright  > wrote:
> Yes, registered timers are stored in managed keyed state and should be 
> fault-tolerant. 
> 
> -Eron
> 
> On Thu, May 25, 2017 at 9:28 AM, Moiz S Jinia  > wrote:
> With a checkpointed RocksDB based state backend, can I expect the registered 
> processing timers to be fault tolerant? (along with the managed keyed state).
> 
> Example -
> A task manager instance owns the key k1 (from a keyed stream) that has 
> registered a processing timer with a timestamp thats a day ahead in the 
> future. If this instance is killed, and the key is moved to another instance, 
> will the onTimer trigger correctly on the other machine at the expected time 
> with the same keyed state (for k1)?
> 
> Thanks,
> Moiz
> 
> 
> 



Re: Are timers in ProcessFunction fault tolerant?

2017-05-25 Thread Moiz S Jinia
A follow on question. Since the registered timers are part of the managed
key state, do the timers get cancelled when i call state.clear()?

Moiz

On Thu, May 25, 2017 at 10:20 PM, Moiz S Jinia  wrote:

> Awesome. Thanks.
>
> On Thu, May 25, 2017 at 10:13 PM, Eron Wright 
> wrote:
>
>> Yes, registered timers are stored in managed keyed state and should be
>> fault-tolerant.
>>
>> -Eron
>>
>> On Thu, May 25, 2017 at 9:28 AM, Moiz S Jinia 
>> wrote:
>>
>>> With a checkpointed RocksDB based state backend, can I expect the
>>> registered processing timers to be fault tolerant? (along with the managed
>>> keyed state).
>>>
>>> Example -
>>> A task manager instance owns the key k1 (from a keyed stream) that has
>>> registered a processing timer with a timestamp thats a day ahead in the
>>> future. If this instance is killed, and the key is moved to another
>>> instance, will the onTimer trigger correctly on the other machine at the
>>> expected time with the same keyed state (for k1)?
>>>
>>> Thanks,
>>> Moiz
>>>
>>
>>
>


Re: Are timers in ProcessFunction fault tolerant?

2017-05-25 Thread Moiz S Jinia
Awesome. Thanks.

On Thu, May 25, 2017 at 10:13 PM, Eron Wright  wrote:

> Yes, registered timers are stored in managed keyed state and should be
> fault-tolerant.
>
> -Eron
>
> On Thu, May 25, 2017 at 9:28 AM, Moiz S Jinia 
> wrote:
>
>> With a checkpointed RocksDB based state backend, can I expect the
>> registered processing timers to be fault tolerant? (along with the managed
>> keyed state).
>>
>> Example -
>> A task manager instance owns the key k1 (from a keyed stream) that has
>> registered a processing timer with a timestamp thats a day ahead in the
>> future. If this instance is killed, and the key is moved to another
>> instance, will the onTimer trigger correctly on the other machine at the
>> expected time with the same keyed state (for k1)?
>>
>> Thanks,
>> Moiz
>>
>
>


Re: Are timers in ProcessFunction fault tolerant?

2017-05-25 Thread Tzu-Li (Gordon) Tai
Hi Moiz!

Adding a bit of more detail here:
Yes, the timer will be restored on whatever new instance is responsible for 
that key.
There is one “gotcha” to look out for, though: the firing time of timers are 
absolute; what this means is that if the checkpoints timer’s firing processing 
timestamp is t (which is basically the registering time + configured trigger 
time), then it will fire also at processing timestamp t on the new instance. 
Therefore, you should be aware of out-of-sync clocks between the 2 instances.

Another thing to note is that if the job isn’t running at t (when the timer is 
supposed to fire), then on restore, that timer is fired immediately.

Cheers,
Gordon

On 26 May 2017 at 12:44:00 AM, Eron Wright (eronwri...@gmail.com) wrote:

Yes, registered timers are stored in managed keyed state and should be 
fault-tolerant. 

-Eron

On Thu, May 25, 2017 at 9:28 AM, Moiz S Jinia  wrote:
With a checkpointed RocksDB based state backend, can I expect the registered 
processing timers to be fault tolerant? (along with the managed keyed state).

Example -
A task manager instance owns the key k1 (from a keyed stream) that has 
registered a processing timer with a timestamp thats a day ahead in the future. 
If this instance is killed, and the key is moved to another instance, will the 
onTimer trigger correctly on the other machine at the expected time with the 
same keyed state (for k1)?

Thanks,
Moiz



Re: Are timers in ProcessFunction fault tolerant?

2017-05-25 Thread Eron Wright
Yes, registered timers are stored in managed keyed state and should be
fault-tolerant.

-Eron

On Thu, May 25, 2017 at 9:28 AM, Moiz S Jinia  wrote:

> With a checkpointed RocksDB based state backend, can I expect the
> registered processing timers to be fault tolerant? (along with the managed
> keyed state).
>
> Example -
> A task manager instance owns the key k1 (from a keyed stream) that has
> registered a processing timer with a timestamp thats a day ahead in the
> future. If this instance is killed, and the key is moved to another
> instance, will the onTimer trigger correctly on the other machine at the
> expected time with the same keyed state (for k1)?
>
> Thanks,
> Moiz
>


Are timers in ProcessFunction fault tolerant?

2017-05-25 Thread Moiz S Jinia
With a checkpointed RocksDB based state backend, can I expect the
registered processing timers to be fault tolerant? (along with the managed
keyed state).

Example -
A task manager instance owns the key k1 (from a keyed stream) that has
registered a processing timer with a timestamp thats a day ahead in the
future. If this instance is killed, and the key is moved to another
instance, will the onTimer trigger correctly on the other machine at the
expected time with the same keyed state (for k1)?

Thanks,
Moiz