Re: [DISCUSS] FLIP-319: Integrating with Kafka’s proper support for 2PC participation (KIP-939).

2023-08-31 Thread Alexander Sorokoumov
Hi Gordon!

Thank you for publishing this FLIP! I would like to ask several questions
and confirm my understanding of several aspects of this proposal. Even
though this discussion is focused on FLIP-319, as it is based on top of
KIP-939, my questions will also cover the KIP.



   1. Why does KafkaProducer have both commitTransaction and
   completeTransaction methods? Why can't we fuse them?
  1. Is it correct that commitTransaction and abortTransaction bring
  the transaction to its final state (committed/aborted) while
  completeTransaction is a recovery method that rolls a txn back/forward,
  depending on the prepared state?
  2. In what situations is PreparedTransactionState in the state store
  different from PreparedTransactionState in Kafka?
   2. Why does the pre-commit phase initialize the producer with
   currentProducer.initTransaction(false)?
  1. Is it because "false" means that we do not expect any prepared txn
  to be there, and if there is one, we should either roll it
forward or abort
  it? In the pre-commit phase with a new producer, there shouldn't be any
  dangling txns.
   3. Shouldn't we call completeTransaction on restore instead of
   commitTransaction? In what situations would the flink Kafka connector abort
   the transaction?
   4. Do we need to keep the current KafkaInternalProducer for a while to
   remain compatible with older Kafka versions that do not support KIP-939?
   5. How will the connector handle
   transaction.two.phase.commit.enable=false on the broker (not client) level?
   6. Does it make sense for the connector users to override
   transaction.two.phase.commit.enable? If it does not make sense, would the
   connector ignore the config or throw an exception when it is passed?


Best regards,
Alex

On Wed, Aug 23, 2023 at 6:09 AM Gyula Fóra  wrote:

> Hi Gordon!
>
> Thank you for preparing the detailed FLIP, I think this is a huge
> improvement that enables the exactly-once Kafka sink in many environments /
> use-cases where this was previously unfeasible due to the limitations
> highlighted in the FLIP.
>
> Big +1
>
> Cheers,
> Gyula
>
> On Fri, Aug 18, 2023 at 7:54 PM Tzu-Li (Gordon) Tai 
> wrote:
>
> > Hi Flink devs,
> >
> > I’d like to officially start a discussion for FLIP-319: Integrating with
> > Kafka’s proper support for 2PC participation (KIP-939) [1].
> >
> > This is the “sister” joint FLIP for KIP-939 [2] [3]. It has been a
> > long-standing issue that Flink’s Kafka connector doesn’t work fully
> > correctly under exactly-once mode due to lack of distributed transaction
> > support in the Kafka transaction protocol. This has led to subpar hacks
> in
> > the connector such as Java reflections to workaround the protocol's
> > limitations (which causes a bunch of problems on its own, e.g. long
> > recovery times for the connector), while still having corner case
> scenarios
> > that can lead to data loss.
> >
> > This joint effort with the Kafka community attempts to address this so
> that
> > the Flink Kafka connector can finally work against public Kafka APIs,
> which
> > should result in a much more robust integration between the two systems,
> > and for Flink developers, easier maintainability of the code.
> >
> > Obviously, actually implementing this FLIP relies on the joint KIP being
> > implemented and released first. Nevertheless, I'd like to start the
> > discussion for the design as early as possible so we can benefit from the
> > new Kafka changes as soon as it is available.
> >
> > Looking forward to feedback and comments on the proposal!
> >
> > Thanks,
> > Gordon
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > [3] https://lists.apache.org/thread/wbs9sqs3z1tdm7ptw5j4o9osmx9s41nf
> >
>


Re: [VOTE] Release flink-connector-kafka 3.0.0 for Flink 1.17, release candidate #2

2023-04-13 Thread Alexander Sorokoumov
+1 (nb).

Checked:

   - checksums are correct
   - source code builds (JDK 8+11)
   - release notes are correct


