Hi Aljoscha, I don't quite follow your analysis. If both sources are configured with idleness, they should send a periodic watermark on timeout. So the code that you posted would receive watermarks on the idle source and thus advance watermarks periodically.
If an idle source does not emit a watermark at all, then I'm curious why it's not mapped to StreamStatus.IDLE [1], which would trigger the desired behavior. [1] https://github.com/apache/flink/blob/72cd5921684e6daac4a7dd7916698eeee98b56d5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L79 On Wed, Aug 26, 2020 at 5:46 PM Aljoscha Krettek <aljos...@apache.org> wrote: > Yes, I'm afraid this analysis is correct. The StreamOperator, > AbstractStreamOperator to be specific, computes the combined watermarks > from both inputs here: > > https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L573. > > The operator layer is not aware of idleness so it will never notice. The > idleness only works on the level of inputs but is never forwarded to an > operator itself. > > To fix this we would have to also make operators aware of idleness such > that they can take this into account when computing the combined output > watermark. > > Best, > Aljoscha > > On 26.08.20 10:02, Dawid Wysakowicz wrote: > > Hi Kien, > > > > I am afraid this is a valid bug. I am not 100% sure but the way I > > understand the code the idleness mechanism applies to input channels, > > which means e.g. when multiple parallell instances shuffle its results > > to downstream operators. > > > > In case of a two input operator, combining the watermark of two > > different upstream operators happens inside of the operator itself. > > There we do not have the idleness status. We do not have a status that a > > whole upstream operator became idle. That's definitely a bug/limitation. > > > > I'm also cc'ing Aljoscha who could maybe confirm my analysis. > > > > Best, > > > > Dawid > > > > On 24/08/2020 16:00, Truong Duc Kien wrote: > >> Hi all, > >> We are testing the new Idleness detection feature in Flink 1.11, > >> however, it does not work as we expected: > >> When we connect two data streams, of which one is idle, the output > >> watermark CoProcessOperator does not increase, hence the program > >> cannot progress. > >> > >> I've made a small project to illustrate the problem. The watermark > >> received by the sink does not increase at all until the idle source is > >> stopped. > >> > >> https://github.com/kien-truong/flink-idleness-testing > >> > >> Is this a bug or does the idleness detection not support this use case ? > >> > >> Regards. > >> Kien > > > > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng