Does anyone have further comment on FLIP-167? https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
Thanks, Eron On Thu, May 20, 2021 at 5:02 PM Eron Wright <ewri...@streamnative.io> wrote: > Filed FLIP-167: Watermarks for Sink API: > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API > > I'd like to call a vote next week, is that reasonable? > > > On Wed, May 19, 2021 at 6:28 PM Zhou, Brian <b.z...@dell.com> wrote: > >> Hi Arvid and Eron, >> >> Thanks for the discussion and I read through Eron's pull request and I >> think this can benefit Pravega Flink connector as well. >> >> Here is some background. Pravega had the watermark concept through the >> event stream since two years ago, and here is a blog introduction[1] for >> Pravega watermark. >> Pravega Flink connector also had this watermark integration last year >> that we wanted to propagate the Flink watermark to Pravega in the >> SinkFunction, and at that time we just used the existing Flink API that we >> keep the last watermark in memory and check if watermark changes for each >> event[2] which is not efficient. With such new interface, we can also >> manage the watermark propagation much more easily. >> >> [1] https://pravega.io/blog/2019/11/08/pravega-watermarking-support/ >> [2] >> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L465 >> >> -----Original Message----- >> From: Arvid Heise <ar...@apache.org> >> Sent: Wednesday, May 19, 2021 16:06 >> To: dev >> Subject: Re: [DISCUSS] Watermark propagation with Sink API >> >> >> [EXTERNAL EMAIL] >> >> Hi Eron, >> >> Thanks for pushing that topic. I can now see that the benefit is even >> bigger than I initially thought. So it's worthwhile anyways to include that. >> >> I also briefly thought about exposing watermarks to all UDFs, but here I >> really have an issue to see specific use cases. Could you maybe take a few >> minutes to think about it as well? I could only see someone misusing Async >> IO as a sink where a real sink would be more appropriate. In general, if >> there is not a clear use case, we shouldn't add the functionality as it's >> just increased maintenance for no value. >> >> If we stick to the plan, I think your PR is already in a good shape. We >> need to create a FLIP for it though, since it changes Public interfaces >> [1]. I was initially not convinced that we should also change the old >> SinkFunction interface, but seeing how little the change is, I wouldn't >> mind at all to increase consistency. Only when we wrote the FLIP and >> approved it (which should be minimal and fast), we should actually look at >> the PR ;). >> >> The only thing which I would improve is the name of the function. >> processWatermark sounds as if the sink implementer really needs to >> implement it (as you would need to do it on a custom operator). I would >> make them symmetric to the record writing/invoking method (e.g. >> writeWatermark and invokeWatermark). >> >> As a follow-up PR, we should then migrate KafkaShuffle to the new API. >> But that's something I can do. >> >> [1] >> >> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/Flink*Improvement*Proposals__;Kys!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnp6nc7o$ >> [cwiki[.]apache[.]org] >> >> On Wed, May 19, 2021 at 3:34 AM Eron Wright <ewri...@streamnative.io >> .invalid> >> wrote: >> >> > Update: opened an issue and a PR. >> > >> > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN >> > K-22700__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dM >> > plbgRO4$ [issues[.]apache[.]org] >> > https://urldefense.com/v3/__https://github.com/apache/flink/pull/15950 >> > __;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtScmG7a >> > $ [github[.]com] >> > >> > >> > On Tue, May 18, 2021 at 10:03 AM Eron Wright <ewri...@streamnative.io> >> > wrote: >> > >> > > Thanks Arvid and David for sharing your ideas on this subject. I'm >> > > glad to hear that you're seeing use cases for watermark propagation >> > > via an enhanced sink interface. >> > > >> > > As you've guessed, my interest is in Pulsar and am exploring some >> > > options for brokering watermarks across stream processing pipelines. >> > > I think >> > Arvid >> > > is speaking to a high-fidelity solution where the difference between >> > intra- >> > > and inter-pipeline flow is eliminated. My goal is more limited; I >> > > want >> > to >> > > write the watermark that arrives at the sink to Pulsar. Simply >> > > imagine that Pulsar has native support for watermarking in its >> > > producer/consumer API, and we'll leave the details to another forum. >> > > >> > > David, I like your invariant. I see lateness as stemming from the >> > problem >> > > domain and from system dynamics (e.g. scheduling, batching, lag). >> > > When >> > one >> > > depends on order-of-observation to generate watermarks, the app may >> > become >> > > unduly sensitive to dynamics which bear on order-of-observation. My >> > > goal is to factor out the system dynamics from lateness determination. >> > > >> > > Arvid, to be most valuable (at least for my purposes) the >> > > enhancement is needed on SinkFunction. This will allow us to easily >> > > evolve the existing Pulsar connector. >> > > >> > > Next step, I will open a PR to advance the conversation. >> > > >> > > Eron >> > > >> > > On Tue, May 18, 2021 at 5:06 AM David Morávek >> > > <david.mora...@gmail.com> >> > > wrote: >> > > >> > >> Hi Eron, >> > >> >> > >> Thanks for starting this discussion. I've been thinking about this >> > >> recently as we've run into "watermark related" issues, when >> > >> chaining multiple pipelines together. My to cents to the >> > >> discussion: >> > >> >> > >> How I like to think about the problem, is that there should an >> > >> invariant that holds for any stream processing pipeline: "NON_LATE >> > >> element >> > entering >> > >> the system, should never become LATE" >> > >> >> > >> Unfortunately this is exactly what happens in downstream pipelines, >> > >> because the upstream one can: >> > >> - break ordering (especially with higher parallelism) >> > >> - emit elements that are ahead of output watermark >> > >> >> > >> There is not enough information to re-construct upstream watermark >> > >> in latter stages (it's always just an estimate based on previous >> > >> pipeline's output). >> > >> >> > >> It would be great, if we could have a general abstraction, that is >> > >> reusable for various sources / sinks (not just Kafka / Pulsar, >> > >> thought this would probably cover most of the use-cases) and >> > >> systems. >> > >> >> > >> Is there any other use-case then sharing watermark between >> > >> pipelines, >> > that >> > >> you're trying to solve? >> > >> >> > >> Arvid: >> > >> >> > >> 1. Watermarks are closely coupled to the used system (=Flink). I >> > >> have a >> > >> > hard time imagining that it's useful to use a different stream >> > processor >> > >> > downstream. So for now, I'm assuming that both upstream and >> > >> > downstream >> > >> are >> > >> > Flink applications. In that case, we probably define both parts >> > >> > of the pipeline in the same Flink job similar to KafkaStream's >> #through. >> > >> > >> > >> >> > >> I'd slightly disagree here. For example we're "materializing" >> > change-logs >> > >> produced by Flink pipeline into serving layer (random access db / >> > >> in memory view / ..) and we need to know, whether responses we >> > >> serve meet the "freshness" requirements (eg. you may want to >> > >> respond differently, when watermark is lagging way too much behind >> > >> processing time). Also not >> > every >> > >> stream processor in the pipeline needs to be Flink. It can as well >> > >> be a simple element-wise transformation that reads from Kafka and >> > >> writes back into separate topic (that's what we do for example with >> > >> ML models, that have special hardware requirements). >> > >> >> > >> Best, >> > >> D. >> > >> >> > >> >> > >> On Tue, May 18, 2021 at 8:30 AM Arvid Heise <ar...@apache.org> >> wrote: >> > >> >> > >> > Hi Eron, >> > >> > >> > >> > I think this is a useful addition for storage systems that act as >> > >> > pass-through for Flink to reduce recovery time. It is only useful >> > >> > if >> > you >> > >> > combine it with regional fail-over as only a small part of the >> > pipeline >> > >> is >> > >> > restarted. >> > >> > >> > >> > A couple of thoughts on the implications: >> > >> > 1. Watermarks are closely coupled to the used system (=Flink). I >> > >> > have >> > a >> > >> > hard time imagining that it's useful to use a different stream >> > processor >> > >> > downstream. So for now, I'm assuming that both upstream and >> > >> > downstream >> > >> are >> > >> > Flink applications. In that case, we probably define both parts >> > >> > of the pipeline in the same Flink job similar to KafkaStream's >> #through. >> > >> > 2. The schema of the respective intermediate stream/topic would >> > >> > need >> > to >> > >> be >> > >> > managed by Flink to encode both records and watermarks. This >> > >> > reduces >> > the >> > >> > usability quite a bit and needs to be carefully crafted. >> > >> > 3. It's not clear to me if constructs like SchemaRegistry can be >> > >> properly >> > >> > supported (and also if they should be supported) in terms of >> > >> > schema evolution. >> > >> > 4. Potentially, StreamStatus and LatencyMarker would also need to >> > >> > be encoded. >> > >> > 5. It's important to have some way to transport backpressure from >> > >> > the downstream to the upstream. Or else you would have the same >> > >> > issue as KafkaStreams where two separate pipelines can drift so >> > >> > far away that >> > you >> > >> > experience data loss if the data retention period is smaller than >> > >> > the drift. >> > >> > 6. It's clear that you trade a huge chunk of throughput for lower >> > >> overall >> > >> > latency in case of failure. So it's an interesting feature for >> > >> > use >> > cases >> > >> > with SLAs. >> > >> > >> > >> > Since we are phasing out SinkFunction, I'd prefer to only support >> > >> > SinkWriter. Having a no-op default sounds good to me. >> > >> > >> > >> > We have some experimental feature for Kafka [1], which pretty >> > >> > much >> > >> reflects >> > >> > your idea. Here we have an ugly workaround to be able to process >> > >> > the watermark by using a custom StreamSink task. We could also >> > >> > try to >> > >> create a >> > >> > FLIP that abstracts the actual system away and then we could use >> > >> > the approach for both Pulsar and Kafka. >> > >> > >> > >> > [1] >> > >> > >> > >> > >> > >> >> > https://urldefense.com/v3/__https://github.com/apache/flink/blob/maste >> > r/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flin >> > k/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java*L103__;Iw! >> > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMvmemHrt$ >> > [github[.]com] >> > >> > >> > >> > >> > >> > On Mon, May 17, 2021 at 10:44 PM Eron Wright >> > >> > <ewri...@streamnative.io.invalid> wrote: >> > >> > >> > >> > > I would like to propose an enhancement to the Sink API, the >> > >> > > ability >> > to >> > >> > > receive upstream watermarks. I'm aware that the sink context >> > >> provides >> > >> > the >> > >> > > current watermark for a given record. I'd like to be able to >> > >> > > write >> > a >> > >> > sink >> > >> > > function that is invoked whenever the watermark changes. Out >> > >> > > of >> > scope >> > >> > > would be event-time timers (since sinks aren't keyed). >> > >> > > >> > >> > > For context, imagine that a stream storage system had the >> > >> > > ability to persist watermarks in addition to ordinary elements, >> > >> > > e.g. to serve >> > as >> > >> > > source watermarks in a downstream processor. Ideally one could >> > >> compose a >> > >> > > multi-stage, event-driven application, with watermarks flowing >> > >> end-to-end >> > >> > > without need for a heuristics-based watermark at each stage. >> > >> > > >> > >> > > The specific proposal would be a new method on `SinkFunction` >> > >> > > and/or >> > >> on >> > >> > > `SinkWriter`, called 'processWatermark' or 'writeWatermark', >> > >> > > with a >> > >> > default >> > >> > > implementation that does nothing. >> > >> > > >> > >> > > Thoughts? >> > >> > > >> > >> > > Thanks! >> > >> > > Eron Wright >> > >> > > StreamNative >> > >> > > >> > >> > >> > >> >> > > >> > > >> > > -- >> > > >> > > Eron Wright Cloud Engineering Lead >> > > >> > > p: +1 425 922 8617 <18163542939> >> > > >> > > streamnative.io | Meet with me >> > > <https://urldefense.com/v3/__https://calendly.com/eronwright/regular >> > > -1-hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5 >> > > dMtQrD25c$ [calendly[.]com]> >> > > >> > > <https://urldefense.com/v3/__https://github.com/streamnative__;!!LpK >> > > I!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$ >> > > [github[.]com]> >> > > <https://urldefense.com/v3/__https://www.linkedin.com/company/stream >> > > native/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5 >> > > dMqO4UZJa$ [linkedin[.]com]> >> > > <https://urldefense.com/v3/__https://twitter.com/streamnativeio/__;! >> > > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$ >> > > [twitter[.]com]> >> > > >> > >> > >> > -- >> > >> > Eron Wright Cloud Engineering Lead >> > >> > p: +1 425 922 8617 <18163542939> >> > >> > streamnative.io | Meet with me >> > <https://urldefense.com/v3/__https://calendly.com/eronwright/regular-1 >> > -hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtQ >> > rD25c$ [calendly[.]com]> >> > >> > <https://urldefense.com/v3/__https://github.com/streamnative__;!!LpKI! >> > 2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$ >> > [github[.]com]> >> > <https://urldefense.com/v3/__https://www.linkedin.com/company/streamna >> > tive/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMqO >> > 4UZJa$ [linkedin[.]com]> >> > <https://urldefense.com/v3/__https://twitter.com/streamnativeio/__;!!L >> > pKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$ >> > [twitter[.]com]> >> > >> > > > -- > > Eron Wright Cloud Engineering Lead > > p: +1 425 922 8617 <18163542939> > > streamnative.io | Meet with me > <https://calendly.com/eronwright/regular-1-hour> > > <https://github.com/streamnative> > <https://www.linkedin.com/company/streamnative/> > <https://twitter.com/streamnativeio/> > -- Eron Wright Cloud Engineering Lead p: +1 425 922 8617 <18163542939> streamnative.io | Meet with me <https://calendly.com/eronwright/regular-1-hour> <https://github.com/streamnative> <https://www.linkedin.com/company/streamnative/> <https://twitter.com/streamnativeio/>