Hi Alexis,

I believe you should be able to use the `ConnectedStreams#transform()`
method.

Best, Piotrek

wt., 18 sty 2022 o 14:20 Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> napisał(a):

> Hi again everyone,
>
>
>
> It’s been a while, so first of all happy new year :)
>
>
>
> I was revisiting this discussion and started looking at the code. However,
> it seems that all of the overloads of ConnectedStreams#process expect a
> CoProcessFunction or the Keyed counterpart, so I don’t think I can inject a
> custom TwoInputStreamOperator.
>
>
>
> After a quick glance at the joining documentation, I wonder if I could
> accomplish what I want with a window/interval join of streams. If so, I
> might be able to avoid using state in the join function, but if I can’t
> avoid it, is it possible to use managed state in a (Process)JoinFunction?
> The join needs keys, but I don’t know if the resulting stream counts as
> keyed from the state’s point of view.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Piotr Nowojski <pnowoj...@apache.org>
> *Sent:* Montag, 6. Dezember 2021 08:43
> *To:* David Morávek <d...@apache.org>
> *Cc:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>;
> user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> Hi Alexis and David,
>
>
>
> This actually can not happen. There are mechanisms in the code to make
> sure none of the input is starved IF there is some data to be read.
>
>
>
> The only time when input can be blocked is during the alignment phase of
> aligned checkpoints under back pressure. If there was a back pressure in
> your job it could have easily happened that checkpoint barriers would flow
> through the job graph to the CoProcessKeyedCoProcessFunction on one of the
> paths much quicker then the other, causing this faster path to be blocked
> until the other side catched up. But that would happen only during the
> alignment phase of the checkpoint, so without a backpressure for a very
> short period of time.
>
>
>
> Piotrek
>
>
>
> czw., 2 gru 2021 o 18:23 David Morávek <d...@apache.org> napisał(a):
>
> I think this could happen, but I have a very limited knowledge about how
> the input gates work internally. @Piotr could definitely provide some more
> insight here.
>
>
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 5:54 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> I do have some logic with timers today, but it’s indeed not ideal. I guess
> I’ll have a look at TwoInputStreamOperator, but I do have related
> questions. You mentioned a sample scenario of "processing backlog" where
> windows fire very quickly; could it happen that, in such a situation, the
> framework calls the operator’s processElement1 continuously (even for
> several minutes) before calling processElement2 a single time? How does the
> framework decide when to switch the stream processing when the streams are
> connected?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek <d...@apache.org>
> *Sent:* Donnerstag, 2. Dezember 2021 17:18
> *To:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> Even with the TwoInputStreamOperator you can not "halt" the processing.
> You need to buffer these elements for example in the ListState for later
> processing. At the time the watermark of the second stream arrives, you can
> process all buffered elements that satisfy the condition.
>
>
>
> You could probably also implement a similar (less optimized) solution with
> KeyedCoProcessFunction using event time timers.
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 5:12 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> Yes, that sounds right, but with my current KeyedCoProcessFunction I can’t
> tell Flink to "halt" processElement1 and switch to the other stream
> depending on watermarks. I could look into TwoInputStreamOperator if you
> think that’s the best approach.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek <d...@apache.org>
> *Sent:* Donnerstag, 2. Dezember 2021 16:59
> *To:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> I think this would require using lower level API and implementing a custom
> `TwoInputStreamOperator`. Then you can hook to `processWatemark{1,2}`
> methods.
>
>
>
> Let's also make sure we're on the same page on what the watermark is. You
> can think of the watermark as event time clock. It basically gives you an
> information, that *no more events with timestamp lower than the watermark
> should appear in your stream*.
>
>
>
> You simply delay emitting of the window result from your "connect"
> operator, until watermark from the second (side output) stream passes the
> window's max timestamp (maximum timestamp that is included in the window).
>
>
>
> Does that make sense?
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 4:25 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> Could you elaborate on what you mean with synchronize? Buffering in the
> state would be fine, but I haven’t been able to come up with a good way of
> ensuring that all data from the side stream for a given minute is processed
> by processElement2 before all data for the same (windowed) minute reaches
> processElement1, even when considering watermarks.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek <d...@apache.org>
> *Sent:* Donnerstag, 2. Dezember 2021 15:45
> *To:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> You can not rely on order of the two streams that easily. In case you are
> for example processing backlog and the windows fire quickly, it can happen
> that it's actually faster than the second branch which has less work to do.
> This will make the pipeline non-deterministic.
>
>
>
> What you can do is to "synchronize" watermarks of both streams in your
> "connect" operator, but that of course involves buffering events in the
> state.
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 3:02 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> Hi David,
>
>
>
> A watermark step simply refers to assigning timestamps and watermarks, my
> source doesn’t do that.
>
>
>
> I have a test system with only a couple of data points per day, so there’s
> definitely no back pressure. I basically have a browser where I can see the
> results from the sink, and I found one result that should have been
> discarded but wasn’t, which makes me think that the operator processed the
> "open" state but waited too long and didn’t process the "close" state
> before the window fired. I can also see that the closure (going from open
> to close) triggered on second 17, and my windows are evaluated every
> minute, so it wasn’t a race condition.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek <d...@apache.org>
> *Sent:* Donnerstag, 2. Dezember 2021 14:52
> *To:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> Hi Alexis,
>
>
>
> I'm not sure what "watermark" step refers to in you graph, but in general
> I'd say your intuition is correct.
>
>
>
> For the "buffering" part, each sub-task needs to send data via data
> exchange (last operator in chain) has an output buffer and the operator
> that consumes this data (maybe on different machine) has an input buffer
> (buffer de-bloating feature can help to mitigate excessive buffering in
> case of back-pressure).
>
>
>
> but I’m not sure if this actually happens
>
>
>
> How are you trying to verify this? Also can you check whether the
> operators are not back-pressured?
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 12:27 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> Hello,
>
>
>
> I have a use case with event-time processing that ends up with a DAG
> roughly like this:
>
>
>
> source -> filter -> keyBy -> watermark -> window -> process -> keyBy_2 ->
> connect (KeyedCoProcessFunction) -> sink
>
>            |
>                                                   /
>
>       (side output) -> keyBy -> watermark
> --------------------------------/
>
>
>
>
>
> (In case the text gets mangled in the e-mail, the side output comes from
> the filter and joins back with the connect operation)
>
>
>
> The filter takes all data and its main output is all _*valid*_ data with
> state "open"; the side output is all _*valid*_ data regardless of state.
>
>
>
> The goal of the KeyedCoProcessFunction is to check the results of the
> (sliding) window. The window only receives open states, but
> KeyedCoProcessFunction receives all valid data and should discard results
> from the main stream if states changed from "open" to something else before
> the window was evaluated.
>
>
>
> I would have expected all data from the side output to be processed
> roughly immediately in KeyedCoProcessFunction’s processElement2 because
> there’s no windowing in the side stream, but I’m not sure if this actually
> happens, maybe the side stream (or both streams) buffers some data before
> passing it to the connected stream? If yes, is there any way I could tune
> this?
>
>
>
> Regards,
>
> Alexis.
>
>
>
>

Reply via email to