Best,
Alex


On Wed, Apr 12, 2023 at 5:07 PM Tzu-Li (Gordon) Tai 
wrote:

> A few important remarks about this release candidate:
>
> - As mentioned in the previous voting thread of RC1 [1], we've decided to
> skip releasing a version of the externalized Flink Kafka Connector matching
> with Flink 1.16.x since the original vote thread stalled, and meanwhile
> we've already completed externalizing all Kafka connector code as of Flink
> 1.17.0.
>
> - As such, this RC is basically identical to the Kafka connector code
> bundled with the Flink 1.17.0 release, PLUS a few critical fixes for
> exactly-once violations, namely FLINK-31305, FLINK-31363, and FLINK-31620
> (please see release notes [2]).
>
> - As part of preparing this RC, I've also deleted the original v3.0 branch
> and re-named the v4.0 branch to replace it instead. Effectively, this
> resets the versioning numbers for the externalized Flink Kafka Connector
> code repository, so that this first release of the repo starts from v3.0.0.
>
> Thanks,
> Gordon
>
> [1] https://lists.apache.org/thread/r97y5qt8x0c72460vs5cjm5c729ljmh6
> [2]
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352577
>
> On Wed, Apr 12, 2023 at 4:55 PM Tzu-Li (Gordon) Tai 
> wrote:
>
> > Hi everyone,
> >
> > Please review and vote on release candidate #2 for version 3.0.0 of the
> > Apache Flink Kafka Connector, as follows:
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> >
> > The complete staging area is available for your review, which includes:
> > * JIRA release notes [1],
> > * the official Apache source release to be deployed to dist.apache.org
> > [2], which are signed with the key with fingerprint
> > 1C1E2394D3194E1944613488F320986D35C33D6A [3],
> > * all artifacts to be deployed to the Maven Central Repository [4],
> > * source code tag v3.0.0-rc2 [5],
> > * website pull request listing the new release [6].
> >
> > The vote will be open for at least 72 hours. It is adopted by majority
> > approval, with at least 3 PMC affirmative votes.
> >
> > Thanks,
> > Gordon
> >
> > [1]
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352577
> > [2]
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-connector-kafka-3.0.0-rc2/
> > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [4]
> https://repository.apache.org/content/repositories/orgapacheflink-1607
> > [5]
> > https://github.com/apache/flink-connector-kafka/releases/tag/v3.0.0-rc2
> > [6] https://github.com/apache/flink-web/pull/632
> >
>


Re: [DISCUSS] EXACTLY_ONCE delivery semantics for upsert-kafka connector

2023-04-12 Thread Alexander Sorokoumov
Hi Jark, John,

Thank you for the discussion! I will proceed with completing the patch that
adds exactly-once to upsert-kafka connector.

Best,
Alexander

On Wed, Apr 12, 2023 at 12:21 AM Jark Wu  wrote:

