Does anyone have further comment on FLIP-167?
https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API

Thanks,
Eron


On Thu, May 20, 2021 at 5:02 PM Eron Wright <ewri...@streamnative.io> wrote:

> Filed FLIP-167: Watermarks for Sink API:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
>
> I'd like to call a vote next week, is that reasonable?
>
>
> On Wed, May 19, 2021 at 6:28 PM Zhou, Brian <b.z...@dell.com> wrote:
>
>> Hi Arvid and Eron,
>>
>> Thanks for the discussion and I read through Eron's pull request and I
>> think this can benefit Pravega Flink connector as well.
>>
>> Here is some background. Pravega had the watermark concept through the
>> event stream since two years ago, and here is a blog introduction[1] for
>> Pravega watermark.
>> Pravega Flink connector also had this watermark integration last year
>> that we wanted to propagate the Flink watermark to Pravega in the
>> SinkFunction, and at that time we just used the existing Flink API that we
>> keep the last watermark in memory and check if watermark changes for each
>> event[2] which is not efficient. With such new interface, we can also
>> manage the watermark propagation much more easily.
>>
>> [1] https://pravega.io/blog/2019/11/08/pravega-watermarking-support/
>> [2]
>> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L465
>>
>> -----Original Message-----
>> From: Arvid Heise <ar...@apache.org>
>> Sent: Wednesday, May 19, 2021 16:06
>> To: dev
>> Subject: Re: [DISCUSS] Watermark propagation with Sink API
>>
>>
>> [EXTERNAL EMAIL]
>>
>> Hi Eron,
>>
>> Thanks for pushing that topic. I can now see that the benefit is even
>> bigger than I initially thought. So it's worthwhile anyways to include that.
>>
>> I also briefly thought about exposing watermarks to all UDFs, but here I
>> really have an issue to see specific use cases. Could you maybe take a few
>> minutes to think about it as well? I could only see someone misusing Async
>> IO as a sink where a real sink would be more appropriate. In general, if
>> there is not a clear use case, we shouldn't add the functionality as it's
>> just increased maintenance for no value.
>>
>> If we stick to the plan, I think your PR is already in a good shape. We
>> need to create a FLIP for it though, since it changes Public interfaces
>> [1]. I was initially not convinced that we should also change the old
>> SinkFunction interface, but seeing how little the change is, I wouldn't
>> mind at all to increase consistency. Only when we wrote the FLIP and
>> approved it (which should be minimal and fast), we should actually look at
>> the PR ;).
>>
>> The only thing which I would improve is the name of the function.
>> processWatermark sounds as if the sink implementer really needs to
>> implement it (as you would need to do it on a custom operator). I would
>> make them symmetric to the record writing/invoking method (e.g.
>> writeWatermark and invokeWatermark).
>>
>> As a follow-up PR, we should then migrate KafkaShuffle to the new API.
>> But that's something I can do.
>>
>> [1]
>>
>> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/Flink*Improvement*Proposals__;Kys!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnp6nc7o$
>> [cwiki[.]apache[.]org]
>>
>> On Wed, May 19, 2021 at 3:34 AM Eron Wright <ewri...@streamnative.io
>> .invalid>
>> wrote:
>>
>> > Update: opened an issue and a PR.
>> >
>> > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN
>> > K-22700__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dM
>> > plbgRO4$ [issues[.]apache[.]org]
>> > https://urldefense.com/v3/__https://github.com/apache/flink/pull/15950
>> > __;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtScmG7a
>> > $ [github[.]com]
>> >
>> >
>> > On Tue, May 18, 2021 at 10:03 AM Eron Wright <ewri...@streamnative.io>
>> > wrote:
>> >
>> > > Thanks Arvid and David for sharing your ideas on this subject.  I'm
>> > > glad to hear that you're seeing use cases for watermark propagation
>> > > via an enhanced sink interface.
>> > >
>> > > As you've guessed, my interest is in Pulsar and am exploring some
>> > > options for brokering watermarks across stream processing pipelines.
>> > > I think
>> > Arvid
>> > > is speaking to a high-fidelity solution where the difference between
>> > intra-
>> > > and inter-pipeline flow is eliminated.  My goal is more limited; I
>> > > want
>> > to
>> > > write the watermark that arrives at the sink to Pulsar.  Simply
>> > > imagine that Pulsar has native support for watermarking in its
>> > > producer/consumer API, and we'll leave the details to another forum.
>> > >
>> > > David, I like your invariant.  I see lateness as stemming from the
>> > problem
>> > > domain and from system dynamics (e.g. scheduling, batching, lag).
>> > > When
>> > one
>> > > depends on order-of-observation to generate watermarks, the app may
>> > become
>> > > unduly sensitive to dynamics which bear on order-of-observation.  My
>> > > goal is to factor out the system dynamics from lateness determination.
>> > >
>> > > Arvid, to be most valuable (at least for my purposes) the
>> > > enhancement is needed on SinkFunction.  This will allow us to easily
>> > > evolve the existing Pulsar connector.
>> > >
>> > > Next step, I will open a PR to advance the conversation.
>> > >
>> > > Eron
>> > >
>> > > On Tue, May 18, 2021 at 5:06 AM David Morávek
>> > > <david.mora...@gmail.com>
>> > > wrote:
>> > >
>> > >> Hi Eron,
>> > >>
>> > >> Thanks for starting this discussion. I've been thinking about this
>> > >> recently as we've run into "watermark related" issues, when
>> > >> chaining multiple pipelines together. My to cents to the
>> > >> discussion:
>> > >>
>> > >> How I like to think about the problem, is that there should an
>> > >> invariant that holds for any stream processing pipeline: "NON_LATE
>> > >> element
>> > entering
>> > >> the system, should never become LATE"
>> > >>
>> > >> Unfortunately this is exactly what happens in downstream pipelines,
>> > >> because the upstream one can:
>> > >> - break ordering (especially with higher parallelism)
>> > >> - emit elements that are ahead of output watermark
>> > >>
>> > >> There is not enough information to re-construct upstream watermark
>> > >> in latter stages (it's always just an estimate based on previous
>> > >> pipeline's output).
>> > >>
>> > >> It would be great, if we could have a general abstraction, that is
>> > >> reusable for various sources / sinks (not just Kafka / Pulsar,
>> > >> thought this would probably cover most of the use-cases) and
>> > >> systems.
>> > >>
>> > >> Is there any other use-case then sharing watermark between
>> > >> pipelines,
>> > that
>> > >> you're trying to solve?
>> > >>
>> > >> Arvid:
>> > >>
>> > >> 1. Watermarks are closely coupled to the used system (=Flink). I
>> > >> have a
>> > >> > hard time imagining that it's useful to use a different stream
>> > processor
>> > >> > downstream. So for now, I'm assuming that both upstream and
>> > >> > downstream
>> > >> are
>> > >> > Flink applications. In that case, we probably define both parts
>> > >> > of the pipeline in the same Flink job similar to KafkaStream's
>> #through.
>> > >> >
>> > >>
>> > >> I'd slightly disagree here. For example we're "materializing"
>> > change-logs
>> > >> produced by Flink pipeline into serving layer (random access db /
>> > >> in memory view / ..) and we need to know, whether responses we
>> > >> serve meet the "freshness" requirements (eg. you may want to
>> > >> respond differently, when watermark is lagging way too much behind
>> > >> processing time). Also not
>> > every
>> > >> stream processor in the pipeline needs to be Flink. It can as well
>> > >> be a simple element-wise transformation that reads from Kafka and
>> > >> writes back into separate topic (that's what we do for example with
>> > >> ML models, that have special hardware requirements).
>> > >>
>> > >> Best,
>> > >> D.
>> > >>
>> > >>
>> > >> On Tue, May 18, 2021 at 8:30 AM Arvid Heise <ar...@apache.org>
>> wrote:
>> > >>
>> > >> > Hi Eron,
>> > >> >
>> > >> > I think this is a useful addition for storage systems that act as
>> > >> > pass-through for Flink to reduce recovery time. It is only useful
>> > >> > if
>> > you
>> > >> > combine it with regional fail-over as only a small part of the
>> > pipeline
>> > >> is
>> > >> > restarted.
>> > >> >
>> > >> > A couple of thoughts on the implications:
>> > >> > 1. Watermarks are closely coupled to the used system (=Flink). I
>> > >> > have
>> > a
>> > >> > hard time imagining that it's useful to use a different stream
>> > processor
>> > >> > downstream. So for now, I'm assuming that both upstream and
>> > >> > downstream
>> > >> are
>> > >> > Flink applications. In that case, we probably define both parts
>> > >> > of the pipeline in the same Flink job similar to KafkaStream's
>> #through.
>> > >> > 2. The schema of the respective intermediate stream/topic would
>> > >> > need
>> > to
>> > >> be
>> > >> > managed by Flink to encode both records and watermarks. This
>> > >> > reduces
>> > the
>> > >> > usability quite a bit and needs to be carefully crafted.
>> > >> > 3. It's not clear to me if constructs like SchemaRegistry can be
>> > >> properly
>> > >> > supported (and also if they should be supported) in terms of
>> > >> > schema evolution.
>> > >> > 4. Potentially, StreamStatus and LatencyMarker would also need to
>> > >> > be encoded.
>> > >> > 5. It's important to have some way to transport backpressure from
>> > >> > the downstream to the upstream. Or else you would have the same
>> > >> > issue as KafkaStreams where two separate pipelines can drift so
>> > >> > far away that
>> > you
>> > >> > experience data loss if the data retention period is smaller than
>> > >> > the drift.
>> > >> > 6. It's clear that you trade a huge chunk of throughput for lower
>> > >> overall
>> > >> > latency in case of failure. So it's an interesting feature for
>> > >> > use
>> > cases
>> > >> > with SLAs.
>> > >> >
>> > >> > Since we are phasing out SinkFunction, I'd prefer to only support
>> > >> > SinkWriter. Having a no-op default sounds good to me.
>> > >> >
>> > >> > We have some experimental feature for Kafka [1], which pretty
>> > >> > much
>> > >> reflects
>> > >> > your idea. Here we have an ugly workaround to be able to process
>> > >> > the watermark by using a custom StreamSink task. We could also
>> > >> > try to
>> > >> create a
>> > >> > FLIP that abstracts the actual system away and then we could use
>> > >> > the approach for both Pulsar and Kafka.
>> > >> >
>> > >> > [1]
>> > >> >
>> > >> >
>> > >>
>> > https://urldefense.com/v3/__https://github.com/apache/flink/blob/maste
>> > r/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flin
>> > k/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java*L103__;Iw!
>> > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMvmemHrt$
>> > [github[.]com]
>> > >> >
>> > >> >
>> > >> > On Mon, May 17, 2021 at 10:44 PM Eron Wright
>> > >> > <ewri...@streamnative.io.invalid> wrote:
>> > >> >
>> > >> > > I would like to propose an enhancement to the Sink API, the
>> > >> > > ability
>> > to
>> > >> > > receive upstream watermarks.   I'm aware that the sink context
>> > >> provides
>> > >> > the
>> > >> > > current watermark for a given record.  I'd like to be able to
>> > >> > > write
>> > a
>> > >> > sink
>> > >> > > function that is invoked whenever the watermark changes.  Out
>> > >> > > of
>> > scope
>> > >> > > would be event-time timers (since sinks aren't keyed).
>> > >> > >
>> > >> > > For context, imagine that a stream storage system had the
>> > >> > > ability to persist watermarks in addition to ordinary elements,
>> > >> > > e.g. to serve
>> > as
>> > >> > > source watermarks in a downstream processor.  Ideally one could
>> > >> compose a
>> > >> > > multi-stage, event-driven application, with watermarks flowing
>> > >> end-to-end
>> > >> > > without need for a heuristics-based watermark at each stage.
>> > >> > >
>> > >> > > The specific proposal would be a new method on `SinkFunction`
>> > >> > > and/or
>> > >> on
>> > >> > > `SinkWriter`, called 'processWatermark' or 'writeWatermark',
>> > >> > > with a
>> > >> > default
>> > >> > > implementation that does nothing.
>> > >> > >
>> > >> > > Thoughts?
>> > >> > >
>> > >> > > Thanks!
>> > >> > > Eron Wright
>> > >> > > StreamNative
>> > >> > >
>> > >> >
>> > >>
>> > >
>> > >
>> > > --
>> > >
>> > > Eron Wright   Cloud Engineering Lead
>> > >
>> > > p: +1 425 922 8617 <18163542939>
>> > >
>> > > streamnative.io |  Meet with me
>> > > <https://urldefense.com/v3/__https://calendly.com/eronwright/regular
>> > > -1-hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5
>> > > dMtQrD25c$ [calendly[.]com]>
>> > >
>> > > <https://urldefense.com/v3/__https://github.com/streamnative__;!!LpK
>> > > I!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$
>> > > [github[.]com]>
>> > > <https://urldefense.com/v3/__https://www.linkedin.com/company/stream
>> > > native/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5
>> > > dMqO4UZJa$ [linkedin[.]com]>
>> > > <https://urldefense.com/v3/__https://twitter.com/streamnativeio/__;!
>> > > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$
>> > > [twitter[.]com]>
>> > >
>> >
>> >
>> > --
>> >
>> > Eron Wright   Cloud Engineering Lead
>> >
>> > p: +1 425 922 8617 <18163542939>
>> >
>> > streamnative.io |  Meet with me
>> > <https://urldefense.com/v3/__https://calendly.com/eronwright/regular-1
>> > -hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtQ
>> > rD25c$ [calendly[.]com]>
>> >
>> > <https://urldefense.com/v3/__https://github.com/streamnative__;!!LpKI!
>> > 2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$
>> > [github[.]com]>
>> > <https://urldefense.com/v3/__https://www.linkedin.com/company/streamna
>> > tive/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMqO
>> > 4UZJa$ [linkedin[.]com]>
>> > <https://urldefense.com/v3/__https://twitter.com/streamnativeio/__;!!L
>> > pKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$
>> > [twitter[.]com]>
>> >
>>
>
>
> --
>
> Eron Wright   Cloud Engineering Lead
>
> p: +1 425 922 8617 <18163542939>
>
> streamnative.io |  Meet with me
> <https://calendly.com/eronwright/regular-1-hour>
>
> <https://github.com/streamnative>
> <https://www.linkedin.com/company/streamnative/>
> <https://twitter.com/streamnativeio/>
>


-- 

Eron Wright   Cloud Engineering Lead

p: +1 425 922 8617 <18163542939>

streamnative.io |  Meet with me
<https://calendly.com/eronwright/regular-1-hour>

<https://github.com/streamnative>
<https://www.linkedin.com/company/streamnative/>
<https://twitter.com/streamnativeio/>

Reply via email to