Hey Arvid,

The problem is that the StreamStatus.IDLE is set on the Task level. It
is not propagated to the operator. Combining of the Watermark for a
TwoInputStreamOperator happens in the AbstractStreamOperator:

    public void processWatermark(Watermark mark) throws Exception {
        if (timeServiceManager != null) {
            timeServiceManager.advanceWatermark(mark);
        }
        output.emitWatermark(mark);
    }

    public void processWatermark1(Watermark mark) throws Exception {
        input1Watermark = mark.getTimestamp();
        long newMin = Math.min(input1Watermark, input2Watermark);
        if (newMin > combinedWatermark) {
            combinedWatermark = newMin;
            processWatermark(new Watermark(combinedWatermark));
        }
    }

    public void processWatermark2(Watermark mark) throws Exception {
        input2Watermark = mark.getTimestamp();
        long newMin = Math.min(input1Watermark, input2Watermark);
        if (newMin > combinedWatermark) {
            combinedWatermark = newMin;
            processWatermark(new Watermark(combinedWatermark));
        }
    }

There we do not know that e.g. the whole input 1 is idle. Therefore if
we do not receive any Watermarks from it (it became IDLE) we do not
progress the Watermark starting from any two input operator. We are
missing similar handling of the IDLE status from the task level which
works well for one input operators and multiple parallel upstream instances.

Best,

Dawid

On 31/08/2020 11:05, Arvid Heise wrote:
> Hi Aljoscha,
>
> I don't quite follow your analysis. If both sources are configured
> with idleness, they should send a periodic watermark on timeout.
> So the code that you posted would receive watermarks on the idle
> source and thus advance watermarks periodically.
>
> If an idle source does not emit a watermark at all, then I'm curious
> why it's not mapped to StreamStatus.IDLE [1], which would trigger the
> desired behavior.
>
> [1]
> https://github.com/apache/flink/blob/72cd5921684e6daac4a7dd7916698eeee98b56d5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L79
>
> On Wed, Aug 26, 2020 at 5:46 PM Aljoscha Krettek <aljos...@apache.org
> <mailto:aljos...@apache.org>> wrote:
>
>     Yes, I'm afraid this analysis is correct. The StreamOperator,
>     AbstractStreamOperator to be specific, computes the combined
>     watermarks
>     from both inputs here:
>     
> https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L573.
>
>     The operator layer is not aware of idleness so it will never
>     notice. The
>     idleness only works on the level of inputs but is never forwarded
>     to an
>     operator itself.
>
>     To fix this we would have to also make operators aware of idleness
>     such
>     that they can take this into account when computing the combined
>     output
>     watermark.
>
>     Best,
>     Aljoscha
>
>     On 26.08.20 10:02, Dawid Wysakowicz wrote:
>     > Hi Kien,
>     >
>     > I am afraid this is a valid bug. I am not 100% sure but the way I
>     > understand the code the idleness mechanism applies to input
>     channels,
>     > which means e.g. when multiple parallell instances shuffle its
>     results
>     > to downstream operators.
>     >
>     > In case of a two input operator, combining the watermark of two
>     > different upstream operators happens inside of the operator itself.
>     > There we do not have the idleness status. We do not have a
>     status that a
>     > whole upstream operator became idle. That's definitely a
>     bug/limitation.
>     >
>     > I'm also cc'ing Aljoscha who could maybe confirm my analysis.
>     >
>     > Best,
>     >
>     > Dawid
>     >
>     > On 24/08/2020 16:00, Truong Duc Kien wrote:
>     >> Hi all,
>     >> We are testing the new Idleness detection feature in Flink 1.11,
>     >> however, it does not work as we expected:
>     >> When we connect two data streams, of which one is idle, the output
>     >> watermark CoProcessOperator does not increase, hence the program
>     >> cannot progress.
>     >>
>     >> I've made a small project to illustrate the problem. The watermark
>     >> received by the sink does not increase at all until the idle
>     source is
>     >> stopped.
>     >>
>     >> https://github.com/kien-truong/flink-idleness-testing
>     >>
>     >> Is this a bug or does the idleness detection not support this
>     use case ?
>     >>
>     >> Regards.
>     >> Kien
>     >
>
>
>
> -- 
>
> Arvid Heise| Senior Java Developer
>
> <https://www.ververica.com/>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/>- The Apache
> FlinkConference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244
> BManaging Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
> Ji (Toni) Cheng   

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to