> Hi John,
>
> Thank you for your valuable input. It sounds reasonable to me.
>
> From this point of view, the exactly-once is used to guarantee transaction
> semantics other than avoid duplication/upserts.
> This is similar to the JDBC connectors that already support eventual
> consistency with idempotent updates, but we still add the support of
> 2PC[1].
>
> Best,
> Jark
>
> [1]:
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink
>
> On Wed, 12 Apr 2023 at 10:36, John Roesler  wrote:
>
> > Hi Jark,
> >
> > I hope you don’t mind if I chime in.
> >
> > You have a good point that the sequence of upserts will eventually
> > converge to the correct value under the at-least-once delivery guarantee,
> > but it can still be important to avoid passing on uncommitted results.
> Some
> > thoughts, numbered for reference:
> >
> > 1. Most generally, if some result R is written to the sink topic, but
> then
> > the job fails before a checkpoint, rolls back, and reprocesses, producing
> > R’, then it is incorrect to call R an “upsert”. In fact, as far as the
> > system is concerned, R never happened at all (because it was part of a
> > rolled-back batch of processing).
> >
> > 2. Readers may reasonably wish to impose some meaning on the sequence of
> > upserts itself, so including aborted results can lead to wrong semantics
> > downstream. Eg: “how many times has ‘x’ been updated today”?
> >
> > 3. Note that processing may not be deterministic over failures, and,
> > building on (2), readers may have an expectation that every record in the
> > topic corresponds to a real value that was associated with that key at
> some
> > point. Eg, if we start with x=1, checkpoint, then produce x=99, crash,
> > restart and produce x=2. Under at-least-once, the history of x
> is[1,99,2],
> > while exactly-once would give the correct history of [1,2]. If we set up
> an
> > alert if the value of x is ever greater over 10, then at-least-once will
> > erroneously alert us, while exactly-once does not.
> >
> > 4. Sending results for failed processing can also cause operational
> > problems: if you’re processing a high volume of data, and you get into a
> > crash loop, you can create a flood of repeated results. I’ve seen this
> case
> > cause real world pain for people, and it’s nice to have a way to avoid
> it.
> >
> > I hope some of these examples show why a user might reasonably want to
> > configure the connector with the exactly-once guarantee.
> >
> > Thanks!
> > -John
> >
> > On Sat, Apr 8, 2023, at 10:03, Jark Wu wrote:
> > > Hi Alexander,
> > >
> > > Yes, Kafka’s exactly-once semantics are used to avoid duplicated
> records
> > in
> > > case of producer retries
> > > or failovers. But as I explained above, it can’t avoid intentionally
> > > duplicated records. Actually, I would
> > > like to call them "upsert records" instead of "duplicates", that's why
> > the
> > > connector is named "upsert-kafka",
> > > to make Kafka work like a database that supports updating and deleting
> by
> > > key.
> > >
> > > For example, there is a SQL query:
> > >
> > > SELECT URL, COUNT(*) page_views
> > > FROM access_logs
> > > GROUP BY URL;
> > >
> > > This is a continuous query[1] that continuously emits a new  > > page_views> record once a new URL
> > > access entry is received. The same URLs in the log may be far away and
> be
> > > processed in different checkpoints.
> > >
> > > It's easy to make upsert-kafka to support exactly-once delivery
> > guarantee,
> > > but as we discussed above,
> > > it's unnecessary to support it and we intend to expose as few
> > > configurations to users as possible.
> > >
> > >
> > > Best,
> > > Jark
> > >
> > > [1]
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/
> > >
> > >
> > >
> > > On Sat, 8 Apr 2023 at 02:42, Alexander Sorokoumov
> > >  wrote:
> > >
> > >> Hi Jark,
> > >>
> > >> To my knowledge, Kafka's EXACTLY

Re: [DISCUSS] EXACTLY_ONCE delivery semantics for upsert-kafka connector

2023-04-07 Thread Alexander Sorokoumov
Hi Jark,

To my knowledge, Kafka's EXACTLY_ONCE transactions together with idempotent
producers prevent duplicated records[1], at least in the cases when
upstream does not produce them intentionally and across checkpoints.

Could you please elaborate or point me to the docs that explain the reason
for duplicated records upstream and across checkpoints? I am relatively new
to Flink and not aware of it. According to the kafka connector
documentation, it does support exactly once semantics by configuring '
sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why we
can't make upsert-kafka configurable in the same way to support this
delivery guarantee.

Thank you,
Alexander

1.
https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
2.
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/#consistency-guarantees


On Fri, Apr 7, 2023 at 3:44 AM Jark Wu  wrote:

