Coming back to my previous comment: I would actually propose to separate
the discussion about whether to expose the WatermarkStatus in the sinks or
not from correcting the StreamStatus and Idleness definition in order to
keep the scope of this FLIP as small as possible. If there is a good reason
to expose the WatermarkStatus, then we can probably do it.

Cheers,
Till

On Fri, Jul 30, 2021 at 2:29 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Martijn,
>
> 1. Good question. The watermarks and statuses of the splits are first
> aggregated before emitted through the reader. The watermark strategy of the
> user is actually applied on all SourceOutputs (=splits). Since one split is
> active and one is idle, the watermark of the reader will not advance until
> the user-defined idleness is triggered on the idle split. At this point,
> the combined watermark solely depends on the active split. The combined
> status remains ACTIVE.
> 2. Kafka has no dynamic partitions. This is a complete misnomer on Flink
> side. In fact, if you search for Kafka and partition discovery, you will
> only find Flink resources. What we actually do is dynamic topic discovery
> and that can only be triggered through pattern afaik. We could go for topic
> discovery on all patterns by default if we don't do that already.
> 3. Yes, idleness on assigned partitions would even work with dynamic
> assignments. I will update the FLIP to reflect that.
> 4. Afaik it was only meant for scenario 2 (and your question 3) and it
> should be this way after the FLIP. I don't know of any source
> implementation that uses the user-specified idleness to handle scenario 3.
> The thing that is currently extra is that some readers go idle, when the
> reader doesn't have an active assignment.
>
> Best,
>
> Arvid
>
> On Fri, Jul 30, 2021 at 12:17 PM Martijn Visser <mart...@ververica.com>
> wrote:
>
> > Hi all,
> >
> > I have a couple of questions after studying the FLIP and the docs:
> >
> > 1. What happens when one of the readers has two splits assigned and one
> of
> > the splits actually receives data?
> >
> > 2. If I understand it correctly the Kinesis Source uses dynamic shard
> > discovery by default (so in case of idleness scenario 3 would happen
> there)
> > and the FileSource also has a dynamic assignment. The Kafka Source
> doesn't
> > use dynamic partition discovery by default (so scenario 2 would be the
> > default to happen there). Why did we choose to not enable dynamic
> partition
> > discovery by default and should we actually change that?
> >
> > 3. To be sure, is it correct that in case of a dynamic assignment and
> there
> > is temporarily no data, that scenario 2 is applicable?
> >
> > 4. Does WatermarkStrategy#withIdleness currently cover scenario 2, 3 and
> > the one from my 3rd question? (edited)
> >
> > Best regards,
> >
> > Martijn
> >
> > On Fri, 23 Jul 2021 at 15:57, Till Rohrmann <trohrm...@apache.org>
> wrote:
> >
> > > Hi everyone,
> > >
> > > I would be in favour of what Arvid said about not exposing the
> > > WatermarkStatus to the Sink. Unless there is a very strong argument
> that
> > > this is required I think that keeping this concept internal seems to me
> > the
> > > better choice right now. Moreover, as Arvid said the downstream
> > application
> > > can derive the WatermarkStatus on their own depending on its business
> > > logic.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Fri, Jul 23, 2021 at 2:15 PM Arvid Heise <ar...@apache.org> wrote:
> > >
> > > > Hi Eron,
> > > >
> > > > thank you very much for your feedback.
> > > >
> > > > Please mention that the "temporary status toggle" code will be
> removed.
> > > > >
> > > > This code is already removed but there is still some automation of
> > going
> > > > idle when temporary no splits are assigned. I will include it in the
> > > FLIP.
> > > >
> > > > I agree with adding the markActive() functionality, for symmetry.
> > > Speaking
> > > > > of symmetry, could we now include the minor enhancement we
> discussed
> > in
> > > > > FLIP-167, the exposure of watermark status changes on the Sink
> > > interface.
> > > > > I drafted a PR and would be happy to revisit it.
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70
> > > >
> > > > I'm still not sure if that's a good idea.
> > > >
> > > > If we have now refined idleness to be an user-specified,
> > > > application-specific way to handle with temporarily stalled
> partitions,
> > > > then downstream applications should actually implement their own
> > idleness
> > > > definition. Let's see what other devs think. I'm pinging the once
> that
> > > have
> > > > been most involved in the discussion: @Stephan Ewen <
> se...@apache.org>
> > > > @Till
> > > > Rohrmann <trohrm...@apache.org> @Dawid Wysakowicz <
> > > dwysakow...@apache.org>
> > > > .
> > > >
> > > > The flip mentions a 'watermarkstatus' package for the WatermarkStatus
> > > > > class.  Should it be 'eventtime' package?
> > > > >
> > > > Are you proposing org.apache.flink.api.common.eventtime? I was simply
> > > > suggesting to simply rename
> > > > org.apache.flink.streaming.runtime.streamstatus but I'm very open for
> > > other
> > > > suggestions (given that there are only 2 classes in the package).
> > > >
> > > >
> > > > > Regarding the change of 'streamStatus' to 'watermarkStatus', could
> > you
> > > > > spell out what the new method names will be on each interface? May
> I
> > > > > suggest that Input.emitStreamStatus be Input.processStreamStatus?
> > This
> > > > is
> > > > > to help decouple the input's watermark status from the output's
> > > watermark
> > > > > status.
> > > > >
> > > > I haven't found
> > > > org.apache.flink.streaming.api.operators.Input#emitStreamStatus in
> > > master.
> > > > Could you double-check if I'm looking at the correct class?
> > > >
> > > > The current idea was mainly to grep+replace
> > > /streamStatus/watermarkStatus/
> > > > and /StreamStatus/WatermarkStatus/. But again I'm very open for more
> > > > descriptive names. I can add an explicit list later. I'm assuming you
> > are
> > > > only interested in (semi-)public classes.
> > > >
> > > >
> > > > > I observe that AbstractStreamOperator is hardcoded to derive the
> > output
> > > > > channel's status from the input channel's status.  May I suggest
> > > > > we refactor
> > > "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)"
> > > > to
> > > > > allow for the operator subclass to customize the processing of the
> > > > > aggregated watermark and watermark status.
> > > > >
> > > > Can you add a motivation for that?
> > > > @Dawid Wysakowicz <dwysakow...@apache.org> , I think you are the
> last
> > > > person that touched the code. Do you have some example operators in
> > your
> > > > head that would change it?
> > > >
> > > > Maybe the FLIP should spell out the expected behavior of the generic
> > > > > watermark generator (TimestampsAndWatermarksOperator).  Should the
> > > > > generator ignore the upstream idleness signal?  I believe it
> > propagates
> > > > the
> > > > > signal, even though it also generates its own signals.   Given that
> > > > > source-based and generic watermark generation shouldn't be
> combined,
> > > one
> > > > > could argue that the generic watermark generator should activate
> only
> > > > when
> > > > > its input channel's watermark status is idle.
> > > > >
> > > > I will add a section. In general, we assume that we only have
> > > source-based
> > > > watermark generators once FLIP-27 is properly adopted.
> > > >
> > > > Best,
> > > >
> > > > Arvid
> > > >
> > > > On Wed, Jul 21, 2021 at 12:40 AM Eron Wright
> > > > <ewri...@streamnative.io.invalid> wrote:
> > > >
> > > > > This proposal to narrow the definition of idleness to focus on the
> > > > > event-time clock is great.
> > > > >
> > > > > Please mention that the "temporary status toggle" code will be
> > removed.
> > > > >
> > > > > I agree with adding the markActive() functionality, for symmetry.
> > > > Speaking
> > > > > of symmetry, could we now include the minor enhancement we
> discussed
> > in
> > > > > FLIP-167, the exposure of watermark status changes on the Sink
> > > interface.
> > > > > I drafted a PR and would be happy to revisit it.
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70
> > > > >
> > > > > The flip mentions a 'watermarkstatus' package for the
> WatermarkStatus
> > > > > class.  Should it be 'eventtime' package?
> > > > >
> > > > > Regarding the change of 'streamStatus' to 'watermarkStatus', could
> > you
> > > > > spell out what the new method names will be on each interface? May
> I
> > > > > suggest that Input.emitStreamStatus be Input.processStreamStatus?
> > This
> > > > is
> > > > > to help decouple the input's watermark status from the output's
> > > watermark
> > > > > status.
> > > > >
> > > > > I observe that AbstractStreamOperator is hardcoded to derive the
> > output
> > > > > channel's status from the input channel's status.  May I suggest
> > > > > we refactor
> > > "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)"
> > > > to
> > > > > allow for the operator subclass to customize the processing of the
> > > > > aggregated watermark and watermark status.
> > > > >
> > > > > Maybe the FLIP should spell out the expected behavior of the
> generic
> > > > > watermark generator (TimestampsAndWatermarksOperator).  Should the
> > > > > generator ignore the upstream idleness signal?  I believe it
> > propagates
> > > > the
> > > > > signal, even though it also generates its own signals.   Given that
> > > > > source-based and generic watermark generation shouldn't be
> combined,
> > > one
> > > > > could argue that the generic watermark generator should activate
> only
> > > > when
> > > > > its input channel's watermark status is idle.
> > > > >
> > > > > Thanks again for this effort!
> > > > > -Eron
> > > > >
> > > > >
> > > > > On Sun, Jul 18, 2021 at 11:53 PM Arvid Heise <ar...@apache.org>
> > wrote:
> > > > >
> > > > > > Dear devs,
> > > > > >
> > > > > > We recently discovered that StreamStatus and Idleness is
> > > insufficiently
> > > > > > defined [1], so I drafted a FLIP [3] to amend that situation. It
> > > would
> > > > be
> > > > > > good to hear more opinions on that matter. Ideally, we can make
> the
> > > > > changes
> > > > > > to 1.14 as some newly added methods are affected.
> > > > > >
> > > > > > Best,
> > > > > >
> > > > > > Arvid
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r5194e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e%40%3Cdev.flink.apache.org%3E
> > > > > > [2]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/rb871f5aecbca6e5d786303557a6cdb3d425954385cbdb1b777f2fcf5%40%3Cdev.flink.apache.org%3E
> > > > > > [3]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to