Hi everyone,

I created a FLIP and started a discussion around that topic [1].

Best,

Arvid

[1]
https://lists.apache.org/thread.html/r8357d64b9cfdf5a233c53a20d9ac62b75c07c925ce2c43e162f1e39c%40%3Cdev.flink.apache.org%3E

On Tue, Jun 8, 2021 at 11:46 PM Eron Wright <ewri...@streamnative.io.invalid>
wrote:

> Thanks, the narrowed FLIP-167 is fine for now.  I'll re-activate the vote
> process.  Thanks!
>
> On Tue, Jun 8, 2021 at 3:01 AM Till Rohrmann <trohrm...@apache.org> wrote:
>
> > Hi everyone,
> >
> > I do agree that Flink's definition of idleness is not fully thought
> through
> > yet. Consequently, I would feel a bit uneasy to make it part of Flink's
> API
> > right now. Instead, defining the proper semantics first and then exposing
> > it sounds like a good approach forward. Hence, +1 for option number 1,
> > which will also allow FLIP-167 to make progress.
> >
> > Concerning subtasks with no partitions assigned, would it make sense to
> > terminate these tasks at some point? That way, the stream would be closed
> > and there is no need to maintain a stream status. Of course, this also
> > requires at some point that Flink can start new sources when new
> partitions
> > appear.
> >
> > Cheers,
> > Till
> >
> > On Tue, Jun 8, 2021 at 9:26 AM Piotr Nowojski <piotr.nowoj...@gmail.com>
> > wrote:
> >
> > > Hi Eron,
> > >
> > > The FLIP-167 is narrow, but we recently discovered some problems with
> > > current idleness semantics as Arvid explained. We are planning to
> > present a
> > > new proposal to redefine them. Probably as a part of it, we would need
> to
> > > rename them. Given that, I think it doesn't make sense to expose
> idleness
> > > to the sinks before we rename and define it properly. In other words:
> > >
> > > > 2. When the sink operator is idled, tell the sink function.
> > >
> > > We shouldn't expose stream status as a part of public API until it's
> > > properly defined.
> > >
> > > I would propose one of the two things:
> > > 1. Proceed with FLIP-167, without exposing idleness in the sinks YET.
> > > Exposing idleness could be part of this next/future FLIP that would
> > define
> > > idleness in the first place.
> > > 2. Block FLIP-167, until the idleness is fixed.
> > >
> > > I would vote for option number 1.
> > >
> > > Piotrek
> > >
> > > pon., 7 cze 2021 o 18:08 Eron Wright <ewri...@streamnative.io.invalid>
> > > napisał(a):
> > >
> > > > Piotr, David, and Arvid, we've had an expansive discussion but
> > ultimately
> > > > the proposal is narrow.  It is:
> > > > 1. When a watermark arrives at the sink operator, tell the sink
> > function.
> > > > 2. When the sink operator is idled, tell the sink function.
> > > >
> > > > With these enhancements, we will significantly improve correctness in
> > > > multi-stage flows, and facilitate an exciting project in the Pulsar
> > > > community.  Would you please lend your support to FLIP-167 so that we
> > can
> > > > land this enhancement for 1.14?  My deepest thanks!
> > > >
> > > > -Eron
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, Jun 7, 2021 at 4:45 AM Arvid Heise <ar...@apache.org> wrote:
> > > >
> > > > > Hi Eron,
> > > > >
> > > > > you either have very specific use cases in mind or have a
> > misconception
> > > > > about idleness in Flink with the new sources. The basic idea is
> that
> > > you
> > > > > have watermark generators only at the sources and the user supplies
> > > them.
> > > > > As a source author, you have no option to limit that. Here a bit of
> > > > > background:
> > > > >
> > > > > We observed that many users that read from Kafka were confused
> about
> > no
> > > > > visible progress in their Flink applications because of some idle
> > > > partition
> > > > > and we introduced idleness subsequently. Idleness was always
> > considered
> > > > as
> > > > > a means to achieve progress at the risk of losing a bit of
> > correctness.
> > > > > So especially in the case that you describe with a Pulsar partition
> > > that
> > > > is
> > > > > empty but indefinitely active, the user needs to be able to use
> > > idleness
> > > > > such that downstream window operators progress.
> > > > >
> > > > > I hope to have clarified that "I wouldn't recommend using
> > > withIdleness()
> > > > > with source-based watermarks." would pretty much make the intended
> > use
> > > > case
> > > > > not work anymore.
> > > > >
> > > > > ---
> > > > >
> > > > > Nevertheless, from the discussion with you and some offline
> > discussion
> > > > with
> > > > > Piotr and Dawid, we actually found quite a bit of drawbacks from
> the
> > > > > current definition of idleness:
> > > > > - We currently only use idleness to exclude respective upstream
> tasks
> > > > from
> > > > > participating in watermark generation (as you have eloquently put
> > > further
> > > > > up in the thread).
> > > > > - However, the definition is bound to records. So while a partition
> > is
> > > > > idle, no records should be produced.
> > > > > - That brings us into quite a few edge cases, where operators emit
> > > > records,
> > > > > while they are actually idling: Think of timers, asyncIO operators,
> > > > window
> > > > > operators based on timeouts, etc.
> > > > > - The solution would be to turn the operator active while emitting
> > and
> > > > > returning to being idle afterwards (but when?). However, this has
> > some
> > > > > unintended side-effects depending on when you switch back.
> > > > >
> > > > > We are currently thinking that we should rephrase the definition to
> > > what
> > > > > you described:
> > > > > - A channel that is active is providing watermarks.
> > > > > - An idle channel is not providing any watermarks but can deliver
> > > > records.
> > > > > - Then we are not talking about idle partitions anymore but
> explicit
> > > and
> > > > > implicit watermark generation and should probably rename the
> > concepts.
> > > > > - This would probably mean that we also need an explicit markActive
> > in
> > > > > source/sink to express that the respective entity now needs to wait
> > for
> > > > > explicit watermarks.
> > > > >
> > > > > I'll open a proper discussion thread tomorrow.
> > > > >
> > > > > Note that we probably shouldn't rush this FLIP until we have
> > clarified
> > > > the
> > > > > semantics of idleness. We could also cut the scope of the FLIP to
> > > exclude
> > > > > idleness and go ahead without it (there should be enough binding
> > votes
> > > > > already).
> > > > >
> > > > > On Sat, Jun 5, 2021 at 12:09 AM Eron Wright <
> ewri...@streamnative.io
> > > > > .invalid>
> > > > > wrote:
> > > > >
> > > > > > I understand your scenario but I disagree with its assumptions:
> > > > > >
> > > > > > "However, the partition of A is empty and thus A is temporarily
> > > idle."
> > > > -
> > > > > > you're assuming that the behavior of the source is to mark itself
> > > idle
> > > > if
> > > > > > data isn't available, but that's clearly source-specific and not
> > > > behavior
> > > > > > we expect to have in Pulsar source.  A partition may be empty
> > > > > indefinitely
> > > > > > while still being active.  Imagine that the producer is
> defending a
> > > > > lease -
> > > > > > "I'm here, there's no data, please don't advance the clock".
> > > > > >
> > > > > > "we bind idleness to wall clock time" - you're characterizing a
> > > > specific
> > > > > > strategy (WatermarkStrategy.withIdleness()), not the inherent
> > > behavior
> > > > of
> > > > > > the pipeline.  I wouldn't recommend using withIdleness() with
> > > > > source-based
> > > > > > watermarks.
> > > > > >
> > > > > > I do agree that dynamism in partition assignment can wreak havoc
> on
> > > > > > watermark correctness.  We have some ideas on the Pulsar side
> about
> > > > that
> > > > > > too.  I would ask that we focus on the Flink framework and
> pipeline
> > > > > > behavior.  By offering a more powerful framework, we encourage
> > stream
> > > > > > storage systems to "rise to the occasion" - treat event time in a
> > > > > > first-class way, optimize for correctness, etc.  In this case,
> > > FLIP-167
> > > > > is
> > > > > > setting the stage for evolution in Pulsar.
> > > > > >
> > > > > > Thanks again Arvid for the great discussion.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Jun 4, 2021 at 2:06 PM Arvid Heise <ar...@apache.org>
> > wrote:
> > > > > >
> > > > > > > At least one big motivation is having (temporary) empty
> > partitions.
> > > > Let
> > > > > > me
> > > > > > > give you an example, why imho idleness is only approximate in
> > this
> > > > > case:
> > > > > > > Assume you have source subtask A, B, C that correspond to 3
> > source
> > > > > > > partitions and a downstream keyed window operator W.
> > > > > > >
> > > > > > > W would usually trigger on min_watermark(A, B, C). However, the
> > > > > partition
> > > > > > > of A is empty and thus A is temporarily idle. So W triggers on
> > > > > > > min_watermark(B, C). When A is now active again, the watermark
> > > > > implicitly
> > > > > > > is min_watermark(B, C) for A!
> > > > > > >
> > > > > > > Let's further assume that the source is filled by another
> > pipeline
> > > > > > before.
> > > > > > > This pipeline experiences technical difficulties for X minutes
> > and
> > > > > could
> > > > > > > not produce into the partition of A, hence the idleness. When
> the
> > > > > > upstream
> > > > > > > pipeline resumes it fills A with some records that are before
> > > > > > > min_watermark(B, C). Any watermark generated from these records
> > is
> > > > > > > discarded as the watermark is monotonous. Therefore, these
> > records
> > > > will
> > > > > > be
> > > > > > > considered late by W and discarded.
> > > > > > >
> > > > > > > Without idleness, we would have simply bocked W until the
> > upstream
> > > > > > pipeline
> > > > > > > fully recovers and we would not have had any late records. The
> > same
> > > > > holds
> > > > > > > for any reprocessing where the data of partition A is
> continuous.
> > > > > > >
> > > > > > > If you look deeper, the issue is that we bind idleness to wall
> > > clock
> > > > > time
> > > > > > > (e.g. advance watermark after X seconds without data). Then we
> > > assume
> > > > > the
> > > > > > > watermark of the idle partition to be in sync with the slowest
> > > > > partition.
> > > > > > > However, in the case of hiccups, this assumption does not hold
> at
> > > > all.
> > > > > > > I don't see any fix for that (easy or not easy) and imho it's
> > > > inherent
> > > > > to
> > > > > > > the design of idleness.
> > > > > > > We lack information (why is no data coming) and have a
> heuristic
> > to
> > > > fix
> > > > > > it.
> > > > > > >
> > > > > > > In the case of partition assignment where one subtask has no
> > > > partition,
> > > > > > we
> > > > > > > are probably somewhat safe. We know why no data is coming (no
> > > > > partition)
> > > > > > > and as long as we do not have dynamic partition assignment,
> there
> > > > will
> > > > > > > never be a switch to active without restart (for the
> foreseeable
> > > > > future).
> > > > > > >
> > > > > > > On Fri, Jun 4, 2021 at 10:34 PM Eron Wright <
> > > ewri...@streamnative.io
> > > > > > > .invalid>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Yes I'm talking about an implementation of idleness that is
> > > > unrelated
> > > > > > to
> > > > > > > > processing time.  The clear example is partition assignment
> to
> > > > > > subtasks,
> > > > > > > > which probably motivated Flink's idleness functionality in
> the
> > > > first
> > > > > > > place.
> > > > > > > >
> > > > > > > > On Fri, Jun 4, 2021 at 12:53 PM Arvid Heise <
> ar...@apache.org>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Eron,
> > > > > > > > >
> > > > > > > > > Are you referring to an implementation of idleness that
> does
> > > not
> > > > > rely
> > > > > > > on
> > > > > > > > a
> > > > > > > > > wall clock but on some clock baked into the partition
> > > information
> > > > > of
> > > > > > > the
> > > > > > > > > source system?
> > > > > > > > > If so, you are right that it invalidates my points.
> > > > > > > > > Do you have an example on where this is used?
> > > > > > > > >
> > > > > > > > > With a wall clock, you always run into the issues that I
> > > describe
> > > > > > since
> > > > > > > > you
> > > > > > > > > are effectively mixing event time and processing time...
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Jun 4, 2021 at 6:28 PM Eron Wright <
> > > > > ewri...@streamnative.io
> > > > > > > > > .invalid>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Dawid, I think you're mischaracterizing the idleness
> signal
> > > as
> > > > > > > > > inherently a
> > > > > > > > > > heuristic, but Flink does not impose that.  A
> source-based
> > > > > > watermark
> > > > > > > > (and
> > > > > > > > > > corresponding idleness signal) may well be entirely
> > > > data-driven,
> > > > > > > > entirely
> > > > > > > > > > deterministic.  Basically you're underselling what the
> > > pipeline
> > > > > is
> > > > > > > > > capable
> > > > > > > > > > of, based on painful experiences with using the generic,
> > > > > > > > heuristics-based
> > > > > > > > > > watermark assigner.  Please don't let those experiences
> > > > > overshadow
> > > > > > > > what's
> > > > > > > > > > possible with source-based watermarking.
> > > > > > > > > >
> > > > > > > > > > The idleness signal does have a strict definition, it
> > > indicates
> > > > > > > whether
> > > > > > > > > the
> > > > > > > > > > stream is actively participating in advancing the event
> > time
> > > > > clock.
> > > > > > > > The
> > > > > > > > > > status of all participants is considered when aggregating
> > > > > > watermarks.
> > > > > > > > A
> > > > > > > > > > source subtask generally makes the determination based on
> > > data,
> > > > > > e.g.
> > > > > > > > > > whether a topic is assigned to that subtask.
> > > > > > > > > >
> > > > > > > > > > We have here a modest proposal to add callbacks to the
> sink
> > > > > > function
> > > > > > > > for
> > > > > > > > > > information that the sink operator already receives.  The
> > > > > practical
> > > > > > > > > result
> > > > > > > > > > is improved correctness when used with streaming systems
> > that
> > > > > have
> > > > > > > > > > first-class support for event time.  The specific changes
> > may
> > > > be
> > > > > > > > > previewed
> > > > > > > > > > here:
> > > > > > > > > > https://github.com/apache/flink/pull/15950
> > > > > > > > > > https://github.com/streamnative/flink/pull/2
> > > > > > > > > >
> > > > > > > > > > Thank you all for the robust discussion. Do I have your
> > > support
> > > > > to
> > > > > > > > > proceed
> > > > > > > > > > to enhance FLIP-167 with idleness callbacks and to
> proceed
> > > to a
> > > > > > vote?
> > > > > > > > > >
> > > > > > > > > > Eron
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Jun 4, 2021 at 9:08 AM Arvid Heise <
> > ar...@apache.org
> > > >
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > While everything I wrote before is still valid, upon
> > > further
> > > > > > > > > rethinking,
> > > > > > > > > > I
> > > > > > > > > > > think that the conclusion is not necessarily correct:
> > > > > > > > > > > - If the user wants to have pipeline A and B behaving
> as
> > if
> > > > A+B
> > > > > > was
> > > > > > > > > > jointly
> > > > > > > > > > > executed in the same pipeline without the intermediate
> > > Pulsar
> > > > > > > topic,
> > > > > > > > > > having
> > > > > > > > > > > the idleness in that topic is to only way to guarantee
> > > > > > consistency.
> > > > > > > > > > > - We could support the following in the respective
> > sources:
> > > > If
> > > > > > the
> > > > > > > > user
> > > > > > > > > > > that wants to use a different definition of idleness in
> > B,
> > > > they
> > > > > > can
> > > > > > > > > just
> > > > > > > > > > > provide a new idleness definition. At that point, we
> > should
> > > > > > discard
> > > > > > > > the
> > > > > > > > > > > idleness in the intermediate topic while reading.
> > > > > > > > > > >
> > > > > > > > > > > If we would agree on the latter way, I think having the
> > > > > idleness
> > > > > > in
> > > > > > > > the
> > > > > > > > > > > topic is of great use because it's a piece of
> information
> > > > that
> > > > > > > cannot
> > > > > > > > > be
> > > > > > > > > > > inferred as stated by others. Consequently, we would be
> > > able
> > > > to
> > > > > > > > support
> > > > > > > > > > all
> > > > > > > > > > > use cases and can give the user the freedom to express
> > his
> > > > > > intent.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Jun 4, 2021 at 3:43 PM Arvid Heise <
> > > ar...@apache.org
> > > > >
> > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > I think the core issue in this discussion is that we
> > kind
> > > > of
> > > > > > > assume
> > > > > > > > > > that
> > > > > > > > > > > > idleness is something universally well-defined. But
> > it's
> > > > not.
> > > > > > > It's
> > > > > > > > a
> > > > > > > > > > > > heuristic to advance data processing in event time
> > where
> > > we
> > > > > > would
> > > > > > > > > lack
> > > > > > > > > > > data
> > > > > > > > > > > > to do so otherwise.
> > > > > > > > > > > > Keep in mind that idleness has no real definition in
> > > terms
> > > > of
> > > > > > > event
> > > > > > > > > > time
> > > > > > > > > > > > and leads to severe unexpected results: If you
> > reprocess
> > > a
> > > > > data
> > > > > > > > > stream
> > > > > > > > > > > with
> > > > > > > > > > > > temporarily idle partitions, these partitions would
> not
> > > be
> > > > > > deemed
> > > > > > > > > idle
> > > > > > > > > > on
> > > > > > > > > > > > reprocessing and there is a realistic chance that
> > records
> > > > > that
> > > > > > > were
> > > > > > > > > > > deemed
> > > > > > > > > > > > late in the live processing case are now perfectly
> fine
> > > > > records
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > > reprocessing case. (I can expand on that if that was
> > too
> > > > > short)
> > > > > > > > > > > >
> > > > > > > > > > > > With that in mind, why would a downstream process
> even
> > > try
> > > > to
> > > > > > > > > calculate
> > > > > > > > > > > > the same idleness state as the upstream process? I
> > don't
> > > > see
> > > > > a
> > > > > > > > point;
> > > > > > > > > > we
> > > > > > > > > > > > would just further any imprecision in the
> calculation.
> > > > > > > > > > > >
> > > > > > > > > > > > Let's have a concrete example. Assume that we have
> > > upstream
> > > > > > > > pipeline
> > > > > > > > > A
> > > > > > > > > > > and
> > > > > > > > > > > > downstream pipeline B. A has plenty of resources and
> is
> > > > live
> > > > > > > > > processing
> > > > > > > > > > > > data. Some partitions are idle and that is propagated
> > to
> > > > the
> > > > > > > sinks.
> > > > > > > > > > Now B
> > > > > > > > > > > > is heavily backpressured and consumes very slowly. B
> > > > doesn't
> > > > > > see
> > > > > > > > any
> > > > > > > > > > > > idleness directly. B can calculate exact watermarks
> and
> > > use
> > > > > all
> > > > > > > > > records
> > > > > > > > > > > for
> > > > > > > > > > > > it's calculation. Reprocessing would yield the same
> > > result
> > > > > for
> > > > > > B.
> > > > > > > > If
> > > > > > > > > we
> > > > > > > > > > > now
> > > > > > > > > > > > forward idleness, we can easily find cases where we
> > would
> > > > > > advance
> > > > > > > > the
> > > > > > > > > > > > watermark prematurely while there is data directly
> > > > available
> > > > > to
> > > > > > > > > > calculate
> > > > > > > > > > > > the exact watermark.
> > > > > > > > > > > >
> > > > > > > > > > > > For me, idleness is just a pipeline-specific
> heuristic
> > > and
> > > > > > should
> > > > > > > > be
> > > > > > > > > > > > viewed as such.
> > > > > > > > > > > >
> > > > > > > > > > > > Best,
> > > > > > > > > > > >
> > > > > > > > > > > > Arvid
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Jun 4, 2021 at 2:01 PM Piotr Nowojski <
> > > > > > > > pnowoj...@apache.org>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > >> Hi,
> > > > > > > > > > > >>
> > > > > > > > > > > >> > Imagine you're starting consuming from the result
> > > > channel
> > > > > > in a
> > > > > > > > > > > situation
> > > > > > > > > > > >> were you have:
> > > > > > > > > > > >> > record4, record3, StreamStatus.ACTIVE,
> > > StreamStatus.IDLE
> > > > > > > > record2,
> > > > > > > > > > > >> record1, record0
> > > > > > > > > > > >> > Switching to the encoded StreamStatus.IDLE is
> > > > unnecessary,
> > > > > > and
> > > > > > > > > might
> > > > > > > > > > > >> cause the record3 and record4 to be late depending
> on
> > > how
> > > > > the
> > > > > > > > > > watermark
> > > > > > > > > > > >> progressed in other partitions.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Yes, I understand this point. But it can also be the
> > > other
> > > > > way
> > > > > > > > > around.
> > > > > > > > > > > >> There might be a large gap between record2 and
> > record3,
> > > > and
> > > > > > > users
> > > > > > > > > > might
> > > > > > > > > > > >> prefer or might be not able to duplicate idleness
> > > > detection
> > > > > > > logic.
> > > > > > > > > The
> > > > > > > > > > > >> downstream system might be lacking some kind of
> > > > information
> > > > > > > (that
> > > > > > > > is
> > > > > > > > > > > only
> > > > > > > > > > > >> available in the top level/ingesting system) to
> > > correctly
> > > > > set
> > > > > > > the
> > > > > > > > > idle
> > > > > > > > > > > >> status.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Piotrek
> > > > > > > > > > > >>
> > > > > > > > > > > >> pt., 4 cze 2021 o 12:30 Dawid Wysakowicz <
> > > > > > > dwysakow...@apache.org>
> > > > > > > > > > > >> napisał(a):
> > > > > > > > > > > >>
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Same as Eron I don't follow this point. Any
> > streaming
> > > > sink
> > > > > > can
> > > > > > > > be
> > > > > > > > > > used
> > > > > > > > > > > >> as
> > > > > > > > > > > >> > this kind of transient channel. Streaming sinks,
> > like
> > > > > Kafka,
> > > > > > > are
> > > > > > > > > > also
> > > > > > > > > > > >> used
> > > > > > > > > > > >> > to connect one streaming system with another one,
> > also
> > > > for
> > > > > > an
> > > > > > > > > > > immediate
> > > > > > > > > > > >> > consumption.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Sure it can, but imo it is rarely the primary use
> > case
> > > > why
> > > > > > you
> > > > > > > > > want
> > > > > > > > > > to
> > > > > > > > > > > >> > offload the channels to an external persistent
> > system.
> > > > > Again
> > > > > > > in
> > > > > > > > my
> > > > > > > > > > > >> > understanding StreamStatus is something transient,
> > > e.g.
> > > > > part
> > > > > > > of
> > > > > > > > > our
> > > > > > > > > > > >> > external system went offline. I think those kind
> of
> > > > events
> > > > > > > > should
> > > > > > > > > > not
> > > > > > > > > > > be
> > > > > > > > > > > >> > persisted.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Both watermarks and idleness status can be some
> > > > > > > > > > > >> > inherent property of the underlying data stream.
> if
> > an
> > > > > > > > > > > >> upstream/ingesting
> > > > > > > > > > > >> > system knows that this particular stream/partition
> > of
> > > a
> > > > > > stream
> > > > > > > > is
> > > > > > > > > > > going
> > > > > > > > > > > >> > idle (for example for a couple of hours), why does
> > > this
> > > > > > > > > information
> > > > > > > > > > > >> have to
> > > > > > > > > > > >> > be re-created in the downstream system using some
> > > > > heuristic?
> > > > > > > It
> > > > > > > > > > could
> > > > > > > > > > > be
> > > > > > > > > > > >> > explicitly encoded.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Because it's most certainly not true in the
> > > downstream.
> > > > > The
> > > > > > > > > idleness
> > > > > > > > > > > >> works
> > > > > > > > > > > >> > usually according to a heuristic: "We have not
> seen
> > > > > records
> > > > > > > for
> > > > > > > > 5
> > > > > > > > > > > >> minutes,
> > > > > > > > > > > >> > so there is a fair chance we won't see records for
> > the
> > > > > next
> > > > > > 5
> > > > > > > > > > minutes,
> > > > > > > > > > > >> so
> > > > > > > > > > > >> > let's not wait for watermarks for now." That
> > heuristic
> > > > > most
> > > > > > > > > > certainly
> > > > > > > > > > > >> won't
> > > > > > > > > > > >> > hold for a downstream persistent storage.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Imagine you're starting consuming from the result
> > > > channel
> > > > > > in a
> > > > > > > > > > > situation
> > > > > > > > > > > >> > were you have:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > record4, record3, StreamStatus.ACTIVE,
> > > StreamStatus.IDLE
> > > > > > > > record2,
> > > > > > > > > > > >> record1,
> > > > > > > > > > > >> > record0
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Switching to the encoded StreamStatus.IDLE is
> > > > unnecessary,
> > > > > > and
> > > > > > > > > might
> > > > > > > > > > > >> cause
> > > > > > > > > > > >> > the record3 and record4 to be late depending on
> how
> > > the
> > > > > > > > watermark
> > > > > > > > > > > >> > progressed in other partitions.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I understand Eron's use case, which is not about
> > > storing
> > > > > the
> > > > > > > > > > > >> StreamStatus,
> > > > > > > > > > > >> > but performing an immediate aggregation or said
> > > > > differently
> > > > > > > > > changing
> > > > > > > > > > > the
> > > > > > > > > > > >> > partitioning/granularity of records and watermarks
> > > > > > externally
> > > > > > > to
> > > > > > > > > > > Flink.
> > > > > > > > > > > >> The
> > > > > > > > > > > >> > produced by Flink partitioning is actually never
> > > > persisted
> > > > > > in
> > > > > > > > that
> > > > > > > > > > > >> case. In
> > > > > > > > > > > >> > this case I agree exposing the StreamStatus makes
> > > > sense. I
> > > > > > am
> > > > > > > > > still
> > > > > > > > > > > >> > concerned it will lead to storing the StreamStatus
> > > which
> > > > > can
> > > > > > > > lead
> > > > > > > > > to
> > > > > > > > > > > >> many
> > > > > > > > > > > >> > subtle problems.
> > > > > > > > > > > >> > On 04/06/2021 11:53, Piotr Nowojski wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Hi,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks for picking up this discussion. For the
> > > record, I
> > > > > > also
> > > > > > > > > think
> > > > > > > > > > we
> > > > > > > > > > > >> > shouldn't expose latency markers.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > About the stream status
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >  Persisting the StreamStatus
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I don't agree with the view that sinks are
> "storing"
> > > the
> > > > > > > > > > data/idleness
> > > > > > > > > > > >> > status. This nomenclature makes only sense if we
> are
> > > > > talking
> > > > > > > > about
> > > > > > > > > > > >> > streaming jobs producing batch data.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > In my understanding a StreamStatus makes sense
> only
> > > when
> > > > > > > talking
> > > > > > > > > > about
> > > > > > > > > > > >> > immediately consumed transient channels such as
> > > between
> > > > > > > > operators
> > > > > > > > > > > within
> > > > > > > > > > > >> > a single job.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Same as Eron I don't follow this point. Any
> > streaming
> > > > sink
> > > > > > can
> > > > > > > > be
> > > > > > > > > > used
> > > > > > > > > > > >> as
> > > > > > > > > > > >> > this kind of transient channel. Streaming sinks,
> > like
> > > > > Kafka,
> > > > > > > are
> > > > > > > > > > also
> > > > > > > > > > > >> used
> > > > > > > > > > > >> > to connect one streaming system with another one,
> > also
> > > > for
> > > > > > an
> > > > > > > > > > > immediate
> > > > > > > > > > > >> > consumption.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > You could say the same thing about watermarks
> (note
> > > they
> > > > > are
> > > > > > > > > usually
> > > > > > > > > > > >> > generated in Flink based on the incoming events)
> > and I
> > > > > would
> > > > > > > not
> > > > > > > > > > agree
> > > > > > > > > > > >> with
> > > > > > > > > > > >> > it in the same way. Both watermarks and idleness
> > > status
> > > > > can
> > > > > > be
> > > > > > > > > some
> > > > > > > > > > > >> > inherent property of the underlying data stream.
> if
> > an
> > > > > > > > > > > >> upstream/ingesting
> > > > > > > > > > > >> > system knows that this particular stream/partition
> > of
> > > a
> > > > > > stream
> > > > > > > > is
> > > > > > > > > > > going
> > > > > > > > > > > >> > idle (for example for a couple of hours), why does
> > > this
> > > > > > > > > information
> > > > > > > > > > > >> have to
> > > > > > > > > > > >> > be re-created in the downstream system using some
> > > > > heuristic?
> > > > > > > It
> > > > > > > > > > could
> > > > > > > > > > > be
> > > > > > > > > > > >> > explicitly encoded.  If you want to pass
> watermarks
> > > > > > explicitly
> > > > > > > > to
> > > > > > > > > a
> > > > > > > > > > > next
> > > > > > > > > > > >> > downstream streaming system, because you do not
> want
> > > to
> > > > > > > recreate
> > > > > > > > > > them
> > > > > > > > > > > >> from
> > > > > > > > > > > >> > the events using a duplicated logic, why wouldn't
> > you
> > > > like
> > > > > > to
> > > > > > > do
> > > > > > > > > the
> > > > > > > > > > > >> same
> > > > > > > > > > > >> > thing with the idleness?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Also keep in mind that I would expect that a user
> > can
> > > > > decide
> > > > > > > > > whether
> > > > > > > > > > > he
> > > > > > > > > > > >> > wants to persist the watermarks/stream status on
> his
> > > > own.
> > > > > > This
> > > > > > > > > > > >> shouldn't be
> > > > > > > > > > > >> > obligatory.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > For me there is one good reason to not expose
> stream
> > > > > status
> > > > > > > YET.
> > > > > > > > > > That
> > > > > > > > > > > >> is,
> > > > > > > > > > > >> > if we are sure that we do not need this just yet,
> > > while
> > > > at
> > > > > > the
> > > > > > > > > same
> > > > > > > > > > > >> time we
> > > > > > > > > > > >> > don't want to expand the Public/PublicEvolving
> API,
> > as
> > > > > this
> > > > > > > > always
> > > > > > > > > > > >> > increases the maintenance cost.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Best,
> > > > > > > > > > > >> > Piotrek
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > pt., 4 cze 2021 o 10:57 Eron Wright <
> > > > > > ewri...@streamnative.io
> > > > > > > > > > .invalid>
> > > > > > > > > > > <
> > > > > > > > > > > >> ewri...@streamnative.io.invalid>
> > > > > > > > > > > >> > napisał(a):
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I believe that the correctness of watermarks and
> > > stream
> > > > > > status
> > > > > > > > > > markers
> > > > > > > > > > > >> is
> > > > > > > > > > > >> > determined entirely by the source (ignoring the
> > > generic
> > > > > > > > assigner).
> > > > > > > > > > > Such
> > > > > > > > > > > >> > stream elements are known not to overtake records,
> > and
> > > > > > aren't
> > > > > > > > > > > transient
> > > > > > > > > > > >> > from a pipeline perspective.  I do agree that
> > > recoveries
> > > > > may
> > > > > > > be
> > > > > > > > > > lossy
> > > > > > > > > > > if
> > > > > > > > > > > >> > some operator state is transient (e.g. valve
> state).
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Consider that status markers already affect the
> flow
> > > of
> > > > > > > > watermarks
> > > > > > > > > > > (e.g.
> > > > > > > > > > > >> > suppression), and thus affect operator behavior.
> > > Seems
> > > > to
> > > > > > me
> > > > > > > > that
> > > > > > > > > > > >> exposing
> > > > > > > > > > > >> > the idleness state is no different than exposing a
> > > > > > watermark.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > The high-level story is, there is a need for the
> > Flink
> > > > job
> > > > > > to
> > > > > > > be
> > > > > > > > > > > >> > transparent or neutral with respect to the event
> > time
> > > > > clock.
> > > > > > > I
> > > > > > > > > > > believe
> > > > > > > > > > > >> > this is possible if time flows with high fidelity
> > from
> > > > > > source
> > > > > > > to
> > > > > > > > > > sink.
> > > > > > > > > > > >> Of
> > > > > > > > > > > >> > course, one always has the choice as to whether to
> > use
> > > > > > > > > source-based
> > > > > > > > > > > >> > watermarks; as you mentioned, requirements vary.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Regarding the Pulsar specifics, we're working on a
> > > > > community
> > > > > > > > > > proposal
> > > > > > > > > > > >> that
> > > > > > > > > > > >> > I'm anxious to share.  To answer your question,
> the
> > > > broker
> > > > > > > > > > aggregates
> > > > > > > > > > > >> > watermarks from multiple producers who are writing
> > to
> > > a
> > > > > > single
> > > > > > > > > > topic.
> > > > > > > > > > > >> > Each sink
> > > > > > > > > > > >> > subtask is a producer.  The broker considers each
> > > > > producer's
> > > > > > > > > > > assertions
> > > > > > > > > > > >> > (watermarks, idleness) to be independent inputs,
> > much
> > > > like
> > > > > > the
> > > > > > > > > case
> > > > > > > > > > > with
> > > > > > > > > > > >> > the watermark valve.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On your concern about idleness causing false late
> > > > events,
> > > > > I
> > > > > > > > > > understand
> > > > > > > > > > > >> your
> > > > > > > > > > > >> > point but don't think it applies if the keyspace
> > > > > assignments
> > > > > > > are
> > > > > > > > > > > stable.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I hope this explains to your satisfaction.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > - Eron
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Fri, Jun 4, 2021, 12:07 AM Dawid Wysakowicz <
> > > > > > > > > > > dwysakow...@apache.org>
> > > > > > > > > > > >> <dwysakow...@apache.org>
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Hi Eron,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I might be missing some background on Pulsar
> > > > partitioning
> > > > > > but
> > > > > > > > > > > something
> > > > > > > > > > > >> > seems off to me. What is the chunk/batch/partition
> > > that
> > > > > > Pulsar
> > > > > > > > > > brokers
> > > > > > > > > > > >> > will additionally combine watermarks for? Isn't it
> > the
> > > > > case
> > > > > > > that
> > > > > > > > > > only
> > > > > > > > > > > a
> > > > > > > > > > > >> > single Flink sub-task would write to such a chunk
> > and
> > > > thus
> > > > > > > will
> > > > > > > > > > > produce
> > > > > > > > > > > >> > an aggregated watermark already via the
> > writeWatermark
> > > > > > method?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Personally I am really skeptical about exposing
> the
> > > > > > > StreamStatus
> > > > > > > > > in
> > > > > > > > > > > any
> > > > > > > > > > > >> > Producer API. In my understanding the StreamStatus
> > is
> > > a
> > > > > > > > transient
> > > > > > > > > > > >> > setting of a consumer of data. StreamStatus is a
> > > > mechanism
> > > > > > for
> > > > > > > > > > making
> > > > > > > > > > > a
> > > > > > > > > > > >> > tradeoff between correctness (how many late
> elements
> > > > that
> > > > > > are
> > > > > > > > > behind
> > > > > > > > > > > >> > watermark we have) vs making progress. IMO one has
> > to
> > > be
> > > > > > extra
> > > > > > > > > > > cautious
> > > > > > > > > > > >> > when it comes to persistent systems. Again I might
> > be
> > > > > > missing
> > > > > > > > the
> > > > > > > > > > > exact
> > > > > > > > > > > >> > use case you are trying to solve here, but I can
> > > imagine
> > > > > > > > multiple
> > > > > > > > > > jobs
> > > > > > > > > > > >> > reading from such a stream which might have
> > different
> > > > > > > > correctness
> > > > > > > > > > > >> > requirements. Just quickly throwing an idea out of
> > my
> > > > head
> > > > > > you
> > > > > > > > > might
> > > > > > > > > > > >> > want to have an entirely correct results which can
> > be
> > > > > > delayed
> > > > > > > > for
> > > > > > > > > > > >> > minutes, and a separate task that produces quick
> > > > insights
> > > > > > > within
> > > > > > > > > > > >> > seconds. Another thing to consider is that by the
> > time
> > > > the
> > > > > > > > > > downstream
> > > > > > > > > > > >> > job starts consuming the upstream one might have
> > > > produced
> > > > > > > > records
> > > > > > > > > to
> > > > > > > > > > > the
> > > > > > > > > > > >> > previously idle chunk. Persisting the StreamStatus
> > in
> > > > > such a
> > > > > > > > > > scenario
> > > > > > > > > > > >> > would add unnecessary false late events.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > In my understanding a StreamStatus makes sense
> only
> > > when
> > > > > > > talking
> > > > > > > > > > about
> > > > > > > > > > > >> > immediately consumed transient channels such as
> > > between
> > > > > > > > operators
> > > > > > > > > > > within
> > > > > > > > > > > >> > a single job.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Best,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Dawid
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On 03/06/2021 23:31, Eron Wright wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I think the rationale for end-to-end idleness
> (i.e.
> > > > > between
> > > > > > > > > > pipelines)
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > is
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the same as the rationale for idleness between
> > > operators
> > > > > > > within
> > > > > > > > a
> > > > > > > > > > > >> > pipeline.   On the 'main issue' you mentioned, we
> > > > entrust
> > > > > > the
> > > > > > > > > source
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > with
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > adapting to Flink's notion of idleness (e.g. in
> > Pulsar
> > > > > > source,
> > > > > > > > it
> > > > > > > > > > > means
> > > > > > > > > > > >> > that no topics/partitions are assigned to a given
> > > > > > sub-task); a
> > > > > > > > > > similar
> > > > > > > > > > > >> > adaption would occur in the sink.  In other
> words, I
> > > > think
> > > > > > it
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > reasonable
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > that a sink for a watermark-aware storage system
> has
> > > > need
> > > > > > for
> > > > > > > > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > idleness
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > signal.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Let me explain how I would use it in Pulsar's
> sink.
> > > > Each
> > > > > > > > sub-task
> > > > > > > > > > is
> > > > > > > > > > > a
> > > > > > > > > > > >> > Pulsar producer, and is writing watermarks to a
> > > > configured
> > > > > > > topic
> > > > > > > > > via
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Producer API.  The Pulsar broker aggregates the
> > > > watermarks
> > > > > > > that
> > > > > > > > > are
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > written
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > by each producer into a global minimum (similar to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > StatusWatermarkValve).
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > The broker keeps track of which producers are
> > actively
> > > > > > > producing
> > > > > > > > > > > >> > watermarks, and a producer may mark itself as idle
> > to
> > > > tell
> > > > > > the
> > > > > > > > > > broker
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > not
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to wait for watermarks from it, e.g. when a
> producer
> > > is
> > > > > > going
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > offline.  I
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > had intended to mark the producer as idle when the
> > > > > sub-task
> > > > > > is
> > > > > > > > > > > closing,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > but
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > now I see that it would be insufficient; the
> > producer
> > > > > should
> > > > > > > > also
> > > > > > > > > be
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > idled
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > if the sub-task is idled.  Otherwise, the broker
> > would
> > > > > wait
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > indefinitely
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > for the idled sub-task to produce a watermark.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Arvid, I think your original instincts were
> correct
> > > > about
> > > > > > > > idleness
> > > > > > > > > > > >> > propagation, and I hope I've demonstrated a
> > practical
> > > > use
> > > > > > > case.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Thu, Jun 3, 2021 at 12:49 PM Arvid Heise <
> > > > > > ar...@apache.org
> > > > > > > >
> > > > > > > > <
> > > > > > > > > > > >> ar...@apache.org> wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > When I was rethinking the idleness issue, I came
> to
> > > the
> > > > > > > > conclusion
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > that
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > it
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > should be inferred at the source of the respective
> > > > > > downstream
> > > > > > > > > > pipeline
> > > > > > > > > > > >> > again.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > The main issue on propagating idleness is that you
> > > would
> > > > > > force
> > > > > > > > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > same
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > definition across all downstream pipelines, which
> > may
> > > > not
> > > > > be
> > > > > > > > what
> > > > > > > > > > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > user
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > intended.
> > > > > > > > > > > >> > On the other hand, I don't immediately see a
> > technical
> > > > > > reason
> > > > > > > > why
> > > > > > > > > > the
> > > > > > > > > > > >> > downstream source wouldn't be able to infer that.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Thu, Jun 3, 2021 at 9:14 PM Eron Wright <
> > > > > > > > > ewri...@streamnative.io
> > > > > > > > > > > >> > .invalid> <ewri...@streamnative.io.invalid>
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks Piotr for bringing this up.  I reflected on
> > > this
> > > > > and
> > > > > > I
> > > > > > > > > agree
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > we
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > should expose idleness, otherwise a multi-stage
> flow
> > > > could
> > > > > > > > stall.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Regarding the latency markers, I don't see an
> > > immediate
> > > > > need
> > > > > > > for
> > > > > > > > > > > >> > propagating them, because they serve to estimate
> > > latency
> > > > > > > within
> > > > > > > > a
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > pipeline,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > not across pipelines.  One would probably need to
> > > > enhance
> > > > > > the
> > > > > > > > > source
> > > > > > > > > > > >> > interface also to do e2e latency.  Seems we agree
> > this
> > > > > > aspect
> > > > > > > is
> > > > > > > > > out
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > of
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > scope.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I took a look at the code to get a sense of how to
> > > > > > accomplish
> > > > > > > > > this.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > The
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > gist is a new `markIdle` method on the
> > > `StreamOperator`
> > > > > > > > interface,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > that
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > is
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > called when the stream status maintainer (the
> > > > > > `OperatorChain`)
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > transitions
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to idle state.  Then, a new `markIdle` method on
> the
> > > > > > > > > `SinkFunction`
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > and
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > `SinkWriter` that is called by the respective
> > > operators.
> > > > > > >  Note
> > > > > > > > > that
> > > > > > > > > > > >> > StreamStatus is an internal class.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Here's a draft PR (based on the existing PR of
> > > > > FLINK-22700)
> > > > > > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > highlight
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > this new aspect:
> > > > > > > > > https://github.com/streamnative/flink/pull/2/files
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Please let me know if you'd like me to proceed to
> > > update
> > > > > the
> > > > > > > > FLIP
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > with
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > these details.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks again,
> > > > > > > > > > > >> > Eron
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Thu, Jun 3, 2021 at 7:56 AM Piotr Nowojski <
> > > > > > > > > pnowoj...@apache.org
> > > > > > > > > > >
> > > > > > > > > > > <
> > > > > > > > > > > >> pnowoj...@apache.org>
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Hi,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Sorry for chipping in late in the discussion, but
> I
> > > > would
> > > > > > > second
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > this
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > point
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > from Arvid:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 4. Potentially, StreamStatus and LatencyMarker
> would
> > > > also
> > > > > > need
> > > > > > > > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > be
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > encoded.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > It seems like this point was asked, but not
> > followed?
> > > Or
> > > > > > did I
> > > > > > > > > miss
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > it?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Especially the StreamStatus part. For me it sounds
> > > like
> > > > > > > exposing
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > watermarks
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > without letting the sink know that the stream can
> be
> > > > idle
> > > > > is
> > > > > > > an
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > incomplete
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > feature and can be very problematic/confusing for
> > > > > potential
> > > > > > > > users.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Best,
> > > > > > > > > > > >> > Piotrek
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > pon., 31 maj 2021 o 08:34 Arvid Heise <
> > > ar...@apache.org
> > > > >
> > > > > <
> > > > > > > > > > > >> ar...@apache.org>
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > napisał(a):
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Afaik everyone can start a [VOTE] thread [1]. For
> > > > example,
> > > > > > > here
> > > > > > > > a
> > > > > > > > > > > >> > non-committer started a successful thread [2].
> > > > > > > > > > > >> > If you start it, I can already cast a binding vote
> > and
> > > > we
> > > > > > just
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > need 2
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > more
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > for the FLIP to be accepted.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [1]
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120731026#FlinkBylaws-Voting
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [2]
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Deprecating-Mesos-support-td50142.html
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Fri, May 28, 2021 at 8:17 PM Eron Wright <
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > ewri...@streamnative.io
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > .invalid>
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Arvid,
> > > > > > > > > > > >> > Thanks for the feedback.  I investigated the
> japicmp
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > configuration,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > and I
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > see that SinkWriter is marked Experimental (not
> > Public
> > > > or
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > PublicEvolving).
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I think this means that SinkWriter need not be
> > > excluded.
> > > > > As
> > > > > > > you
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > mentioned,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > SinkFunction is already excluded.  I've updated
> the
> > > FLIP
> > > > > > with
> > > > > > > an
> > > > > > > > > > > >> > explanation.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I believe all issues are resolved.  May we proceed
> > to
> > > a
> > > > > vote
> > > > > > > > now?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > And
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > are
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > you able to drive the vote process?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks,
> > > > > > > > > > > >> > Eron
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Fri, May 28, 2021 at 4:40 AM Arvid Heise <
> > > > > > ar...@apache.org
> > > > > > > >
> > > > > > > > <
> > > > > > > > > > > >> ar...@apache.org>
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Hi Eron,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 1. fair point. It still feels odd to have
> > > writeWatermark
> > > > > in
> > > > > > > the
> > > > > > > > > > > >> > SinkFunction (it's supposed to be functional as
> you
> > > > > > > mentioned),
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > but I
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > agree
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > that invokeWatermark is not better. So unless
> > someone
> > > > has
> > > > > a
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > better
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > idea,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I'm fine with it.
> > > > > > > > > > > >> > 2.+3. I tried to come up with scenarios for a
> longer
> > > > time.
> > > > > > In
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > general,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > it
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > seems as if the new SinkWriter interface
> encourages
> > > more
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > injection
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > (see
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > processing time service in InitContext), such that
> > the
> > > > > need
> > > > > > > for
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > context
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > is really just context information of that
> > particular
> > > > > record
> > > > > > > and
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > don't
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > see any use beyond timestamp and watermark. For
> > > > > > SinkFunction,
> > > > > > > > I'd
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > not
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > over-engineer as it's going to be deprecated
> > soonish.
> > > So
> > > > > +1
> > > > > > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > leave
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > it
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > out.
> > > > > > > > > > > >> > 4. Okay so I double-checked: from an execution
> > > > > perspective,
> > > > > > it
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > works.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > However, japicmp would definitely complain. I
> > propose
> > > to
> > > > > add
> > > > > > > it
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > compatibility section like this. We need to add an
> > > > > exception
> > > > > > > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > SinkWriter
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > then. (SinkFunction is already on the exception
> > list)
> > > > > > > > > > > >> > 5.+6. Awesome, I was also sure but wanted to
> double
> > > > check.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Best,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Arvid
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Wed, May 26, 2021 at 7:29 PM Eron Wright <
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > ewri...@streamnative.io
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > .invalid>
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Arvid,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 1. I assume that the method name `invoke` stems
> from
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > considering
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > SinkFunction to be a functional interface, but is
> > > > > otherwise
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > meaningless.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Keeping it as `writeWatermark` does keep it
> > symmetric
> > > > with
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > SinkWriter.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > My
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > vote is to leave it.  You decide.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 2+3. I too considered adding a `WatermarkContext`,
> > but
> > > > it
> > > > > > > would
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > merely
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > be a
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > placeholder.  I don't anticipate any context info
> in
> > > > > future.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > As
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > we
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > see
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > with invoke, it is possible to add a context later
> > in
> > > a
> > > > > > > > > > > >> > backwards-compatible way.  My vote is to not
> > > introduce a
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > context.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > You
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > decide.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 4. No anticipated compatibility issues.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 5. Short answer, it works as expected.  The new
> > > methods
> > > > > are
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > invoked
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > whenever the underlying operator receives a
> > watermark.
> > > > I
> > > > > do
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > believe
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > that
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > batch and ingestion time applications receive
> > > > watermarks.
> > > > > > > Seems
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > programming model is more unified in that respect
> > > since
> > > > > 1.12
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > (FLIP-134).
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 6. The failure behavior is the same as for
> elements.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks,
> > > > > > > > > > > >> > Eron
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Tue, May 25, 2021 at 12:42 PM Arvid Heise <
> > > > > > > ar...@apache.org
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Hi Eron,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I think the FLIP is crisp and mostly good to go.
> > Some
> > > > > > smaller
> > > > > > > > > > > >> > things/questions:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >    1. SinkFunction#writeWatermark could be named
> > > > > > > > > > > >> >    SinkFunction#invokeWatermark or
> invokeOnWatermark
> > > to
> > > > > keep
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > it
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > symmetric.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >    2. We could add the context parameter to both.
> > For
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > SinkWriter#Context,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >    we currently do not gain much.
> > SinkFunction#Context
> > > > > also
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > exposes
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > processing
> > > > > > > > > > > >> >    time, which may or may not be handy and is
> > > currently
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > mostly
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > used
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > for
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >    StreamingFileSink bucket policies. We may add
> > that
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > processing
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > time
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > flag
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >    also to SinkWriter#Context in the future.
> > > > > > > > > > > >> >    3. Alternatively, we could also add a different
> > > > context
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > parameter
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > just
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >    to keep the API stable while allowing
> additional
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > information
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > be
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > passed
> > > > > > > > > > > >> >    in the future.
> > > > > > > > > > > >> >    4. Would we run into any compatibility issue if
> > we
> > > > use
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Flink
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 1.13
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > source
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >    in Flink 1.14 (with this FLIP) or vice versa?
> > > > > > > > > > > >> >    5. What happens with sinks that use the new
> > methods
> > > > in
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > applications
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > that
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >    do not have watermarks (batch mode, processing
> > > time)?
> > > > > > Does
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > this
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > also
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > work
> > > > > > > > > > > >> >    with ingestion time sufficiently?
> > > > > > > > > > > >> >    6. How do exactly once sinks deal with written
> > > > > watermarks
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > in
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > case
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > of
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >    failure? I guess it's the same as normal
> records.
> > > > > (Either
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > rollback
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > of
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >    transaction or deduplication on resumption)
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Best,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Arvid
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Tue, May 25, 2021 at 6:44 PM Eron Wright <
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > ewri...@streamnative.io
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > .invalid>
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Does anyone have further comment on FLIP-167?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks,
> > > > > > > > > > > >> > Eron
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Thu, May 20, 2021 at 5:02 PM Eron Wright <
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > ewri...@streamnative.io
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Filed FLIP-167: Watermarks for Sink API:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-167%3A+Watermarks+for+Sink+API
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I'd like to call a vote next week, is that
> > reasonable?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Wed, May 19, 2021 at 6:28 PM Zhou, Brian <
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > b.z...@dell.com
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Hi Arvid and Eron,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks for the discussion and I read through
> Eron's
> > > pull
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > request
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > and I
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > think this can benefit Pravega Flink connector as
> > > well.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Here is some background. Pravega had the watermark
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > concept
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > through
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > event stream since two years ago, and here is a
> blog
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > introduction[1]
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > for
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Pravega watermark.
> > > > > > > > > > > >> > Pravega Flink connector also had this watermark
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > integration
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > last
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > year
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > that we wanted to propagate the Flink watermark to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Pravega
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > in
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > SinkFunction, and at that time we just used the
> > > existing
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Flink
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > API
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > that
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > we
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > keep the last watermark in memory and check if
> > > watermark
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > changes
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > for
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > each
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > event[2] which is not efficient. With such new
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > interface,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > we
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > can
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > also
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > manage the watermark propagation much more easily.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [1]
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > >
> > https://pravega.io/blog/2019/11/08/pravega-watermarking-support/
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [2]
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L465
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > -----Original Message-----
> > > > > > > > > > > >> > From: Arvid Heise <ar...@apache.org> <
> > > ar...@apache.org>
> > > > > > > > > > > >> > Sent: Wednesday, May 19, 2021 16:06
> > > > > > > > > > > >> > To: dev
> > > > > > > > > > > >> > Subject: Re: [DISCUSS] Watermark propagation with
> > Sink
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > API
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [EXTERNAL EMAIL]
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Hi Eron,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks for pushing that topic. I can now see that
> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > benefit
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > is
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > even
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > bigger than I initially thought. So it's
> worthwhile
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > anyways
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > include
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > that.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I also briefly thought about exposing watermarks
> to
> > > all
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > UDFs,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > but
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > here I
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > really have an issue to see specific use cases.
> > Could
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > you
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > maybe
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > take a
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > few
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > minutes to think about it as well? I could only
> see
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > someone
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > misusing
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Async
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > IO as a sink where a real sink would be more
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > appropriate.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > In
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > general,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > if
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > there is not a clear use case, we shouldn't add
> the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > functionality
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > as
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > it's
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > just increased maintenance for no value.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > If we stick to the plan, I think your PR is
> already
> > > in a
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > good
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > shape.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > We
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > need to create a FLIP for it though, since it
> > changes
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Public
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > interfaces
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [1]. I was initially not convinced that we should
> > also
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > change
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > old
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > SinkFunction interface, but seeing how little the
> > > change
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > is, I
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > wouldn't
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > mind at all to increase consistency. Only when we
> > > wrote
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > FLIP
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > and
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > approved it (which should be minimal and fast), we
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > should
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > actually
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > look
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > at
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the PR ;).
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > The only thing which I would improve is the name
> of
> > > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > function.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > processWatermark sounds as if the sink implementer
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > really
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > needs
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > implement it (as you would need to do it on a
> custom
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > operator).
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > would
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > make them symmetric to the record writing/invoking
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > method
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > (e.g.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > writeWatermark and invokeWatermark).
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > As a follow-up PR, we should then migrate
> > KafkaShuffle
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > new
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > API.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > But that's something I can do.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [1]
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/Flink*Improvement*Proposals__;Kys!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnp6nc7o$
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [cwiki[.]apache[.]org]
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Wed, May 19, 2021 at 3:34 AM Eron Wright <
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > ewri...@streamnative.io
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > .invalid>
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Update: opened an issue and a PR.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > >
> > > > > > > >
> > > > >
> > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > >
> > > > > > >
> > > >
> K-22700__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dM
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > plbgRO4$ [issues[.]apache[.]org]
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > >
> > > > > > > >
> > > > >
> > https://urldefense.com/v3/__https://github.com/apache/flink/pull/15950
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > >
> > > > > > >
> > > >
> __;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtScmG7a
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > $ [github[.]com]
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Tue, May 18, 2021 at 10:03 AM Eron Wright <
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > ewri...@streamnative.io
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks Arvid and David for sharing your ideas on
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > this
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > subject.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I'm
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > glad to hear that you're seeing use cases for
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > watermark
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > propagation
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > via an enhanced sink interface.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > As you've guessed, my interest is in Pulsar and am
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > exploring
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > some
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > options for brokering watermarks across stream
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > processing
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > pipelines.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I think
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Arvid
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > is speaking to a high-fidelity solution where the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > difference
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > between
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > intra-
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > and inter-pipeline flow is eliminated.  My goal is
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > more
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > limited; I
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > want
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > write the watermark that arrives at the sink to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Pulsar.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Simply
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > imagine that Pulsar has native support for
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > watermarking
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > in
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > its
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > producer/consumer API, and we'll leave the details
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > another
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > forum.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > David, I like your invariant.  I see lateness as
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > stemming
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > from
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > problem
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > domain and from system dynamics (e.g. scheduling,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > batching,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > lag).
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > When
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > one
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > depends on order-of-observation to generate
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > watermarks,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > app
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > may
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > become
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > unduly sensitive to dynamics which bear on
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > order-of-observation.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > My
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > goal is to factor out the system dynamics from
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > lateness
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > determination.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Arvid, to be most valuable (at least for my
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > purposes)
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > enhancement is needed on SinkFunction.  This will
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > allow
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > us
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > easily
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > evolve the existing Pulsar connector.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Next step, I will open a PR to advance the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > conversation.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Eron
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Tue, May 18, 2021 at 5:06 AM David Morávek<
> > > > > > > > > > david.mora...@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > > >> <david.mora...@gmail.com>
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Hi Eron,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks for starting this discussion. I've been
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > thinking
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > about
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > this
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > recently as we've run into "watermark related"
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > issues,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > when
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > chaining multiple pipelines together. My to cents
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > discussion:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > How I like to think about the problem, is that
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > there
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > should
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > an
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > invariant that holds for any stream processing
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > pipeline:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > "NON_LATE
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > element
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > entering
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the system, should never become LATE"
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Unfortunately this is exactly what happens in
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > downstream
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > pipelines,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > because the upstream one can:
> > > > > > > > > > > >> > - break ordering (especially with higher
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > parallelism)
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > - emit elements that are ahead of output watermark
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > There is not enough information to re-construct
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > upstream
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > watermark
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > in latter stages (it's always just an estimate
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > based
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > on
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > previous
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > pipeline's output).
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > It would be great, if we could have a general
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > abstraction,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > that
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > is
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > reusable for various sources / sinks (not just
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Kafka
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > /
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Pulsar,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > thought this would probably cover most of the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > use-cases)
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > and
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > systems.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Is there any other use-case then sharing watermark
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > between
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > pipelines,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > that
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > you're trying to solve?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Arvid:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 1. Watermarks are closely coupled to the used
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > system
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > (=Flink).
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > have a
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > hard time imagining that it's useful to use a
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > different
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > stream
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > processor
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > downstream. So for now, I'm assuming that both
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > upstream
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > and
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > downstream
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > are
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Flink applications. In that case, we probably
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > define
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > both
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > parts
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > of the pipeline in the same Flink job similar to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > KafkaStream's
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > #through.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I'd slightly disagree here. For example we're
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > "materializing"
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > change-logs
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > produced by Flink pipeline into serving layer
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > (random
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > access
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > db /
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > in memory view / ..) and we need to know, whether
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > responses
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > we
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > serve meet the "freshness" requirements (eg. you
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > may
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > want
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > respond differently, when watermark is lagging way
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > too
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > much
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > behind
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > processing time). Also not
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > every
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > stream processor in the pipeline needs to be
> Flink.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > It
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > can
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > as
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > well
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > be a simple element-wise transformation that reads
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > from
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Kafka
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > and
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > writes back into separate topic (that's what we do
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > for
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > example
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > with
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > ML models, that have special hardware
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > requirements).
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Best,
> > > > > > > > > > > >> > D.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Tue, May 18, 2021 at 8:30 AM Arvid Heise <
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > ar...@apache.org>
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Hi Eron,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I think this is a useful addition for storage
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > systems
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > that
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > act
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > as
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > pass-through for Flink to reduce recovery time.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > It
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > is
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > only
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > useful
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > if
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > you
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > combine it with regional fail-over as only a
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > small
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > part
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > of
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > pipeline
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > is
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > restarted.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > A couple of thoughts on the implications:
> > > > > > > > > > > >> > 1. Watermarks are closely coupled to the used
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > system
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > (=Flink).
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > have
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > a
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > hard time imagining that it's useful to use a
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > different
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > stream
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > processor
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > downstream. So for now, I'm assuming that both
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > upstream
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > and
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > downstream
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > are
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Flink applications. In that case, we probably
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > define
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > both
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > parts
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > of the pipeline in the same Flink job similar to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > KafkaStream's
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > #through.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 2. The schema of the respective intermediate
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > stream/topic
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > would
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > need
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > be
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > managed by Flink to encode both records and
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > watermarks.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > This
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > reduces
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > usability quite a bit and needs to be carefully
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > crafted.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 3. It's not clear to me if constructs like
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > SchemaRegistry
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > can
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > be
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > properly
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > supported (and also if they should be supported)
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > in
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > terms
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > of
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > schema evolution.
> > > > > > > > > > > >> > 4. Potentially, StreamStatus and LatencyMarker
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > would
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > also
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > need
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > be encoded.
> > > > > > > > > > > >> > 5. It's important to have some way to transport
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > backpressure
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > from
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the downstream to the upstream. Or else you would
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > have
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > same
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > issue as KafkaStreams where two separate
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > pipelines
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > can
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > drift
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > so
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > far away that
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > you
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > experience data loss if the data retention period
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > is
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > smaller
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > than
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the drift.
> > > > > > > > > > > >> > 6. It's clear that you trade a huge chunk of
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > throughput
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > for
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > lower
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > overall
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > latency in case of failure. So it's an
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > interesting
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > feature
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > for
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > use
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > cases
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > with SLAs.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Since we are phasing out SinkFunction, I'd prefer
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > only
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > support
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > SinkWriter. Having a no-op default sounds good to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > me.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > We have some experimental feature for Kafka [1],
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > which
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > pretty
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > much
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > reflects
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > your idea. Here we have an ugly workaround to be
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > able
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > process
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the watermark by using a custom StreamSink task.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > We
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > could
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > also
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > try to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > create a
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > FLIP that abstracts the actual system away and
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > then
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > we
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > could
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > use
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the approach for both Pulsar and Kafka.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [1]
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > >
> > > > > > > >
> > > > >
> > https://urldefense.com/v3/__https://github.com/apache/flink/blob/maste
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > >
> > > > > > >
> > > >
> r/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flin
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > >
> > > > > > >
> > > >
> k/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java*L103__;Iw!
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > >
> > > > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMvmemHrt$
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [github[.]com]
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Mon, May 17, 2021 at 10:44 PM Eron
> > > > > > > > > > > >> Wright<ewri...@streamnative.io.invalid> <
> > > > > > > ewri...@streamnative.io
> > > > > > > > > > > .invalid>
> > > > > > > > > > > >> wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I would like to propose an enhancement to the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Sink
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > API,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > ability
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > receive upstream watermarks.   I'm aware that
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > sink
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > context
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > provides
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > current watermark for a given record.  I'd like
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > be
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > able
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > write
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > a
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > sink
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > function that is invoked whenever the watermark
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > changes.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Out
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > of
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > scope
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > would be event-time timers (since sinks aren't
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > keyed).
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > For context, imagine that a stream storage
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > system
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > had
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > the
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > ability to persist watermarks in addition to
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > ordinary
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > elements,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > e.g. to serve
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > as
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > source watermarks in a downstream processor.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Ideally
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > one
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > could
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > compose a
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > multi-stage, event-driven application, with
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > watermarks
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > flowing
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > end-to-end
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > without need for a heuristics-based watermark
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > at
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > each
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > stage.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > The specific proposal would be a new method on
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > `SinkFunction`
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > and/or
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > on
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > `SinkWriter`, called 'processWatermark' or
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 'writeWatermark',
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > with a
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > default
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > implementation that does nothing.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thoughts?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks!
> > > > > > > > > > > >> > Eron Wright
> > > > > > > > > > > >> > StreamNative
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > --
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > > > > >> > streamnative.io |  Meet with me
> > > > > > > > > > > >> > <
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > >
> > > > >
> https://urldefense.com/v3/__https://calendly.com/eronwright/regular
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > >
> > > > >
> -1-hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > dMtQrD25c$ [calendly[.]com]>
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > <
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > >
> > > > >
> https://urldefense.com/v3/__https://github.com/streamnative__;!!LpK
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > I!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [github[.]com]>
> > > > > > > > > > > >> > <
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > >
> > > > >
> https://urldefense.com/v3/__https://www.linkedin.com/company/stream
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > >
> > > > >
> native/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > dMqO4UZJa$ [linkedin[.]com]>
> > > > > > > > > > > >> > <
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > >
> > > https://urldefense.com/v3/__https://twitter.com/streamnativeio/__
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > ;!
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > >
> > > > !LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [twitter[.]com]>
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > --
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > > > > >> > streamnative.io |  Meet with me
> > > > > > > > > > > >> > <
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > >
> > > > > > >
> > > >
> https://urldefense.com/v3/__https://calendly.com/eronwright/regular-1
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > >
> > > > > > >
> > > >
> -hour__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMtQ
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > rD25c$ [calendly[.]com]>
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > <
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > >
> > > > > >
> > https://urldefense.com/v3/__https://github.com/streamnative__;!!LpKI
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > !
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > 2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMnQskrSQ$
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [github[.]com]>
> > > > > > > > > > > >> > <
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > >
> > > > > > >
> > > >
> https://urldefense.com/v3/__https://www.linkedin.com/company/streamna
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > >
> > > > > > >
> > > >
> tive/__;!!LpKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMqO
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 4UZJa$ [linkedin[.]com]>
> > > > > > > > > > > >> > <
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > >
> > > > > > >
> > > >
> https://urldefense.com/v3/__https://twitter.com/streamnativeio/__;!!L
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > >
> > pKI!2IQYKfnjRuBgkNRxnPbJeFvTdhWjpwN0urN3m0yz_6W11H74kY5dMpbyC_rP$
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [twitter[.]com]>
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > --
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > > > > >> > streamnative.io |  Meet with me<
> > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> <
> > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour>
> > > > > > > > > > > >> > <https://github.com/streamnative> <
> > > > > > > > > https://github.com/streamnative
> > > > > > > > > > ><
> > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/> <
> > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/><
> > > > > > > > > > > >> https://twitter.com/streamnativeio/> <
> > > > > > > > > > > https://twitter.com/streamnativeio/
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > --
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > > > > >> > streamnative.io |  Meet with me<
> > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> <
> > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour>
> > > > > > > > > > > >> > <https://github.com/streamnative> <
> > > > > > > > > https://github.com/streamnative
> > > > > > > > > > ><
> > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/> <
> > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/><
> > > > > > > > > > > >> https://twitter.com/streamnativeio/> <
> > > > > > > > > > > https://twitter.com/streamnativeio/
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > --
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > > > > >> > streamnative.io |  Meet with me<
> > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> <
> > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour>
> > > > > > > > > > > >> > <https://github.com/streamnative> <
> > > > > > > > > https://github.com/streamnative
> > > > > > > > > > ><
> > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/> <
> > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/><
> > > > > > > > > > > >> https://twitter.com/streamnativeio/> <
> > > > > > > > > > > https://twitter.com/streamnativeio/
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > --
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > > > > >> > streamnative.io |  Meet with me<
> > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> <
> > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour>
> > > > > > > > > > > >> > <https://github.com/streamnative> <
> > > > > > > > > https://github.com/streamnative
> > > > > > > > > > ><
> > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/> <
> > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/><
> > > > > > > > > > > >> https://twitter.com/streamnativeio/> <
> > > > > > > > > > > https://twitter.com/streamnativeio/
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > --
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Eron Wright   Cloud Engineering Lead
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > p: +1 425 922 8617 <18163542939>
> > > > > > > > > > > >> > streamnative.io |  Meet with me<
> > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour> <
> > > > > > > > > > > >> https://calendly.com/eronwright/regular-1-hour>
> > > > > > > > > > > >> > <https://github.com/streamnative> <
> > > > > > > > > https://github.com/streamnative
> > > > > > > > > > ><
> > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/> <
> > > > > > > > > > > >> https://www.linkedin.com/company/streamnative/><
> > > > > > > > > > > >> https://twitter.com/streamnativeio/> <
> > > > > > > > > > > https://twitter.com/streamnativeio/
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to