When I was rethinking the idleness issue, I came to the conclusion that it
should be inferred at the source of the respective downstream pipeline
again.

The main issue on propagating idleness is that you would force the same
definition across all downstream pipelines, which may not be what the user
intended.
On the other hand, I don't immediately see a technical reason why the
downstream source wouldn't be able to infer that.


On Thu, Jun 3, 2021 at 9:14 PM Eron Wright <ewri...@streamnative.io.invalid>
wrote:

> Thanks Piotr for bringing this up.  I reflected on this and I agree we
> should expose idleness, otherwise a multi-stage flow could stall.
>
> Regarding the latency markers, I don't see an immediate need for
> propagating them, because they serve to estimate latency within a pipeline,
> not across pipelines.  One would probably need to enhance the source
> interface also to do e2e latency.  Seems we agree this aspect is out of
> scope.
>
> I took a look at the code to get a sense of how to accomplish this.  The
> gist is a new `markIdle` method on the `StreamOperator` interface, that is
> called when the stream status maintainer (the `OperatorChain`) transitions
> to idle state.  Then, a new `markIdle` method on the `SinkFunction` and
> `SinkWriter` that is called by the respective operators.   Note that
> StreamStatus is an internal class.
>
> Here's a draft PR (based on the existing PR of FLINK-22700) to highlight
> this new aspect:
> https://github.com/streamnative/flink/pull/2/files
>
> Please let me know if you'd like me to proceed to update the FLIP with
> these details.
>
> Thanks again,
> Eron
>
> On Thu, Jun 3, 2021 at 7:56 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
> > Hi,
> >
> > Sorry for chipping in late in the discussion, but I would second this
> point
> > from Arvid:
> >
> > > 4. Potentially, StreamStatus and LatencyMarker would also need to be
> > encoded.
> >
> > It seems like this point was asked, but not followed? Or did I miss it?
> > Especially the StreamStatus part. For me it sounds like exposing
> watermarks
> > without letting the sink know that the stream can be idle is an
> incomplete
> > feature and can be very problematic/confusing for potential users.
> >
> > Best,
> > Piotrek
> >
> > pon., 31 maj 2021 o 08:34 Arvid Heise <ar...@apache.org> napisał(a):
> >
> > > Afaik everyone can start a [VOTE] thread [1]. For example, here a
> > > non-committer started a successful thread [2].
> > > If you start it, I can already cast a binding vote and we just need 2
> > more
> > > for the FLIP to be accepted.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120731026#FlinkBylaws-Voting
> > > [2]
> > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Deprecating-Mesos-support-td50142.html
> > >
> > > On Fri, May 28, 2021 at 8:17 PM Eron Wright <ewri...@streamnative.io
> > > .invalid>
> > > wrote:
> > >
> > > > Arvid,
> > > > Thanks for the feedback.  I investigated the japicmp configuration,
> > and I
> > > > see that SinkWriter is marked Experimental (not Public or
> > > PublicEvolving).
> > > > I think this means that SinkWriter need not be excluded.  As you
> > > mentioned,
> > > > SinkFunction is already excluded.  I've updated the FLIP with an
> > > > explanation.
> > > >
> > > > I believe all issues are resolved.  May we proceed to a vote now?
> And
> > > are
> > > > you able to drive the vote process?
> > > >
> > > > Thanks,
> > > > Eron
> > > >
> > > >
> > > > On Fri, May 28, 2021 at 4:40 AM Arvid Heise <ar...@apache.org>
> wrote:
> > > >
> > > > > Hi Eron,
> > > > >
> > > > > 1. fair point. It still feels odd to have writeWatermark in the
> > > > > SinkFunction (it's supposed to be functional as you mentioned),
> but I
> > > > agree
> > > > > that invokeWatermark is not better. So unless someone has a better
> > > idea,
> > > > > I'm fine with it.
> > > > > 2.+3. I tried to come up with scenarios for a longer time. In
> > general,
> > > it
> > > > > seems as if the new SinkWriter interface encourages more injection
> > (see
> > > > > processing time service in InitContext), such that the need for the
> > > > context
> > > > > is really just context information of that particular record and I
> > > don't
> > > > > see any use beyond timestamp and watermark. For SinkFunction, I'd
> not
> > > > > over-engineer as it's going to be deprecated soonish. So +1 to
> leave
> > it
> > > > > out.
> > > > > 4. Okay so I double-checked: from an execution perspective, it
> works.
> > > > > However, japicmp would definitely complain. I propose to add it to
> > the
> > > > > compatibility section like this. We need to add an exception to
> > > > SinkWriter
> > > > > then. (SinkFunction is already on the exception list)
> > > > > 5.+6. Awesome, I was also sure but wanted to double check.
> > > > >
> > > > > Best,
> > > > >
> > > > > Arvid
> > > > >
> > > > >
> > > > > On Wed, May 26, 2021 at 7:29 PM Eron Wright <
> ewri...@streamnative.io
> > > > > .invalid>
> > > > > wrote:
> > > > >
> > > > > > Arvid,
> > > > > >
> > > > > > 1. I assume that the method name `invoke` stems from considering
> > the
> > > > > > SinkFunction to be a functional interface, but is otherwise
> > > > meaningless.
> > > > > > Keeping it as `writeWatermark` does keep it symmetric with
> > > SinkWriter.
> > > > > My
> > > > > > vote is to leave it.  You decide.
> > > > > >
> > > > > > 2+3. I too considered adding a `WatermarkContext`, but it would
> > > merely
> > > > > be a
> > > > > > placeholder.  I don't anticipate any context info in future.  As
> we
> > > see
> > > > > > with invoke, it is possible to add a context later in a
> > > > > > backwards-compatible way.  My vote is to not introduce a context.
> > > You
> > > > > > decide.
> > > > > >
> > > > > > 4. No anticipated compatibility issues.
> > > > > >
> > > > > > 5. Short answer, it works as expected.  The new methods are
> invoked
> > > > > > whenever the underlying operator receives a watermark.  I do
> > believe
> > > > that
> > > > > > batch and ingestion time applications receive watermarks. Seems
> the
> > > > > > programming model is more unified in that respect since 1.12
> > > > (FLIP-134).
> > > > > >
> > > > > > 6. The failure behavior is the same as for elements.
> > > > > >
> > > > > > Thanks,
> > > > > > Eron
> > > > > >
> > > > > > On Tue, May 25, 2021 at 12:42 PM Arvid Heise <ar...@apache.org>
> > > wrote:
> > > > > >
> > > > > > > Hi Eron,
> > > > > > >
> > > > > > > I think the FLIP is crisp and mostly good to go. Some smaller
> > > > > > > things/questions:
> > > > > > >
> > > > > > >    1. SinkFunction#writeWatermark could be named
> > > > > > >    SinkFunction#invokeWatermark or invokeOnWatermark to keep it
> > > > > > symmetric.
> > > > > > >    2. We could add the context parameter to both. For
> > > > > SinkWriter#Context,
> > > > > > >    we currently do not gain much. SinkFunction#Context also
> > exposes
> > > > > > > processing
> > > > > > >    time, which may or may not be handy and is currently mostly
> > used
> > > > for
> > > > > > >    StreamingFileSink bucket policies. We may add that
> processing
> > > time
> > > > > > flag
> > > > > > >    also to SinkWriter#Context in the future.
> > > > > > >    3. Alternatively, we could also add a different context
> > > parameter
> > > > > just
> > > > > > >    to keep the API stable while allowing additional information
> > to
> > > be
> > > > > > > passed
> > > > > > >    in the future.
> > > > > > >    4. Would we run into any compatibility issue if we use Flink
> > > 1.13
> > > > > > source
> > > > > > >    in Flink 1.14 (with this FLIP) or vice versa?
> > > > > > >    5. What happens with sinks that use the new methods in
> > > > applications
> > > > > > that
> > > > > > >    do not have watermarks (batch mode, processing time)? Does
> > this
> > > > also
> > > > > > > work
> > > > > > >    with ingestion time sufficiently?
> > > > > > >    6. How do exactly once sinks deal with written watermarks in
> > > case
> > > > of
> > > > > > >    failure? I guess it's the same as normal records. (Either
> > > rollback
> > > > > of
> > > > > > >    transaction or deduplication on resumption)
> > > > > > >
> > > > > > > Best,
> > > > > > >
> > > > > > > Arvid
> > > > > > >
> > > > > > > On Tue, May 25, 2021 at 6:44 PM Eron Wright <
> > > ewri...@streamnative.io
> > > > > > > .invalid>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Does anyone have further comment on FLIP-167?
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Eron
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, May 20, 2021 at 5:02 PM Eron Wright <
> > > > ewri...@streamnative.io
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Filed FLIP-167: Watermarks for Sink API:
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
> > > > > > > > >
> > > > > > > > > I'd like to call a vote next week, is that reasonable?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, May 19, 2021 at 6:28 PM Zhou, Brian <
> b.z...@dell.com
> > >
> > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Hi Arvid and Eron,
> > > > > > > > >>
> > > > > > > > >> Thanks for the discussion and I read through Eron's pull
> > > request
> > > > > > and I
> > > > > > > > >> think this can benefit Pravega Flink connector as well.
> > > > > > > > >>
> > > > > > > > >> Here is some background. Pravega had the watermark concept
> > > > through
> > > > > > the
> > > > > > > > >> event stream since two years ago, and here is a blog
> > > > > introduction[1]
> > > > > > > for
> > > > > > > > >> Pravega watermark.
> > > > > > > > >> Pravega Flink connector also had this watermark
> integration
> > > last
> > > > > > year
> > > > > > > > >> that we wanted to propagate the Flink watermark to Pravega
> > in
> > > > the
> > > > > > > > >> SinkFunction, and at that time we just used the existing
> > Flink
> > > > API
> > > > > > > that
> > > > > > > > we
> > > > > > > > >> keep the last watermark in memory and check if watermark
> > > changes
> > > > > for
> > > > > > > > each
> > > > > > > > >> event[2] which is not efficient. With such new interface,
> we
> > > can
> > > > > > also
> > > > > > > > >> manage the watermark propagation much more easily.
> > > > > > > > >>
> > > > > > > > >> [1]
> > > > > > https://pravega.io/blog/2019/11/08/pravega-watermarking-support/
> > > > > > > > >> [2]
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L465
> > > > > > > > >>
> > > > > > > > >> -----Original Message-----
> > > > > > > > >> From: Arvid Heise <ar...@apache.org>
> > > > > > > > >> Sent: Wednesday, May 19, 2021 16:06
> > > > > > > > >> To: dev
> > > > > > > > >> Subject: Re: [DISCUSS] Watermark propagation with Sink API
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> [EXTERNAL EMAIL]
> > > > > > > > >>
> > > > > > > > >> Hi Eron,
> > > > > > > > >>
> > > > > > > > >> Thanks for pushing that topic. I can now see that the
> > benefit
> > > is
> > > > > > even
> > > > > > > > >> bigger than I initially thought. So it's worthwhile
> anyways
> > to
> > > > > > include
> > > > > > > > that.
> > > > > > > > >>
> > > > > > > > >> I also briefly thought about exposing watermarks to all
> > UDFs,
> > > > but
> > > > > > > here I
> > > > > > > > >> really have an issue to see specific use cases. Could you
> > > maybe
> > > > > > take a
> > > > > > > > few
> > > > > > > > >> minutes to think about it as well? I could only see
> someone
> > > > > misusing
> > > > > > > > Async
> > > > > > > > >> IO as a sink where a real sink would be more appropriate.
> In
> > > > > > general,
> > > > > > > if
> > > > > > > > >> there is not a clear use case, we shouldn't add the
> > > > functionality
> > > > > as
> > > > > > > > it's
> > > > > > > > >> just increased maintenance for no value.
> > > > > > > > >>
> > > > > > > > >> If we stick to the plan, I think your PR is already in a
> > good
> > > > > shape.
> > > > > > > We
> > > > > > > > >> need to create a FLIP for it though, since it changes
> Public
> > > > > > > interfaces
> > > > > > > > >> [1]. I was initially not convinced that we should also
> > change
> > > > the
> > > > > > old
> > > > > > > > >> SinkFunction interface, but seeing how little the change
> > is, I
> > > > > > > wouldn't
> > > > > > > > >> mind at all to increase consistency. Only when we wrote
> the
> > > FLIP
> > > > > and
> > > > > > > > >> approved it (which should be minimal and fast), we should
> > > > actually
> > > > > > > look
> > > > > > > > at
> > > > > > > > >> the PR ;).
> > > > > > > > >>
> > > > > > > > >> The only thing which I would improve is the name of the
> > > > function.
> > > > > > > > >> processWatermark sounds as if the sink implementer really
> > > needs
> > > > to
> > > > > > > > >> implement it (as you would need to do it on a custom
> > > operator).
> > > > I
> > > > > > > would
> > > > > > > > >> make them symmetric to the record writing/invoking method
> > > (e.g.
> > > > > > > > >> writeWatermark and invokeWatermark).
> > > > > > > > >>
> > > > > > > > >> As a follow-up PR, we should then migrate KafkaShuffle to
> > the
> > > > new
> > > > > > API.
> > > > > > > > >> But that's something I can do.
> > > > > > > > >>
> > > > > > > > >> [1]
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/Flink*Improvement*Proposals__;Kys!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnp6nc7o$
> > > > > > > > >> [cwiki[.]apache[.]org]
> > > > > > > > >>
> > > > > > > > >> On Wed, May 19, 2021 at 3:34 AM Eron Wright <
> > > > > > ewri...@streamnative.io
> > > > > > > > >> .invalid>
> > > > > > > > >> wrote:
> > > > > > > > >>
> > > > > > > > >> > Update: opened an issue and a PR.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > >
> > > > >
> > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN
> > > > > > > > >> >
> > > > > > >
> > > >
> K-22700__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dM
> > > > > > > > >> > plbgRO4$ [issues[.]apache[.]org]
> > > > > > > > >> >
> > > > > > > >
> > > > >
> > https://urldefense.com/v3/__https://github.com/apache/flink/pull/15950
> > > > > > > > >> >
> > > > > > >
> > > >
> __;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtScmG7a
> > > > > > > > >> > $ [github[.]com]
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > On Tue, May 18, 2021 at 10:03 AM Eron Wright <
> > > > > > > ewri...@streamnative.io
> > > > > > > > >
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> > > Thanks Arvid and David for sharing your ideas on this
> > > > subject.
> > > > > > > I'm
> > > > > > > > >> > > glad to hear that you're seeing use cases for
> watermark
> > > > > > > propagation
> > > > > > > > >> > > via an enhanced sink interface.
> > > > > > > > >> > >
> > > > > > > > >> > > As you've guessed, my interest is in Pulsar and am
> > > exploring
> > > > > > some
> > > > > > > > >> > > options for brokering watermarks across stream
> > processing
> > > > > > > pipelines.
> > > > > > > > >> > > I think
> > > > > > > > >> > Arvid
> > > > > > > > >> > > is speaking to a high-fidelity solution where the
> > > difference
> > > > > > > between
> > > > > > > > >> > intra-
> > > > > > > > >> > > and inter-pipeline flow is eliminated.  My goal is
> more
> > > > > > limited; I
> > > > > > > > >> > > want
> > > > > > > > >> > to
> > > > > > > > >> > > write the watermark that arrives at the sink to
> Pulsar.
> > > > > Simply
> > > > > > > > >> > > imagine that Pulsar has native support for
> watermarking
> > in
> > > > its
> > > > > > > > >> > > producer/consumer API, and we'll leave the details to
> > > > another
> > > > > > > forum.
> > > > > > > > >> > >
> > > > > > > > >> > > David, I like your invariant.  I see lateness as
> > stemming
> > > > from
> > > > > > the
> > > > > > > > >> > problem
> > > > > > > > >> > > domain and from system dynamics (e.g. scheduling,
> > > batching,
> > > > > > lag).
> > > > > > > > >> > > When
> > > > > > > > >> > one
> > > > > > > > >> > > depends on order-of-observation to generate
> watermarks,
> > > the
> > > > > app
> > > > > > > may
> > > > > > > > >> > become
> > > > > > > > >> > > unduly sensitive to dynamics which bear on
> > > > > order-of-observation.
> > > > > > > My
> > > > > > > > >> > > goal is to factor out the system dynamics from
> lateness
> > > > > > > > determination.
> > > > > > > > >> > >
> > > > > > > > >> > > Arvid, to be most valuable (at least for my purposes)
> > the
> > > > > > > > >> > > enhancement is needed on SinkFunction.  This will
> allow
> > us
> > > > to
> > > > > > > easily
> > > > > > > > >> > > evolve the existing Pulsar connector.
> > > > > > > > >> > >
> > > > > > > > >> > > Next step, I will open a PR to advance the
> conversation.
> > > > > > > > >> > >
> > > > > > > > >> > > Eron
> > > > > > > > >> > >
> > > > > > > > >> > > On Tue, May 18, 2021 at 5:06 AM David Morávek
> > > > > > > > >> > > <david.mora...@gmail.com>
> > > > > > > > >> > > wrote:
> > > > > > > > >> > >
> > > > > > > > >> > >> Hi Eron,
> > > > > > > > >> > >>
> > > > > > > > >> > >> Thanks for starting this discussion. I've been
> thinking
> > > > about
> > > > > > > this
> > > > > > > > >> > >> recently as we've run into "watermark related"
> issues,
> > > when
> > > > > > > > >> > >> chaining multiple pipelines together. My to cents to
> > the
> > > > > > > > >> > >> discussion:
> > > > > > > > >> > >>
> > > > > > > > >> > >> How I like to think about the problem, is that there
> > > should
> > > > > an
> > > > > > > > >> > >> invariant that holds for any stream processing
> > pipeline:
> > > > > > > "NON_LATE
> > > > > > > > >> > >> element
> > > > > > > > >> > entering
> > > > > > > > >> > >> the system, should never become LATE"
> > > > > > > > >> > >>
> > > > > > > > >> > >> Unfortunately this is exactly what happens in
> > downstream
> > > > > > > pipelines,
> > > > > > > > >> > >> because the upstream one can:
> > > > > > > > >> > >> - break ordering (especially with higher parallelism)
> > > > > > > > >> > >> - emit elements that are ahead of output watermark
> > > > > > > > >> > >>
> > > > > > > > >> > >> There is not enough information to re-construct
> > upstream
> > > > > > > watermark
> > > > > > > > >> > >> in latter stages (it's always just an estimate based
> on
> > > > > > previous
> > > > > > > > >> > >> pipeline's output).
> > > > > > > > >> > >>
> > > > > > > > >> > >> It would be great, if we could have a general
> > > abstraction,
> > > > > that
> > > > > > > is
> > > > > > > > >> > >> reusable for various sources / sinks (not just Kafka
> /
> > > > > Pulsar,
> > > > > > > > >> > >> thought this would probably cover most of the
> > use-cases)
> > > > and
> > > > > > > > >> > >> systems.
> > > > > > > > >> > >>
> > > > > > > > >> > >> Is there any other use-case then sharing watermark
> > > between
> > > > > > > > >> > >> pipelines,
> > > > > > > > >> > that
> > > > > > > > >> > >> you're trying to solve?
> > > > > > > > >> > >>
> > > > > > > > >> > >> Arvid:
> > > > > > > > >> > >>
> > > > > > > > >> > >> 1. Watermarks are closely coupled to the used system
> > > > > (=Flink).
> > > > > > I
> > > > > > > > >> > >> have a
> > > > > > > > >> > >> > hard time imagining that it's useful to use a
> > different
> > > > > > stream
> > > > > > > > >> > processor
> > > > > > > > >> > >> > downstream. So for now, I'm assuming that both
> > upstream
> > > > and
> > > > > > > > >> > >> > downstream
> > > > > > > > >> > >> are
> > > > > > > > >> > >> > Flink applications. In that case, we probably
> define
> > > both
> > > > > > parts
> > > > > > > > >> > >> > of the pipeline in the same Flink job similar to
> > > > > > KafkaStream's
> > > > > > > > >> #through.
> > > > > > > > >> > >> >
> > > > > > > > >> > >>
> > > > > > > > >> > >> I'd slightly disagree here. For example we're
> > > > "materializing"
> > > > > > > > >> > change-logs
> > > > > > > > >> > >> produced by Flink pipeline into serving layer (random
> > > > access
> > > > > > db /
> > > > > > > > >> > >> in memory view / ..) and we need to know, whether
> > > responses
> > > > > we
> > > > > > > > >> > >> serve meet the "freshness" requirements (eg. you may
> > want
> > > > to
> > > > > > > > >> > >> respond differently, when watermark is lagging way
> too
> > > much
> > > > > > > behind
> > > > > > > > >> > >> processing time). Also not
> > > > > > > > >> > every
> > > > > > > > >> > >> stream processor in the pipeline needs to be Flink.
> It
> > > can
> > > > as
> > > > > > > well
> > > > > > > > >> > >> be a simple element-wise transformation that reads
> from
> > > > Kafka
> > > > > > and
> > > > > > > > >> > >> writes back into separate topic (that's what we do
> for
> > > > > example
> > > > > > > with
> > > > > > > > >> > >> ML models, that have special hardware requirements).
> > > > > > > > >> > >>
> > > > > > > > >> > >> Best,
> > > > > > > > >> > >> D.
> > > > > > > > >> > >>
> > > > > > > > >> > >>
> > > > > > > > >> > >> On Tue, May 18, 2021 at 8:30 AM Arvid Heise <
> > > > > ar...@apache.org>
> > > > > > > > >> wrote:
> > > > > > > > >> > >>
> > > > > > > > >> > >> > Hi Eron,
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > I think this is a useful addition for storage
> systems
> > > > that
> > > > > > act
> > > > > > > as
> > > > > > > > >> > >> > pass-through for Flink to reduce recovery time. It
> is
> > > > only
> > > > > > > useful
> > > > > > > > >> > >> > if
> > > > > > > > >> > you
> > > > > > > > >> > >> > combine it with regional fail-over as only a small
> > part
> > > > of
> > > > > > the
> > > > > > > > >> > pipeline
> > > > > > > > >> > >> is
> > > > > > > > >> > >> > restarted.
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > A couple of thoughts on the implications:
> > > > > > > > >> > >> > 1. Watermarks are closely coupled to the used
> system
> > > > > > (=Flink).
> > > > > > > I
> > > > > > > > >> > >> > have
> > > > > > > > >> > a
> > > > > > > > >> > >> > hard time imagining that it's useful to use a
> > different
> > > > > > stream
> > > > > > > > >> > processor
> > > > > > > > >> > >> > downstream. So for now, I'm assuming that both
> > upstream
> > > > and
> > > > > > > > >> > >> > downstream
> > > > > > > > >> > >> are
> > > > > > > > >> > >> > Flink applications. In that case, we probably
> define
> > > both
> > > > > > parts
> > > > > > > > >> > >> > of the pipeline in the same Flink job similar to
> > > > > > KafkaStream's
> > > > > > > > >> #through.
> > > > > > > > >> > >> > 2. The schema of the respective intermediate
> > > stream/topic
> > > > > > would
> > > > > > > > >> > >> > need
> > > > > > > > >> > to
> > > > > > > > >> > >> be
> > > > > > > > >> > >> > managed by Flink to encode both records and
> > watermarks.
> > > > > This
> > > > > > > > >> > >> > reduces
> > > > > > > > >> > the
> > > > > > > > >> > >> > usability quite a bit and needs to be carefully
> > > crafted.
> > > > > > > > >> > >> > 3. It's not clear to me if constructs like
> > > SchemaRegistry
> > > > > can
> > > > > > > be
> > > > > > > > >> > >> properly
> > > > > > > > >> > >> > supported (and also if they should be supported) in
> > > terms
> > > > > of
> > > > > > > > >> > >> > schema evolution.
> > > > > > > > >> > >> > 4. Potentially, StreamStatus and LatencyMarker
> would
> > > also
> > > > > > need
> > > > > > > to
> > > > > > > > >> > >> > be encoded.
> > > > > > > > >> > >> > 5. It's important to have some way to transport
> > > > > backpressure
> > > > > > > from
> > > > > > > > >> > >> > the downstream to the upstream. Or else you would
> > have
> > > > the
> > > > > > same
> > > > > > > > >> > >> > issue as KafkaStreams where two separate pipelines
> > can
> > > > > drift
> > > > > > so
> > > > > > > > >> > >> > far away that
> > > > > > > > >> > you
> > > > > > > > >> > >> > experience data loss if the data retention period
> is
> > > > > smaller
> > > > > > > than
> > > > > > > > >> > >> > the drift.
> > > > > > > > >> > >> > 6. It's clear that you trade a huge chunk of
> > throughput
> > > > for
> > > > > > > lower
> > > > > > > > >> > >> overall
> > > > > > > > >> > >> > latency in case of failure. So it's an interesting
> > > > feature
> > > > > > for
> > > > > > > > >> > >> > use
> > > > > > > > >> > cases
> > > > > > > > >> > >> > with SLAs.
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > Since we are phasing out SinkFunction, I'd prefer
> to
> > > only
> > > > > > > support
> > > > > > > > >> > >> > SinkWriter. Having a no-op default sounds good to
> me.
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > We have some experimental feature for Kafka [1],
> > which
> > > > > pretty
> > > > > > > > >> > >> > much
> > > > > > > > >> > >> reflects
> > > > > > > > >> > >> > your idea. Here we have an ugly workaround to be
> able
> > > to
> > > > > > > process
> > > > > > > > >> > >> > the watermark by using a custom StreamSink task. We
> > > could
> > > > > > also
> > > > > > > > >> > >> > try to
> > > > > > > > >> > >> create a
> > > > > > > > >> > >> > FLIP that abstracts the actual system away and then
> > we
> > > > > could
> > > > > > > use
> > > > > > > > >> > >> > the approach for both Pulsar and Kafka.
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > [1]
> > > > > > > > >> > >> >
> > > > > > > > >> > >> >
> > > > > > > > >> > >>
> > > > > > > > >> >
> > > > > > > >
> > > > >
> > https://urldefense.com/v3/__https://github.com/apache/flink/blob/maste
> > > > > > > > >> >
> > > > > > >
> > > >
> r/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flin
> > > > > > > > >> >
> > > > > > >
> > > >
> k/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java*L103__;Iw!
> > > > > > > > >> >
> > > > > >
> !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMvmemHrt$
> > > > > > > > >> > [github[.]com]
> > > > > > > > >> > >> >
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > On Mon, May 17, 2021 at 10:44 PM Eron Wright
> > > > > > > > >> > >> > <ewri...@streamnative.io.invalid> wrote:
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > > I would like to propose an enhancement to the
> Sink
> > > API,
> > > > > the
> > > > > > > > >> > >> > > ability
> > > > > > > > >> > to
> > > > > > > > >> > >> > > receive upstream watermarks.   I'm aware that the
> > > sink
> > > > > > > context
> > > > > > > > >> > >> provides
> > > > > > > > >> > >> > the
> > > > > > > > >> > >> > > current watermark for a given record.  I'd like
> to
> > be
> > > > > able
> > > > > > to
> > > > > > > > >> > >> > > write
> > > > > > > > >> > a
> > > > > > > > >> > >> > sink
> > > > > > > > >> > >> > > function that is invoked whenever the watermark
> > > > changes.
> > > > > > Out
> > > > > > > > >> > >> > > of
> > > > > > > > >> > scope
> > > > > > > > >> > >> > > would be event-time timers (since sinks aren't
> > > keyed).
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > For context, imagine that a stream storage system
> > had
> > > > the
> > > > > > > > >> > >> > > ability to persist watermarks in addition to
> > ordinary
> > > > > > > elements,
> > > > > > > > >> > >> > > e.g. to serve
> > > > > > > > >> > as
> > > > > > > > >> > >> > > source watermarks in a downstream processor.
> > Ideally
> > > > one
> > > > > > > could
> > > > > > > > >> > >> compose a
> > > > > > > > >> > >> > > multi-stage, event-driven application, with
> > > watermarks
> > > > > > > flowing
> > > > > > > > >> > >> end-to-end
> > > > > > > > >> > >> > > without need for a heuristics-based watermark at
> > each
> > > > > > stage.
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > The specific proposal would be a new method on
> > > > > > `SinkFunction`
> > > > > > > > >> > >> > > and/or
> > > > > > > > >> > >> on
> > > > > > > > >> > >> > > `SinkWriter`, called 'processWatermark' or
> > > > > > 'writeWatermark',
> > > > > > > > >> > >> > > with a
> > > > > > > > >> > >> > default
> > > > > > > > >> > >> > > implementation that does nothing.
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > Thoughts?
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > Thanks!
> > > > > > > > >> > >> > > Eron Wright
> > > > > > > > >> > >> > > StreamNative
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> >
> > > > > > > > >> > >>
> > > > > > > > >> > >
> > > > > > > > >> > >
> > > > > > > > >> > > --
> > > > > > > > >> > >
> > > > > > > > >> > > Eron Wright   Cloud Engineering Lead
> > > > > > > > >> > >
> > > > > > > > >> > > p: +1 425 922 8617 <18163542939>
> > > > > > > > >> > >
> > > > > > > > >> > > streamnative.io |  Meet with me
> > > > > > > > >> > > <
> > > > > > > >
> > > > https://urldefense.com/v3/__https://calendly.com/eronwright/regular
> > > > > > > > >> > >
> > > > > > >
> > > -1-hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5
> > > > > > > > >> > > dMtQrD25c$ [calendly[.]com]>
> > > > > > > > >> > >
> > > > > > > > >> > > <
> > > > > > > >
> > > > https://urldefense.com/v3/__https://github.com/streamnative__;!!LpK
> > > > > > > > >> > >
> > > > > I!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$
> > > > > > > > >> > > [github[.]com]>
> > > > > > > > >> > > <
> > > > > > > >
> > > > https://urldefense.com/v3/__https://www.linkedin.com/company/stream
> > > > > > > > >> > >
> > > > > > >
> > > native/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5
> > > > > > > > >> > > dMqO4UZJa$ [linkedin[.]com]>
> > > > > > > > >> > > <
> > > > > > >
> > https://urldefense.com/v3/__https://twitter.com/streamnativeio/__
> > > > > > > > ;!
> > > > > > > > >> > >
> > > > > > >
> > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$
> > > > > > > > >> > > [twitter[.]com]>
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > --
> > > > > > > > >> >
> > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > >> >
> > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > >> >
> > > > > > > > >> > streamnative.io |  Meet with me
> > > > > > > > >> > <
> > > > > > > >
> > > > >
> > https://urldefense.com/v3/__https://calendly.com/eronwright/regular-1
> > > > > > > > >> >
> > > > > > >
> > > >
> -hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtQ
> > > > > > > > >> > rD25c$ [calendly[.]com]>
> > > > > > > > >> >
> > > > > > > > >> > <
> > > > > > >
> > > https://urldefense.com/v3/__https://github.com/streamnative__;!!LpKI
> > > > > > > > !
> > > > > > > > >> >
> > > 2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$
> > > > > > > > >> > [github[.]com]>
> > > > > > > > >> > <
> > > > > > > >
> > > > >
> > https://urldefense.com/v3/__https://www.linkedin.com/company/streamna
> > > > > > > > >> >
> > > > > > >
> > > >
> tive/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMqO
> > > > > > > > >> > 4UZJa$ [linkedin[.]com]>
> > > > > > > > >> > <
> > > > > > > >
> > > > >
> > https://urldefense.com/v3/__https://twitter.com/streamnativeio/__;!!L
> > > > > > > > >> >
> > > > > pKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$
> > > > > > > > >> > [twitter[.]com]>
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > >
> > > > > > > > > Eron Wright   Cloud Engineering Lead
> > > > > > > > >
> > > > > > > > > p: +1 425 922 8617 <18163542939>
> > > > > > > > >
> > > > > > > > > streamnative.io |  Meet with me
> > > > > > > > > <https://calendly.com/eronwright/regular-1-hour>
> > > > > > > > >
> > > > > > > > > <https://github.com/streamnative>
> > > > > > > > > <https://www.linkedin.com/company/streamnative/>
> > > > > > > > > <https://twitter.com/streamnativeio/>
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > >
> > > > > > > > Eron Wright   Cloud Engineering Lead
> > > > > > > >
> > > > > > > > p: +1 425 922 8617 <18163542939>
> > > > > > > >
> > > > > > > > streamnative.io |  Meet with me
> > > > > > > > <https://calendly.com/eronwright/regular-1-hour>
> > > > > > > >
> > > > > > > > <https://github.com/streamnative>
> > > > > > > > <https://www.linkedin.com/company/streamnative/>
> > > > > > > > <https://twitter.com/streamnativeio/>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Eron Wright   Cloud Engineering Lead
> > > > > >
> > > > > > p: +1 425 922 8617 <18163542939>
> > > > > >
> > > > > > streamnative.io |  Meet with me
> > > > > > <https://calendly.com/eronwright/regular-1-hour>
> > > > > >
> > > > > > <https://github.com/streamnative>
> > > > > > <https://www.linkedin.com/company/streamnative/>
> > > > > > <https://twitter.com/streamnativeio/>
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Eron Wright   Cloud Engineering Lead
> > > >
> > > > p: +1 425 922 8617 <18163542939>
> > > >
> > > > streamnative.io |  Meet with me
> > > > <https://calendly.com/eronwright/regular-1-hour>
> > > >
> > > > <https://github.com/streamnative>
> > > > <https://www.linkedin.com/company/streamnative/>
> > > > <https://twitter.com/streamnativeio/>
> > > >
> > >
> >
>
>
> --
>
> Eron Wright   Cloud Engineering Lead
>
> p: +1 425 922 8617 <18163542939>
>
> streamnative.io |  Meet with me
> <https://calendly.com/eronwright/regular-1-hour>
>
> <https://github.com/streamnative>
> <https://www.linkedin.com/company/streamnative/>
> <https://twitter.com/streamnativeio/>
>

Reply via email to