Hi everyone,

I would be in favour of what Arvid said about not exposing the
WatermarkStatus to the Sink. Unless there is a very strong argument that
this is required I think that keeping this concept internal seems to me the
better choice right now. Moreover, as Arvid said the downstream application
can derive the WatermarkStatus on their own depending on its business logic.

Cheers,
Till

On Fri, Jul 23, 2021 at 2:15 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Eron,
>
> thank you very much for your feedback.
>
> Please mention that the "temporary status toggle" code will be removed.
> >
> This code is already removed but there is still some automation of going
> idle when temporary no splits are assigned. I will include it in the FLIP.
>
> I agree with adding the markActive() functionality, for symmetry.  Speaking
> > of symmetry, could we now include the minor enhancement we discussed in
> > FLIP-167, the exposure of watermark status changes on the Sink interface.
> > I drafted a PR and would be happy to revisit it.
> >
> >
> https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70
>
> I'm still not sure if that's a good idea.
>
> If we have now refined idleness to be an user-specified,
> application-specific way to handle with temporarily stalled partitions,
> then downstream applications should actually implement their own idleness
> definition. Let's see what other devs think. I'm pinging the once that have
> been most involved in the discussion: @Stephan Ewen <se...@apache.org>
> @Till
> Rohrmann <trohrm...@apache.org> @Dawid Wysakowicz <dwysakow...@apache.org>
> .
>
> The flip mentions a 'watermarkstatus' package for the WatermarkStatus
> > class.  Should it be 'eventtime' package?
> >
> Are you proposing org.apache.flink.api.common.eventtime? I was simply
> suggesting to simply rename
> org.apache.flink.streaming.runtime.streamstatus but I'm very open for other
> suggestions (given that there are only 2 classes in the package).
>
>
> > Regarding the change of 'streamStatus' to 'watermarkStatus', could you
> > spell out what the new method names will be on each interface? May I
> > suggest that Input.emitStreamStatus be Input.processStreamStatus?  This
> is
> > to help decouple the input's watermark status from the output's watermark
> > status.
> >
> I haven't found
> org.apache.flink.streaming.api.operators.Input#emitStreamStatus in master.
> Could you double-check if I'm looking at the correct class?
>
> The current idea was mainly to grep+replace /streamStatus/watermarkStatus/
> and /StreamStatus/WatermarkStatus/. But again I'm very open for more
> descriptive names. I can add an explicit list later. I'm assuming you are
> only interested in (semi-)public classes.
>
>
> > I observe that AbstractStreamOperator is hardcoded to derive the output
> > channel's status from the input channel's status.  May I suggest
> > we refactor "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)"
> to
> > allow for the operator subclass to customize the processing of the
> > aggregated watermark and watermark status.
> >
> Can you add a motivation for that?
> @Dawid Wysakowicz <dwysakow...@apache.org> , I think you are the last
> person that touched the code. Do you have some example operators in your
> head that would change it?
>
> Maybe the FLIP should spell out the expected behavior of the generic
> > watermark generator (TimestampsAndWatermarksOperator).  Should the
> > generator ignore the upstream idleness signal?  I believe it propagates
> the
> > signal, even though it also generates its own signals.   Given that
> > source-based and generic watermark generation shouldn't be combined, one
> > could argue that the generic watermark generator should activate only
> when
> > its input channel's watermark status is idle.
> >
> I will add a section. In general, we assume that we only have source-based
> watermark generators once FLIP-27 is properly adopted.
>
> Best,
>
> Arvid
>
> On Wed, Jul 21, 2021 at 12:40 AM Eron Wright
> <ewri...@streamnative.io.invalid> wrote:
>
> > This proposal to narrow the definition of idleness to focus on the
> > event-time clock is great.
> >
> > Please mention that the "temporary status toggle" code will be removed.
> >
> > I agree with adding the markActive() functionality, for symmetry.
> Speaking
> > of symmetry, could we now include the minor enhancement we discussed in
> > FLIP-167, the exposure of watermark status changes on the Sink interface.
> > I drafted a PR and would be happy to revisit it.
> >
> >
> https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70
> >
> > The flip mentions a 'watermarkstatus' package for the WatermarkStatus
> > class.  Should it be 'eventtime' package?
> >
> > Regarding the change of 'streamStatus' to 'watermarkStatus', could you
> > spell out what the new method names will be on each interface? May I
> > suggest that Input.emitStreamStatus be Input.processStreamStatus?  This
> is
> > to help decouple the input's watermark status from the output's watermark
> > status.
> >
> > I observe that AbstractStreamOperator is hardcoded to derive the output
> > channel's status from the input channel's status.  May I suggest
> > we refactor "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)"
> to
> > allow for the operator subclass to customize the processing of the
> > aggregated watermark and watermark status.
> >
> > Maybe the FLIP should spell out the expected behavior of the generic
> > watermark generator (TimestampsAndWatermarksOperator).  Should the
> > generator ignore the upstream idleness signal?  I believe it propagates
> the
> > signal, even though it also generates its own signals.   Given that
> > source-based and generic watermark generation shouldn't be combined, one
> > could argue that the generic watermark generator should activate only
> when
> > its input channel's watermark status is idle.
> >
> > Thanks again for this effort!
> > -Eron
> >
> >
> > On Sun, Jul 18, 2021 at 11:53 PM Arvid Heise <ar...@apache.org> wrote:
> >
> > > Dear devs,
> > >
> > > We recently discovered that StreamStatus and Idleness is insufficiently
> > > defined [1], so I drafted a FLIP [3] to amend that situation. It would
> be
> > > good to hear more opinions on that matter. Ideally, we can make the
> > changes
> > > to 1.14 as some newly added methods are affected.
> > >
> > > Best,
> > >
> > > Arvid
> > >
> > > [1]
> > >
> > >
> >
> https://lists.apache.org/thread.html/r5194e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e%40%3Cdev.flink.apache.org%3E
> > > [2]
> > >
> > >
> >
> https://lists.apache.org/thread.html/rb871f5aecbca6e5d786303557a6cdb3d425954385cbdb1b777f2fcf5%40%3Cdev.flink.apache.org%3E
> > > [3]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
> > >
> >
>

Reply via email to