Hi Jark, John, Thank you for the discussion! I will proceed with completing the patch that adds exactly-once to upsert-kafka connector.
Best, Alexander On Wed, Apr 12, 2023 at 12:21 AM Jark Wu <imj...@gmail.com> wrote: > Hi John, > > Thank you for your valuable input. It sounds reasonable to me. > > From this point of view, the exactly-once is used to guarantee transaction > semantics other than avoid duplication/upserts. > This is similar to the JDBC connectors that already support eventual > consistency with idempotent updates, but we still add the support of > 2PC[1]. > > Best, > Jark > > [1]: > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink > > On Wed, 12 Apr 2023 at 10:36, John Roesler <vvcep...@apache.org> wrote: > > > Hi Jark, > > > > I hope you don’t mind if I chime in. > > > > You have a good point that the sequence of upserts will eventually > > converge to the correct value under the at-least-once delivery guarantee, > > but it can still be important to avoid passing on uncommitted results. > Some > > thoughts, numbered for reference: > > > > 1. Most generally, if some result R is written to the sink topic, but > then > > the job fails before a checkpoint, rolls back, and reprocesses, producing > > R’, then it is incorrect to call R an “upsert”. In fact, as far as the > > system is concerned, R never happened at all (because it was part of a > > rolled-back batch of processing). > > > > 2. Readers may reasonably wish to impose some meaning on the sequence of > > upserts itself, so including aborted results can lead to wrong semantics > > downstream. Eg: “how many times has ‘x’ been updated today”? > > > > 3. Note that processing may not be deterministic over failures, and, > > building on (2), readers may have an expectation that every record in the > > topic corresponds to a real value that was associated with that key at > some > > point. Eg, if we start with x=1, checkpoint, then produce x=99, crash, > > restart and produce x=2. Under at-least-once, the history of x > is[1,99,2], > > while exactly-once would give the correct history of [1,2]. If we set up > an > > alert if the value of x is ever greater over 10, then at-least-once will > > erroneously alert us, while exactly-once does not. > > > > 4. Sending results for failed processing can also cause operational > > problems: if you’re processing a high volume of data, and you get into a > > crash loop, you can create a flood of repeated results. I’ve seen this > case > > cause real world pain for people, and it’s nice to have a way to avoid > it. > > > > I hope some of these examples show why a user might reasonably want to > > configure the connector with the exactly-once guarantee. > > > > Thanks! > > -John > > > > On Sat, Apr 8, 2023, at 10:03, Jark Wu wrote: > > > Hi Alexander, > > > > > > Yes, Kafka’s exactly-once semantics are used to avoid duplicated > records > > in > > > case of producer retries > > > or failovers. But as I explained above, it can’t avoid intentionally > > > duplicated records. Actually, I would > > > like to call them "upsert records" instead of "duplicates", that's why > > the > > > connector is named "upsert-kafka", > > > to make Kafka work like a database that supports updating and deleting > by > > > key. > > > > > > For example, there is a SQL query: > > > > > > SELECT URL, COUNT(*) page_views > > > FROM access_logs > > > GROUP BY URL; > > > > > > This is a continuous query[1] that continuously emits a new <url, > > > page_views> record once a new URL > > > access entry is received. The same URLs in the log may be far away and > be > > > processed in different checkpoints. > > > > > > It's easy to make upsert-kafka to support exactly-once delivery > > guarantee, > > > but as we discussed above, > > > it's unnecessary to support it and we intend to expose as few > > > configurations to users as possible. > > > > > > > > > Best, > > > Jark > > > > > > [1] > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/ > > > > > > > > > > > > On Sat, 8 Apr 2023 at 02:42, Alexander Sorokoumov > > > <asorokou...@confluent.io.invalid> wrote: > > > > > >> Hi Jark, > > >> > > >> To my knowledge, Kafka's EXACTLY_ONCE transactions together with > > idempotent > > >> producers prevent duplicated records[1], at least in the cases when > > >> upstream does not produce them intentionally and across checkpoints. > > >> > > >> Could you please elaborate or point me to the docs that explain the > > reason > > >> for duplicated records upstream and across checkpoints? I am > relatively > > new > > >> to Flink and not aware of it. According to the kafka connector > > >> documentation, it does support exactly once semantics by configuring ' > > >> sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why > we > > >> can't make upsert-kafka configurable in the same way to support this > > >> delivery guarantee. > > >> > > >> Thank you, > > >> Alexander > > >> > > >> 1. > > >> > > >> > > > https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/ > > >> 2. > > >> > > >> > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/#consistency-guarantees > > >> > > >> > > >> On Fri, Apr 7, 2023 at 3:44 AM Jark Wu <imj...@gmail.com> wrote: > > >> > > >> > Hi Alexander, > > >> > > > >> > I’m not sure I fully understand the reasons. I left my comments > > inline. > > >> > > > >> > > 1. There might be other non-Flink topic consumers that would > rather > > not > > >> > have duplicated records. > > >> > > > >> > Exactly once can’t avoid producing duplicated records. Because the > > >> upstream > > >> > produces duplicated records intentionally and across checkpoints. > > Exactly > > >> > once > > >> > can’t recognize duplicated records and drop duplications. That > means > > >> > duplicated > > >> > records are written into topics even if exactly-once mode is > enabled. > > >> > > > >> > > > >> > > 2. Multiple upsert-kafka producers might cause keys to roll back > to > > >> > previous values. > > >> > > > >> > Sorry, I don’t understand how exactly once can prevent this rollback > > >> > behavior. > > >> > Even in your example with EXACTLY_ONCE enabled, the x will go to a5, > > and > > >> > b5, > > >> > then back a5 if jobs perform checkpoints after producing records. > > >> > > > >> > > > >> > Best, > > >> > Jark > > >> > > > >> > > > >> > > 2023年4月5日 09:39,Alexander Sorokoumov <asorokou...@confluent.io > > >> .INVALID> > > >> > 写道: > > >> > > > > >> > > Hello Flink community, > > >> > > > > >> > > I would like to discuss if it is worth adding EXACTLY_ONCE > delivery > > >> > > semantics to upsert-kafka connector. According to upsert-kafka > > docs[1] > > >> > and > > >> > > ReducingUpsertSink javadoc[2], the connector is correct even with > > >> > duplicate > > >> > > records under AT_LEAST_ONCE because the records are idempotent, > and > > the > > >> > > read path de-duplicates them. However, there are at least 2 > reasons > > to > > >> > > configure the connector with EXACTLY_ONCE: > > >> > > > > >> > > 1. There might be other non-Flink topic consumers that would > rather > > not > > >> > > have duplicated records. > > >> > > 2. Multiple upsert-kafka producers might cause keys to roll back > to > > >> > > previous values. Consider a scenario where 2 producing jobs A and > B > > >> write > > >> > > to the same topic with AT_LEAST_ONCE, and a consuming job reads > from > > >> the > > >> > > topic. Both producers write unique, monotonically increasing > > sequences > > >> to > > >> > > the same key. Job A writes x=a1,a2,a3,a4,a5… Job B writes > > >> > > x=b1,b2,b3,b4,b5,.... With this setup, we can have the following > > >> > sequence: > > >> > > > > >> > > 1. Job A produces x=a5. > > >> > > 2. Job B produces x=b5. > > >> > > 3. Job A produces the duplicate write x=5. > > >> > > > > >> > > The consuming job would observe x going to a5, then to b5, then > back > > >> a5. > > >> > > EXACTLY_ONCE would prevent this behavior. > > >> > > > > >> > > I created https://issues.apache.org/jira/browse/FLINK-31408 and a > > WIP > > >> > patch > > >> > > to add EXACTLY_ONCE to upsert-kafka, but would like to know what > the > > >> > > community thinks about it before moving forward with it. > > >> > > > > >> > > Thanks, > > >> > > Alexander > > >> > > > > >> > > 1. > > >> > > > > >> > > > >> > > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees > > >> > > 2. > > >> > > > > >> > > > >> > > > https://github.com/apache/flink-connector-kafka/blob/40cf9994dd847c13602acf1f90895cf9f89b2ce6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L31-L37 > > >> > > > >> > > > >> > > >