Re: ProcessingTimeTimer in ProcessFunction after a savepoint

2017-03-19 Thread Florian König
@Aljoscha: We’re using 1.2.

The intention of our code is as follows: The events that flow through Flink 
represent scheduling decisions, i.e. they contain the ID of a target entity, a 
description of an action that should be performed on that entity by some other 
job, and a timestamp of when that should happen.

We’re using the windowing mechanism to delay those events until they should be 
forwarded (and trigger the corresponding action). Furthermore, the schedule can 
be moved closer to the current point in time: subsequent schedule events for an 
entity (identified by its ID) can set the trigger time to an earlier instant. 
If the trigger time is in the past or very shortly (e.g., 100 ms) after now, 
the action should be triggered immediately. Actions scheduled for an instant 
after the currently planned one should be ignored; i.e. the schedule cannot be 
moved to the future.

exemplary event stream
time … (ID, action, trigger time)   // intended reaction
0 … (1, 'foo', 10)  // trigger action 'foo' on entity 1 at 
time 10
3 … (2, 'bar', 15)  // trigger action 'bar' on entity 2 at 
time 15
4 … (1, 'foo', 7)   // move trigger back to time 7
9 … (1, 'foo', 12)  // ignore
15 … (2, 'bar', 15) // trigger immediately

resulting stream:
(1, 'foo', 7)   // at time 7
(2, 'bar', 15)  // at time 15

To implement this, we have written a custom trigger that’s called from the 
following Flink code:

…
schedules.keyBy(schedule -> schedule.entityId)
.window(GlobalWindows.create())
.trigger(DynamicEarliestWindowTrigger.create())
.fold((Schedule) null, (folded, schedule) -> schedule)
.map( /* process schedules */ )
…

We fold the scheduling events 'to themselves', because only the latest event in 
each period is relevant. The custom trigger is implemented as follows (only 
Flink-revelvant parts and syntax):

class DynamicEarliestWindowTrigger 
extends Trigger {

ValueStateDescriptor windowEnd = new 
ValueStateDescriptor<>("windowEnd", Long.class);

TriggerResult onElement(T element, long timestamp, W window, 
TriggerContext ctx) throws Exception {
val windowEndState = ctx.getPartitionedState(windowEnd);
val windowEndsAt = windowEndState.value();
val newEnd = element.getTimestamp();

// no timer set yet, or intention to trigger earlier
if (windowEndsAt == null || newEnd <= windowEndsAt) {
deleteCurrentTimer(ctx);

// trigger time far enough from now => schedule timer
if (newEnd > System.currentTimeMillis() + 100) {
ctx.registerProcessingTimeTimer(newEnd);
windowEndState.update(newEnd);
} else {
return TriggerResult.FIRE;  // close enough 
=> fire immediately
}
}

// ignore events that should be triggered in the future
return TriggerResult.CONTINUE;
}

// fire when timer has reached pre-set time
TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) 
throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}

// noop
TriggerResult onEventTime(long time, W window, TriggerContext ctx) 
throws Exception {
return TriggerResult.CONTINUE;
}

void clear(W window, TriggerContext ctx) throws Exception {
deleteCurrentTimer(ctx);
}

void deleteCurrentTimer(TriggerContext ctx) throws Exception {
val windowEndState = ctx.getPartitionedState(windowEnd);
val windowEndsAt = windowEndState.value();

if (windowEndsAt != null) {
ctx.deleteProcessingTimeTimer(windowEndsAt);
windowEndState.clear();
}
}

boolean canMerge() { return false; }
}

The job state grows by the number of scheduled entities and the mechanism works 
as intended, as long as the job runs. However, due to unrelated reasons, the 
job sometimes fails and is restarted from a checkpoint. The state size after 
the restore tells me that the state has been restored.

Yet, the mechanism stops working and none of the old scheduling events that 
must have been ‚waiting‘ in the window for the timer to trigger are actually 
forwarded. Hence my question if it’s possible that timers may not be restored.

Any ideas what might have gone wrong? Is there a better way to implement such a 
mechanism?

Thanks and enjoy the rest of your weekend :)
Florian



Re: Telling if a job has caught up with Kafka

2017-03-19 Thread Florian König
Thanks Gordon for the detailed explanation! That makes sense and explains the 
expected behaviour.

The JIRA for the new metric also sounds very good. Can’t wait to have this in 
the Flink GUI (KafkaOffsetMonitor has some problems and stops working after 1-2 
days, don’t know the reason yet).

All the best,
Florian


> Am 18.03.2017 um 08:38 schrieb Tzu-Li (Gordon) Tai :
> 
> @Florian
> the 0.9 / 0.10 version and 0.8 version behave a bit differently right now for 
> the offset committing.
> 
> In 0.9 / 0.10, if checkpointing is enabled, the “auto.commit.enable” etc. 
> settings will be completely ignored and overwritten before used to 
> instantiate the interval Kafka clients, hence committing will only happen on 
> Flink checkpoints.
> 
> In 0.8, this isn’t the case. Both automatic periodic committing and 
> committing on checkpoints can take place. That’s perhaps why you’re observing 
> the 0.8 consumer to be committing more frequently.
> 
> FYI: This behaviour will be unified in Flink 1.3.0. If you’re interested, you 
> can take a look at https://github.com/apache/flink/pull/3527.
> 
> - Gordon
> 
> 
> On March 17, 2017 at 6:07:38 PM, Florian König (florian.koe...@micardo.com) 
> wrote:
> 
>> Why is that so? The checkpoint contains the Kafka offset and would be able 
>> to start reading wherever it left off, regardless of any offset stored in 
>> Kafka or Zookeeper. Why is the offset not committed regularly, independently 
>> from the checkpointing? Or did I misconfigure anything?