Hi Jark, John,

Thank you for the discussion! I will proceed with completing the patch that
adds exactly-once to upsert-kafka connector.

Best,
Alexander

On Wed, Apr 12, 2023 at 12:21 AM Jark Wu <imj...@gmail.com> wrote:

> Hi John,
>
> Thank you for your valuable input. It sounds reasonable to me.
>
> From this point of view, the exactly-once is used to guarantee transaction
> semantics other than avoid duplication/upserts.
> This is similar to the JDBC connectors that already support eventual
> consistency with idempotent updates, but we still add the support of
> 2PC[1].
>
> Best,
> Jark
>
> [1]:
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink
>
> On Wed, 12 Apr 2023 at 10:36, John Roesler <vvcep...@apache.org> wrote:
>
> > Hi Jark,
> >
> > I hope you don’t mind if I chime in.
> >
> > You have a good point that the sequence of upserts will eventually
> > converge to the correct value under the at-least-once delivery guarantee,
> > but it can still be important to avoid passing on uncommitted results.
> Some
> > thoughts, numbered for reference:
> >
> > 1. Most generally, if some result R is written to the sink topic, but
> then
> > the job fails before a checkpoint, rolls back, and reprocesses, producing
> > R’, then it is incorrect to call R an “upsert”. In fact, as far as the
> > system is concerned, R never happened at all (because it was part of a
> > rolled-back batch of processing).
> >
> > 2. Readers may reasonably wish to impose some meaning on the sequence of
> > upserts itself, so including aborted results can lead to wrong semantics
> > downstream. Eg: “how many times has ‘x’ been updated today”?
> >
> > 3. Note that processing may not be deterministic over failures, and,
> > building on (2), readers may have an expectation that every record in the
> > topic corresponds to a real value that was associated with that key at
> some
> > point. Eg, if we start with x=1, checkpoint, then produce x=99, crash,
> > restart and produce x=2. Under at-least-once, the history of x
> is[1,99,2],
> > while exactly-once would give the correct history of [1,2]. If we set up
> an
> > alert if the value of x is ever greater over 10, then at-least-once will
> > erroneously alert us, while exactly-once does not.
> >
> > 4. Sending results for failed processing can also cause operational
> > problems: if you’re processing a high volume of data, and you get into a
> > crash loop, you can create a flood of repeated results. I’ve seen this
> case
> > cause real world pain for people, and it’s nice to have a way to avoid
> it.
> >
> > I hope some of these examples show why a user might reasonably want to
> > configure the connector with the exactly-once guarantee.
> >
> > Thanks!
> > -John
> >
> > On Sat, Apr 8, 2023, at 10:03, Jark Wu wrote:
> > > Hi Alexander,
> > >
> > > Yes, Kafka’s exactly-once semantics are used to avoid duplicated
> records
> > in
> > > case of producer retries
> > > or failovers. But as I explained above, it can’t avoid intentionally
> > > duplicated records. Actually, I would
> > > like to call them "upsert records" instead of "duplicates", that's why
> > the
> > > connector is named "upsert-kafka",
> > > to make Kafka work like a database that supports updating and deleting
> by
> > > key.
> > >
> > > For example, there is a SQL query:
> > >
> > > SELECT URL, COUNT(*) page_views
> > > FROM access_logs
> > > GROUP BY URL;
> > >
> > > This is a continuous query[1] that continuously emits a new <url,
> > > page_views> record once a new URL
> > > access entry is received. The same URLs in the log may be far away and
> be
> > > processed in different checkpoints.
> > >
> > > It's easy to make upsert-kafka to support exactly-once delivery
> > guarantee,
> > > but as we discussed above,
> > > it's unnecessary to support it and we intend to expose as few
> > > configurations to users as possible.
> > >
> > >
> > > Best,
> > > Jark
> > >
> > > [1]
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/
> > >
> > >
> > >
> > > On Sat, 8 Apr 2023 at 02:42, Alexander Sorokoumov
> > > <asorokou...@confluent.io.invalid> wrote:
> > >
> > >> Hi Jark,
> > >>
> > >> To my knowledge, Kafka's EXACTLY_ONCE transactions together with
> > idempotent
> > >> producers prevent duplicated records[1], at least in the cases when
> > >> upstream does not produce them intentionally and across checkpoints.
> > >>
> > >> Could you please elaborate or point me to the docs that explain the
> > reason
> > >> for duplicated records upstream and across checkpoints? I am
> relatively
> > new
> > >> to Flink and not aware of it. According to the kafka connector
> > >> documentation, it does support exactly once semantics by configuring '
> > >> sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why
> we
> > >> can't make upsert-kafka configurable in the same way to support this
> > >> delivery guarantee.
> > >>
> > >> Thank you,
> > >> Alexander
> > >>
> > >> 1.
> > >>
> > >>
> >
> https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
> > >> 2.
> > >>
> > >>
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/#consistency-guarantees
> > >>
> > >>
> > >> On Fri, Apr 7, 2023 at 3:44 AM Jark Wu <imj...@gmail.com> wrote:
> > >>
> > >> > Hi Alexander,
> > >> >
> > >> > I’m not sure I fully understand the reasons. I left my comments
> > inline.
> > >> >
> > >> > > 1. There might be other non-Flink topic consumers that would
> rather
> > not
> > >> > have duplicated records.
> > >> >
> > >> > Exactly once can’t avoid producing duplicated records. Because the
> > >> upstream
> > >> > produces duplicated records intentionally and across checkpoints.
> > Exactly
> > >> > once
> > >> > can’t recognize duplicated records and drop duplications.  That
> means
> > >> > duplicated
> > >> > records are written into topics even if exactly-once mode is
> enabled.
> > >> >
> > >> >
> > >> > > 2. Multiple upsert-kafka producers might cause keys to roll back
> to
> > >> > previous values.
> > >> >
> > >> > Sorry, I don’t understand how exactly once can prevent this rollback
> > >> > behavior.
> > >> > Even in your example with EXACTLY_ONCE enabled, the x will go to a5,
> > and
> > >> > b5,
> > >> > then back a5 if jobs perform checkpoints after producing records.
> > >> >
> > >> >
> > >> > Best,
> > >> > Jark
> > >> >
> > >> >
> > >> > > 2023年4月5日 09:39,Alexander Sorokoumov <asorokou...@confluent.io
> > >> .INVALID>
> > >> > 写道:
> > >> > >
> > >> > > Hello Flink community,
> > >> > >
> > >> > > I would like to discuss if it is worth adding EXACTLY_ONCE
> delivery
> > >> > > semantics to upsert-kafka connector. According to upsert-kafka
> > docs[1]
> > >> > and
> > >> > > ReducingUpsertSink javadoc[2], the connector is correct even with
> > >> > duplicate
> > >> > > records under AT_LEAST_ONCE because the records are idempotent,
> and
> > the
> > >> > > read path de-duplicates them. However, there are at least 2
> reasons
> > to
> > >> > > configure the connector with EXACTLY_ONCE:
> > >> > >
> > >> > > 1. There might be other non-Flink topic consumers that would
> rather
> > not
> > >> > > have duplicated records.
> > >> > > 2. Multiple upsert-kafka producers might cause keys to roll back
> to
> > >> > > previous values. Consider a scenario where 2 producing jobs A and
> B
> > >> write
> > >> > > to the same topic with AT_LEAST_ONCE, and a consuming job reads
> from
> > >> the
> > >> > > topic. Both producers write unique, monotonically increasing
> > sequences
> > >> to
> > >> > > the same key. Job A writes x=a1,a2,a3,a4,a5… Job B writes
> > >> > > x=b1,b2,b3,b4,b5,.... With this setup, we can have the following
> > >> > sequence:
> > >> > >
> > >> > >   1. Job A produces x=a5.
> > >> > >   2. Job B produces x=b5.
> > >> > >   3. Job A produces the duplicate write x=5.
> > >> > >
> > >> > > The consuming job would observe x going to a5, then to b5, then
> back
> > >> a5.
> > >> > > EXACTLY_ONCE would prevent this behavior.
> > >> > >
> > >> > > I created https://issues.apache.org/jira/browse/FLINK-31408 and a
> > WIP
> > >> > patch
> > >> > > to add EXACTLY_ONCE to upsert-kafka, but would like to know what
> the
> > >> > > community thinks about it before moving forward with it.
> > >> > >
> > >> > > Thanks,
> > >> > > Alexander
> > >> > >
> > >> > > 1.
> > >> > >
> > >> >
> > >>
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees
> > >> > > 2.
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/flink-connector-kafka/blob/40cf9994dd847c13602acf1f90895cf9f89b2ce6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L31-L37
> > >> >
> > >> >
> > >>
> >
>

Reply via email to