On 17.11.20 17:37, Simone Cavallarin wrote:
Hi,

I have been working on the suggestion that you gave me, thanks! The first part is to add to the 
message the gap. 1)I receive the event, 2)I take that event and I map it using  
StatefulsessionCalculator, that is where I put together "The message", and 
"long" that is my gap in millis.

DataStream<Event> source = <Kafka Source>

Operation in front of the window that keeps track of session gaps

DataStream<Tuple2<MyMessageType, Long>> enriched = source
    .keyBy(<key extractor>)
    .map(new StatefulSessionCalculator()); // or process()

This is my StatefulSessionCalculator():

Tuple2<MyMessageType, Long> map(MyMessageType input) {
    ValueState<MyState> valueState = getState(myModelStateDescriptor);
MyState state = valueState.value()
    state.update(input);
    long suggestedGap = state.getSuggestedGap();
    valueState.update(state);
    return Tuple2.of(input, suggestedGap);
}

If the "gap" calculated is "1234".
The result would be: [Tom, 1.70, 50, 1605612588995], [1234]>?

That looks correct, yes.

The second step is to use the gap calculated through  
DynamicWindowGapExtractor().

DataStream<...> result = enriched
    .keyBy(new MyKeySelector())
    .window(EventTimeSessionWindows.withDynamicGap(new 
DynamicWindowGapExtractor()))


The DynamicWindowGapExtractor() extract the gap from the message and feed it 
back to Flink.
Could you please give me an example also for this one?

This would just be class that extends SessionWindowTimeGapExtractor<Tuple2<MyEvent, Long>> and returns the gap from the extract() method.

One thing that I don't understand is that after enriching the message my event 
that contain a POJO is nested inside tuple. How can I access it?

You would just read the first field of the tuple, i.e. tuple.f0.


The last point when you said: "I think, however, that it might be easier at this 
point to just use a stateful ProcessFunction", you meant a completely different 
approach, would be better?

That's what I meant yes. Because it seems to complicated to split the logic into the part that determines the dynamic gap and then another part that does the computation per session. It seems easier to just roll that into one operator that does everything. And with state and timers you should have enough flexibility.

Best,
Aljoscha

Reply via email to