Thanks! I've managed to implement a working solution with the trigger API,
but I'm not exactly sure why it works.
I'm doing the following:
DataStream<SessionSummary> summaries = env
.addSource(kafkaConsumer, "playerEvents(Kafka)")
.name("EP - Read player events from Kafka")
.uid("EP - Read player events from Kafka")
.map(json -> DECODER.decode(json,
TypeToken.of(HashMap.class))).returns(HashMap.class)
.name("EP - Map Json to HashMap")
.uid("EP - Map Json to HashMap")
.filter((FilterFunction<HashMap>) event ->
!(event.get(Field.SESSION_ID) == null))
.name("EP - Remove any events without sessionId since they
shouldn't generate sessions.")
.uid("EP - Remove any events without sessionId since they
shouldn't generate sessions.")
.filter((FilterFunction<HashMap>) event -> event.get(Field.ACCOUNT_ID))
.keyBy((KeySelector<HashMap, String>) event -> (String)
event.get(Field.SESSION_ID))
.window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(2)))
.trigger(new SessionTrigger())
.aggregate(new SummaryAggregator())
.name("EP - Aggregate events into session summaries")
.uid("EP - Aggregate events into session summaries");
summaries.print();
With the following trigger (omitting parts of the trigger):
[ ... ]
@Override
public TriggerResult onElement(HashMap element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
// If an end event is detected, emit the content and purge
if(endSession.contains(element.get(Field.EVENT_TYPE))) {
return TriggerResult.FIRE_AND_PURGE;
}
if (firstSeen.value() == null) {
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime()
+ 10000L);
ctx.registerProcessingTimeTimer(window.maxTimestamp());
firstSeen.update(true);
}
logger.info("Current window end is {} for session {}",
window.maxTimestamp(), element.get(Field.SESSION_ID));
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window,
TriggerContext ctx) throws Exception {
// Emit the current result every time the processing time trigger fires
if (time == window.maxTimestamp()) {
return TriggerResult.FIRE_AND_PURGE;
} else {
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime()
+ 10000L);
return TriggerResult.FIRE;
}
}
[ ... ]
So what I'm doing is setting the
ctx.registerProcessingTimeTimer(window.maxTimestamp()); however I only set
this once at the first event. But when testing it does work as I want and
fires every ten seconds and the fires and purges only after no events have
been received for 2 minutes (as specified in the SessionWindow). Is the
processingTimeTimer being updated every time the window end time is
increased (I noticed this happens in the background by Flink every time a
new event arrives)?
I'm happy with my solution, just trying to figure out how things work!
Cheers,
Tim
On Thu, 29 Apr 2021 at 18:42, Till Rohrmann <[email protected]> wrote:
> If you use the Trigger API, then you don't have to do anything special for
> fault tolerance. When using the ProcessFunction, then you should use
> Flink's state primitives to store your state (e.g. ValueState). This will
> automatically checkpoint the state. In case of a failure Flink will always
> resume from the latest successfully completed checkpoint.
>
> Cheers,
> Till
>
> On Thu, Apr 29, 2021 at 12:25 PM Tim Josefsson <[email protected]>
> wrote:
>
>> Thanks for the suggestions! I'll see if I can implement something that
>> works!
>> A followup question, more related to state. If I implement either the
>> custom trigger with or the process function, how will they handle crashes
>> and such. So if I for instance have a checkpointing interval of 10s will
>> the job recover from the last checkpoint with all the summaries as they
>> were at that point. Or do I have to implement specific ValueStates in both
>> cases?
>>
>> On Thu, 29 Apr 2021 at 10:25, Till Rohrmann <[email protected]> wrote:
>>
>>> Hi Tim,
>>>
>>> I think you could use Flink's trigger API [1] to implement a trigger
>>> which fires when it sees a certain event or after some time.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Apr 28, 2021 at 5:25 PM Tim Josefsson <[email protected]>
>>> wrote:
>>>
>>>> Hello!
>>>>
>>>> I'm trying to figure out how to implement a window that will emit
>>>> events at regular intervals or when a specific event is encountered.
>>>>
>>>> A bit of background. I have a stream of events from devices that will
>>>> send events to our system whenever a user watches a video. These events
>>>> include a unique id (sessionId) shared by all events of the same same
>>>> session so I want to key my stream on this. After that I want to aggregate
>>>> all the events into a session summary and this summary I want to emit every
>>>> 5 minutes however I still want to keep the summary in the window (in case
>>>> more events for that session arrives). However if I were to receive an end
>>>> event (sent by the device once a user stops watching the video) I want to
>>>> emit the summary and remove it from the window.
>>>>
>>>> Is it possible to do this with one of the existing windows together
>>>> with a trigger or in some other way? Been trying to figure it out by
>>>> reading the docs but haven't gotten any wiser so turning to the mailing
>>>> list for help.
>>>>
>>>> Best regards,
>>>> Tim
>>>>
>>>
>>
>> --
>>
>> *Tim Josefsson*
>> [image: Webstep GPtW] <http://www.webstep.se/>
>> mobil +46 (0) 707 81 91 12
>> telefon +46 (0) 8 21 40 70
>>
>> [email protected]
>> *webstep.se <http://www.webstep.se/>*
>> Suttungs gränd 2
>> 753 19 Uppsala
>> Stockholm | Uppsala | Malmö | Sundsvall | Oslo
>> Bergen | Stavanger | Trondheim | Kristiansand
>> [image: LinkedIn] <http://www.linkedin.com/company/webstep-ab> [image:
>> Facebook] <http://www.facebook.com/webstepAB> [image: Facebook]
>> <http://www.instagram.com/webstep_sverige>
>>
>
--
*Tim Josefsson*
[image: Webstep GPtW] <http://www.webstep.se/>
mobil +46 (0) 707 81 91 12
telefon +46 (0) 8 21 40 70
[email protected]
*webstep.se <http://www.webstep.se/>*
Suttungs gränd 2
753 19 Uppsala
Stockholm | Uppsala | Malmö | Sundsvall | Oslo
Bergen | Stavanger | Trondheim | Kristiansand
[image: LinkedIn] <http://www.linkedin.com/company/webstep-ab> [image:
Facebook] <http://www.facebook.com/webstepAB> [image: Facebook]
<http://www.instagram.com/webstep_sverige>