Re: Session Window with dynamic gap

2020-01-08 Thread KristoffSC
Hi Aljoscha, Thanks for the response. This sounds ok for me. It's as if the message carries additional information that can "tell" operators how to handle this message. Maybe we could use this approach also for different use cases. I will try this approach, thanks. -- Sent from: http://apache

Re: Session Window with dynamic gap

2020-01-08 Thread Aljoscha Krettek
Hi Kristoff, There are no plans of adding state support to the gap extractors but you could do this using a two-step approach, i.e. have an operation in front of the window that keeps track of session gaps, enriches the message with the gap that should be used and then the extractor extracts th

Re: Session Window with dynamic gap

2020-01-02 Thread vino yang
Hi KristoffSC, >> Are there any plans to add support of Flink State into SessionWindowTimeGapExtractor? As I said, `SessionWindowTimeGapExtractor` is neither a general UDF nor an operator. But I cannot give a clear answer. Let me ping @Aljoscha Krettek to give the answer. Best, Vino Kristoff

Re: Session Window with dynamic gap

2020-01-02 Thread KristoffSC
Ok, I did some more tests and yep, it seems that there is no way to use Flink's State in class that will implement SessionWindowTimeGapExtractor. Even if I will implement this interface on a class that is an operator, whenever extract method is called it does not have any access to Flink's state

Re: Session Window with dynamic gap

2020-01-02 Thread KristoffSC
So I was trying to have something like this: PipelineConfigOperator pipelineConfigOperator = new PipelineConfigOperator(); messageStream .connect(pipelineConfigStream) .process(*pipelineConfigOperator*) .keyBy(tradeKeySelector) .wind

Re: Session Window with dynamic gap

2020-01-02 Thread KristoffSC
Thank you for the answer, the thing is that I would not like to call external system for each Window, rather I woudl like to keep the gap size in Flink's state which I will be able to change from external system, for example handle configUpdate message from Kafka. So if SessionWindowTimeGapExtra

Re: Session Window with dynamic gap

2020-01-02 Thread vino yang
Hi KristoffSC, Firstly, IMO, you can implement this feature by customizing the `SessionWindowTimeGapExtractor`. Additionally, let me clearify a concept. A component that implements the `SessionWindowTimeGapExtractor` interface should not be an operator in Flink. In Flink's concepts, Window is an

Session Window with dynamic gap

2020-01-02 Thread KristoffSC
Hi all, I'm exploring Flink for our new project. Currently I'm playing with Session Windows with dynamic Gap. In short, I would like to be able to change the value of the gap on demand, for example on config update. So I'm having this code: messageStream .keyBy(tradeKeySelector