> Hi Alexander,
>
> I’m not sure I fully understand the reasons. I left my comments inline.
>
> > 1. There might be other non-Flink topic consumers that would rather not
> have duplicated records.
>
> Exactly once can’t avoid producing duplicated records. Because the upstream
> produces duplicated records intentionally and across checkpoints. Exactly
> once
> can’t recognize duplicated records and drop duplications.  That means
> duplicated
> records are written into topics even if exactly-once mode is enabled.
>
>
> > 2. Multiple upsert-kafka producers might cause keys to roll back to
> previous values.
>
> Sorry, I don’t understand how exactly once can prevent this rollback
> behavior.
> Even in your example with EXACTLY_ONCE enabled, the x will go to a5, and
> b5,
> then back a5 if jobs perform checkpoints after producing records.
>
>
> Best,
> Jark
>
>
> > 2023年4月5日 09:39,Alexander Sorokoumov 
> 写道:
> >
> > Hello Flink community,
> >
> > I would like to discuss if it is worth adding EXACTLY_ONCE delivery
> > semantics to upsert-kafka connector. According to upsert-kafka docs[1]
> and
> > ReducingUpsertSink javadoc[2], the connector is correct even with
> duplicate
> > records under AT_LEAST_ONCE because the records are idempotent, and the
> > read path de-duplicates them. However, there are at least 2 reasons to
> > configure the connector with EXACTLY_ONCE:
> >
> > 1. There might be other non-Flink topic consumers that would rather not
> > have duplicated records.
> > 2. Multiple upsert-kafka producers might cause keys to roll back to
> > previous values. Consider a scenario where 2 producing jobs A and B write
> > to the same topic with AT_LEAST_ONCE, and a consuming job reads from the
> > topic. Both producers write unique, monotonically increasing sequences to
> > the same key. Job A writes x=a1,a2,a3,a4,a5… Job B writes
> > x=b1,b2,b3,b4,b5, With this setup, we can have the following
> sequence:
> >
> >   1. Job A produces x=a5.
> >   2. Job B produces x=b5.
> >   3. Job A produces the duplicate write x=5.
> >
> > The consuming job would observe x going to a5, then to b5, then back a5.
> > EXACTLY_ONCE would prevent this behavior.
> >
> > I created https://issues.apache.org/jira/browse/FLINK-31408 and a WIP
> patch
> > to add EXACTLY_ONCE to upsert-kafka, but would like to know what the
> > community thinks about it before moving forward with it.
> >
> > Thanks,
> > Alexander
> >
> > 1.
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees
> > 2.
> >
> https://github.com/apache/flink-connector-kafka/blob/40cf9994dd847c13602acf1f90895cf9f89b2ce6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L31-L37
>
>


[DISCUSS] EXACTLY_ONCE delivery semantics for upsert-kafka connector

2023-04-04 Thread Alexander Sorokoumov
Hello Flink community,

I would like to discuss if it is worth adding EXACTLY_ONCE delivery
semantics to upsert-kafka connector. According to upsert-kafka docs[1] and
ReducingUpsertSink javadoc[2], the connector is correct even with duplicate
records under AT_LEAST_ONCE because the records are idempotent, and the
read path de-duplicates them. However, there are at least 2 reasons to
configure the connector with EXACTLY_ONCE:

1. There might be other non-Flink topic consumers that would rather not
have duplicated records.
2. Multiple upsert-kafka producers might cause keys to roll back to
previous values. Consider a scenario where 2 producing jobs A and B write
to the same topic with AT_LEAST_ONCE, and a consuming job reads from the
topic. Both producers write unique, monotonically increasing sequences to
the same key. Job A writes x=a1,a2,a3,a4,a5… Job B writes
x=b1,b2,b3,b4,b5, With this setup, we can have the following sequence:

   1. Job A produces x=a5.
   2. Job B produces x=b5.
   3. Job A produces the duplicate write x=5.

The consuming job would observe x going to a5, then to b5, then back a5.
EXACTLY_ONCE would prevent this behavior.

I created https://issues.apache.org/jira/browse/FLINK-31408 and a WIP patch
to add EXACTLY_ONCE to upsert-kafka, but would like to know what the
community thinks about it before moving forward with it.

Thanks,
Alexander

