Hi Eron,

The FLIP-167 is narrow, but we recently discovered some problems with
current idleness semantics as Arvid explained. We are planning to present a
new proposal to redefine them. Probably as a part of it, we would need to
rename them. Given that, I think it doesn't make sense to expose idleness
to the sinks before we rename and define it properly. In other words:

> 2. When the sink operator is idled, tell the sink function.

We shouldn't expose stream status as a part of public API until it's
properly defined.

I would propose one of the two things:
1. Proceed with FLIP-167, without exposing idleness in the sinks YET.
Exposing idleness could be part of this next/future FLIP that would define
idleness in the first place.
2. Block FLIP-167, until the idleness is fixed.

I would vote for option number 1.

Piotrek

pon., 7 cze 2021 o 18:08 Eron Wright <ewri...@streamnative.io.invalid>
napisał(a):

> Piotr, David, and Arvid, we've had an expansive discussion but ultimately
> the proposal is narrow.  It is:
> 1. When a watermark arrives at the sink operator, tell the sink function.
> 2. When the sink operator is idled, tell the sink function.
>
> With these enhancements, we will significantly improve correctness in
> multi-stage flows, and facilitate an exciting project in the Pulsar
> community.  Would you please lend your support to FLIP-167 so that we can
> land this enhancement for 1.14?  My deepest thanks!
>
> -Eron
>
>
>
>
> On Mon, Jun 7, 2021 at 4:45 AM Arvid Heise <ar...@apache.org> wrote:
>
> > Hi Eron,
> >
> > you either have very specific use cases in mind or have a misconception
> > about idleness in Flink with the new sources. The basic idea is that you
> > have watermark generators only at the sources and the user supplies them.
> > As a source author, you have no option to limit that. Here a bit of
> > background:
> >
> > We observed that many users that read from Kafka were confused about no
> > visible progress in their Flink applications because of some idle
> partition
> > and we introduced idleness subsequently. Idleness was always considered
> as
> > a means to achieve progress at the risk of losing a bit of correctness.
> > So especially in the case that you describe with a Pulsar partition that
> is
> > empty but indefinitely active, the user needs to be able to use idleness
> > such that downstream window operators progress.
> >
> > I hope to have clarified that "I wouldn't recommend using withIdleness()
> > with source-based watermarks." would pretty much make the intended use
> case
> > not work anymore.
> >
> > ---
> >
> > Nevertheless, from the discussion with you and some offline discussion
> with
> > Piotr and Dawid, we actually found quite a bit of drawbacks from the
> > current definition of idleness:
> > - We currently only use idleness to exclude respective upstream tasks
> from
> > participating in watermark generation (as you have eloquently put further
> > up in the thread).
> > - However, the definition is bound to records. So while a partition is
> > idle, no records should be produced.
> > - That brings us into quite a few edge cases, where operators emit
> records,
> > while they are actually idling: Think of timers, asyncIO operators,
> window
> > operators based on timeouts, etc.
> > - The solution would be to turn the operator active while emitting and
> > returning to being idle afterwards (but when?). However, this has some
> > unintended side-effects depending on when you switch back.
> >
> > We are currently thinking that we should rephrase the definition to what
> > you described:
> > - A channel that is active is providing watermarks.
> > - An idle channel is not providing any watermarks but can deliver
> records.
> > - Then we are not talking about idle partitions anymore but explicit and
> > implicit watermark generation and should probably rename the concepts.
> > - This would probably mean that we also need an explicit markActive in
> > source/sink to express that the respective entity now needs to wait for
> > explicit watermarks.
> >
> > I'll open a proper discussion thread tomorrow.
> >
> > Note that we probably shouldn't rush this FLIP until we have clarified
> the
> > semantics of idleness. We could also cut the scope of the FLIP to exclude
> > idleness and go ahead without it (there should be enough binding votes
> > already).
> >
> > On Sat, Jun 5, 2021 at 12:09 AM Eron Wright <ewri...@streamnative.io
> > .invalid>
> > wrote:
> >
> > > I understand your scenario but I disagree with its assumptions:
> > >
> > > "However, the partition of A is empty and thus A is temporarily idle."
> -
> > > you're assuming that the behavior of the source is to mark itself idle
> if
> > > data isn't available, but that's clearly source-specific and not
> behavior
> > > we expect to have in Pulsar source.  A partition may be empty
> > indefinitely
> > > while still being active.  Imagine that the producer is defending a
> > lease -
> > > "I'm here, there's no data, please don't advance the clock".
> > >
> > > "we bind idleness to wall clock time" - you're characterizing a
> specific
> > > strategy (WatermarkStrategy.withIdleness()), not the inherent behavior
> of
> > > the pipeline.  I wouldn't recommend using withIdleness() with
> > source-based
> > > watermarks.
> > >
> > > I do agree that dynamism in partition assignment can wreak havoc on
> > > watermark correctness.  We have some ideas on the Pulsar side about
> that
> > > too.  I would ask that we focus on the Flink framework and pipeline
> > > behavior.  By offering a more powerful framework, we encourage stream
> > > storage systems to "rise to the occasion" - treat event time in a
> > > first-class way, optimize for correctness, etc.  In this case, FLIP-167
> > is
> > > setting the stage for evolution in Pulsar.
> > >
> > > Thanks again Arvid for the great discussion.
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Jun 4, 2021 at 2:06 PM Arvid Heise <ar...@apache.org> wrote:
> > >
> > > > At least one big motivation is having (temporary) empty partitions.
> Let
> > > me
> > > > give you an example, why imho idleness is only approximate in this
> > case:
> > > > Assume you have source subtask A, B, C that correspond to 3 source
> > > > partitions and a downstream keyed window operator W.
> > > >
> > > > W would usually trigger on min_watermark(A, B, C). However, the
> > partition
> > > > of A is empty and thus A is temporarily idle. So W triggers on
> > > > min_watermark(B, C). When A is now active again, the watermark
> > implicitly
> > > > is min_watermark(B, C) for A!
> > > >
> > > > Let's further assume that the source is filled by another pipeline
> > > before.
> > > > This pipeline experiences technical difficulties for X minutes and
> > could
> > > > not produce into the partition of A, hence the idleness. When the
> > > upstream
> > > > pipeline resumes it fills A with some records that are before
> > > > min_watermark(B, C). Any watermark generated from these records is
> > > > discarded as the watermark is monotonous. Therefore, these records
> will
> > > be
> > > > considered late by W and discarded.
> > > >
> > > > Without idleness, we would have simply bocked W until the upstream
> > > pipeline
> > > > fully recovers and we would not have had any late records. The same
> > holds
> > > > for any reprocessing where the data of partition A is continuous.
> > > >
> > > > If you look deeper, the issue is that we bind idleness to wall clock
> > time
> > > > (e.g. advance watermark after X seconds without data). Then we assume
> > the
> > > > watermark of the idle partition to be in sync with the slowest
> > partition.
> > > > However, in the case of hiccups, this assumption does not hold at
> all.
> > > > I don't see any fix for that (easy or not easy) and imho it's
> inherent
> > to
> > > > the design of idleness.
> > > > We lack information (why is no data coming) and have a heuristic to
> fix
> > > it.
> > > >
> > > > In the case of partition assignment where one subtask has no
> partition,
> > > we
> > > > are probably somewhat safe. We know why no data is coming (no
> > partition)
> > > > and as long as we do not have dynamic partition assignment, there
> will
> > > > never be a switch to active without restart (for the foreseeable
> > future).
> > > >
> > > > On Fri, Jun 4, 2021 at 10:34 PM Eron Wright <ewri...@streamnative.io
> > > > .invalid>
> > > > wrote:
> > > >
> > > > > Yes I'm talking about an implementation of idleness that is
> unrelated
> > > to
> > > > > processing time.  The clear example is partition assignment to
> > > subtasks,
> > > > > which probably motivated Flink's idleness functionality in the
> first
> > > > place.
> > > > >
> > > > > On Fri, Jun 4, 2021 at 12:53 PM Arvid Heise <ar...@apache.org>
> > wrote:
> > > > >
> > > > > > Hi Eron,
> > > > > >
> > > > > > Are you referring to an implementation of idleness that does not
> > rely
> > > > on
> > > > > a
> > > > > > wall clock but on some clock baked into the partition information
> > of
> > > > the
> > > > > > source system?
> > > > > > If so, you are right that it invalidates my points.
> > > > > > Do you have an example on where this is used?
> > > > > >
> > > > > > With a wall clock, you always run into the issues that I describe
> > > since
> > > > > you
> > > > > > are effectively mixing event time and processing time...
> > > > > >
> > > > > >
> > > > > > On Fri, Jun 4, 2021 at 6:28 PM Eron Wright <
> > ewri...@streamnative.io
> > > > > > .invalid>
> > > > > > wrote:
> > > > > >
> > > > > > > Dawid, I think you're mischaracterizing the idleness signal as
> > > > > > inherently a
> > > > > > > heuristic, but Flink does not impose that.  A source-based
> > > watermark
> > > > > (and
> > > > > > > corresponding idleness signal) may well be entirely
> data-driven,
> > > > > entirely
> > > > > > > deterministic.  Basically you're underselling what the pipeline
> > is
> > > > > > capable
> > > > > > > of, based on painful experiences with using the generic,
> > > > > heuristics-based
> > > > > > > watermark assigner.  Please don't let those experiences
> > overshadow
> > > > > what's
> > > > > > > possible with source-based watermarking.
> > > > > > >
> > > > > > > The idleness signal does have a strict definition, it indicates
> > > > whether
> > > > > > the
> > > > > > > stream is actively participating in advancing the event time
> > clock.
> > > > > The
> > > > > > > status of all participants is considered when aggregating
> > > watermarks.
> > > > > A
> > > > > > > source subtask generally makes the determination based on data,
> > > e.g.
> > > > > > > whether a topic is assigned to that subtask.
> > > > > > >
> > > > > > > We have here a modest proposal to add callbacks to the sink
> > > function
> > > > > for
> > > > > > > information that the sink operator already receives.  The
> > practical
> > > > > > result
> > > > > > > is improved correctness when used with streaming systems that
> > have
> > > > > > > first-class support for event time.  The specific changes may
> be
> > > > > > previewed
> > > > > > > here:
> > > > > > > https://github.com/apache/flink/pull/15950
> > > > > > > https://github.com/streamnative/flink/pull/2
> > > > > > >
> > > > > > > Thank you all for the robust discussion. Do I have your support
> > to
> > > > > > proceed
> > > > > > > to enhance FLIP-167 with idleness callbacks and to proceed to a
> > > vote?
> > > > > > >
> > > > > > > Eron
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Jun 4, 2021 at 9:08 AM Arvid Heise <ar...@apache.org>
> > > wrote:
> > > > > > >
> > > > > > > > While everything I wrote before is still valid, upon further
> > > > > > rethinking,
> > > > > > > I
> > > > > > > > think that the conclusion is not necessarily correct:
> > > > > > > > - If the user wants to have pipeline A and B behaving as if
> A+B
> > > was
> > > > > > > jointly
> > > > > > > > executed in the same pipeline without the intermediate Pulsar
> > > > topic,
> > > > > > > having
> > > > > > > > the idleness in that topic is to only way to guarantee
> > > consistency.
> > > > > > > > - We could support the following in the respective sources:
> If
> > > the
> > > > > user
> > > > > > > > that wants to use a different definition of idleness in B,
> they
> > > can
> > > > > > just
> > > > > > > > provide a new idleness definition. At that point, we should
> > > discard
> > > > > the
> > > > > > > > idleness in the intermediate topic while reading.
> > > > > > > >
> > > > > > > > If we would agree on the latter way, I think having the
> > idleness
> > > in
> > > > > the
> > > > > > > > topic is of great use because it's a piece of information
> that
> > > > cannot
> > > > > > be
> > > > > > > > inferred as stated by others. Consequently, we would be able
> to
> > > > > support
> > > > > > > all
> > > > > > > > use cases and can give the user the freedom to express his
> > > intent.
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Jun 4, 2021 at 3:43 PM Arvid Heise <ar...@apache.org
> >
> > > > wrote:
> > > > > > > >
> > > > > > > > > I think the core issue in this discussion is that we kind
> of
> > > > assume
> > > > > > > that
> > > > > > > > > idleness is something universally well-defined. But it's
> not.
> > > > It's
> > > > > a
> > > > > > > > > heuristic to advance data processing in event time where we
> > > would
> > > > > > lack
> > > > > > > > data
> > > > > > > > > to do so otherwise.
> > > > > > > > > Keep in mind that idleness has no real definition in terms
> of
> > > > event
> > > > > > > time
> > > > > > > > > and leads to severe unexpected results: If you reprocess a
> > data
> > > > > > stream
> > > > > > > > with
> > > > > > > > > temporarily idle partitions, these partitions would not be
> > > deemed
> > > > > > idle
> > > > > > > on
> > > > > > > > > reprocessing and there is a realistic chance that records
> > that
> > > > were
> > > > > > > > deemed
> > > > > > > > > late in the live processing case are now perfectly fine
> > records
> > > > in
> > > > > > the
> > > > > > > > > reprocessing case. (I can expand on that if that was too
> > short)
> > > > > > > > >
> > > > > > > > > With that in mind, why would a downstream process even try
> to
> > > > > > calculate
> > > > > > > > > the same idleness state as the upstream process? I don't
> see
> > a
> > > > > point;
> > > > > > > we
> > > > > > > > > would just further any imprecision in the calculation.
> > > > > > > > >
> > > > > > > > > Let's have a concrete example. Assume that we have upstream
> > > > > pipeline
> > > > > > A
> > > > > > > > and
> > > > > > > > > downstream pipeline B. A has plenty of resources and is
> live
> > > > > > processing
> > > > > > > > > data. Some partitions are idle and that is propagated to
> the
> > > > sinks.
> > > > > > > Now B
> > > > > > > > > is heavily backpressured and consumes very slowly. B
> doesn't
> > > see
> > > > > any
> > > > > > > > > idleness directly. B can calculate exact watermarks and use
> > all
> > > > > > records
> > > > > > > > for
> > > > > > > > > it's calculation. Reprocessing would yield the same result
> > for
> > > B.
> > > > > If
> > > > > > we
> > > > > > > > now
> > > > > > > > > forward idleness, we can easily find cases where we would
> > > advance
> > > > > the
> > > > > > > > > watermark prematurely while there is data directly
> available
> > to
> > > > > > > calculate
> > > > > > > > > the exact watermark.
> > > > > > > > >
> > > > > > > > > For me, idleness is just a pipeline-specific heuristic and
> > > should
> > > > > be
> > > > > > > > > viewed as such.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > >
> > > > > > > > > Arvid
> > > > > > > > >
> > > > > > > > > On Fri, Jun 4, 2021 at 2:01 PM Piotr Nowojski <
> > > > > pnowoj...@apache.org>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Hi,
> > > > > > > > >>
> > > > > > > > >> > Imagine you're starting consuming from the result
> channel
> > > in a
> > > > > > > > situation
> > > > > > > > >> were you have:
> > > > > > > > >> > record4, record3, StreamStatus.ACTIVE, StreamStatus.IDLE
> > > > > record2,
> > > > > > > > >> record1, record0
> > > > > > > > >> > Switching to the encoded StreamStatus.IDLE is
> unnecessary,
> > > and
> > > > > > might
> > > > > > > > >> cause the record3 and record4 to be late depending on how
> > the
> > > > > > > watermark
> > > > > > > > >> progressed in other partitions.
> > > > > > > > >>
> > > > > > > > >> Yes, I understand this point. But it can also be the other
> > way
> > > > > > around.
> > > > > > > > >> There might be a large gap between record2 and record3,
> and
> > > > users
> > > > > > > might
> > > > > > > > >> prefer or might be not able to duplicate idleness
> detection
> > > > logic.
> > > > > > The
> > > > > > > > >> downstream system might be lacking some kind of
> information
> > > > (that
> > > > > is
> > > > > > > > only
> > > > > > > > >> available in the top level/ingesting system) to correctly
> > set
> > > > the
> > > > > > idle
> > > > > > > > >> status.
> > > > > > > > >>
> > > > > > > > >> Piotrek
> > > > > > > > >>
> > > > > > > > >> pt., 4 cze 2021 o 12:30 Dawid Wysakowicz <
> > > > dwysakow...@apache.org>
> > > > > > > > >> napisał(a):
> > > > > > > > >>
> > > > > > > > >> >
> > > > > > > > >> > Same as Eron I don't follow this point. Any streaming
> sink
> > > can
> > > > > be
> > > > > > > used
> > > > > > > > >> as
> > > > > > > > >> > this kind of transient channel. Streaming sinks, like
> > Kafka,
> > > > are
> > > > > > > also
> > > > > > > > >> used
> > > > > > > > >> > to connect one streaming system with another one, also
> for
> > > an
> > > > > > > > immediate
> > > > > > > > >> > consumption.
> > > > > > > > >> >
> > > > > > > > >> > Sure it can, but imo it is rarely the primary use case
> why
> > > you
> > > > > > want
> > > > > > > to
> > > > > > > > >> > offload the channels to an external persistent system.
> > Again
> > > > in
> > > > > my
> > > > > > > > >> > understanding StreamStatus is something transient, e.g.
> > part
> > > > of
> > > > > > our
> > > > > > > > >> > external system went offline. I think those kind of
> events
> > > > > should
> > > > > > > not
> > > > > > > > be
> > > > > > > > >> > persisted.
> > > > > > > > >> >
> > > > > > > > >> > Both watermarks and idleness status can be some
> > > > > > > > >> > inherent property of the underlying data stream. if an
> > > > > > > > >> upstream/ingesting
> > > > > > > > >> > system knows that this particular stream/partition of a
> > > stream
> > > > > is
> > > > > > > > going
> > > > > > > > >> > idle (for example for a couple of hours), why does this
> > > > > > information
> > > > > > > > >> have to
> > > > > > > > >> > be re-created in the downstream system using some
> > heuristic?
> > > > It
> > > > > > > could
> > > > > > > > be
> > > > > > > > >> > explicitly encoded.
> > > > > > > > >> >
> > > > > > > > >> > Because it's most certainly not true in the downstream.
> > The
> > > > > > idleness
> > > > > > > > >> works
> > > > > > > > >> > usually according to a heuristic: "We have not seen
> > records
> > > > for
> > > > > 5
> > > > > > > > >> minutes,
> > > > > > > > >> > so there is a fair chance we won't see records for the
> > next
> > > 5
> > > > > > > minutes,
> > > > > > > > >> so
> > > > > > > > >> > let's not wait for watermarks for now." That heuristic
> > most
> > > > > > > certainly
> > > > > > > > >> won't
> > > > > > > > >> > hold for a downstream persistent storage.
> > > > > > > > >> >
> > > > > > > > >> > Imagine you're starting consuming from the result
> channel
> > > in a
> > > > > > > > situation
> > > > > > > > >> > were you have:
> > > > > > > > >> >
> > > > > > > > >> > record4, record3, StreamStatus.ACTIVE, StreamStatus.IDLE
> > > > > record2,
> > > > > > > > >> record1,
> > > > > > > > >> > record0
> > > > > > > > >> >
> > > > > > > > >> > Switching to the encoded StreamStatus.IDLE is
> unnecessary,
> > > and
> > > > > > might
> > > > > > > > >> cause
> > > > > > > > >> > the record3 and record4 to be late depending on how the
> > > > > watermark
> > > > > > > > >> > progressed in other partitions.
> > > > > > > > >> >
> > > > > > > > >> > I understand Eron's use case, which is not about storing
> > the
> > > > > > > > >> StreamStatus,
> > > > > > > > >> > but performing an immediate aggregation or said
> > differently
> > > > > > changing
> > > > > > > > the
> > > > > > > > >> > partitioning/granularity of records and watermarks
> > > externally
> > > > to
> > > > > > > > Flink.
> > > > > > > > >> The
> > > > > > > > >> > produced by Flink partitioning is actually never
> persisted
> > > in
> > > > > that
> > > > > > > > >> case. In
> > > > > > > > >> > this case I agree exposing the StreamStatus makes
> sense. I
> > > am
> > > > > > still
> > > > > > > > >> > concerned it will lead to storing the StreamStatus which
> > can
> > > > > lead
> > > > > > to
> > > > > > > > >> many
> > > > > > > > >> > subtle problems.
> > > > > > > > >> > On 04/06/2021 11:53, Piotr Nowojski wrote:
> > > > > > > > >> >
> > > > > > > > >> > Hi,
> > > > > > > > >> >
> > > > > > > > >> > Thanks for picking up this discussion. For the record, I
> > > also
> > > > > > think
> > > > > > > we
> > > > > > > > >> > shouldn't expose latency markers.
> > > > > > > > >> >
> > > > > > > > >> > About the stream status
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >  Persisting the StreamStatus
> > > > > > > > >> >
> > > > > > > > >> > I don't agree with the view that sinks are "storing" the
> > > > > > > data/idleness
> > > > > > > > >> > status. This nomenclature makes only sense if we are
> > talking
> > > > > about
> > > > > > > > >> > streaming jobs producing batch data.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > In my understanding a StreamStatus makes sense only when
> > > > talking
> > > > > > > about
> > > > > > > > >> > immediately consumed transient channels such as between
> > > > > operators
> > > > > > > > within
> > > > > > > > >> > a single job.
> > > > > > > > >> >
> > > > > > > > >> > Same as Eron I don't follow this point. Any streaming
> sink
> > > can
> > > > > be
> > > > > > > used
> > > > > > > > >> as
> > > > > > > > >> > this kind of transient channel. Streaming sinks, like
> > Kafka,
> > > > are
> > > > > > > also
> > > > > > > > >> used
> > > > > > > > >> > to connect one streaming system with another one, also
> for
> > > an
> > > > > > > > immediate
> > > > > > > > >> > consumption.
> > > > > > > > >> >
> > > > > > > > >> > You could say the same thing about watermarks (note they
> > are
> > > > > > usually
> > > > > > > > >> > generated in Flink based on the incoming events) and I
> > would
> > > > not
> > > > > > > agree
> > > > > > > > >> with
> > > > > > > > >> > it in the same way. Both watermarks and idleness status
> > can
> > > be
> > > > > > some
> > > > > > > > >> > inherent property of the underlying data stream. if an
> > > > > > > > >> upstream/ingesting
> > > > > > > > >> > system knows that this particular stream/partition of a
> > > stream
> > > > > is
> > > > > > > > going
> > > > > > > > >> > idle (for example for a couple of hours), why does this
> > > > > > information
> > > > > > > > >> have to
> > > > > > > > >> > be re-created in the downstream system using some
> > heuristic?
> > > > It
> > > > > > > could
> > > > > > > > be
> > > > > > > > >> > explicitly encoded.  If you want to pass watermarks
> > > explicitly
> > > > > to
> > > > > > a
> > > > > > > > next
> > > > > > > > >> > downstream streaming system, because you do not want to
> > > > recreate
> > > > > > > them
> > > > > > > > >> from
> > > > > > > > >> > the events using a duplicated logic, why wouldn't you
> like
> > > to
> > > > do
> > > > > > the
> > > > > > > > >> same
> > > > > > > > >> > thing with the idleness?
> > > > > > > > >> >
> > > > > > > > >> > Also keep in mind that I would expect that a user can
> > decide
> > > > > > whether
> > > > > > > > he
> > > > > > > > >> > wants to persist the watermarks/stream status on his
> own.
> > > This
> > > > > > > > >> shouldn't be
> > > > > > > > >> > obligatory.
> > > > > > > > >> >
> > > > > > > > >> > For me there is one good reason to not expose stream
> > status
> > > > YET.
> > > > > > > That
> > > > > > > > >> is,
> > > > > > > > >> > if we are sure that we do not need this just yet, while
> at
> > > the
> > > > > > same
> > > > > > > > >> time we
> > > > > > > > >> > don't want to expand the Public/PublicEvolving API, as
> > this
> > > > > always
> > > > > > > > >> > increases the maintenance cost.
> > > > > > > > >> >
> > > > > > > > >> > Best,
> > > > > > > > >> > Piotrek
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > pt., 4 cze 2021 o 10:57 Eron Wright <
> > > ewri...@streamnative.io
> > > > > > > .invalid>
> > > > > > > > <
> > > > > > > > >> ewri...@streamnative.io.invalid>
> > > > > > > > >> > napisał(a):
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > I believe that the correctness of watermarks and stream
> > > status
> > > > > > > markers
> > > > > > > > >> is
> > > > > > > > >> > determined entirely by the source (ignoring the generic
> > > > > assigner).
> > > > > > > > Such
> > > > > > > > >> > stream elements are known not to overtake records, and
> > > aren't
> > > > > > > > transient
> > > > > > > > >> > from a pipeline perspective.  I do agree that recoveries
> > may
> > > > be
> > > > > > > lossy
> > > > > > > > if
> > > > > > > > >> > some operator state is transient (e.g. valve state).
> > > > > > > > >> >
> > > > > > > > >> > Consider that status markers already affect the flow of
> > > > > watermarks
> > > > > > > > (e.g.
> > > > > > > > >> > suppression), and thus affect operator behavior.  Seems
> to
> > > me
> > > > > that
> > > > > > > > >> exposing
> > > > > > > > >> > the idleness state is no different than exposing a
> > > watermark.
> > > > > > > > >> >
> > > > > > > > >> > The high-level story is, there is a need for the Flink
> job
> > > to
> > > > be
> > > > > > > > >> > transparent or neutral with respect to the event time
> > clock.
> > > > I
> > > > > > > > believe
> > > > > > > > >> > this is possible if time flows with high fidelity from
> > > source
> > > > to
> > > > > > > sink.
> > > > > > > > >> Of
> > > > > > > > >> > course, one always has the choice as to whether to use
> > > > > > source-based
> > > > > > > > >> > watermarks; as you mentioned, requirements vary.
> > > > > > > > >> >
> > > > > > > > >> > Regarding the Pulsar specifics, we're working on a
> > community
> > > > > > > proposal
> > > > > > > > >> that
> > > > > > > > >> > I'm anxious to share.  To answer your question, the
> broker
> > > > > > > aggregates
> > > > > > > > >> > watermarks from multiple producers who are writing to a
> > > single
> > > > > > > topic.
> > > > > > > > >> > Each sink
> > > > > > > > >> > subtask is a producer.  The broker considers each
> > producer's
> > > > > > > > assertions
> > > > > > > > >> > (watermarks, idleness) to be independent inputs, much
> like
> > > the
> > > > > > case
> > > > > > > > with
> > > > > > > > >> > the watermark valve.
> > > > > > > > >> >
> > > > > > > > >> > On your concern about idleness causing false late
> events,
> > I
> > > > > > > understand
> > > > > > > > >> your
> > > > > > > > >> > point but don't think it applies if the keyspace
> > assignments
> > > > are
> > > > > > > > stable.
> > > > > > > > >> >
> > > > > > > > >> > I hope this explains to your satisfaction.
> > > > > > > > >> >
> > > > > > > > >> > - Eron
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > On Fri, Jun 4, 2021, 12:07 AM Dawid Wysakowicz <
> > > > > > > > dwysakow...@apache.org>
> > > > > > > > >> <dwysakow...@apache.org>
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Hi Eron,
> > > > > > > > >> >
> > > > > > > > >> > I might be missing some background on Pulsar
> partitioning
> > > but
> > > > > > > > something
> > > > > > > > >> > seems off to me. What is the chunk/batch/partition that
> > > Pulsar
> > > > > > > brokers
> > > > > > > > >> > will additionally combine watermarks for? Isn't it the
> > case
> > > > that
> > > > > > > only
> > > > > > > > a
> > > > > > > > >> > single Flink sub-task would write to such a chunk and
> thus
> > > > will
> > > > > > > > produce
> > > > > > > > >> > an aggregated watermark already via the writeWatermark
> > > method?
> > > > > > > > >> >
> > > > > > > > >> > Personally I am really skeptical about exposing the
> > > > StreamStatus
> > > > > > in
> > > > > > > > any
> > > > > > > > >> > Producer API. In my understanding the StreamStatus is a
> > > > > transient
> > > > > > > > >> > setting of a consumer of data. StreamStatus is a
> mechanism
> > > for
> > > > > > > making
> > > > > > > > a
> > > > > > > > >> > tradeoff between correctness (how many late elements
> that
> > > are
> > > > > > behind
> > > > > > > > >> > watermark we have) vs making progress. IMO one has to be
> > > extra
> > > > > > > > cautious
> > > > > > > > >> > when it comes to persistent systems. Again I might be
> > > missing
> > > > > the
> > > > > > > > exact
> > > > > > > > >> > use case you are trying to solve here, but I can imagine
> > > > > multiple
> > > > > > > jobs
> > > > > > > > >> > reading from such a stream which might have different
> > > > > correctness
> > > > > > > > >> > requirements. Just quickly throwing an idea out of my
> head
> > > you
> > > > > > might
> > > > > > > > >> > want to have an entirely correct results which can be
> > > delayed
> > > > > for
> > > > > > > > >> > minutes, and a separate task that produces quick
> insights
> > > > within
> > > > > > > > >> > seconds. Another thing to consider is that by the time
> the
> > > > > > > downstream
> > > > > > > > >> > job starts consuming the upstream one might have
> produced
> > > > > records
> > > > > > to
> > > > > > > > the
> > > > > > > > >> > previously idle chunk. Persisting the StreamStatus in
> > such a
> > > > > > > scenario
> > > > > > > > >> > would add unnecessary false late events.
> > > > > > > > >> >
> > > > > > > > >> > In my understanding a StreamStatus makes sense only when
> > > > talking
> > > > > > > about
> > > > > > > > >> > immediately consumed transient channels such as between
> > > > > operators
> > > > > > > > within
> > > > > > > > >> > a single job.
> > > > > > > > >> >
> > > > > > > > >> > Best,
> > > > > > > > >> >
> > > > > > > > >> > Dawid
> > > > > > > > >> >
> > > > > > > > >> > On 03/06/2021 23:31, Eron Wright wrote:
> > > > > > > > >> >
> > > > > > > > >> > I think the rationale for end-to-end idleness (i.e.
> > between
> > > > > > > pipelines)
> > > > > > > > >> >
> > > > > > > > >> > is
> > > > > > > > >> >
> > > > > > > > >> > the same as the rationale for idleness between operators
> > > > within
> > > > > a
> > > > > > > > >> > pipeline.   On the 'main issue' you mentioned, we
> entrust
> > > the
> > > > > > source
> > > > > > > > >> >
> > > > > > > > >> > with
> > > > > > > > >> >
> > > > > > > > >> > adapting to Flink's notion of idleness (e.g. in Pulsar
> > > source,
> > > > > it
> > > > > > > > means
> > > > > > > > >> > that no topics/partitions are assigned to a given
> > > sub-task); a
> > > > > > > similar
> > > > > > > > >> > adaption would occur in the sink.  In other words, I
> think
> > > it
> > > > > > > > >> >
> > > > > > > > >> > reasonable
> > > > > > > > >> >
> > > > > > > > >> > that a sink for a watermark-aware storage system has
> need
> > > for
> > > > > the
> > > > > > > > >> >
> > > > > > > > >> > idleness
> > > > > > > > >> >
> > > > > > > > >> > signal.
> > > > > > > > >> >
> > > > > > > > >> > Let me explain how I would use it in Pulsar's sink.
> Each
> > > > > sub-task
> > > > > > > is
> > > > > > > > a
> > > > > > > > >> > Pulsar producer, and is writing watermarks to a
> configured
> > > > topic
> > > > > > via
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > Producer API.  The Pulsar broker aggregates the
> watermarks
> > > > that
> > > > > > are
> > > > > > > > >> >
> > > > > > > > >> > written
> > > > > > > > >> >
> > > > > > > > >> > by each producer into a global minimum (similar to
> > > > > > > > >> >
> > > > > > > > >> > StatusWatermarkValve).
> > > > > > > > >> >
> > > > > > > > >> > The broker keeps track of which producers are actively
> > > > producing
> > > > > > > > >> > watermarks, and a producer may mark itself as idle to
> tell
> > > the
> > > > > > > broker
> > > > > > > > >> >
> > > > > > > > >> > not
> > > > > > > > >> >
> > > > > > > > >> > to wait for watermarks from it, e.g. when a producer is
> > > going
> > > > > > > > >> >
> > > > > > > > >> > offline.  I
> > > > > > > > >> >
> > > > > > > > >> > had intended to mark the producer as idle when the
> > sub-task
> > > is
> > > > > > > > closing,
> > > > > > > > >> >
> > > > > > > > >> > but
> > > > > > > > >> >
> > > > > > > > >> > now I see that it would be insufficient; the producer
> > should
> > > > > also
> > > > > > be
> > > > > > > > >> >
> > > > > > > > >> > idled
> > > > > > > > >> >
> > > > > > > > >> > if the sub-task is idled.  Otherwise, the broker would
> > wait
> > > > > > > > >> >
> > > > > > > > >> > indefinitely
> > > > > > > > >> >
> > > > > > > > >> > for the idled sub-task to produce a watermark.
> > > > > > > > >> >
> > > > > > > > >> > Arvid, I think your original instincts were correct
> about
> > > > > idleness
> > > > > > > > >> > propagation, and I hope I've demonstrated a practical
> use
> > > > case.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > On Thu, Jun 3, 2021 at 12:49 PM Arvid Heise <
> > > ar...@apache.org
> > > > >
> > > > > <
> > > > > > > > >> ar...@apache.org> wrote:
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > When I was rethinking the idleness issue, I came to the
> > > > > conclusion
> > > > > > > > >> >
> > > > > > > > >> > that
> > > > > > > > >> >
> > > > > > > > >> > it
> > > > > > > > >> >
> > > > > > > > >> > should be inferred at the source of the respective
> > > downstream
> > > > > > > pipeline
> > > > > > > > >> > again.
> > > > > > > > >> >
> > > > > > > > >> > The main issue on propagating idleness is that you would
> > > force
> > > > > the
> > > > > > > > >> >
> > > > > > > > >> > same
> > > > > > > > >> >
> > > > > > > > >> > definition across all downstream pipelines, which may
> not
> > be
> > > > > what
> > > > > > > the
> > > > > > > > >> >
> > > > > > > > >> > user
> > > > > > > > >> >
> > > > > > > > >> > intended.
> > > > > > > > >> > On the other hand, I don't immediately see a technical
> > > reason
> > > > > why
> > > > > > > the
> > > > > > > > >> > downstream source wouldn't be able to infer that.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > On Thu, Jun 3, 2021 at 9:14 PM Eron Wright <
> > > > > > ewri...@streamnative.io
> > > > > > > > >> > .invalid> <ewri...@streamnative.io.invalid>
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Thanks Piotr for bringing this up.  I reflected on this
> > and
> > > I
> > > > > > agree
> > > > > > > > >> >
> > > > > > > > >> > we
> > > > > > > > >> >
> > > > > > > > >> > should expose idleness, otherwise a multi-stage flow
> could
> > > > > stall.
> > > > > > > > >> >
> > > > > > > > >> > Regarding the latency markers, I don't see an immediate
> > need
> > > > for
> > > > > > > > >> > propagating them, because they serve to estimate latency
> > > > within
> > > > > a
> > > > > > > > >> >
> > > > > > > > >> > pipeline,
> > > > > > > > >> >
> > > > > > > > >> > not across pipelines.  One would probably need to
> enhance
> > > the
> > > > > > source
> > > > > > > > >> > interface also to do e2e latency.  Seems we agree this
> > > aspect
> > > > is
> > > > > > out
> > > > > > > > >> >
> > > > > > > > >> > of
> > > > > > > > >> >
> > > > > > > > >> > scope.
> > > > > > > > >> >
> > > > > > > > >> > I took a look at the code to get a sense of how to
> > > accomplish
> > > > > > this.
> > > > > > > > >> >
> > > > > > > > >> > The
> > > > > > > > >> >
> > > > > > > > >> > gist is a new `markIdle` method on the `StreamOperator`
> > > > > interface,
> > > > > > > > >> >
> > > > > > > > >> > that
> > > > > > > > >> >
> > > > > > > > >> > is
> > > > > > > > >> >
> > > > > > > > >> > called when the stream status maintainer (the
> > > `OperatorChain`)
> > > > > > > > >> >
> > > > > > > > >> > transitions
> > > > > > > > >> >
> > > > > > > > >> > to idle state.  Then, a new `markIdle` method on the
> > > > > > `SinkFunction`
> > > > > > > > >> >
> > > > > > > > >> > and
> > > > > > > > >> >
> > > > > > > > >> > `SinkWriter` that is called by the respective operators.
> > > >  Note
> > > > > > that
> > > > > > > > >> > StreamStatus is an internal class.
> > > > > > > > >> >
> > > > > > > > >> > Here's a draft PR (based on the existing PR of
> > FLINK-22700)
> > > to
> > > > > > > > >> >
> > > > > > > > >> > highlight
> > > > > > > > >> >
> > > > > > > > >> > this new aspect:
> > > > > > https://github.com/streamnative/flink/pull/2/files
> > > > > > > > >> >
> > > > > > > > >> > Please let me know if you'd like me to proceed to update
> > the
> > > > > FLIP
> > > > > > > > >> >
> > > > > > > > >> > with
> > > > > > > > >> >
> > > > > > > > >> > these details.
> > > > > > > > >> >
> > > > > > > > >> > Thanks again,
> > > > > > > > >> > Eron
> > > > > > > > >> >
> > > > > > > > >> > On Thu, Jun 3, 2021 at 7:56 AM Piotr Nowojski <
> > > > > > pnowoj...@apache.org
> > > > > > > >
> > > > > > > > <
> > > > > > > > >> pnowoj...@apache.org>
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Hi,
> > > > > > > > >> >
> > > > > > > > >> > Sorry for chipping in late in the discussion, but I
> would
> > > > second
> > > > > > > > >> >
> > > > > > > > >> > this
> > > > > > > > >> >
> > > > > > > > >> > point
> > > > > > > > >> >
> > > > > > > > >> > from Arvid:
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > 4. Potentially, StreamStatus and LatencyMarker would
> also
> > > need
> > > > > to
> > > > > > > > >> >
> > > > > > > > >> > be
> > > > > > > > >> >
> > > > > > > > >> > encoded.
> > > > > > > > >> >
> > > > > > > > >> > It seems like this point was asked, but not followed? Or
> > > did I
> > > > > > miss
> > > > > > > > >> >
> > > > > > > > >> > it?
> > > > > > > > >> >
> > > > > > > > >> > Especially the StreamStatus part. For me it sounds like
> > > > exposing
> > > > > > > > >> >
> > > > > > > > >> > watermarks
> > > > > > > > >> >
> > > > > > > > >> > without letting the sink know that the stream can be
> idle
> > is
> > > > an
> > > > > > > > >> >
> > > > > > > > >> > incomplete
> > > > > > > > >> >
> > > > > > > > >> > feature and can be very problematic/confusing for
> > potential
> > > > > users.
> > > > > > > > >> >
> > > > > > > > >> > Best,
> > > > > > > > >> > Piotrek
> > > > > > > > >> >
> > > > > > > > >> > pon., 31 maj 2021 o 08:34 Arvid Heise <ar...@apache.org
> >
> > <
> > > > > > > > >> ar...@apache.org>
> > > > > > > > >> >
> > > > > > > > >> > napisał(a):
> > > > > > > > >> >
> > > > > > > > >> > Afaik everyone can start a [VOTE] thread [1]. For
> example,
> > > > here
> > > > > a
> > > > > > > > >> > non-committer started a successful thread [2].
> > > > > > > > >> > If you start it, I can already cast a binding vote and
> we
> > > just
> > > > > > > > >> >
> > > > > > > > >> > need 2
> > > > > > > > >> >
> > > > > > > > >> > more
> > > > > > > > >> >
> > > > > > > > >> > for the FLIP to be accepted.
> > > > > > > > >> >
> > > > > > > > >> > [1]
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120731026#FlinkBylaws-Voting
> > > > > > > > >> >
> > > > > > > > >> > [2]
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Deprecating-Mesos-support-td50142.html
> > > > > > > > >> >
> > > > > > > > >> > On Fri, May 28, 2021 at 8:17 PM Eron Wright <
> > > > > > > > >> >
> > > > > > > > >> > ewri...@streamnative.io
> > > > > > > > >> >
> > > > > > > > >> > .invalid>
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Arvid,
> > > > > > > > >> > Thanks for the feedback.  I investigated the japicmp
> > > > > > > > >> >
> > > > > > > > >> > configuration,
> > > > > > > > >> >
> > > > > > > > >> > and I
> > > > > > > > >> >
> > > > > > > > >> > see that SinkWriter is marked Experimental (not Public
> or
> > > > > > > > >> >
> > > > > > > > >> > PublicEvolving).
> > > > > > > > >> >
> > > > > > > > >> > I think this means that SinkWriter need not be excluded.
> > As
> > > > you
> > > > > > > > >> >
> > > > > > > > >> > mentioned,
> > > > > > > > >> >
> > > > > > > > >> > SinkFunction is already excluded.  I've updated the FLIP
> > > with
> > > > an
> > > > > > > > >> > explanation.
> > > > > > > > >> >
> > > > > > > > >> > I believe all issues are resolved.  May we proceed to a
> > vote
> > > > > now?
> > > > > > > > >> >
> > > > > > > > >> > And
> > > > > > > > >> >
> > > > > > > > >> > are
> > > > > > > > >> >
> > > > > > > > >> > you able to drive the vote process?
> > > > > > > > >> >
> > > > > > > > >> > Thanks,
> > > > > > > > >> > Eron
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > On Fri, May 28, 2021 at 4:40 AM Arvid Heise <
> > > ar...@apache.org
> > > > >
> > > > > <
> > > > > > > > >> ar...@apache.org>
> > > > > > > > >> >
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> > Hi Eron,
> > > > > > > > >> >
> > > > > > > > >> > 1. fair point. It still feels odd to have writeWatermark
> > in
> > > > the
> > > > > > > > >> > SinkFunction (it's supposed to be functional as you
> > > > mentioned),
> > > > > > > > >> >
> > > > > > > > >> > but I
> > > > > > > > >> >
> > > > > > > > >> > agree
> > > > > > > > >> >
> > > > > > > > >> > that invokeWatermark is not better. So unless someone
> has
> > a
> > > > > > > > >> >
> > > > > > > > >> > better
> > > > > > > > >> >
> > > > > > > > >> > idea,
> > > > > > > > >> >
> > > > > > > > >> > I'm fine with it.
> > > > > > > > >> > 2.+3. I tried to come up with scenarios for a longer
> time.
> > > In
> > > > > > > > >> >
> > > > > > > > >> > general,
> > > > > > > > >> >
> > > > > > > > >> > it
> > > > > > > > >> >
> > > > > > > > >> > seems as if the new SinkWriter interface encourages more
> > > > > > > > >> >
> > > > > > > > >> > injection
> > > > > > > > >> >
> > > > > > > > >> > (see
> > > > > > > > >> >
> > > > > > > > >> > processing time service in InitContext), such that the
> > need
> > > > for
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > context
> > > > > > > > >> >
> > > > > > > > >> > is really just context information of that particular
> > record
> > > > and
> > > > > > > > >> >
> > > > > > > > >> > I
> > > > > > > > >> >
> > > > > > > > >> > don't
> > > > > > > > >> >
> > > > > > > > >> > see any use beyond timestamp and watermark. For
> > > SinkFunction,
> > > > > I'd
> > > > > > > > >> >
> > > > > > > > >> > not
> > > > > > > > >> >
> > > > > > > > >> > over-engineer as it's going to be deprecated soonish. So
> > +1
> > > to
> > > > > > > > >> >
> > > > > > > > >> > leave
> > > > > > > > >> >
> > > > > > > > >> > it
> > > > > > > > >> >
> > > > > > > > >> > out.
> > > > > > > > >> > 4. Okay so I double-checked: from an execution
> > perspective,
> > > it
> > > > > > > > >> >
> > > > > > > > >> > works.
> > > > > > > > >> >
> > > > > > > > >> > However, japicmp would definitely complain. I propose to
> > add
> > > > it
> > > > > > > > >> >
> > > > > > > > >> > to
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > compatibility section like this. We need to add an
> > exception
> > > > to
> > > > > > > > >> >
> > > > > > > > >> > SinkWriter
> > > > > > > > >> >
> > > > > > > > >> > then. (SinkFunction is already on the exception list)
> > > > > > > > >> > 5.+6. Awesome, I was also sure but wanted to double
> check.
> > > > > > > > >> >
> > > > > > > > >> > Best,
> > > > > > > > >> >
> > > > > > > > >> > Arvid
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > On Wed, May 26, 2021 at 7:29 PM Eron Wright <
> > > > > > > > >> >
> > > > > > > > >> > ewri...@streamnative.io
> > > > > > > > >> >
> > > > > > > > >> > .invalid>
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Arvid,
> > > > > > > > >> >
> > > > > > > > >> > 1. I assume that the method name `invoke` stems from
> > > > > > > > >> >
> > > > > > > > >> > considering
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > SinkFunction to be a functional interface, but is
> > otherwise
> > > > > > > > >> >
> > > > > > > > >> > meaningless.
> > > > > > > > >> >
> > > > > > > > >> > Keeping it as `writeWatermark` does keep it symmetric
> with
> > > > > > > > >> >
> > > > > > > > >> > SinkWriter.
> > > > > > > > >> >
> > > > > > > > >> > My
> > > > > > > > >> >
> > > > > > > > >> > vote is to leave it.  You decide.
> > > > > > > > >> >
> > > > > > > > >> > 2+3. I too considered adding a `WatermarkContext`, but
> it
> > > > would
> > > > > > > > >> >
> > > > > > > > >> > merely
> > > > > > > > >> >
> > > > > > > > >> > be a
> > > > > > > > >> >
> > > > > > > > >> > placeholder.  I don't anticipate any context info in
> > future.
> > > > > > > > >> >
> > > > > > > > >> > As
> > > > > > > > >> >
> > > > > > > > >> > we
> > > > > > > > >> >
> > > > > > > > >> > see
> > > > > > > > >> >
> > > > > > > > >> > with invoke, it is possible to add a context later in a
> > > > > > > > >> > backwards-compatible way.  My vote is to not introduce a
> > > > > > > > >> >
> > > > > > > > >> > context.
> > > > > > > > >> >
> > > > > > > > >> > You
> > > > > > > > >> >
> > > > > > > > >> > decide.
> > > > > > > > >> >
> > > > > > > > >> > 4. No anticipated compatibility issues.
> > > > > > > > >> >
> > > > > > > > >> > 5. Short answer, it works as expected.  The new methods
> > are
> > > > > > > > >> >
> > > > > > > > >> > invoked
> > > > > > > > >> >
> > > > > > > > >> > whenever the underlying operator receives a watermark.
> I
> > do
> > > > > > > > >> >
> > > > > > > > >> > believe
> > > > > > > > >> >
> > > > > > > > >> > that
> > > > > > > > >> >
> > > > > > > > >> > batch and ingestion time applications receive
> watermarks.
> > > > Seems
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > programming model is more unified in that respect since
> > 1.12
> > > > > > > > >> >
> > > > > > > > >> > (FLIP-134).
> > > > > > > > >> >
> > > > > > > > >> > 6. The failure behavior is the same as for elements.
> > > > > > > > >> >
> > > > > > > > >> > Thanks,
> > > > > > > > >> > Eron
> > > > > > > > >> >
> > > > > > > > >> > On Tue, May 25, 2021 at 12:42 PM Arvid Heise <
> > > > ar...@apache.org
> > > > > > > > >> >
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> > Hi Eron,
> > > > > > > > >> >
> > > > > > > > >> > I think the FLIP is crisp and mostly good to go. Some
> > > smaller
> > > > > > > > >> > things/questions:
> > > > > > > > >> >
> > > > > > > > >> >    1. SinkFunction#writeWatermark could be named
> > > > > > > > >> >    SinkFunction#invokeWatermark or invokeOnWatermark to
> > keep
> > > > > > > > >> >
> > > > > > > > >> > it
> > > > > > > > >> >
> > > > > > > > >> > symmetric.
> > > > > > > > >> >
> > > > > > > > >> >    2. We could add the context parameter to both. For
> > > > > > > > >> >
> > > > > > > > >> > SinkWriter#Context,
> > > > > > > > >> >
> > > > > > > > >> >    we currently do not gain much. SinkFunction#Context
> > also
> > > > > > > > >> >
> > > > > > > > >> > exposes
> > > > > > > > >> >
> > > > > > > > >> > processing
> > > > > > > > >> >    time, which may or may not be handy and is currently
> > > > > > > > >> >
> > > > > > > > >> > mostly
> > > > > > > > >> >
> > > > > > > > >> > used
> > > > > > > > >> >
> > > > > > > > >> > for
> > > > > > > > >> >
> > > > > > > > >> >    StreamingFileSink bucket policies. We may add that
> > > > > > > > >> >
> > > > > > > > >> > processing
> > > > > > > > >> >
> > > > > > > > >> > time
> > > > > > > > >> >
> > > > > > > > >> > flag
> > > > > > > > >> >
> > > > > > > > >> >    also to SinkWriter#Context in the future.
> > > > > > > > >> >    3. Alternatively, we could also add a different
> context
> > > > > > > > >> >
> > > > > > > > >> > parameter
> > > > > > > > >> >
> > > > > > > > >> > just
> > > > > > > > >> >
> > > > > > > > >> >    to keep the API stable while allowing additional
> > > > > > > > >> >
> > > > > > > > >> > information
> > > > > > > > >> >
> > > > > > > > >> > to
> > > > > > > > >> >
> > > > > > > > >> > be
> > > > > > > > >> >
> > > > > > > > >> > passed
> > > > > > > > >> >    in the future.
> > > > > > > > >> >    4. Would we run into any compatibility issue if we
> use
> > > > > > > > >> >
> > > > > > > > >> > Flink
> > > > > > > > >> >
> > > > > > > > >> > 1.13
> > > > > > > > >> >
> > > > > > > > >> > source
> > > > > > > > >> >
> > > > > > > > >> >    in Flink 1.14 (with this FLIP) or vice versa?
> > > > > > > > >> >    5. What happens with sinks that use the new methods
> in
> > > > > > > > >> >
> > > > > > > > >> > applications
> > > > > > > > >> >
> > > > > > > > >> > that
> > > > > > > > >> >
> > > > > > > > >> >    do not have watermarks (batch mode, processing time)?
> > > Does
> > > > > > > > >> >
> > > > > > > > >> > this
> > > > > > > > >> >
> > > > > > > > >> > also
> > > > > > > > >> >
> > > > > > > > >> > work
> > > > > > > > >> >    with ingestion time sufficiently?
> > > > > > > > >> >    6. How do exactly once sinks deal with written
> > watermarks
> > > > > > > > >> >
> > > > > > > > >> > in
> > > > > > > > >> >
> > > > > > > > >> > case
> > > > > > > > >> >
> > > > > > > > >> > of
> > > > > > > > >> >
> > > > > > > > >> >    failure? I guess it's the same as normal records.
> > (Either
> > > > > > > > >> >
> > > > > > > > >> > rollback
> > > > > > > > >> >
> > > > > > > > >> > of
> > > > > > > > >> >
> > > > > > > > >> >    transaction or deduplication on resumption)
> > > > > > > > >> >
> > > > > > > > >> > Best,
> > > > > > > > >> >
> > > > > > > > >> > Arvid
> > > > > > > > >> >
> > > > > > > > >> > On Tue, May 25, 2021 at 6:44 PM Eron Wright <
> > > > > > > > >> >
> > > > > > > > >> > ewri...@streamnative.io
> > > > > > > > >> >
> > > > > > > > >> > .invalid>
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Does anyone have further comment on FLIP-167?
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
> > > > > > > > >> >
> > > > > > > > >> > Thanks,
> > > > > > > > >> > Eron
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > On Thu, May 20, 2021 at 5:02 PM Eron Wright <
> > > > > > > > >> >
> > > > > > > > >> > ewri...@streamnative.io
> > > > > > > > >> >
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Filed FLIP-167: Watermarks for Sink API:
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
> > > > > > > > >> >
> > > > > > > > >> > I'd like to call a vote next week, is that reasonable?
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > On Wed, May 19, 2021 at 6:28 PM Zhou, Brian <
> > > > > > > > >> >
> > > > > > > > >> > b.z...@dell.com
> > > > > > > > >> >
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> > Hi Arvid and Eron,
> > > > > > > > >> >
> > > > > > > > >> > Thanks for the discussion and I read through Eron's pull
> > > > > > > > >> >
> > > > > > > > >> > request
> > > > > > > > >> >
> > > > > > > > >> > and I
> > > > > > > > >> >
> > > > > > > > >> > think this can benefit Pravega Flink connector as well.
> > > > > > > > >> >
> > > > > > > > >> > Here is some background. Pravega had the watermark
> > > > > > > > >> >
> > > > > > > > >> > concept
> > > > > > > > >> >
> > > > > > > > >> > through
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > event stream since two years ago, and here is a blog
> > > > > > > > >> >
> > > > > > > > >> > introduction[1]
> > > > > > > > >> >
> > > > > > > > >> > for
> > > > > > > > >> >
> > > > > > > > >> > Pravega watermark.
> > > > > > > > >> > Pravega Flink connector also had this watermark
> > > > > > > > >> >
> > > > > > > > >> > integration
> > > > > > > > >> >
> > > > > > > > >> > last
> > > > > > > > >> >
> > > > > > > > >> > year
> > > > > > > > >> >
> > > > > > > > >> > that we wanted to propagate the Flink watermark to
> > > > > > > > >> >
> > > > > > > > >> > Pravega
> > > > > > > > >> >
> > > > > > > > >> > in
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > SinkFunction, and at that time we just used the existing
> > > > > > > > >> >
> > > > > > > > >> > Flink
> > > > > > > > >> >
> > > > > > > > >> > API
> > > > > > > > >> >
> > > > > > > > >> > that
> > > > > > > > >> >
> > > > > > > > >> > we
> > > > > > > > >> >
> > > > > > > > >> > keep the last watermark in memory and check if watermark
> > > > > > > > >> >
> > > > > > > > >> > changes
> > > > > > > > >> >
> > > > > > > > >> > for
> > > > > > > > >> >
> > > > > > > > >> > each
> > > > > > > > >> >
> > > > > > > > >> > event[2] which is not efficient. With such new
> > > > > > > > >> >
> > > > > > > > >> > interface,
> > > > > > > > >> >
> > > > > > > > >> > we
> > > > > > > > >> >
> > > > > > > > >> > can
> > > > > > > > >> >
> > > > > > > > >> > also
> > > > > > > > >> >
> > > > > > > > >> > manage the watermark propagation much more easily.
> > > > > > > > >> >
> > > > > > > > >> > [1]
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > https://pravega.io/blog/2019/11/08/pravega-watermarking-support/
> > > > > > > > >> >
> > > > > > > > >> > [2]
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L465
> > > > > > > > >> >
> > > > > > > > >> > -----Original Message-----
> > > > > > > > >> > From: Arvid Heise <ar...@apache.org> <ar...@apache.org>
> > > > > > > > >> > Sent: Wednesday, May 19, 2021 16:06
> > > > > > > > >> > To: dev
> > > > > > > > >> > Subject: Re: [DISCUSS] Watermark propagation with Sink
> > > > > > > > >> >
> > > > > > > > >> > API
> > > > > > > > >> >
> > > > > > > > >> > [EXTERNAL EMAIL]
> > > > > > > > >> >
> > > > > > > > >> > Hi Eron,
> > > > > > > > >> >
> > > > > > > > >> > Thanks for pushing that topic. I can now see that the
> > > > > > > > >> >
> > > > > > > > >> > benefit
> > > > > > > > >> >
> > > > > > > > >> > is
> > > > > > > > >> >
> > > > > > > > >> > even
> > > > > > > > >> >
> > > > > > > > >> > bigger than I initially thought. So it's worthwhile
> > > > > > > > >> >
> > > > > > > > >> > anyways
> > > > > > > > >> >
> > > > > > > > >> > to
> > > > > > > > >> >
> > > > > > > > >> > include
> > > > > > > > >> >
> > > > > > > > >> > that.
> > > > > > > > >> >
> > > > > > > > >> > I also briefly thought about exposing watermarks to all
> > > > > > > > >> >
> > > > > > > > >> > UDFs,
> > > > > > > > >> >
> > > > > > > > >> > but
> > > > > > > > >> >
> > > > > > > > >> > here I
> > > > > > > > >> >
> > > > > > > > >> > really have an issue to see specific use cases. Could
> > > > > > > > >> >
> > > > > > > > >> > you
> > > > > > > > >> >
> > > > > > > > >> > maybe
> > > > > > > > >> >
> > > > > > > > >> > take a
> > > > > > > > >> >
> > > > > > > > >> > few
> > > > > > > > >> >
> > > > > > > > >> > minutes to think about it as well? I could only see
> > > > > > > > >> >
> > > > > > > > >> > someone
> > > > > > > > >> >
> > > > > > > > >> > misusing
> > > > > > > > >> >
> > > > > > > > >> > Async
> > > > > > > > >> >
> > > > > > > > >> > IO as a sink where a real sink would be more
> > > > > > > > >> >
> > > > > > > > >> > appropriate.
> > > > > > > > >> >
> > > > > > > > >> > In
> > > > > > > > >> >
> > > > > > > > >> > general,
> > > > > > > > >> >
> > > > > > > > >> > if
> > > > > > > > >> >
> > > > > > > > >> > there is not a clear use case, we shouldn't add the
> > > > > > > > >> >
> > > > > > > > >> > functionality
> > > > > > > > >> >
> > > > > > > > >> > as
> > > > > > > > >> >
> > > > > > > > >> > it's
> > > > > > > > >> >
> > > > > > > > >> > just increased maintenance for no value.
> > > > > > > > >> >
> > > > > > > > >> > If we stick to the plan, I think your PR is already in a
> > > > > > > > >> >
> > > > > > > > >> > good
> > > > > > > > >> >
> > > > > > > > >> > shape.
> > > > > > > > >> >
> > > > > > > > >> > We
> > > > > > > > >> >
> > > > > > > > >> > need to create a FLIP for it though, since it changes
> > > > > > > > >> >
> > > > > > > > >> > Public
> > > > > > > > >> >
> > > > > > > > >> > interfaces
> > > > > > > > >> >
> > > > > > > > >> > [1]. I was initially not convinced that we should also
> > > > > > > > >> >
> > > > > > > > >> > change
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > old
> > > > > > > > >> >
> > > > > > > > >> > SinkFunction interface, but seeing how little the change
> > > > > > > > >> >
> > > > > > > > >> > is, I
> > > > > > > > >> >
> > > > > > > > >> > wouldn't
> > > > > > > > >> >
> > > > > > > > >> > mind at all to increase consistency. Only when we wrote
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > FLIP
> > > > > > > > >> >
> > > > > > > > >> > and
> > > > > > > > >> >
> > > > > > > > >> > approved it (which should be minimal and fast), we
> > > > > > > > >> >
> > > > > > > > >> > should
> > > > > > > > >> >
> > > > > > > > >> > actually
> > > > > > > > >> >
> > > > > > > > >> > look
> > > > > > > > >> >
> > > > > > > > >> > at
> > > > > > > > >> >
> > > > > > > > >> > the PR ;).
> > > > > > > > >> >
> > > > > > > > >> > The only thing which I would improve is the name of the
> > > > > > > > >> >
> > > > > > > > >> > function.
> > > > > > > > >> >
> > > > > > > > >> > processWatermark sounds as if the sink implementer
> > > > > > > > >> >
> > > > > > > > >> > really
> > > > > > > > >> >
> > > > > > > > >> > needs
> > > > > > > > >> >
> > > > > > > > >> > to
> > > > > > > > >> >
> > > > > > > > >> > implement it (as you would need to do it on a custom
> > > > > > > > >> >
> > > > > > > > >> > operator).
> > > > > > > > >> >
> > > > > > > > >> > I
> > > > > > > > >> >
> > > > > > > > >> > would
> > > > > > > > >> >
> > > > > > > > >> > make them symmetric to the record writing/invoking
> > > > > > > > >> >
> > > > > > > > >> > method
> > > > > > > > >> >
> > > > > > > > >> > (e.g.
> > > > > > > > >> >
> > > > > > > > >> > writeWatermark and invokeWatermark).
> > > > > > > > >> >
> > > > > > > > >> > As a follow-up PR, we should then migrate KafkaShuffle
> > > > > > > > >> >
> > > > > > > > >> > to
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > new
> > > > > > > > >> >
> > > > > > > > >> > API.
> > > > > > > > >> >
> > > > > > > > >> > But that's something I can do.
> > > > > > > > >> >
> > > > > > > > >> > [1]
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/Flink*Improvement*Proposals__;Kys!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnp6nc7o$
> > > > > > > > >> >
> > > > > > > > >> > [cwiki[.]apache[.]org]
> > > > > > > > >> >
> > > > > > > > >> > On Wed, May 19, 2021 at 3:34 AM Eron Wright <
> > > > > > > > >> >
> > > > > > > > >> > ewri...@streamnative.io
> > > > > > > > >> >
> > > > > > > > >> > .invalid>
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Update: opened an issue and a PR.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > >
> > > > >
> > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > >
> > > >
> K-22700__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dM
> > > > > > > > >> >
> > > > > > > > >> > plbgRO4$ [issues[.]apache[.]org]
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > >
> > > > >
> > https://urldefense.com/v3/__https://github.com/apache/flink/pull/15950
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > >
> > > >
> __;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtScmG7a
> > > > > > > > >> >
> > > > > > > > >> > $ [github[.]com]
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > On Tue, May 18, 2021 at 10:03 AM Eron Wright <
> > > > > > > > >> >
> > > > > > > > >> > ewri...@streamnative.io
> > > > > > > > >> >
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Thanks Arvid and David for sharing your ideas on
> > > > > > > > >> >
> > > > > > > > >> > this
> > > > > > > > >> >
> > > > > > > > >> > subject.
> > > > > > > > >> >
> > > > > > > > >> > I'm
> > > > > > > > >> >
> > > > > > > > >> > glad to hear that you're seeing use cases for
> > > > > > > > >> >
> > > > > > > > >> > watermark
> > > > > > > > >> >
> > > > > > > > >> > propagation
> > > > > > > > >> >
> > > > > > > > >> > via an enhanced sink interface.
> > > > > > > > >> >
> > > > > > > > >> > As you've guessed, my interest is in Pulsar and am
> > > > > > > > >> >
> > > > > > > > >> > exploring
> > > > > > > > >> >
> > > > > > > > >> > some
> > > > > > > > >> >
> > > > > > > > >> > options for brokering watermarks across stream
> > > > > > > > >> >
> > > > > > > > >> > processing
> > > > > > > > >> >
> > > > > > > > >> > pipelines.
> > > > > > > > >> >
> > > > > > > > >> > I think
> > > > > > > > >> >
> > > > > > > > >> > Arvid
> > > > > > > > >> >
> > > > > > > > >> > is speaking to a high-fidelity solution where the
> > > > > > > > >> >
> > > > > > > > >> > difference
> > > > > > > > >> >
> > > > > > > > >> > between
> > > > > > > > >> >
> > > > > > > > >> > intra-
> > > > > > > > >> >
> > > > > > > > >> > and inter-pipeline flow is eliminated.  My goal is
> > > > > > > > >> >
> > > > > > > > >> > more
> > > > > > > > >> >
> > > > > > > > >> > limited; I
> > > > > > > > >> >
> > > > > > > > >> > want
> > > > > > > > >> >
> > > > > > > > >> > to
> > > > > > > > >> >
> > > > > > > > >> > write the watermark that arrives at the sink to
> > > > > > > > >> >
> > > > > > > > >> > Pulsar.
> > > > > > > > >> >
> > > > > > > > >> > Simply
> > > > > > > > >> >
> > > > > > > > >> > imagine that Pulsar has native support for
> > > > > > > > >> >
> > > > > > > > >> > watermarking
> > > > > > > > >> >
> > > > > > > > >> > in
> > > > > > > > >> >
> > > > > > > > >> > its
> > > > > > > > >> >
> > > > > > > > >> > producer/consumer API, and we'll leave the details
> > > > > > > > >> >
> > > > > > > > >> > to
> > > > > > > > >> >
> > > > > > > > >> > another
> > > > > > > > >> >
> > > > > > > > >> > forum.
> > > > > > > > >> >
> > > > > > > > >> > David, I like your invariant.  I see lateness as
> > > > > > > > >> >
> > > > > > > > >> > stemming
> > > > > > > > >> >
> > > > > > > > >> > from
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > problem
> > > > > > > > >> >
> > > > > > > > >> > domain and from system dynamics (e.g. scheduling,
> > > > > > > > >> >
> > > > > > > > >> > batching,
> > > > > > > > >> >
> > > > > > > > >> > lag).
> > > > > > > > >> >
> > > > > > > > >> > When
> > > > > > > > >> >
> > > > > > > > >> > one
> > > > > > > > >> >
> > > > > > > > >> > depends on order-of-observation to generate
> > > > > > > > >> >
> > > > > > > > >> > watermarks,
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > app
> > > > > > > > >> >
> > > > > > > > >> > may
> > > > > > > > >> >
> > > > > > > > >> > become
> > > > > > > > >> >
> > > > > > > > >> > unduly sensitive to dynamics which bear on
> > > > > > > > >> >
> > > > > > > > >> > order-of-observation.
> > > > > > > > >> >
> > > > > > > > >> > My
> > > > > > > > >> >
> > > > > > > > >> > goal is to factor out the system dynamics from
> > > > > > > > >> >
> > > > > > > > >> > lateness
> > > > > > > > >> >
> > > > > > > > >> > determination.
> > > > > > > > >> >
> > > > > > > > >> > Arvid, to be most valuable (at least for my
> > > > > > > > >> >
> > > > > > > > >> > purposes)
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > enhancement is needed on SinkFunction.  This will
> > > > > > > > >> >
> > > > > > > > >> > allow
> > > > > > > > >> >
> > > > > > > > >> > us
> > > > > > > > >> >
> > > > > > > > >> > to
> > > > > > > > >> >
> > > > > > > > >> > easily
> > > > > > > > >> >
> > > > > > > > >> > evolve the existing Pulsar connector.
> > > > > > > > >> >
> > > > > > > > >> > Next step, I will open a PR to advance the
> > > > > > > > >> >
> > > > > > > > >> > conversation.
> > > > > > > > >> >
> > > > > > > > >> > Eron
> > > > > > > > >> >
> > > > > > > > >> > On Tue, May 18, 2021 at 5:06 AM David Morávek<
> > > > > > > david.mora...@gmail.com
> > > > > > > > >
> > > > > > > > >> <david.mora...@gmail.com>
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Hi Eron,
> > > > > > > > >> >
> > > > > > > > >> > Thanks for starting this discussion. I've been
> > > > > > > > >> >
> > > > > > > > >> > thinking
> > > > > > > > >> >
> > > > > > > > >> > about
> > > > > > > > >> >
> > > > > > > > >> > this
> > > > > > > > >> >
> > > > > > > > >> > recently as we've run into "watermark related"
> > > > > > > > >> >
> > > > > > > > >> > issues,
> > > > > > > > >> >
> > > > > > > > >> > when
> > > > > > > > >> >
> > > > > > > > >> > chaining multiple pipelines together. My to cents
> > > > > > > > >> >
> > > > > > > > >> > to
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > discussion:
> > > > > > > > >> >
> > > > > > > > >> > How I like to think about the problem, is that
> > > > > > > > >> >
> > > > > > > > >> > there
> > > > > > > > >> >
> > > > > > > > >> > should
> > > > > > > > >> >
> > > > > > > > >> > an
> > > > > > > > >> >
> > > > > > > > >> > invariant that holds for any stream processing
> > > > > > > > >> >
> > > > > > > > >> > pipeline:
> > > > > > > > >> >
> > > > > > > > >> > "NON_LATE
> > > > > > > > >> >
> > > > > > > > >> > element
> > > > > > > > >> >
> > > > > > > > >> > entering
> > > > > > > > >> >
> > > > > > > > >> > the system, should never become LATE"
> > > > > > > > >> >
> > > > > > > > >> > Unfortunately this is exactly what happens in
> > > > > > > > >> >
> > > > > > > > >> > downstream
> > > > > > > > >> >
> > > > > > > > >> > pipelines,
> > > > > > > > >> >
> > > > > > > > >> > because the upstream one can:
> > > > > > > > >> > - break ordering (especially with higher
> > > > > > > > >> >
> > > > > > > > >> > parallelism)
> > > > > > > > >> >
> > > > > > > > >> > - emit elements that are ahead of output watermark
> > > > > > > > >> >
> > > > > > > > >> > There is not enough information to re-construct
> > > > > > > > >> >
> > > > > > > > >> > upstream
> > > > > > > > >> >
> > > > > > > > >> > watermark
> > > > > > > > >> >
> > > > > > > > >> > in latter stages (it's always just an estimate
> > > > > > > > >> >
> > > > > > > > >> > based
> > > > > > > > >> >
> > > > > > > > >> > on
> > > > > > > > >> >
> > > > > > > > >> > previous
> > > > > > > > >> >
> > > > > > > > >> > pipeline's output).
> > > > > > > > >> >
> > > > > > > > >> > It would be great, if we could have a general
> > > > > > > > >> >
> > > > > > > > >> > abstraction,
> > > > > > > > >> >
> > > > > > > > >> > that
> > > > > > > > >> >
> > > > > > > > >> > is
> > > > > > > > >> >
> > > > > > > > >> > reusable for various sources / sinks (not just
> > > > > > > > >> >
> > > > > > > > >> > Kafka
> > > > > > > > >> >
> > > > > > > > >> > /
> > > > > > > > >> >
> > > > > > > > >> > Pulsar,
> > > > > > > > >> >
> > > > > > > > >> > thought this would probably cover most of the
> > > > > > > > >> >
> > > > > > > > >> > use-cases)
> > > > > > > > >> >
> > > > > > > > >> > and
> > > > > > > > >> >
> > > > > > > > >> > systems.
> > > > > > > > >> >
> > > > > > > > >> > Is there any other use-case then sharing watermark
> > > > > > > > >> >
> > > > > > > > >> > between
> > > > > > > > >> >
> > > > > > > > >> > pipelines,
> > > > > > > > >> >
> > > > > > > > >> > that
> > > > > > > > >> >
> > > > > > > > >> > you're trying to solve?
> > > > > > > > >> >
> > > > > > > > >> > Arvid:
> > > > > > > > >> >
> > > > > > > > >> > 1. Watermarks are closely coupled to the used
> > > > > > > > >> >
> > > > > > > > >> > system
> > > > > > > > >> >
> > > > > > > > >> > (=Flink).
> > > > > > > > >> >
> > > > > > > > >> > I
> > > > > > > > >> >
> > > > > > > > >> > have a
> > > > > > > > >> >
> > > > > > > > >> > hard time imagining that it's useful to use a
> > > > > > > > >> >
> > > > > > > > >> > different
> > > > > > > > >> >
> > > > > > > > >> > stream
> > > > > > > > >> >
> > > > > > > > >> > processor
> > > > > > > > >> >
> > > > > > > > >> > downstream. So for now, I'm assuming that both
> > > > > > > > >> >
> > > > > > > > >> > upstream
> > > > > > > > >> >
> > > > > > > > >> > and
> > > > > > > > >> >
> > > > > > > > >> > downstream
> > > > > > > > >> >
> > > > > > > > >> > are
> > > > > > > > >> >
> > > > > > > > >> > Flink applications. In that case, we probably
> > > > > > > > >> >
> > > > > > > > >> > define
> > > > > > > > >> >
> > > > > > > > >> > both
> > > > > > > > >> >
> > > > > > > > >> > parts
> > > > > > > > >> >
> > > > > > > > >> > of the pipeline in the same Flink job similar to
> > > > > > > > >> >
> > > > > > > > >> > KafkaStream's
> > > > > > > > >> >
> > > > > > > > >> > #through.
> > > > > > > > >> >
> > > > > > > > >> > I'd slightly disagree here. For example we're
> > > > > > > > >> >
> > > > > > > > >> > "materializing"
> > > > > > > > >> >
> > > > > > > > >> > change-logs
> > > > > > > > >> >
> > > > > > > > >> > produced by Flink pipeline into serving layer
> > > > > > > > >> >
> > > > > > > > >> > (random
> > > > > > > > >> >
> > > > > > > > >> > access
> > > > > > > > >> >
> > > > > > > > >> > db /
> > > > > > > > >> >
> > > > > > > > >> > in memory view / ..) and we need to know, whether
> > > > > > > > >> >
> > > > > > > > >> > responses
> > > > > > > > >> >
> > > > > > > > >> > we
> > > > > > > > >> >
> > > > > > > > >> > serve meet the "freshness" requirements (eg. you
> > > > > > > > >> >
> > > > > > > > >> > may
> > > > > > > > >> >
> > > > > > > > >> > want
> > > > > > > > >> >
> > > > > > > > >> > to
> > > > > > > > >> >
> > > > > > > > >> > respond differently, when watermark is lagging way
> > > > > > > > >> >
> > > > > > > > >> > too
> > > > > > > > >> >
> > > > > > > > >> > much
> > > > > > > > >> >
> > > > > > > > >> > behind
> > > > > > > > >> >
> > > > > > > > >> > processing time). Also not
> > > > > > > > >> >
> > > > > > > > >> > every
> > > > > > > > >> >
> > > > > > > > >> > stream processor in the pipeline needs to be Flink.
> > > > > > > > >> >
> > > > > > > > >> > It
> > > > > > > > >> >
> > > > > > > > >> > can
> > > > > > > > >> >
> > > > > > > > >> > as
> > > > > > > > >> >
> > > > > > > > >> > well
> > > > > > > > >> >
> > > > > > > > >> > be a simple element-wise transformation that reads
> > > > > > > > >> >
> > > > > > > > >> > from
> > > > > > > > >> >
> > > > > > > > >> > Kafka
> > > > > > > > >> >
> > > > > > > > >> > and
> > > > > > > > >> >
> > > > > > > > >> > writes back into separate topic (that's what we do
> > > > > > > > >> >
> > > > > > > > >> > for
> > > > > > > > >> >
> > > > > > > > >> > example
> > > > > > > > >> >
> > > > > > > > >> > with
> > > > > > > > >> >
> > > > > > > > >> > ML models, that have special hardware
> > > > > > > > >> >
> > > > > > > > >> > requirements).
> > > > > > > > >> >
> > > > > > > > >> > Best,
> > > > > > > > >> > D.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > On Tue, May 18, 2021 at 8:30 AM Arvid Heise <
> > > > > > > > >> >
> > > > > > > > >> > ar...@apache.org>
> > > > > > > > >> >
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> > Hi Eron,
> > > > > > > > >> >
> > > > > > > > >> > I think this is a useful addition for storage
> > > > > > > > >> >
> > > > > > > > >> > systems
> > > > > > > > >> >
> > > > > > > > >> > that
> > > > > > > > >> >
> > > > > > > > >> > act
> > > > > > > > >> >
> > > > > > > > >> > as
> > > > > > > > >> >
> > > > > > > > >> > pass-through for Flink to reduce recovery time.
> > > > > > > > >> >
> > > > > > > > >> > It
> > > > > > > > >> >
> > > > > > > > >> > is
> > > > > > > > >> >
> > > > > > > > >> > only
> > > > > > > > >> >
> > > > > > > > >> > useful
> > > > > > > > >> >
> > > > > > > > >> > if
> > > > > > > > >> >
> > > > > > > > >> > you
> > > > > > > > >> >
> > > > > > > > >> > combine it with regional fail-over as only a
> > > > > > > > >> >
> > > > > > > > >> > small
> > > > > > > > >> >
> > > > > > > > >> > part
> > > > > > > > >> >
> > > > > > > > >> > of
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > pipeline
> > > > > > > > >> >
> > > > > > > > >> > is
> > > > > > > > >> >
> > > > > > > > >> > restarted.
> > > > > > > > >> >
> > > > > > > > >> > A couple of thoughts on the implications:
> > > > > > > > >> > 1. Watermarks are closely coupled to the used
> > > > > > > > >> >
> > > > > > > > >> > system
> > > > > > > > >> >
> > > > > > > > >> > (=Flink).
> > > > > > > > >> >
> > > > > > > > >> > I
> > > > > > > > >> >
> > > > > > > > >> > have
> > > > > > > > >> >
> > > > > > > > >> > a
> > > > > > > > >> >
> > > > > > > > >> > hard time imagining that it's useful to use a
> > > > > > > > >> >
> > > > > > > > >> > different
> > > > > > > > >> >
> > > > > > > > >> > stream
> > > > > > > > >> >
> > > > > > > > >> > processor
> > > > > > > > >> >
> > > > > > > > >> > downstream. So for now, I'm assuming that both
> > > > > > > > >> >
> > > > > > > > >> > upstream
> > > > > > > > >> >
> > > > > > > > >> > and
> > > > > > > > >> >
> > > > > > > > >> > downstream
> > > > > > > > >> >
> > > > > > > > >> > are
> > > > > > > > >> >
> > > > > > > > >> > Flink applications. In that case, we probably
> > > > > > > > >> >
> > > > > > > > >> > define
> > > > > > > > >> >
> > > > > > > > >> > both
> > > > > > > > >> >
> > > > > > > > >> > parts
> > > > > > > > >> >
> > > > > > > > >> > of the pipeline in the same Flink job similar to
> > > > > > > > >> >
> > > > > > > > >> > KafkaStream's
> > > > > > > > >> >
> > > > > > > > >> > #through.
> > > > > > > > >> >
> > > > > > > > >> > 2. The schema of the respective intermediate
> > > > > > > > >> >
> > > > > > > > >> > stream/topic
> > > > > > > > >> >
> > > > > > > > >> > would
> > > > > > > > >> >
> > > > > > > > >> > need
> > > > > > > > >> >
> > > > > > > > >> > to
> > > > > > > > >> >
> > > > > > > > >> > be
> > > > > > > > >> >
> > > > > > > > >> > managed by Flink to encode both records and
> > > > > > > > >> >
> > > > > > > > >> > watermarks.
> > > > > > > > >> >
> > > > > > > > >> > This
> > > > > > > > >> >
> > > > > > > > >> > reduces
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > usability quite a bit and needs to be carefully
> > > > > > > > >> >
> > > > > > > > >> > crafted.
> > > > > > > > >> >
> > > > > > > > >> > 3. It's not clear to me if constructs like
> > > > > > > > >> >
> > > > > > > > >> > SchemaRegistry
> > > > > > > > >> >
> > > > > > > > >> > can
> > > > > > > > >> >
> > > > > > > > >> > be
> > > > > > > > >> >
> > > > > > > > >> > properly
> > > > > > > > >> >
> > > > > > > > >> > supported (and also if they should be supported)
> > > > > > > > >> >
> > > > > > > > >> > in
> > > > > > > > >> >
> > > > > > > > >> > terms
> > > > > > > > >> >
> > > > > > > > >> > of
> > > > > > > > >> >
> > > > > > > > >> > schema evolution.
> > > > > > > > >> > 4. Potentially, StreamStatus and LatencyMarker
> > > > > > > > >> >
> > > > > > > > >> > would
> > > > > > > > >> >
> > > > > > > > >> > also
> > > > > > > > >> >
> > > > > > > > >> > need
> > > > > > > > >> >
> > > > > > > > >> > to
> > > > > > > > >> >
> > > > > > > > >> > be encoded.
> > > > > > > > >> > 5. It's important to have some way to transport
> > > > > > > > >> >
> > > > > > > > >> > backpressure
> > > > > > > > >> >
> > > > > > > > >> > from
> > > > > > > > >> >
> > > > > > > > >> > the downstream to the upstream. Or else you would
> > > > > > > > >> >
> > > > > > > > >> > have
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > same
> > > > > > > > >> >
> > > > > > > > >> > issue as KafkaStreams where two separate
> > > > > > > > >> >
> > > > > > > > >> > pipelines
> > > > > > > > >> >
> > > > > > > > >> > can
> > > > > > > > >> >
> > > > > > > > >> > drift
> > > > > > > > >> >
> > > > > > > > >> > so
> > > > > > > > >> >
> > > > > > > > >> > far away that
> > > > > > > > >> >
> > > > > > > > >> > you
> > > > > > > > >> >
> > > > > > > > >> > experience data loss if the data retention period
> > > > > > > > >> >
> > > > > > > > >> > is
> > > > > > > > >> >
> > > > > > > > >> > smaller
> > > > > > > > >> >
> > > > > > > > >> > than
> > > > > > > > >> >
> > > > > > > > >> > the drift.
> > > > > > > > >> > 6. It's clear that you trade a huge chunk of
> > > > > > > > >> >
> > > > > > > > >> > throughput
> > > > > > > > >> >
> > > > > > > > >> > for
> > > > > > > > >> >
> > > > > > > > >> > lower
> > > > > > > > >> >
> > > > > > > > >> > overall
> > > > > > > > >> >
> > > > > > > > >> > latency in case of failure. So it's an
> > > > > > > > >> >
> > > > > > > > >> > interesting
> > > > > > > > >> >
> > > > > > > > >> > feature
> > > > > > > > >> >
> > > > > > > > >> > for
> > > > > > > > >> >
> > > > > > > > >> > use
> > > > > > > > >> >
> > > > > > > > >> > cases
> > > > > > > > >> >
> > > > > > > > >> > with SLAs.
> > > > > > > > >> >
> > > > > > > > >> > Since we are phasing out SinkFunction, I'd prefer
> > > > > > > > >> >
> > > > > > > > >> > to
> > > > > > > > >> >
> > > > > > > > >> > only
> > > > > > > > >> >
> > > > > > > > >> > support
> > > > > > > > >> >
> > > > > > > > >> > SinkWriter. Having a no-op default sounds good to
> > > > > > > > >> >
> > > > > > > > >> > me.
> > > > > > > > >> >
> > > > > > > > >> > We have some experimental feature for Kafka [1],
> > > > > > > > >> >
> > > > > > > > >> > which
> > > > > > > > >> >
> > > > > > > > >> > pretty
> > > > > > > > >> >
> > > > > > > > >> > much
> > > > > > > > >> >
> > > > > > > > >> > reflects
> > > > > > > > >> >
> > > > > > > > >> > your idea. Here we have an ugly workaround to be
> > > > > > > > >> >
> > > > > > > > >> > able
> > > > > > > > >> >
> > > > > > > > >> > to
> > > > > > > > >> >
> > > > > > > > >> > process
> > > > > > > > >> >
> > > > > > > > >> > the watermark by using a custom StreamSink task.
> > > > > > > > >> >
> > > > > > > > >> > We
> > > > > > > > >> >
> > > > > > > > >> > could
> > > > > > > > >> >
> > > > > > > > >> > also
> > > > > > > > >> >
> > > > > > > > >> > try to
> > > > > > > > >> >
> > > > > > > > >> > create a
> > > > > > > > >> >
> > > > > > > > >> > FLIP that abstracts the actual system away and
> > > > > > > > >> >
> > > > > > > > >> > then
> > > > > > > > >> >
> > > > > > > > >> > we
> > > > > > > > >> >
> > > > > > > > >> > could
> > > > > > > > >> >
> > > > > > > > >> > use
> > > > > > > > >> >
> > > > > > > > >> > the approach for both Pulsar and Kafka.
> > > > > > > > >> >
> > > > > > > > >> > [1]
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > >
> > > > >
> > https://urldefense.com/v3/__https://github.com/apache/flink/blob/maste
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > >
> > > >
> r/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flin
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > >
> > > >
> k/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java*L103__;Iw!
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > >
> !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMvmemHrt$
> > > > > > > > >> >
> > > > > > > > >> > [github[.]com]
> > > > > > > > >> >
> > > > > > > > >> > On Mon, May 17, 2021 at 10:44 PM Eron
> > > > > > > > >> Wright<ewri...@streamnative.io.invalid> <
> > > > ewri...@streamnative.io
> > > > > > > > .invalid>
> > > > > > > > >> wrote:
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > I would like to propose an enhancement to the
> > > > > > > > >> >
> > > > > > > > >> > Sink
> > > > > > > > >> >
> > > > > > > > >> > API,
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > ability
> > > > > > > > >> >
> > > > > > > > >> > to
> > > > > > > > >> >
> > > > > > > > >> > receive upstream watermarks.   I'm aware that
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > sink
> > > > > > > > >> >
> > > > > > > > >> > context
> > > > > > > > >> >
> > > > > > > > >> > provides
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > current watermark for a given record.  I'd like
> > > > > > > > >> >
> > > > > > > > >> > to
> > > > > > > > >> >
> > > > > > > > >> > be
> > > > > > > > >> >
> > > > > > > > >> > able
> > > > > > > > >> >
> > > > > > > > >> > to
> > > > > > > > >> >
> > > > > > > > >> > write
> > > > > > > > >> >
> > > > > > > > >> > a
> > > > > > > > >> >
> > > > > > > > >> > sink
> > > > > > > > >> >
> > > > > > > > >> > function that is invoked whenever the watermark
> > > > > > > > >> >
> > > > > > > > >> > changes.
> > > > > > > > >> >
> > > > > > > > >> > Out
> > > > > > > > >> >
> > > > > > > > >> > of
> > > > > > > > >> >
> > > > > > > > >> > scope
> > > > > > > > >> >
> > > > > > > > >> > would be event-time timers (since sinks aren't
> > > > > > > > >> >
> > > > > > > > >> > keyed).
> > > > > > > > >> >
> > > > > > > > >> > For context, imagine that a stream storage
> > > > > > > > >> >
> > > > > > > > >> > system
> > > > > > > > >> >
> > > > > > > > >> > had
> > > > > > > > >> >
> > > > > > > > >> > the
> > > > > > > > >> >
> > > > > > > > >> > ability to persist watermarks in addition to
> > > > > > > > >> >
> > > > > > > > >> > ordinary
> > > > > > > > >> >
> > > > > > > > >> > elements,
> > > > > > > > >> >
> > > > > > > > >> > e.g. to serve
> > > > > > > > >> >
> > > > > > > > >> > as
> > > > > > > > >> >
> > > > > > > > >> > source watermarks in a downstream processor.
> > > > > > > > >> >
> > > > > > > > >> > Ideally
> > > > > > > > >> >
> > > > > > > > >> > one
> > > > > > > > >> >
> > > > > > > > >> > could
> > > > > > > > >> >
> > > > > > > > >> > compose a
> > > > > > > > >> >
> > > > > > > > >> > multi-stage, event-driven application, with
> > > > > > > > >> >
> > > > > > > > >> > watermarks
> > > > > > > > >> >
> > > > > > > > >> > flowing
> > > > > > > > >> >
> > > > > > > > >> > end-to-end
> > > > > > > > >> >
> > > > > > > > >> > without need for a heuristics-based watermark
> > > > > > > > >> >
> > > > > > > > >> > at
> > > > > > > > >> >
> > > > > > > > >> > each
> > > > > > > > >> >
> > > > > > > > >> > stage.
> > > > > > > > >> >
> > > > > > > > >> > The specific proposal would be a new method on
> > > > > > > > >> >
> > > > > > > > >> > `SinkFunction`
> > > > > > > > >> >
> > > > > > > > >> > and/or
> > > > > > > > >> >
> > > > > > > > >> > on
> > > > > > > > >> >
> > > > > > > > >> > `SinkWriter`, called 'processWatermark' or
> > > > > > > > >> >
> > > > > > > > >> > 'writeWatermark',
> > > > > > > > >> >
> > > > > > > > >> > with a
> > > > > > > > >> >
> > > > > > > > >> > default
> > > > > > > > >> >
> > > > > > > > >> > implementation that does nothing.
> > > > > > > > >> >
> > > > > > > > >> > Thoughts?
> > > > > > > > >> >
> > > > > > > > >> > Thanks!
> > > > > > > > >> > Eron Wright
> > > > > > > > >> > StreamNative
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > --
> > > > > > > > >> >
> > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > >> >
> > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > >> > streamnative.io |  Meet with me
> > > > > > > > >> > <
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > >
> > https://urldefense.com/v3/__https://calendly.com/eronwright/regular
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > >
> > -1-hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5
> > > > > > > > >> >
> > > > > > > > >> > dMtQrD25c$ [calendly[.]com]>
> > > > > > > > >> >
> > > > > > > > >> > <
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > >
> > https://urldefense.com/v3/__https://github.com/streamnative__;!!LpK
> > > > > > > > >> >
> > > > > > > > >> >
> > > > I!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$
> > > > > > > > >> >
> > > > > > > > >> > [github[.]com]>
> > > > > > > > >> > <
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > >
> > https://urldefense.com/v3/__https://www.linkedin.com/company/stream
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > >
> > native/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5
> > > > > > > > >> >
> > > > > > > > >> > dMqO4UZJa$ [linkedin[.]com]>
> > > > > > > > >> > <
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > https://urldefense.com/v3/__https://twitter.com/streamnativeio/__
> > > > > > > > >> >
> > > > > > > > >> > ;!
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > >
> !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$
> > > > > > > > >> >
> > > > > > > > >> > [twitter[.]com]>
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > --
> > > > > > > > >> >
> > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > >> >
> > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > >> > streamnative.io |  Meet with me
> > > > > > > > >> > <
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > >
> > > >
> https://urldefense.com/v3/__https://calendly.com/eronwright/regular-1
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > >
> > > >
> -hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtQ
> > > > > > > > >> >
> > > > > > > > >> > rD25c$ [calendly[.]com]>
> > > > > > > > >> >
> > > > > > > > >> > <
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > >
> > > https://urldefense.com/v3/__https://github.com/streamnative__;!!LpKI
> > > > > > > > >> >
> > > > > > > > >> > !
> > > > > > > > >> >
> > > > > > > > >> >
> > > 2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$
> > > > > > > > >> >
> > > > > > > > >> > [github[.]com]>
> > > > > > > > >> > <
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > >
> > > >
> https://urldefense.com/v3/__https://www.linkedin.com/company/streamna
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > >
> > > >
> tive/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMqO
> > > > > > > > >> >
> > > > > > > > >> > 4UZJa$ [linkedin[.]com]>
> > > > > > > > >> > <
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > >
> > > >
> https://urldefense.com/v3/__https://twitter.com/streamnativeio/__;!!L
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > pKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$
> > > > > > > > >> >
> > > > > > > > >> > [twitter[.]com]>
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > --
> > > > > > > > >> >
> > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > >> >
> > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > >> > streamnative.io |  Meet with me<
> > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> <
> > > > > > > > >> https://calendly.com/eronwright/regular-1-hour>
> > > > > > > > >> > <https://github.com/streamnative> <
> > > > > > https://github.com/streamnative
> > > > > > > ><
> > > > > > > > >> https://www.linkedin.com/company/streamnative/> <
> > > > > > > > >> https://www.linkedin.com/company/streamnative/><
> > > > > > > > >> https://twitter.com/streamnativeio/> <
> > > > > > > > https://twitter.com/streamnativeio/
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > --
> > > > > > > > >> >
> > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > >> >
> > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > >> > streamnative.io |  Meet with me<
> > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> <
> > > > > > > > >> https://calendly.com/eronwright/regular-1-hour>
> > > > > > > > >> > <https://github.com/streamnative> <
> > > > > > https://github.com/streamnative
> > > > > > > ><
> > > > > > > > >> https://www.linkedin.com/company/streamnative/> <
> > > > > > > > >> https://www.linkedin.com/company/streamnative/><
> > > > > > > > >> https://twitter.com/streamnativeio/> <
> > > > > > > > https://twitter.com/streamnativeio/
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > --
> > > > > > > > >> >
> > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > >> >
> > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > >> > streamnative.io |  Meet with me<
> > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> <
> > > > > > > > >> https://calendly.com/eronwright/regular-1-hour>
> > > > > > > > >> > <https://github.com/streamnative> <
> > > > > > https://github.com/streamnative
> > > > > > > ><
> > > > > > > > >> https://www.linkedin.com/company/streamnative/> <
> > > > > > > > >> https://www.linkedin.com/company/streamnative/><
> > > > > > > > >> https://twitter.com/streamnativeio/> <
> > > > > > > > https://twitter.com/streamnativeio/
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > --
> > > > > > > > >> >
> > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > >> >
> > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > >> > streamnative.io |  Meet with me<
> > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> <
> > > > > > > > >> https://calendly.com/eronwright/regular-1-hour>
> > > > > > > > >> > <https://github.com/streamnative> <
> > > > > > https://github.com/streamnative
> > > > > > > ><
> > > > > > > > >> https://www.linkedin.com/company/streamnative/> <
> > > > > > > > >> https://www.linkedin.com/company/streamnative/><
> > > > > > > > >> https://twitter.com/streamnativeio/> <
> > > > > > > > https://twitter.com/streamnativeio/
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > --
> > > > > > > > >> >
> > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > >> >
> > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > >> > streamnative.io |  Meet with me<
> > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> <
> > > > > > > > >> https://calendly.com/eronwright/regular-1-hour>
> > > > > > > > >> > <https://github.com/streamnative> <
> > > > > > https://github.com/streamnative
> > > > > > > ><
> > > > > > > > >> https://www.linkedin.com/company/streamnative/> <
> > > > > > > > >> https://www.linkedin.com/company/streamnative/><
> > > > > > > > >> https://twitter.com/streamnativeio/> <
> > > > > > > > https://twitter.com/streamnativeio/
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to