Thanks, please do keep me posted!
> On 20 Mar 2017, at 21:50, Florian König <florian.koe...@micardo.com> wrote:
>
> @Aljoscha Thank you for the pointer to ProcessFunction. That looks like a
> better approach with less code and other overhead.
>
> After restoring, the job is both reading new elements and emitting some, but
> nowhere near as many as expected. I’ll investigate further after switching to
> ProcessFunction. I suspect that there is some problem with my code. I’ll let
> you know if any unexplained discrepancy remains.
>
>> Am 20.03.2017 um 14:15 schrieb Aljoscha Krettek <aljos...@apache.org>:
>>
>> As a general remark, I think the ProcessFunction
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html)
>> could be better suited for implementing such a use case.
>>
>> I did run tests on Flink 1.2 and master with a simple processing-time
>> windowing job. After performing a savepoint and waiting a few minutes I
>> restored and the windows that were still there immediately fired.
>>
>> In your case, after restoring, is the Job also reading new elements or did
>> you try with just restoring without any new input?
>>
>>> On 19 Mar 2017, at 13:15, Florian König <florian.koe...@micardo.com> wrote:
>>>
>>> @Aljoscha: We’re using 1.2.
>>>
>>> The intention of our code is as follows: The events that flow through Flink
>>> represent scheduling decisions, i.e. they contain the ID of a target
>>> entity, a description of an action that should be performed on that entity
>>> by some other job, and a timestamp of when that should happen.
>>>
>>> We’re using the windowing mechanism to delay those events until they should
>>> be forwarded (and trigger the corresponding action). Furthermore, the
>>> schedule can be moved closer to the current point in time: subsequent
>>> schedule events for an entity (identified by its ID) can set the trigger
>>> time to an earlier instant. If the trigger time is in the past or very
>>> shortly (e.g., 100 ms) after now, the action should be triggered
>>> immediately. Actions scheduled for an instant after the currently planned
>>> one should be ignored; i.e. the schedule cannot be moved to the future.
>>>
>>> exemplary event stream
>>> time … (ID, action, trigger time) // intended reaction
>>> 0 … (1, 'foo', 10) // trigger action 'foo' on entity 1 at
>>> time 10
>>> 3 … (2, 'bar', 15) // trigger action 'bar' on entity 2 at
>>> time 15
>>> 4 … (1, 'foo', 7) // move trigger back to time 7
>>> 9 … (1, 'foo', 12) // ignore
>>> 15 … (2, 'bar', 15) // trigger immediately
>>>
>>> resulting stream:
>>> (1, 'foo', 7) // at time 7
>>> (2, 'bar', 15) // at time 15
>>>
>>> To implement this, we have written a custom trigger that’s called from the
>>> following Flink code:
>>>
>>> …
>>> schedules.keyBy(schedule -> schedule.entityId)
>>> .window(GlobalWindows.create())
>>> .trigger(DynamicEarliestWindowTrigger.create())
>>> .fold((Schedule) null, (folded, schedule) -> schedule)
>>> .map( /* process schedules */ )
>>> …
>>>
>>> We fold the scheduling events 'to themselves', because only the latest
>>> event in each period is relevant. The custom trigger is implemented as
>>> follows (only Flink-revelvant parts and syntax):
>>>
>>> class DynamicEarliestWindowTrigger<T extends Timestamped, W extends Window>
>>> extends Trigger<T, W> {
>>>
>>> ValueStateDescriptor<Long> windowEnd = new
>>> ValueStateDescriptor<>("windowEnd", Long.class);
>>>
>>> TriggerResult onElement(T element, long timestamp, W window,
>>> TriggerContext ctx) throws Exception {
>>> val windowEndState = ctx.getPartitionedState(windowEnd);
>>> val windowEndsAt = windowEndState.value();
>>> val newEnd = element.getTimestamp();
>>>
>>> // no timer set yet, or intention to trigger earlier
>>> if (windowEndsAt == null || newEnd <= windowEndsAt) {
>>> deleteCurrentTimer(ctx);
>>>
>>> // trigger time far enough from now => schedule timer
>>> if (newEnd > System.currentTimeMillis() + 100) {
>>> ctx.registerProcessingTimeTimer(newEnd);
>>> windowEndState.update(newEnd);
>>> } else {
>>> return TriggerResult.FIRE; // close enough
>>> => fire immediately
>>> }
>>> }
>>>
>>> // ignore events that should be triggered in the future
>>> return TriggerResult.CONTINUE;
>>> }
>>>
>>> // fire when timer has reached pre-set time
>>> TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
>>> throws Exception {
>>> return TriggerResult.FIRE_AND_PURGE;
>>> }
>>>
>>> // noop
>>> TriggerResult onEventTime(long time, W window, TriggerContext ctx)
>>> throws Exception {
>>> return TriggerResult.CONTINUE;
>>> }
>>>
>>> void clear(W window, TriggerContext ctx) throws Exception {
>>> deleteCurrentTimer(ctx);
>>> }
>>>
>>> void deleteCurrentTimer(TriggerContext ctx) throws Exception {
>>> val windowEndState = ctx.getPartitionedState(windowEnd);
>>> val windowEndsAt = windowEndState.value();
>>>
>>> if (windowEndsAt != null) {
>>> ctx.deleteProcessingTimeTimer(windowEndsAt);
>>> windowEndState.clear();
>>> }
>>> }
>>>
>>> boolean canMerge() { return false; }
>>> }
>>>
>>> The job state grows by the number of scheduled entities and the mechanism
>>> works as intended, as long as the job runs. However, due to unrelated
>>> reasons, the job sometimes fails and is restarted from a checkpoint. The
>>> state size after the restore tells me that the state has been restored.
>>>
>>> Yet, the mechanism stops working and none of the old scheduling events that
>>> must have been ‚waiting‘ in the window for the timer to trigger are
>>> actually forwarded. Hence my question if it’s possible that timers may not
>>> be restored.
>>>
>>> Any ideas what might have gone wrong? Is there a better way to implement
>>> such a mechanism?
>>>
>>> Thanks and enjoy the rest of your weekend :)
>>> Florian
>>>
>>>
>>>> Am 17.03.2017 um 16:51 schrieb Aljoscha Krettek <aljos...@apache.org>:
>>>>
>>>> When restoring, processing-time timers that would have fired already
>>>> should immediately fire.
>>>>
>>>> @Florian what Flink version are you using? In Flink 1.1 there was a bug
>>>> that led to processing-time timers not being reset when restoring.
>>>>> On 17 Mar 2017, at 15:39, Florian König <florian.koe...@micardo.com>
>>>>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> funny coincidence, I was just about to ask the same thing. I have noticed
>>>>> this with restored checkpoints in one of my jobs. The timers seem to be
>>>>> gone. My window trigger registers a processing timer, but it seems that
>>>>> these don’t get restored - even if the timer is set to fire in the
>>>>> future, after the restore.
>>>>>
>>>>> Is there something I need to be aware of in my class implementing
>>>>> Trigger? Anything I forgot to set in a method that’s being called upon a
>>>>> restore?
>>>>>
>>>>> Thanks
>>>>> Florian
>>>>>
>>>>>> Am 17.03.2017 um 15:14 schrieb Yassine MARZOUGUI
>>>>>> <y.marzou...@mindlytix.com>:
>>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> How does the processing time timer behave when a job is taken down with
>>>>>> a savepoint and then restarted after the timer was supposed to fire?
>>>>>> Will the timer fire at restart because it was missed during the
>>>>>> savepoint?
>>>>>>
>>>>>> I'm wondering because I would like to schedule periodic timers in the
>>>>>> future (in processing time) at which a state is read and emitted, but
>>>>>> I'm afraid the timer will never fire if it occurs when the job is being
>>>>>> down, and therefore the state will never be emitted.
>>>>>>
>>>>>> Best,
>>>>>> Yassine
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>
>