1.
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees
2.
https://github.com/apache/flink-connector-kafka/blob/40cf9994dd847c13602acf1f90895cf9f89b2ce6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L31-L37


Re: Streaming queries in FTS using Kafka log

2022-12-21 Thread Alexander Sorokoumov
Hello everyone,

Answering my own question, it turns out that Flink Table Store removes the
normalization node on read from an external log system only if
log.changelog-mode='all' and log.consistency = 'transactional' [1].

1.
https://github.com/apache/flink-table-store/blob/7e0d55ff3dc9fd48455b17d9a439647b0554d020/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java#L136-L141

Best,
Alex


On Fri, Dec 16, 2022 at 5:28 PM Alexander Sorokoumov <
asorokou...@confluent.io> wrote:

> Hello community,
>
> I want to ask about streaming queries with Flink Table Store. After
> reading the documentation on Streaming Queries [1], I was under the
> impression that only tables with LogStore-over-TableStore and No Changelog
> Producer need the normalization step since the Kafka log has the `before`
> values.
>
> However, when I created the following table:
>
> CREATE TABLE word_count (
>  word STRING PRIMARY KEY NOT ENFORCED,
>  cnt BIGINT
> ) WITH (
>  'connector' = 'table-store',
>  'path' = 's3://my-bucket/table-store',
>  'log.system' = 'kafka',
>  'kafka.bootstrap.servers' = 'broker:9092',
>  'kafka.topic' = 'word_count_log',
>  'auto-create' = 'true',
>  'log.changelog-mode' = 'all',
>  'log.consistency' = 'eventual'
> );
>
> And ran a streaming query against it:
>
> SELECT * FROM word_count;
>
> The topology for this query had the normalization task
> (ChangelogNormalize).
>
> Is this a bug or expected behavior? If it is the latter, can you please
> clarify why this is the case?
>
> 1.
> https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/streaming-query/
>
> Thank you,
> Alex
>


Streaming queries in FTS using Kafka log

2022-12-16 Thread Alexander Sorokoumov
Hello community,

I want to ask about streaming queries with Flink Table Store. After reading
the documentation on Streaming Queries [1], I was under the impression that
only tables with LogStore-over-TableStore and No Changelog Producer need
the normalization step since the Kafka log has the `before` values.

However, when I created the following table:

CREATE TABLE word_count (
 word STRING PRIMARY KEY NOT ENFORCED,
 cnt BIGINT
) WITH (
 'connector' = 'table-store',
 'path' = 's3://my-bucket/table-store',
 'log.system' = 'kafka',
 'kafka.bootstrap.servers' = 'broker:9092',
 'kafka.topic' = 'word_count_log',
 'auto-create' = 'true',
 'log.changelog-mode' = 'all',
 'log.consistency' = 'eventual'
);

And ran a streaming query against it:

SELECT * FROM word_count;

The topology for this query had the normalization task (ChangelogNormalize).

Is this a bug or expected behavior? If it is the latter, can you please
clarify why this is the case?

1.
https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/streaming-query/

Thank you,
Alex


Questions about Flink Table Store

2022-11-07 Thread Alexander Sorokoumov
I’m Alexander from Confluent. I am new to Flink and its community. I would
like to contribute to the Flink Table Store, but am missing certain
details. Can someone please clarify the points mentioned below to me?

   - Given that there is always a single writer to a stream, in what
   situations can concurrent writes ever happen to Flink Table Store? FLIP-188
   mentions reprocessing and snapshot generation, but I do not understand how
   these cases can lead to more than a single writer.
   - If there are concurrent INSERTs into a table baked by Flink Table
   Store, how and by what component are they serialized?
   - Is Flink Table Store going to support ACID transactions?
   - Do Flink Table Store snapshots correspond 1:1 to Flink checkpoints?
   - Does Flink Table Store (plan to) support secondary indexes?
   - Is there an open roadmap for this Flink Table Store?

Thank you,
Alexander