Hi Artem, Thanks for publishing this KIP!
Can you please clarify the purpose of having broker-level transaction.two.phase.commit.enable config in addition to the new ACL? If the brokers are configured with transaction.two.phase.commit.enable=false, at what point will a client configured with transaction.two.phase.commit.enable=true fail? Will it happen at KafkaProducer#initTransactions? WDYT about adding an AdminClient method that returns the state of t ransaction.two.phase.commit.enable? This way, clients would know in advance if 2PC is enabled on the brokers. Best, Alex On Fri, Aug 25, 2023 at 9:40 AM Roger Hoover <roger.hoo...@gmail.com> wrote: > Other than supporting multiplexing transactional streams on a single > producer, I don't see how to improve it. > > On Thu, Aug 24, 2023 at 12:12 PM Artem Livshits > <alivsh...@confluent.io.invalid> wrote: > > > Hi Roger, > > > > Thank you for summarizing the cons. I agree and I'm curious what would > be > > the alternatives to solve these problems better and if they can be > > incorporated into this proposal (or built independently in addition to or > > on top of this proposal). E.g. one potential extension we discussed > > earlier in the thread could be multiplexing logical transactional > "streams" > > with a single producer. > > > > -Artem > > > > On Wed, Aug 23, 2023 at 4:50 PM Roger Hoover <roger.hoo...@gmail.com> > > wrote: > > > > > Thanks. I like that you're moving Kafka toward supporting this > > dual-write > > > pattern. Each use case needs to consider the tradeoffs. You already > > > summarized the pros very well in the KIP. I would summarize the cons > > > as follows: > > > > > > - you sacrifice availability - each write requires both DB and Kafka to > > be > > > available so I think your overall application availability is 1 - p(DB > is > > > unavailable)*p(Kafka is unavailable). > > > - latency will be higher and throughput lower - each write requires > both > > > writes to DB and Kafka while holding an exclusive lock in DB. > > > - you need to create a producer per unit of concurrency in your app > which > > > has some overhead in the app and Kafka side (number of connections, > poor > > > batching). I assume the producers would need to be configured for low > > > latency (linger.ms=0) > > > - there's some complexity in managing stable transactional ids for each > > > producer/concurrency unit in your application. With k8s deployment, > you > > > may need to switch to something like a StatefulSet that gives each pod > a > > > stable identity across restarts. On top of that pod identity which you > > can > > > use as a prefix, you then assign unique transactional ids to each > > > concurrency unit (thread/goroutine). > > > > > > On Wed, Aug 23, 2023 at 12:53 PM Artem Livshits > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > Hi Roger, > > > > > > > > Thank you for the feedback. You make a very good point that we also > > > > discussed internally. Adding support for multiple concurrent > > > > transactions in one producer could be valuable but it seems to be a > > > fairly > > > > large and independent change that would deserve a separate KIP. If > > such > > > > support is added we could modify 2PC functionality to incorporate > that. > > > > > > > > > Maybe not too bad but a bit of pain to manage these ids inside each > > > > process and across all application processes. > > > > > > > > I'm not sure if supporting multiple transactions in one producer > would > > > make > > > > id management simpler: we'd need to store a piece of data per > > > transaction, > > > > so whether it's N producers with a single transaction or N > transactions > > > > with a single producer, it's still roughly the same amount of data to > > > > manage. In fact, managing transactional ids (current proposal) might > > be > > > > easier, because the id is controlled by the application and it knows > > how > > > to > > > > complete the transaction after crash / restart; while a TID would be > > > > generated by Kafka and that would create a question of starting Kafka > > > > transaction, but not saving its TID and then crashing, then figuring > > out > > > > which transactions to abort and etc. > > > > > > > > > 2) creating a separate producer for each concurrency slot in the > > > > application > > > > > > > > This is a very valid concern. Maybe we'd need to have some > > multiplexing > > > of > > > > transactional logical "streams" over the same connection. Seems > like a > > > > separate KIP, though. > > > > > > > > > Otherwise, it seems you're left with single-threaded model per > > > > application process? > > > > > > > > That's a fair assessment. Not necessarily exactly single-threaded > per > > > > application, but a single producer per thread model (i.e. an > > application > > > > could have a pool of threads + producers to increase concurrency). > > > > > > > > -Artem > > > > > > > > On Tue, Aug 22, 2023 at 7:22 PM Roger Hoover <roger.hoo...@gmail.com > > > > > > wrote: > > > > > > > > > Artem, > > > > > > > > > > Thanks for the reply. > > > > > > > > > > If I understand correctly, Kafka does not support concurrent > > > transactions > > > > > from the same producer (transactional id). I think this means that > > > > > applications that want to support in-process concurrency (say > > > > thread-level > > > > > concurrency with row-level DB locking) would need to manage > separate > > > > > transactional ids and producers per thread and then store txn state > > > > > accordingly. The potential usability downsides I see are > > > > > 1) managing a set of transactional ids for each application process > > > that > > > > > scales up to it's max concurrency. Maybe not too bad but a bit of > > pain > > > > to > > > > > manage these ids inside each process and across all application > > > > processes. > > > > > 2) creating a separate producer for each concurrency slot in the > > > > > application - this could create a lot more producers and resultant > > > > > connections to Kafka than the typical model of a single producer > per > > > > > process. > > > > > > > > > > Otherwise, it seems you're left with single-threaded model per > > > > application > > > > > process? > > > > > > > > > > Thanks, > > > > > > > > > > Roger > > > > > > > > > > On Tue, Aug 22, 2023 at 5:11 PM Artem Livshits > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > Hi Roger, Arjun, > > > > > > > > > > > > Thank you for the questions. > > > > > > > It looks like the application must have stable transactional > ids > > > over > > > > > > time? > > > > > > > > > > > > The transactional id should uniquely identify a producer instance > > and > > > > > needs > > > > > > to be stable across the restarts. If the transactional id is not > > > > stable > > > > > > across restarts, then zombie messages from a previous incarnation > > of > > > > the > > > > > > producer may violate atomicity. If there are 2 producer > instances > > > > > > concurrently producing data with the same transactional id, they > > are > > > > > going > > > > > > to constantly fence each other and most likely make little or no > > > > > progress. > > > > > > > > > > > > The name might be a little bit confusing as it may be mistaken > for > > a > > > > > > transaction id / TID that uniquely identifies every transaction. > > The > > > > > name > > > > > > and the semantics were defined in the original > > exactly-once-semantics > > > > > (EoS) > > > > > > proposal ( > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging > > > > > > ) > > > > > > and KIP-939 just build on top of that. > > > > > > > > > > > > > I'm curious to understand what happens if the producer dies, > and > > > does > > > > > not > > > > > > come up and recover the pending transaction within the > transaction > > > > > timeout > > > > > > interval. > > > > > > > > > > > > If the producer / application never comes back, the transaction > > will > > > > > remain > > > > > > in prepared (a.k.a. "in-doubt") state until an operator > forcefully > > > > > > terminates the transaction. That's why there is a new ACL is > > defined > > > > in > > > > > > this proposal -- this functionality should only provided to > > > > applications > > > > > > that implement proper recovery logic. > > > > > > > > > > > > -Artem > > > > > > > > > > > > On Tue, Aug 22, 2023 at 12:52 AM Arjun Satish < > > > arjun.sat...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hello Artem, > > > > > > > > > > > > > > Thanks for the KIP. > > > > > > > > > > > > > > I have the same question as Roger on concurrent writes, and an > > > > > additional > > > > > > > one on consumer behavior. Typically, transactions will timeout > if > > > not > > > > > > > committed within some time interval. With the proposed changes > in > > > > this > > > > > > KIP, > > > > > > > consumers cannot consume past the ongoing transaction. I'm > > curious > > > to > > > > > > > understand what happens if the producer dies, and does not come > > up > > > > and > > > > > > > recover the pending transaction within the transaction timeout > > > > > interval. > > > > > > Or > > > > > > > are we saying that when used in this 2PC context, we should > > > configure > > > > > > these > > > > > > > transaction timeouts to very large durations? > > > > > > > > > > > > > > Thanks in advance! > > > > > > > > > > > > > > Best, > > > > > > > Arjun > > > > > > > > > > > > > > > > > > > > > On Mon, Aug 21, 2023 at 1:06 PM Roger Hoover < > > > roger.hoo...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Artem, > > > > > > > > > > > > > > > > Thanks for writing this KIP. Can you clarify the > requirements > > a > > > > bit > > > > > > more > > > > > > > > for managing transaction state? It looks like the > application > > > must > > > > > > have > > > > > > > > stable transactional ids over time? What is the granularity > > of > > > > > those > > > > > > > ids > > > > > > > > and producers? Say the application is a multi-threaded Java > > web > > > > > > server, > > > > > > > > can/should all the concurrent threads share a transactional > id > > > and > > > > > > > > producer? That doesn't seem right to me unless the > application > > > is > > > > > > using > > > > > > > > global DB locks that serialize all requests. Instead, if the > > > > > > application > > > > > > > > uses row-level DB locks, there could be multiple, concurrent, > > > > > > independent > > > > > > > > txns happening in the same JVM so it seems like the > granularity > > > > > > managing > > > > > > > > transactional ids and txn state needs to line up with > > granularity > > > > of > > > > > > the > > > > > > > DB > > > > > > > > locking. > > > > > > > > > > > > > > > > Does that make sense or am I misunderstanding? > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Roger > > > > > > > > > > > > > > > > On Wed, Aug 16, 2023 at 11:40 PM Artem Livshits > > > > > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > > > Hello, > > > > > > > > > > > > > > > > > > This is a discussion thread for > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC > > > > > > > > > . > > > > > > > > > > > > > > > > > > The KIP proposes extending Kafka transaction support (that > > > > already > > > > > > uses > > > > > > > > 2PC > > > > > > > > > under the hood) to enable atomicity of dual writes to Kafka > > and > > > > an > > > > > > > > external > > > > > > > > > database, and helps to fix a long standing Flink issue. > > > > > > > > > > > > > > > > > > An example of code that uses the dual write recipe with > JDBC > > > and > > > > > > should > > > > > > > > > work for most SQL databases is here > > > > > > > > > https://github.com/apache/kafka/pull/14231. > > > > > > > > > > > > > > > > > > The FLIP for the sister fix in Flink is here > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710 > > > > > > > > > > > > > > > > > > -Artem > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >