On 17.11.20 17:37, Simone Cavallarin wrote:
Hi,
I have been working on the suggestion that you gave me, thanks! The first part is to add to the
message the gap. 1)I receive the event, 2)I take that event and I map it using
StatefulsessionCalculator, that is where I put together "The message", and
"long" that is my gap in millis.
DataStream<Event> source = <Kafka Source>
Operation in front of the window that keeps track of session gaps
DataStream<Tuple2<MyMessageType, Long>> enriched = source
.keyBy(<key extractor>)
.map(new StatefulSessionCalculator()); // or process()
This is my StatefulSessionCalculator():
Tuple2<MyMessageType, Long> map(MyMessageType input) {
ValueState<MyState> valueState = getState(myModelStateDescriptor);
MyState state = valueState.value()
state.update(input);
long suggestedGap = state.getSuggestedGap();
valueState.update(state);
return Tuple2.of(input, suggestedGap);
}
If the "gap" calculated is "1234".
The result would be: [Tom, 1.70, 50, 1605612588995], [1234]>?
That looks correct, yes.
The second step is to use the gap calculated through
DynamicWindowGapExtractor().
DataStream<...> result = enriched
.keyBy(new MyKeySelector())
.window(EventTimeSessionWindows.withDynamicGap(new
DynamicWindowGapExtractor()))
The DynamicWindowGapExtractor() extract the gap from the message and feed it
back to Flink.
Could you please give me an example also for this one?
This would just be class that extends
SessionWindowTimeGapExtractor<Tuple2<MyEvent, Long>> and returns the gap
from the extract() method.
One thing that I don't understand is that after enriching the message my event
that contain a POJO is nested inside tuple. How can I access it?
You would just read the first field of the tuple, i.e. tuple.f0.
The last point when you said: "I think, however, that it might be easier at this
point to just use a stateful ProcessFunction", you meant a completely different
approach, would be better?
That's what I meant yes. Because it seems to complicated to split the
logic into the part that determines the dynamic gap and then another
part that does the computation per session. It seems easier to just roll
that into one operator that does everything. And with state and timers
you should have enough flexibility.
Best,
Aljoscha