Thanks! I've managed to implement a working solution with the trigger API,
but I'm not exactly sure why it works.
I'm doing the following:

DataStream<SessionSummary> summaries = env
        .addSource(kafkaConsumer, "playerEvents(Kafka)")
        .name("EP - Read player events from Kafka")
        .uid("EP - Read player events from Kafka")
        .map(json -> DECODER.decode(json,
TypeToken.of(HashMap.class))).returns(HashMap.class)
        .name("EP - Map Json to HashMap")
        .uid("EP - Map Json to HashMap")
        .filter((FilterFunction<HashMap>) event ->
!(event.get(Field.SESSION_ID) == null))
        .name("EP - Remove any events without sessionId since they
shouldn't generate sessions.")
        .uid("EP - Remove any events without sessionId since they
shouldn't generate sessions.")
        .filter((FilterFunction<HashMap>) event -> event.get(Field.ACCOUNT_ID))
        .keyBy((KeySelector<HashMap, String>) event -> (String)
event.get(Field.SESSION_ID))
        
.window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(2)))
        .trigger(new SessionTrigger())
        .aggregate(new SummaryAggregator())
        .name("EP - Aggregate events into session summaries")
        .uid("EP - Aggregate events into session summaries");

summaries.print();

With the following trigger (omitting parts of the trigger):

[ ... ]
@Override
public TriggerResult onElement(HashMap element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
    ValueState<Boolean> firstSeen = ctx.getPartitionedState(
            new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));

    // If an end event is detected, emit the content and purge
    if(endSession.contains(element.get(Field.EVENT_TYPE))) {
        return TriggerResult.FIRE_AND_PURGE;
    }

    if (firstSeen.value() == null) {
        ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime()
+ 10000L);
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        firstSeen.update(true);
    }
    logger.info("Current window end is {} for session {}",
window.maxTimestamp(), element.get(Field.SESSION_ID));
    return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window,
TriggerContext ctx) throws Exception {
    // Emit the current result every time the processing time trigger fires
    if (time == window.maxTimestamp()) {
        return TriggerResult.FIRE_AND_PURGE;
    } else {
        ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime()
+ 10000L);
        return TriggerResult.FIRE;
    }
}
[ ... ]

So what I'm doing is setting the
ctx.registerProcessingTimeTimer(window.maxTimestamp()); however I only set
this once at the first event. But when testing it does work as I want and
fires every ten seconds and the fires and purges only after no events have
been received for 2 minutes (as specified in the SessionWindow). Is the
processingTimeTimer being updated every time the window end time is
increased (I noticed this happens in the background by Flink every time a
new event arrives)?

I'm happy with my solution, just trying to figure out how things work!

Cheers,
Tim


On Thu, 29 Apr 2021 at 18:42, Till Rohrmann <[email protected]> wrote:

> If you use the Trigger API, then you don't have to do anything special for
> fault tolerance. When using the ProcessFunction, then you should use
> Flink's state primitives to store your state (e.g. ValueState). This will
> automatically checkpoint the state. In case of a failure Flink will always
> resume from the latest successfully completed checkpoint.
>
> Cheers,
> Till
>
> On Thu, Apr 29, 2021 at 12:25 PM Tim Josefsson <[email protected]>
> wrote:
>
>> Thanks for the suggestions! I'll see if I can implement something that
>> works!
>> A followup question, more related to state. If I implement either the
>> custom trigger with or the process function, how will they handle crashes
>> and such. So if I for instance have a checkpointing interval of 10s will
>> the job recover from the last checkpoint with all the summaries as they
>> were at that point. Or do I have to implement specific ValueStates in both
>> cases?
>>
>> On Thu, 29 Apr 2021 at 10:25, Till Rohrmann <[email protected]> wrote:
>>
>>> Hi Tim,
>>>
>>> I think you could use Flink's trigger API [1] to implement a trigger
>>> which fires when it sees a certain event or after some time.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Apr 28, 2021 at 5:25 PM Tim Josefsson <[email protected]>
>>> wrote:
>>>
>>>> Hello!
>>>>
>>>> I'm trying to figure out how to implement a window that will emit
>>>> events at regular intervals or when a specific event is encountered.
>>>>
>>>> A bit of background. I have a stream of events from devices that will
>>>> send events to our system whenever a user watches a video. These events
>>>> include a unique id (sessionId) shared by all events of the same same
>>>> session so I want to key my stream on this. After that I want to aggregate
>>>> all the events into a session summary and this summary I want to emit every
>>>> 5 minutes however I still want to keep the summary in the window (in case
>>>> more events for that session arrives). However if I were to receive an end
>>>> event (sent by the device once a user stops watching the video) I want to
>>>> emit the summary and remove it from the window.
>>>>
>>>> Is it possible to do this with one of the existing windows together
>>>> with a trigger or in some other way? Been trying to figure it out by
>>>> reading the docs but haven't gotten any wiser so turning to the mailing
>>>> list for help.
>>>>
>>>> Best regards,
>>>> Tim
>>>>
>>>
>>
>> --
>>
>> *Tim Josefsson*
>> [image: Webstep GPtW] <http://www.webstep.se/>
>> mobil   +46 (0) 707 81 91 12
>> telefon +46 (0) 8 21 40 70
>>
>> [email protected]
>> *webstep.se <http://www.webstep.se/>*
>> Suttungs gränd 2
>> 753 19 Uppsala
>> Stockholm | Uppsala | Malmö | Sundsvall | Oslo
>> Bergen | Stavanger | Trondheim | Kristiansand
>> [image: LinkedIn] <http://www.linkedin.com/company/webstep-ab> [image:
>> Facebook] <http://www.facebook.com/webstepAB> [image: Facebook]
>> <http://www.instagram.com/webstep_sverige>
>>
>

-- 

*Tim Josefsson*
[image: Webstep GPtW] <http://www.webstep.se/>
mobil   +46 (0) 707 81 91 12
telefon +46 (0) 8 21 40 70

[email protected]
*webstep.se <http://www.webstep.se/>*
Suttungs gränd 2
753 19 Uppsala
Stockholm | Uppsala | Malmö | Sundsvall | Oslo
Bergen | Stavanger | Trondheim | Kristiansand
[image: LinkedIn] <http://www.linkedin.com/company/webstep-ab> [image:
Facebook] <http://www.facebook.com/webstepAB> [image: Facebook]
<http://www.instagram.com/webstep_sverige>

Reply via email to