Thanks Arvid and David for sharing your ideas on this subject.  I'm glad to
hear that you're seeing use cases for watermark propagation via an enhanced
sink interface.

As you've guessed, my interest is in Pulsar and am exploring some options
for brokering watermarks across stream processing pipelines.  I think Arvid
is speaking to a high-fidelity solution where the difference between intra-
and inter-pipeline flow is eliminated.  My goal is more limited; I want to
write the watermark that arrives at the sink to Pulsar.  Simply imagine
that Pulsar has native support for watermarking in its producer/consumer
API, and we'll leave the details to another forum.

David, I like your invariant.  I see lateness as stemming from the problem
domain and from system dynamics (e.g. scheduling, batching, lag).  When one
depends on order-of-observation to generate watermarks, the app may become
unduly sensitive to dynamics which bear on order-of-observation.  My goal
is to factor out the system dynamics from lateness determination.

Arvid, to be most valuable (at least for my purposes) the enhancement is
needed on SinkFunction.  This will allow us to easily evolve the existing
Pulsar connector.

Next step, I will open a PR to advance the conversation.

Eron

On Tue, May 18, 2021 at 5:06 AM David Morávek <david.mora...@gmail.com>
wrote:

> Hi Eron,
>
> Thanks for starting this discussion. I've been thinking about this recently
> as we've run into "watermark related" issues, when chaining multiple
> pipelines together. My to cents to the discussion:
>
> How I like to think about the problem, is that there should an invariant
> that holds for any stream processing pipeline: "NON_LATE element entering
> the system, should never become LATE"
>
> Unfortunately this is exactly what happens in downstream pipelines, because
> the upstream one can:
> - break ordering (especially with higher parallelism)
> - emit elements that are ahead of output watermark
>
> There is not enough information to re-construct upstream watermark in
> latter stages (it's always just an estimate based on previous pipeline's
> output).
>
> It would be great, if we could have a general abstraction, that is reusable
> for various sources / sinks (not just Kafka / Pulsar, thought this would
> probably cover most of the use-cases) and systems.
>
> Is there any other use-case then sharing watermark between pipelines, that
> you're trying to solve?
>
> Arvid:
>
> 1. Watermarks are closely coupled to the used system (=Flink). I have a
> > hard time imagining that it's useful to use a different stream processor
> > downstream. So for now, I'm assuming that both upstream and downstream
> are
> > Flink applications. In that case, we probably define both parts of the
> > pipeline in the same Flink job similar to KafkaStream's #through.
> >
>
> I'd slightly disagree here. For example we're "materializing" change-logs
> produced by Flink pipeline into serving layer (random access db / in memory
> view / ..) and we need to know, whether responses we serve meet the
> "freshness" requirements (eg. you may want to respond differently, when
> watermark is lagging way too much behind processing time). Also not every
> stream processor in the pipeline needs to be Flink. It can as well be a
> simple element-wise transformation that reads from Kafka and writes back
> into separate topic (that's what we do for example with ML models, that
> have special hardware requirements).
>
> Best,
> D.
>
>
> On Tue, May 18, 2021 at 8:30 AM Arvid Heise <ar...@apache.org> wrote:
>
> > Hi Eron,
> >
> > I think this is a useful addition for storage systems that act as
> > pass-through for Flink to reduce recovery time. It is only useful if you
> > combine it with regional fail-over as only a small part of the pipeline
> is
> > restarted.
> >
> > A couple of thoughts on the implications:
> > 1. Watermarks are closely coupled to the used system (=Flink). I have a
> > hard time imagining that it's useful to use a different stream processor
> > downstream. So for now, I'm assuming that both upstream and downstream
> are
> > Flink applications. In that case, we probably define both parts of the
> > pipeline in the same Flink job similar to KafkaStream's #through.
> > 2. The schema of the respective intermediate stream/topic would need to
> be
> > managed by Flink to encode both records and watermarks. This reduces the
> > usability quite a bit and needs to be carefully crafted.
> > 3. It's not clear to me if constructs like SchemaRegistry can be properly
> > supported (and also if they should be supported) in terms of schema
> > evolution.
> > 4. Potentially, StreamStatus and LatencyMarker would also need to be
> > encoded.
> > 5. It's important to have some way to transport backpressure from the
> > downstream to the upstream. Or else you would have the same issue as
> > KafkaStreams where two separate pipelines can drift so far away that you
> > experience data loss if the data retention period is smaller than the
> > drift.
> > 6. It's clear that you trade a huge chunk of throughput for lower overall
> > latency in case of failure. So it's an interesting feature for use cases
> > with SLAs.
> >
> > Since we are phasing out SinkFunction, I'd prefer to only support
> > SinkWriter. Having a no-op default sounds good to me.
> >
> > We have some experimental feature for Kafka [1], which pretty much
> reflects
> > your idea. Here we have an ugly workaround to be able to process the
> > watermark by using a custom StreamSink task. We could also try to create
> a
> > FLIP that abstracts the actual system away and then we could use the
> > approach for both Pulsar and Kafka.
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java#L103
> >
> >
> > On Mon, May 17, 2021 at 10:44 PM Eron Wright
> > <ewri...@streamnative.io.invalid> wrote:
> >
> > > I would like to propose an enhancement to the Sink API, the ability to
> > > receive upstream watermarks.   I'm aware that the sink context provides
> > the
> > > current watermark for a given record.  I'd like to be able to write a
> > sink
> > > function that is invoked whenever the watermark changes.  Out of scope
> > > would be event-time timers (since sinks aren't keyed).
> > >
> > > For context, imagine that a stream storage system had the ability to
> > > persist watermarks in addition to ordinary elements, e.g. to serve as
> > > source watermarks in a downstream processor.  Ideally one could
> compose a
> > > multi-stage, event-driven application, with watermarks flowing
> end-to-end
> > > without need for a heuristics-based watermark at each stage.
> > >
> > > The specific proposal would be a new method on `SinkFunction` and/or on
> > > `SinkWriter`, called 'processWatermark' or 'writeWatermark', with a
> > default
> > > implementation that does nothing.
> > >
> > > Thoughts?
> > >
> > > Thanks!
> > > Eron Wright
> > > StreamNative
> > >
> >
>


-- 

Eron Wright   Cloud Engineering Lead

p: +1 425 922 8617 <18163542939>

streamnative.io |  Meet with me
<https://calendly.com/eronwright/regular-1-hour>

<https://github.com/streamnative>
<https://www.linkedin.com/company/streamnative/>
<https://twitter.com/streamnativeio/>

Reply via email to