Re: Unable to restore state value after job failed using RocksDBStateBackend

2019-06-25 Thread Simon Su
Hi Lei Wang Actually it will not work, job recovery from checkpoint by using jobid to detect the snapshot directory, but when restart it in intellj and didn’t set any configurations, jobid will regenerate and it will regard as a new job, so you get the null state every time, you can follow by

open() setup method not being called for AggregateFunctions?

2019-06-25 Thread Piyush Narang
Hi folks, I’ve tried to create some Flink UDAFs that I’m invoking using the Table / SQL api. In these UDAFs I’ve overridden the open() method to perform some setup operations (in my case initialize some metric counters). I noticed that this open() function isn’t being invoked in either the Dat

Re: Process Function's timers "postponing"

2019-06-25 Thread Andrea Spina
Hi Yun, thank you so much. That was an idea, I wanted to avoid to store an additional state for it. In the end, I went for coalescing as documentation suggested so that I will have just one timer per interval. What I didn't catch initially from the documentation is that* for a determined key and a

Re: Process Function's timers "postponing"

2019-06-25 Thread Yun Tang
If you are using processing time, one possible way is to track last registered in another ValueState. And you could call #deleteProcessingTimeTimer(time) when you register new timer and found previous timer which stored in ValueState has smaller timestamp(T1) than current time (T2). After delet

Re: Process Function's timers "postponing"

2019-06-25 Thread Andrea Spina
Hi Yun, thank you for your answer. I'm not sure I got your point. My question is: for the same key K, I process two records R1 at t1 and R2 at t2. When I process R1, I set a timer to be triggered at T1 which is > t2 When I process R2, I set a timer to be triggered at T2 which is > T1, but in order

Re: Process Function's timers "postponing"

2019-06-25 Thread Yun Tang
Hi Andrea If my understanding is correct, you just want to know when the eventual timer would be deleted. When you register your timer into 'processingTimeTimersQueue' (where your timer stored) at [1], the 'SystemProcessingTimeService' would then schedule a runnable TriggerTask after the "postp

Re: Re: Unable to restore state value after job failed using RocksDBStateBackend

2019-06-25 Thread wangl...@geekplus.com.cn
I start and cancel it just in my intellij idea development environment. First click the run button, then click the red stop button, and then click the run button again. Let me google about the savepoint. Thanks, Lei Wang wangl...@geekplus.com.cn From: Stephan Ewen Date: 2019-06-25

Re: Unable to restore state value after job failed using RocksDBStateBackend

2019-06-25 Thread Stephan Ewen
If you manually cancel and restart the job, state is only carried forward if you use a savepoint. Can you check if that is what you are doing? On Tue, Jun 25, 2019 at 2:21 PM Simon Su wrote: > > Hi wanglei > > Can you post how you restart the job ? > > Thanks, > Simon > On 06/25/2019 20:11,wang

Re:Unable to restore state value after job failed using RocksDBStateBackend

2019-06-25 Thread Simon Su
Hi wanglei Can you post how you restart the job ? Thanks, Simon On 06/25/2019 20:11,wangl...@geekplus.com.cn wrote: public class StateProcessTest extends KeyedProcessFunction, String> { private transient ValueState> state; public void processElement(Tuple2 value, Context ctx, Collector ou

Unable to restore state value after job failed using RocksDBStateBackend

2019-06-25 Thread wangl...@geekplus.com.cn
public class StateProcessTest extends KeyedProcessFunction, String> { private transient ValueState> state; public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception { Tuple2 stateValue = state.value(); if(stateValue == null){