Hi Aljoscha,

I don't quite follow your analysis. If both sources are configured with
idleness, they should send a periodic watermark on timeout.
So the code that you posted would receive watermarks on the idle source and
thus advance watermarks periodically.

If an idle source does not emit a watermark at all, then I'm curious why
it's not mapped to StreamStatus.IDLE [1], which would trigger the desired
behavior.

[1]
https://github.com/apache/flink/blob/72cd5921684e6daac4a7dd7916698eeee98b56d5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L79

On Wed, Aug 26, 2020 at 5:46 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Yes, I'm afraid this analysis is correct. The StreamOperator,
> AbstractStreamOperator to be specific, computes the combined watermarks
> from both inputs here:
>
> https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L573.
>
> The operator layer is not aware of idleness so it will never notice. The
> idleness only works on the level of inputs but is never forwarded to an
> operator itself.
>
> To fix this we would have to also make operators aware of idleness such
> that they can take this into account when computing the combined output
> watermark.
>
> Best,
> Aljoscha
>
> On 26.08.20 10:02, Dawid Wysakowicz wrote:
> > Hi Kien,
> >
> > I am afraid this is a valid bug. I am not 100% sure but the way I
> > understand the code the idleness mechanism applies to input channels,
> > which means e.g. when multiple parallell instances shuffle its results
> > to downstream operators.
> >
> > In case of a two input operator, combining the watermark of two
> > different upstream operators happens inside of the operator itself.
> > There we do not have the idleness status. We do not have a status that a
> > whole upstream operator became idle. That's definitely a bug/limitation.
> >
> > I'm also cc'ing Aljoscha who could maybe confirm my analysis.
> >
> > Best,
> >
> > Dawid
> >
> > On 24/08/2020 16:00, Truong Duc Kien wrote:
> >> Hi all,
> >> We are testing the new Idleness detection feature in Flink 1.11,
> >> however, it does not work as we expected:
> >> When we connect two data streams, of which one is idle, the output
> >> watermark CoProcessOperator does not increase, hence the program
> >> cannot progress.
> >>
> >> I've made a small project to illustrate the problem. The watermark
> >> received by the sink does not increase at all until the idle source is
> >> stopped.
> >>
> >> https://github.com/kien-truong/flink-idleness-testing
> >>
> >> Is this a bug or does the idleness detection not support this use case ?
> >>
> >> Regards.
> >> Kien
> >
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to