Hi Aljoscha,
Thanks for the response.
This sounds ok for me. It's as if the message carries additional information
that can "tell" operators how to handle this message. Maybe we could use
this approach also for different use cases.
I will try this approach, thanks.
--
Sent from: http://apache
Hi Kristoff,
There are no plans of adding state support to the gap extractors but you could
do this using a two-step approach, i.e. have an operation in front of the
window that keeps track of session gaps, enriches the message with the gap that
should be used and then the extractor extracts th
Hi KristoffSC,
>> Are there any plans to add support of Flink State into
SessionWindowTimeGapExtractor?
As I said, `SessionWindowTimeGapExtractor` is neither a general UDF nor an
operator.
But I cannot give a clear answer. Let me ping @Aljoscha Krettek
to give the answer.
Best,
Vino
Kristoff
Ok,
I did some more tests and yep, it seems that there is no way to use Flink's
State in class that will implement SessionWindowTimeGapExtractor.
Even if I will implement this interface on a class that is an operator,
whenever extract method is called it does not have any access to Flink's
state
So I was trying to have something like this:
PipelineConfigOperator pipelineConfigOperator = new
PipelineConfigOperator();
messageStream
.connect(pipelineConfigStream)
.process(*pipelineConfigOperator*)
.keyBy(tradeKeySelector)
.wind
Thank you for the answer,
the thing is that I would not like to call external system for each Window,
rather I woudl like to keep the gap size in Flink's state which I will be
able to change from external system, for example handle configUpdate message
from Kafka.
So if SessionWindowTimeGapExtra
Hi KristoffSC,
Firstly, IMO, you can implement this feature by customizing the
`SessionWindowTimeGapExtractor`.
Additionally, let me clearify a concept. A component that implements the
`SessionWindowTimeGapExtractor` interface should not be an operator in
Flink.
In Flink's concepts, Window is an
Hi all,
I'm exploring Flink for our new project.
Currently I'm playing with Session Windows with dynamic Gap. In short, I
would like to be able to change the value of the gap on demand, for example
on config update.
So I'm having this code:
messageStream
.keyBy(tradeKeySelector