Hi Jun,

> 20A.  This only takes care of the abort case. The application still needs
to be changed to handle the commit case properly

My point here is that looking at the initTransactions() call it's not clear
what the semantics is.  Say I'm doing code review, I cannot say if the code
is correct or not -- if the config (that's something that's
theoretically not known at the time of code review) is going to enable 2PC,
then the correct code should look one way, otherwise it would need to look
differently.  Also, say if code is written with InitTransaction() without
explicit abort and then for whatever reason the code would get used with
2PC enabled (could be a library in a bigger product) it'll start breaking
in a non-intuitive way.

> 20B. Hmm, if the admin disables 2PC, there is likely a reason behind that

That's true, but reality may be more complicated.  Say a user wants to run
a self-managed Flink with Confluent cloud.  Confluent cloud adim may not
be comfortable enabling 2PC to general user accounts that use services not
managed by Confluent (the same way Confluent doesn't allow increasing max
transaction timeout for general user accounts).  Right now, self-managed
Flink works because it uses reflection, if it moves to use public APIs
provided by KIP-939 it'll break.

> 32. Ok. That's the kafka metric. In that case, the metric name has a
group and a name. There is no type and no package name.

Is this a suggestion to change or confirmation that the current logic is
ok?  I just copied an existing metric but can change if needed.

-Artem

On Tue, Feb 20, 2024 at 11:25 AM Jun Rao <j...@confluent.io.invalid> wrote:

> Hi, Artem,
>
> Thanks for the reply.
>
> 20. "Say if an application
> currently uses initTransactions() to achieve the current semantics, it
> would need to be rewritten to use initTransactions() + abort to achieve the
> same semantics if the config is changed. "
>
> This only takes care of the abort case. The application still needs to be
> changed to handle the commit case properly
> if transaction.two.phase.commit.enable is set to true.
>
> "Even when KIP-939 is implemented,
> there would be situations when 2PC is disabled by the admin (e.g. Kafka
> service providers may be reluctant to enable 2PC for Flink services that
> users host themselves), so we either have to perpetuate the
> reflection-based implementation in Flink or enable keepPreparedTxn=true
> without 2PC."
>
> Hmm, if the admin disables 2PC, there is likely a reason behind that. I am
> not sure that we should provide an API to encourage the application to
> circumvent that.
>
> 32. Ok. That's the kafka metric. In that case, the metric name has a group
> and a name. There is no type and no package name.
>
> Jun
>
>
> On Thu, Feb 15, 2024 at 8:23 PM Artem Livshits
> <alivsh...@confluent.io.invalid> wrote:
>
> > Hi Jun,
> >
> > Thank you for your questions.
> >
> > > 20. So to abort a prepared transaction after the producer start, we
> could
> > use ...
> >
> > I agree, initTransaction(true) + abort would accomplish the behavior of
> > initTransactions(false), so we could technically have fewer ways to
> achieve
> > the same thing, which is generally valuable.  I wonder, though, if that
> > would be intuitive from the application perspective.  Say if an
> application
> > currently uses initTransactions() to achieve the current semantics, it
> > would need to be rewritten to use initTransactions() + abort to achieve
> the
> > same semantics if the config is changed.  I think this could create
> > subtle confusion, as the config change is generally decoupled from
> changing
> > application implementation.
> >
> > >  The use case mentioned for keepPreparedTxn=true without 2PC doesn't
> seem
> > very important
> >
> > I agree, it's not a strict requirement.  It is, however, a missing option
> > in the public API, so currently Flink has to use reflection to emulate
> this
> > functionality without 2PC support.   Even when KIP-939 is implemented,
> > there would be situations when 2PC is disabled by the admin (e.g. Kafka
> > service providers may be reluctant to enable 2PC for Flink services that
> > users host themselves), so we either have to perpetuate the
> > reflection-based implementation in Flink or enable keepPreparedTxn=true
> > without 2PC.
> >
> > > 32.
> >
> >
> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
> >
> > I just followed the existing metric implementation example
> >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L95
> > ,
> > which maps to
> >
> >
> kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max.
> >
> > > 33. "If the value is 'true' then the corresponding field is set
> >
> > That's correct.  Updated the KIP.
> >
> > -Artem
> >
> > On Wed, Feb 7, 2024 at 10:06 AM Jun Rao <j...@confluent.io.invalid>
> wrote:
> >
> > > Hi, Artem,
> > >
> > > Thanks for the reply.
> > >
> > > 20. So to abort a prepared transaction after producer start, we could
> use
> > > either
> > >   producer.initTransactions(false)
> > > or
> > >   producer.initTransactions(true)
> > >   producer.abortTransaction
> > > Could we just always use the latter API? If we do this, we could
> > > potentially eliminate the keepPreparedTxn flag in initTransactions().
> > After
> > > the initTransactions() call, the outstanding txn is always preserved if
> > 2pc
> > > is enabled and aborted if 2pc is disabled. The use case mentioned for
> > > keepPreparedTxn=true without 2PC doesn't seem very important. If we
> could
> > > do that, it seems that we have (1) less redundant and simpler APIs; (2)
> > > more symmetric syntax for aborting/committing a prepared txn after
> > producer
> > > restart.
> > >
> > > 32.
> > >
> > >
> >
> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
> > > Is this a Yammer or kafka metric? The former uses the camel case for
> name
> > > and type. The latter uses the hyphen notation, but doesn't have the
> type
> > > attribute.
> > >
> > > 33. "If the value is 'true' then the corresponding field is set in the
> > > InitProducerIdRequest and the KafkaProducer object is set into a state
> > > which only allows calling .commitTransaction or .abortTransaction."
> > > We should also allow .completeTransaction, right?
> > >
> > > Jun
> > >
> > >
> > > On Tue, Feb 6, 2024 at 3:29 PM Artem Livshits
> > > <alivsh...@confluent.io.invalid> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > > 20. For Flink usage, it seems that the APIs used to abort and
> commit
> > a
> > > > prepared txn are not symmetric.
> > > >
> > > > For Flink it is expected that Flink would call .commitTransaction or
> > > > .abortTransaction directly, it wouldn't need to deal with
> > > PreparedTxnState,
> > > > the outcome is actually determined by the Flink's job manager, not by
> > > > comparison of PreparedTxnState.  So for Flink, if the Kafka sync
> > crashes
> > > > and restarts there are 2 cases:
> > > >
> > > > 1. Transaction is not prepared.  In that case just call
> > > > producer.initTransactions(false) and then can start transactions as
> > > needed.
> > > > 2. Transaction is prepared.  In that case call
> > > > producer.initTransactions(true) and wait for the decision from the
> job
> > > > manager.  Note that it's not given that the transaction will get
> > > committed,
> > > > the decision could also be an abort.
> > > >
> > > >  > 21. transaction.max.timeout.ms could in theory be MAX_INT.
> Perhaps
> > we
> > > > could use a negative timeout in the record to indicate 2PC?
> > > >
> > > > -1 sounds good, updated.
> > > >
> > > > > 30. The KIP has two different APIs to abort an ongoing txn. Do we
> > need
> > > > both?
> > > >
> > > > I think of producer.initTransactions() to be an implementation for
> > > > adminClient.forceTerminateTransaction(transactionalId).
> > > >
> > > > > 31. "This would flush all the pending messages and transition the
> > > > producer
> > > >
> > > > Updated the KIP to clarify that IllegalStateException will be thrown.
> > > >
> > > > -Artem
> > > >
> > > >
> > > > On Mon, Feb 5, 2024 at 2:22 PM Jun Rao <j...@confluent.io.invalid>
> > wrote:
> > > >
> > > > > Hi, Artem,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > 20. For Flink usage, it seems that the APIs used to abort and
> commit
> > a
> > > > > prepared txn are not symmetric.
> > > > > To abort, the app will just call
> > > > >   producer.initTransactions(false)
> > > > >
> > > > > To commit, the app needs to call
> > > > >   producer.initTransactions(true)
> > > > >   producer.completeTransaction(preparedTxnState)
> > > > >
> > > > > Will this be a concern? For the dual-writer usage, both
> abort/commit
> > > use
> > > > > the same API.
> > > > >
> > > > > 21. transaction.max.timeout.ms could in theory be MAX_INT. Perhaps
> > we
> > > > > could
> > > > > use a negative timeout in the record to indicate 2PC?
> > > > >
> > > > > 30. The KIP has two different APIs to abort an ongoing txn. Do we
> > need
> > > > > both?
> > > > >   producer.initTransactions(false)
> > > > >   adminClient.forceTerminateTransaction(transactionalId)
> > > > >
> > > > > 31. "This would flush all the pending messages and transition the
> > > > producer
> > > > > into a mode where only .commitTransaction, .abortTransaction, or
> > > > > .completeTransaction could be called.  If the call is successful
> (all
> > > > > messages successfully got flushed to all partitions) the
> transaction
> > is
> > > > > prepared."
> > > > >  If the producer calls send() in that state, what exception will
> the
> > > > caller
> > > > > receive?
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Fri, Feb 2, 2024 at 3:34 PM Artem Livshits
> > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > >  Then, should we change the following in the example to use
> > > > > > InitProducerId(true) instead?
> > > > > >
> > > > > > We could. I just thought that it's good to make the example
> > > > > self-contained
> > > > > > by starting from a clean state.
> > > > > >
> > > > > > > Also, could Flink just follow the dual-write recipe?
> > > > > >
> > > > > > I think it would bring some unnecessary logic to Flink (or any
> > other
> > > > > system
> > > > > > that already has a transaction coordinator and just wants to
> drive
> > > > Kafka
> > > > > to
> > > > > > the desired state).  We could discuss it with Flink folks, the
> > > current
> > > > > > proposal was developed in collaboration with them.
> > > > > >
> > > > > > > 21. Could a non 2pc user explicitly set the
> TransactionTimeoutMs
> > to
> > > > > > Integer.MAX_VALUE?
> > > > > >
> > > > > > The server would reject this for regular transactions, it only
> > > accepts
> > > > > > values that are <= *transaction.max.timeout.ms
> > > > > > <http://transaction.max.timeout.ms> *(a broker config).
> > > > > >
> > > > > > > 24. Hmm, In KIP-890, without 2pc, the coordinator expects the
> > > endTxn
> > > > > > request to use the ongoing pid. ...
> > > > > >
> > > > > > Without 2PC there is no case where the pid could change between
> > > > starting
> > > > > a
> > > > > > transaction and endTxn (InitProducerId would abort any ongoing
> > > > > > transaction).  WIth 2PC there is now a case where there could be
> > > > > > InitProducerId that can change the pid without aborting the
> > > > transaction,
> > > > > so
> > > > > > we need to handle that.  I wouldn't say that the flow is
> different,
> > > but
> > > > > > it's rather extended to handle new cases.  The main principle is
> > > still
> > > > > the
> > > > > > same -- for all operations we use the latest "operational" pid
> and
> > > > epoch
> > > > > > known to the client, this way we guarantee that we can fence
> > zombie /
> > > > > split
> > > > > > brain clients by disrupting the "latest known" pid + epoch
> > > progression.
> > > > > >
> > > > > > > 25. "We send out markers using the original ongoing transaction
> > > > > > ProducerId and ProducerEpoch" ...
> > > > > >
> > > > > > Updated.
> > > > > >
> > > > > > -Artem
> > > > > >
> > > > > > On Mon, Jan 29, 2024 at 4:57 PM Jun Rao <j...@confluent.io.invalid
> >
> > > > > wrote:
> > > > > >
> > > > > > > Hi, Artem,
> > > > > > >
> > > > > > > Thanks for the reply.
> > > > > > >
> > > > > > > 20. So for the dual-write recipe, we should always call
> > > > > > > InitProducerId(keepPreparedTxn=true) from the producer? Then,
> > > should
> > > > we
> > > > > > > change the following in the example to use InitProducerId(true)
> > > > > instead?
> > > > > > > 1. InitProducerId(false); TC STATE: Empty, ProducerId=42,
> > > > > > > ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1,
> > > > > > > NextProducerEpoch=-1; RESPONSE ProducerId=42, Epoch=MAX-1,
> > > > > > > OngoingTxnProducerId=-1, OngoingTxnEpoch=-1.
> > > > > > > Also, could Flink just follow the dual-write recipe? It's
> simpler
> > > if
> > > > > > there
> > > > > > > is one way to solve the 2pc issue.
> > > > > > >
> > > > > > > 21. Could a non 2pc user explicitly set the
> TransactionTimeoutMs
> > to
> > > > > > > Integer.MAX_VALUE?
> > > > > > >
> > > > > > > 24. Hmm, In KIP-890, without 2pc, the coordinator expects the
> > > endTxn
> > > > > > > request to use the ongoing pid. With 2pc, the coordinator now
> > > expects
> > > > > the
> > > > > > > endTxn request to use the next pid. So, the flow is different,
> > > right?
> > > > > > >
> > > > > > > 25. "We send out markers using the original ongoing transaction
> > > > > > ProducerId
> > > > > > > and ProducerEpoch"
> > > > > > > We should use ProducerEpoch + 1 in the marker, right?
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Fri, Jan 26, 2024 at 8:35 PM Artem Livshits
> > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > >
> > > > > > > > Hi Jun,
> > > > > > > >
> > > > > > > > > 20.  I am a bit confused by how we set keepPreparedTxn.
> ...
> > > > > > > >
> > > > > > > > keepPreparedTxn=true informs the transaction coordinator that
> > it
> > > > > should
> > > > > > > > keep the ongoing transaction, if any.  If the
> > > > keepPreparedTxn=false,
> > > > > > then
> > > > > > > > any ongoing transaction is aborted (this is exactly the
> current
> > > > > > > behavior).
> > > > > > > > enable2Pc is a separate argument that is controlled by the
> > > > > > > > *transaction.two.phase.commit.enable *setting on the client.
> > > > > > > >
> > > > > > > > To start 2PC, the client just needs to set
> > > > > > > > *transaction.two.phase.commit.enable*=true in the config.
> Then
> > > if
> > > > > the
> > > > > > > > client knows the status of the transaction upfront (in the
> case
> > > of
> > > > > > Flink,
> > > > > > > > Flink keeps the knowledge if the transaction is prepared in
> its
> > > own
> > > > > > > store,
> > > > > > > > so it always knows upfront), it can set keepPreparedTxn
> > > > accordingly,
> > > > > > then
> > > > > > > > if the transaction was prepared, it'll be ready for the
> client
> > to
> > > > > > > complete
> > > > > > > > the appropriate action; if the client doesn't have a
> knowledge
> > > that
> > > > > the
> > > > > > > > transaction is prepared, keepPreparedTxn is going to be
> false,
> > in
> > > > > which
> > > > > > > > case we'll get to a clean state (the same way we do today).
> > > > > > > >
> > > > > > > > For the dual-write recipe, the client doesn't know upfront if
> > the
> > > > > > > > transaction is prepared, this information is implicitly
> encoded
> > > > > > > > PreparedTxnState value that can be used to resolve the
> > > transaction
> > > > > > state.
> > > > > > > > In that case, keepPreparedTxn should always be true, because
> we
> > > > don't
> > > > > > > know
> > > > > > > > upfront and we don't want to accidentally abort a committed
> > > > > > transaction.
> > > > > > > >
> > > > > > > > The forceTerminateTransaction call can just use
> > > > > keepPreparedTxn=false,
> > > > > > it
> > > > > > > > actually doesn't matter if it sets Enable2Pc flag.
> > > > > > > >
> > > > > > > > > 21. TransactionLogValue: Do we need some field to identify
> > > > whether
> > > > > > this
> > > > > > > > is written for 2PC so that ongoing txn is never auto aborted?
> > > > > > > >
> > > > > > > > The TransactionTimeoutMs would be set to Integer.MAX_VALUE if
> > 2PC
> > > > was
> > > > > > > > enabled.  I've added a note to the KIP about this.
> > > > > > > >
> > > > > > > > > 22
> > > > > > > >
> > > > > > > > You're right it's a typo.  I fixed it as well as step 9
> > (REQUEST:
> > > > > > > > ProducerId=73, ProducerEpoch=MAX).
> > > > > > > >
> > > > > > > > > 23. It's a bit weird that Enable2Pc is driven by a config
> > while
> > > > > > > > KeepPreparedTxn is from an API param ...
> > > > > > > >
> > > > > > > > The intent to use 2PC doesn't change from transaction to
> > > > transaction,
> > > > > > but
> > > > > > > > the intent to keep prepared txn may change from transaction
> to
> > > > > > > > transaction.  In dual-write recipes the distinction is not
> > clear,
> > > > but
> > > > > > for
> > > > > > > > use cases where keepPreparedTxn value is known upfront (e.g.
> > > Flink)
> > > > > > it's
> > > > > > > > more prominent.  E.g. a Flink's Kafka sink operator could be
> > > > deployed
> > > > > > > with
> > > > > > > > *transaction.two.phase.commit.enable*=true hardcoded in the
> > > image,
> > > > > but
> > > > > > > > keepPreparedTxn cannot be hardcoded in the image, because it
> > > > depends
> > > > > on
> > > > > > > the
> > > > > > > > job manager's state.
> > > > > > > >
> > > > > > > > > 24
> > > > > > > >
> > > > > > > > The flow is actually going to be the same way as it is now --
> > the
> > > > > > "main"
> > > > > > > > producer id + epoch needs to be used in all operations to
> > prevent
> > > > > > fencing
> > > > > > > > (it's sort of a common "header" in all RPC calls that follow
> > the
> > > > same
> > > > > > > > rules).  The ongoing txn info is just additional info for
> > making
> > > a
> > > > > > > commit /
> > > > > > > > abort decision based on the PreparedTxnState from the DB.
> > > > > > > >
> > > > > > > > --Artem
> > > > > > > >
> > > > > > > > On Thu, Jan 25, 2024 at 11:05 AM Jun Rao
> > > <j...@confluent.io.invalid
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi, Artem,
> > > > > > > > >
> > > > > > > > > Thanks for the reply. A few more comments.
> > > > > > > > >
> > > > > > > > > 20. I am a bit confused by how we set keepPreparedTxn. From
> > the
> > > > > KIP,
> > > > > > I
> > > > > > > > got
> > > > > > > > > the following (1) to start 2pc, we call
> > > > > > > > > InitProducerId(keepPreparedTxn=false); (2) when the
> producer
> > > > fails
> > > > > > and
> > > > > > > > > needs to do recovery, it calls
> > > > > InitProducerId(keepPreparedTxn=true);
> > > > > > > (3)
> > > > > > > > > Admin.forceTerminateTransaction() calls
> > > > > > > > > InitProducerId(keepPreparedTxn=false).
> > > > > > > > > 20.1 In (1), when a producer calls InitProducerId(false)
> with
> > > 2pc
> > > > > > > > enabled,
> > > > > > > > > and there is an ongoing txn, should the server return an
> > error
> > > to
> > > > > the
> > > > > > > > > InitProducerId request? If so, what would be the error
> code?
> > > > > > > > > 20.2 How do we distinguish between (1) and (3)? It's the
> same
> > > API
> > > > > > call
> > > > > > > > but
> > > > > > > > > (1) doesn't abort ongoing txn and (2) does.
> > > > > > > > > 20.3 The usage in (1) seems unintuitive. 2pc implies
> keeping
> > > the
> > > > > > > ongoing
> > > > > > > > > txn. So, setting keepPreparedTxn to false to start 2pc
> seems
> > > > > counter
> > > > > > > > > intuitive.
> > > > > > > > >
> > > > > > > > > 21. TransactionLogValue: Do we need some field to identify
> > > > whether
> > > > > > this
> > > > > > > > is
> > > > > > > > > written for 2PC so that ongoing txn is never auto aborted?
> > > > > > > > >
> > > > > > > > > 22. "8. InitProducerId(true); TC STATE: Ongoing,
> > ProducerId=42,
> > > > > > > > > ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=73,
> > > > > > > > > NextProducerEpoch=MAX; RESPONSE ProducerId=73, Epoch=MAX-1,
> > > > > > > > > OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1"
> > > > > > > > > It seems in the above example, Epoch in RESPONSE should be
> > MAX
> > > to
> > > > > > match
> > > > > > > > > NextProducerEpoch?
> > > > > > > > >
> > > > > > > > > 23. It's a bit weird that Enable2Pc is driven by a config
> > > > > > > > > while KeepPreparedTxn is from an API param. Should we make
> > them
> > > > > more
> > > > > > > > > consistent since they seem related?
> > > > > > > > >
> > > > > > > > > 24. "9. Commit; REQUEST: ProducerId=73,
> ProducerEpoch=MAX-1;
> > TC
> > > > > > STATE:
> > > > > > > > > PrepareCommit, ProducerId=42, ProducerEpoch=MAX,
> > > > PrevProducerId=73,
> > > > > > > > > NextProducerId=85, NextProducerEpoch=0; RESPONSE
> > ProducerId=85,
> > > > > > > Epoch=0,
> > > > > > > > > When a commit request is sent, it uses the latest
> ProducerId
> > > and
> > > > > > > > > ProducerEpoch."
> > > > > > > > > The step where we use the next produceId to commit an old
> txn
> > > > > works,
> > > > > > > but
> > > > > > > > > can be confusing. It's going to be hard for people
> > implementing
> > > > > this
> > > > > > > new
> > > > > > > > > client protocol to figure out when to use the current or
> the
> > > new
> > > > > > > > producerId
> > > > > > > > > in the EndTxnRequest. One potential way to improve this is
> to
> > > > > extend
> > > > > > > > > EndTxnRequest with a new field like expectedNextProducerId.
> > > Then
> > > > we
> > > > > > can
> > > > > > > > > always use the old produceId in the existing field, but set
> > > > > > > > > expectedNextProducerId to bypass the fencing logic when
> > needed.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Mon, Dec 18, 2023 at 2:06 PM Artem Livshits
> > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > >
> > > > > > > > > > Hi Jun,
> > > > > > > > > >
> > > > > > > > > > Thank you for the comments.
> > > > > > > > > >
> > > > > > > > > > > 10. For the two new fields in Enable2Pc and
> > KeepPreparedTxn
> > > > ...
> > > > > > > > > >
> > > > > > > > > > I added a note that all combinations are valid.
> > > > Enable2Pc=false
> > > > > &
> > > > > > > > > > KeepPreparedTxn=true could be potentially useful for
> > backward
> > > > > > > > > compatibility
> > > > > > > > > > with Flink, when the new version of Flink that implements
> > > > KIP-319
> > > > > > > tries
> > > > > > > > > to
> > > > > > > > > > work with a cluster that doesn't authorize 2PC.
> > > > > > > > > >
> > > > > > > > > > > 11.  InitProducerIdResponse: If there is no ongoing
> txn,
> > > what
> > > > > > will
> > > > > > > > > > OngoingTxnProducerId and OngoingTxnEpoch be set?
> > > > > > > > > >
> > > > > > > > > > I added a note that they will be set to -1.  The client
> > then
> > > > will
> > > > > > > know
> > > > > > > > > that
> > > > > > > > > > there is no ongoing txn and .completeTransaction becomes
> a
> > > > no-op
> > > > > > (but
> > > > > > > > > still
> > > > > > > > > > required before .send is enabled).
> > > > > > > > > >
> > > > > > > > > > > 12. ListTransactionsRequest related changes: It seems
> > those
> > > > are
> > > > > > > > already
> > > > > > > > > > covered by KIP-994?
> > > > > > > > > >
> > > > > > > > > > Removed from this KIP.
> > > > > > > > > >
> > > > > > > > > > > 13. TransactionalLogValue ...
> > > > > > > > > >
> > > > > > > > > > This is now updated to work on top of KIP-890.
> > > > > > > > > >
> > > > > > > > > > > 14. "Note that the (producerId, epoch) pair that
> > > corresponds
> > > > to
> > > > > > the
> > > > > > > > > > ongoing transaction ...
> > > > > > > > > >
> > > > > > > > > > This is now updated to work on top of KIP-890.
> > > > > > > > > >
> > > > > > > > > > > 15. active-transaction-total-time-max : ...
> > > > > > > > > >
> > > > > > > > > > Updated.
> > > > > > > > > >
> > > > > > > > > > > 16. "transaction.two.phase.commit.enable The default
> > would
> > > be
> > > > > > > > ‘false’.
> > > > > > > > > > If it’s ‘false’, 2PC functionality is disabled even if
> the
> > > ACL
> > > > is
> > > > > > set
> > > > > > > > ...
> > > > > > > > > >
> > > > > > > > > > Disabling 2PC effectively removes all authorization to
> use
> > > it,
> > > > > > hence
> > > > > > > I
> > > > > > > > > > thought TRANSACTIONAL_ID_AUTHORIZATION_FAILED would be
> > > > > appropriate.
> > > > > > > > > >
> > > > > > > > > > Do you suggest using a different error code for 2PC
> > > > authorization
> > > > > > vs
> > > > > > > > some
> > > > > > > > > > other authorization (e.g.
> > > > > > TRANSACTIONAL_ID_2PC_AUTHORIZATION_FAILED)
> > > > > > > > or a
> > > > > > > > > > different code for disabled vs. unauthorised (e.g.
> > > > > > > > > > TWO_PHASE_COMMIT_DISABLED) or both?
> > > > > > > > > >
> > > > > > > > > > > 17. completeTransaction(). We expect this to be only
> used
> > > > > during
> > > > > > > > > > recovery.
> > > > > > > > > >
> > > > > > > > > > It can also be used if, say, a commit to the database
> fails
> > > and
> > > > > the
> > > > > > > > > result
> > > > > > > > > > is inconclusive, e.g.
> > > > > > > > > >
> > > > > > > > > > 1. Begin DB transaction
> > > > > > > > > > 2. Begin Kafka transaction
> > > > > > > > > > 3. Prepare Kafka transaction
> > > > > > > > > > 4. Commit DB transaction
> > > > > > > > > > 5. The DB commit fails, figure out the state of the
> > > transaction
> > > > > by
> > > > > > > > > reading
> > > > > > > > > > the PreparedTxnState from DB
> > > > > > > > > > 6. Complete Kafka transaction with the PreparedTxnState.
> > > > > > > > > >
> > > > > > > > > > > 18. "either prepareTransaction was called or
> > > > > > initTransaction(true)
> > > > > > > > was
> > > > > > > > > > called": "either" should be "neither"?
> > > > > > > > > >
> > > > > > > > > > Updated.
> > > > > > > > > >
> > > > > > > > > > > 19. Since InitProducerId always bumps up the epoch, it
> > > > creates
> > > > > a
> > > > > > > > > > situation ...
> > > > > > > > > >
> > > > > > > > > > InitProducerId only bumps the producer epoch, the ongoing
> > > > > > transaction
> > > > > > > > > epoch
> > > > > > > > > > stays the same, no matter how many times the
> InitProducerId
> > > is
> > > > > > called
> > > > > > > > > > before the transaction is completed.  Eventually the
> epoch
> > > may
> > > > > > > > overflow,
> > > > > > > > > > and then a new producer id would be allocated, but the
> > > ongoing
> > > > > > > > > transaction
> > > > > > > > > > producer id would stay the same.
> > > > > > > > > >
> > > > > > > > > > I've added a couple examples in the KIP (
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-PersistedDataFormatChanges
> > > > > > > > > > )
> > > > > > > > > > that walk through some scenarios and show how the state
> is
> > > > > changed.
> > > > > > > > > >
> > > > > > > > > > -Artem
> > > > > > > > > >
> > > > > > > > > > On Fri, Dec 8, 2023 at 6:04 PM Jun Rao
> > > > <j...@confluent.io.invalid
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi, Artem,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the KIP. A few comments below.
> > > > > > > > > > >
> > > > > > > > > > > 10. For the two new fields in Enable2Pc and
> > KeepPreparedTxn
> > > > in
> > > > > > > > > > > InitProducerId, it would be useful to document a bit
> more
> > > > > detail
> > > > > > on
> > > > > > > > > what
> > > > > > > > > > > values are set under what cases. For example, are all
> > four
> > > > > > > > combinations
> > > > > > > > > > > valid?
> > > > > > > > > > >
> > > > > > > > > > > 11.  InitProducerIdResponse: If there is no ongoing
> txn,
> > > what
> > > > > > will
> > > > > > > > > > > OngoingTxnProducerId and OngoingTxnEpoch be set?
> > > > > > > > > > >
> > > > > > > > > > > 12. ListTransactionsRequest related changes: It seems
> > those
> > > > are
> > > > > > > > already
> > > > > > > > > > > covered by KIP-994?
> > > > > > > > > > >
> > > > > > > > > > > 13. TransactionalLogValue: Could we name
> > > > TransactionProducerId
> > > > > > and
> > > > > > > > > > > ProducerId better? It's not clear from the name which
> is
> > > for
> > > > > > which.
> > > > > > > > > > >
> > > > > > > > > > > 14. "Note that the (producerId, epoch) pair that
> > > corresponds
> > > > to
> > > > > > the
> > > > > > > > > > ongoing
> > > > > > > > > > > transaction is going to be written instead of the
> > existing
> > > > > > > ProducerId
> > > > > > > > > and
> > > > > > > > > > > ProducerEpoch fields (which are renamed to reflect the
> > > > > semantics)
> > > > > > > to
> > > > > > > > > > > support downgrade.": I am a bit confused on that. Are
> we
> > > > > writing
> > > > > > > > > > different
> > > > > > > > > > > values to the existing fields? Then, we can't
> downgrade,
> > > > right?
> > > > > > > > > > >
> > > > > > > > > > > 15. active-transaction-total-time-max : Would
> > > > > > > > > > > active-transaction-open-time-max be more intuitive?
> Also,
> > > > could
> > > > > > we
> > > > > > > > > > include
> > > > > > > > > > > the full name (group, tags, etc)?
> > > > > > > > > > >
> > > > > > > > > > > 16. "transaction.two.phase.commit.enable The default
> > would
> > > be
> > > > > > > > ‘false’.
> > > > > > > > > > If
> > > > > > > > > > > it’s ‘false’, 2PC functionality is disabled even if the
> > ACL
> > > > is
> > > > > > set,
> > > > > > > > > > clients
> > > > > > > > > > > that attempt to use this functionality would receive
> > > > > > > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED error."
> > > > > > > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED seems unintuitive
> > for
> > > > the
> > > > > > > > client
> > > > > > > > > to
> > > > > > > > > > > understand what the actual cause is.
> > > > > > > > > > >
> > > > > > > > > > > 17. completeTransaction(). We expect this to be only
> used
> > > > > during
> > > > > > > > > > recovery.
> > > > > > > > > > > Could we document this clearly? Could we prevent it
> from
> > > > being
> > > > > > used
> > > > > > > > > > > incorrectly (e.g. throw an exception if the producer
> has
> > > > called
> > > > > > > other
> > > > > > > > > > > methods like send())?
> > > > > > > > > > >
> > > > > > > > > > > 18. "either prepareTransaction was called or
> > > > > > initTransaction(true)
> > > > > > > > was
> > > > > > > > > > > called": "either" should be "neither"?
> > > > > > > > > > >
> > > > > > > > > > > 19. Since InitProducerId always bumps up the epoch, it
> > > > creates
> > > > > a
> > > > > > > > > > situation
> > > > > > > > > > > where there could be multiple outstanding txns. The
> > > following
> > > > > is
> > > > > > an
> > > > > > > > > > example
> > > > > > > > > > > of a potential problem during recovery.
> > > > > > > > > > >    The last txn epoch in the external store is 41 when
> > the
> > > > app
> > > > > > > dies.
> > > > > > > > > > >    Instance1 is created for recovery.
> > > > > > > > > > >      1. (instance1)
> InitProducerId(keepPreparedTxn=true),
> > > > > > epoch=42,
> > > > > > > > > > > ongoingEpoch=41
> > > > > > > > > > >      2. (instance1) dies before completeTxn(41) can be
> > > > called.
> > > > > > > > > > >    Instance2 is created for recovery.
> > > > > > > > > > >      3. (instance2)
> InitProducerId(keepPreparedTxn=true),
> > > > > > epoch=43,
> > > > > > > > > > > ongoingEpoch=42
> > > > > > > > > > >      4. (instance2) completeTxn(41) => abort
> > > > > > > > > > >    The first problem is that 41 now is aborted when it
> > > should
> > > > > be
> > > > > > > > > > committed.
> > > > > > > > > > > The second one is that it's not clear who could abort
> > epoch
> > > > 42,
> > > > > > > which
> > > > > > > > > is
> > > > > > > > > > > still open.
> > > > > > > > > > >
> > > > > > > > > > > Jun
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Dec 7, 2023 at 2:43 PM Justine Olshan
> > > > > > > > > > <jols...@confluent.io.invalid
> > > > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hey Artem,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for the updates. I think what you say makes
> > > sense. I
> > > > > > just
> > > > > > > > > > updated
> > > > > > > > > > > my
> > > > > > > > > > > > KIP so I want to reconcile some of the changes we
> made
> > > > > > especially
> > > > > > > > > with
> > > > > > > > > > > > respect to the TransactionLogValue.
> > > > > > > > > > > >
> > > > > > > > > > > > Firstly, I believe tagged fields require a default
> > value
> > > so
> > > > > > that
> > > > > > > if
> > > > > > > > > > they
> > > > > > > > > > > > are not filled, we return the default (and know that
> > they
> > > > > were
> > > > > > > > > empty).
> > > > > > > > > > > For
> > > > > > > > > > > > my KIP, I proposed the default for producer ID tagged
> > > > fields
> > > > > > > should
> > > > > > > > > be
> > > > > > > > > > > -1.
> > > > > > > > > > > > I was wondering if we could update the KIP to include
> > the
> > > > > > default
> > > > > > > > > > values
> > > > > > > > > > > > for producer ID and epoch.
> > > > > > > > > > > >
> > > > > > > > > > > > Next, I noticed we decided to rename the fields. I
> > guess
> > > > that
> > > > > > the
> > > > > > > > > field
> > > > > > > > > > > > "NextProducerId" in my KIP correlates to "ProducerId"
> > in
> > > > this
> > > > > > > KIP.
> > > > > > > > Is
> > > > > > > > > > > that
> > > > > > > > > > > > correct? So we would have "TransactionProducerId" for
> > the
> > > > > > > > non-tagged
> > > > > > > > > > > field
> > > > > > > > > > > > and have "ProducerId" (NextProducerId) and
> > > "PrevProducerId"
> > > > > as
> > > > > > > > tagged
> > > > > > > > > > > > fields the final version after KIP-890 and KIP-936
> are
> > > > > > > implemented.
> > > > > > > > > Is
> > > > > > > > > > > this
> > > > > > > > > > > > correct? I think the tags will need updating, but
> that
> > is
> > > > > > > trivial.
> > > > > > > > > > > >
> > > > > > > > > > > > The final question I had was with respect to storing
> > the
> > > > new
> > > > > > > epoch.
> > > > > > > > > In
> > > > > > > > > > > > KIP-890 part 2 (epoch bumps) I think we concluded
> that
> > we
> > > > > don't
> > > > > > > > need
> > > > > > > > > to
> > > > > > > > > > > > store the epoch since we can interpret the previous
> > epoch
> > > > > based
> > > > > > > on
> > > > > > > > > the
> > > > > > > > > > > > producer ID. But here we could call the
> InitProducerId
> > > > > multiple
> > > > > > > > times
> > > > > > > > > > and
> > > > > > > > > > > > we only want the producer with the correct epoch to
> be
> > > able
> > > > > to
> > > > > > > > commit
> > > > > > > > > > the
> > > > > > > > > > > > transaction. Is that the correct reasoning for why we
> > > need
> > > > > > epoch
> > > > > > > > here
> > > > > > > > > > but
> > > > > > > > > > > > not the Prepare/Commit state.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Justine
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Nov 22, 2023 at 9:48 AM Artem Livshits
> > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Justine,
> > > > > > > > > > > > >
> > > > > > > > > > > > > After thinking a bit about supporting atomic dual
> > > writes
> > > > > for
> > > > > > > > Kafka
> > > > > > > > > +
> > > > > > > > > > > > NoSQL
> > > > > > > > > > > > > database, I came to a conclusion that we do need to
> > > bump
> > > > > the
> > > > > > > > epoch
> > > > > > > > > > even
> > > > > > > > > > > > > with InitProducerId(keepPreparedTxn=true).  As I
> > > > described
> > > > > in
> > > > > > > my
> > > > > > > > > > > previous
> > > > > > > > > > > > > email, we wouldn't need to bump the epoch to
> protect
> > > from
> > > > > > > zombies
> > > > > > > > > so
> > > > > > > > > > > that
> > > > > > > > > > > > > reasoning is still true.  But we cannot protect
> from
> > > > > > > split-brain
> > > > > > > > > > > > scenarios
> > > > > > > > > > > > > when two or more instances of a producer with the
> > same
> > > > > > > > > transactional
> > > > > > > > > > id
> > > > > > > > > > > > try
> > > > > > > > > > > > > to produce at the same time.  The dual-write
> example
> > > for
> > > > > SQL
> > > > > > > > > > databases
> > > > > > > > > > > (
> > > > > > > > > > > > > https://github.com/apache/kafka/pull/14231/files)
> > > > doesn't
> > > > > > > have a
> > > > > > > > > > > > > split-brain problem because execution is protected
> by
> > > the
> > > > > > > update
> > > > > > > > > lock
> > > > > > > > > > > on
> > > > > > > > > > > > > the transaction state record; however NoSQL
> databases
> > > may
> > > > > not
> > > > > > > > have
> > > > > > > > > > this
> > > > > > > > > > > > > protection (I'll write an example for NoSQL
> database
> > > > > > dual-write
> > > > > > > > > > soon).
> > > > > > > > > > > > >
> > > > > > > > > > > > > In a nutshell, here is an example of a split-brain
> > > > > scenario:
> > > > > > > > > > > > >
> > > > > > > > > > > > >    1. (instance1)
> > InitProducerId(keepPreparedTxn=true),
> > > > got
> > > > > > > > > epoch=42
> > > > > > > > > > > > >    2. (instance2)
> > InitProducerId(keepPreparedTxn=true),
> > > > got
> > > > > > > > > epoch=42
> > > > > > > > > > > > >    3. (instance1) CommitTxn, epoch bumped to 43
> > > > > > > > > > > > >    4. (instance2) CommitTxn, this is considered a
> > > retry,
> > > > so
> > > > > > it
> > > > > > > > got
> > > > > > > > > > > epoch
> > > > > > > > > > > > 43
> > > > > > > > > > > > >    as well
> > > > > > > > > > > > >    5. (instance1) Produce messageA w/sequence 1
> > > > > > > > > > > > >    6. (instance2) Produce messageB w/sequence 1,
> this
> > > is
> > > > > > > > > considered a
> > > > > > > > > > > > >    duplicate
> > > > > > > > > > > > >    7. (instance2) Produce messageC w/sequence 2
> > > > > > > > > > > > >    8. (instance1) Produce messageD w/sequence 2,
> this
> > > is
> > > > > > > > > considered a
> > > > > > > > > > > > >    duplicate
> > > > > > > > > > > > >
> > > > > > > > > > > > > Now if either of those commit the transaction, it
> > would
> > > > > have
> > > > > > a
> > > > > > > > mix
> > > > > > > > > of
> > > > > > > > > > > > > messages from the two instances (messageA and
> > > messageC).
> > > > > > With
> > > > > > > > the
> > > > > > > > > > > proper
> > > > > > > > > > > > > epoch bump, instance1 would get fenced at step 3.
> > > > > > > > > > > > >
> > > > > > > > > > > > > In order to update epoch in
> > > > > > > InitProducerId(keepPreparedTxn=true)
> > > > > > > > we
> > > > > > > > > > > need
> > > > > > > > > > > > to
> > > > > > > > > > > > > preserve the ongoing transaction's epoch (and
> > > producerId,
> > > > > if
> > > > > > > the
> > > > > > > > > > epoch
> > > > > > > > > > > > > overflows), because we'd need to make a correct
> > > decision
> > > > > when
> > > > > > > we
> > > > > > > > > > > compare
> > > > > > > > > > > > > the PreparedTxnState that we read from the database
> > > with
> > > > > the
> > > > > > > > > > > (producerId,
> > > > > > > > > > > > > epoch) of the ongoing transaction.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I've updated the KIP with the following:
> > > > > > > > > > > > >
> > > > > > > > > > > > >    - Ongoing transaction now has 2 (producerId,
> > epoch)
> > > > > pairs
> > > > > > --
> > > > > > > > one
> > > > > > > > > > > pair
> > > > > > > > > > > > >    describes the ongoing transaction, the other
> pair
> > > > > > describes
> > > > > > > > > > expected
> > > > > > > > > > > > > epoch
> > > > > > > > > > > > >    for operations on this transactional id
> > > > > > > > > > > > >    - InitProducerIdResponse now returns 2
> > (producerId,
> > > > > epoch)
> > > > > > > > pairs
> > > > > > > > > > > > >    - TransactionalLogValue now has 2 (producerId,
> > > epoch)
> > > > > > pairs,
> > > > > > > > the
> > > > > > > > > > new
> > > > > > > > > > > > >    values added as tagged fields, so it's easy to
> > > > downgrade
> > > > > > > > > > > > >    - Added a note about downgrade in the
> > Compatibility
> > > > > > section
> > > > > > > > > > > > >    - Added a rejected alternative
> > > > > > > > > > > > >
> > > > > > > > > > > > > -Artem
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Oct 6, 2023 at 5:16 PM Artem Livshits <
> > > > > > > > > > alivsh...@confluent.io>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Justine,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thank you for the questions.  Currently
> > (pre-KIP-939)
> > > > we
> > > > > > > always
> > > > > > > > > > bump
> > > > > > > > > > > > the
> > > > > > > > > > > > > > epoch on InitProducerId and abort an ongoing
> > > > transaction
> > > > > > (if
> > > > > > > > > > any).  I
> > > > > > > > > > > > > > expect this behavior will continue with KIP-890
> as
> > > > well.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > With KIP-939 we need to support the case when the
> > > > ongoing
> > > > > > > > > > transaction
> > > > > > > > > > > > > > needs to be preserved when keepPreparedTxn=true.
> > > > Bumping
> > > > > > > epoch
> > > > > > > > > > > without
> > > > > > > > > > > > > > aborting or committing a transaction is tricky
> > > because
> > > > > > epoch
> > > > > > > > is a
> > > > > > > > > > > short
> > > > > > > > > > > > > > value and it's easy to overflow.  Currently, the
> > > > overflow
> > > > > > > case
> > > > > > > > is
> > > > > > > > > > > > handled
> > > > > > > > > > > > > > by aborting the ongoing transaction, which would
> > send
> > > > out
> > > > > > > > > > transaction
> > > > > > > > > > > > > > markers with epoch=Short.MAX_VALUE to the
> partition
> > > > > > leaders,
> > > > > > > > > which
> > > > > > > > > > > > would
> > > > > > > > > > > > > > fence off any messages with the producer id that
> > > > started
> > > > > > the
> > > > > > > > > > > > transaction
> > > > > > > > > > > > > > (they would have epoch that is less than
> > > > > Short.MAX_VALUE).
> > > > > > > > Then
> > > > > > > > > it
> > > > > > > > > > > is
> > > > > > > > > > > > > safe
> > > > > > > > > > > > > > to allocate a new producer id and use it in new
> > > > > > transactions.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > We could say that maybe when keepPreparedTxn=true
> > we
> > > > bump
> > > > > > > epoch
> > > > > > > > > > > unless
> > > > > > > > > > > > it
> > > > > > > > > > > > > > leads to overflow, and don't bump epoch in the
> > > overflow
> > > > > > case.
> > > > > > > > I
> > > > > > > > > > > don't
> > > > > > > > > > > > > > think it's a good solution because if it's not
> safe
> > > to
> > > > > keep
> > > > > > > the
> > > > > > > > > > same
> > > > > > > > > > > > > epoch
> > > > > > > > > > > > > > when keepPreparedTxn=true, then we must handle
> the
> > > > epoch
> > > > > > > > overflow
> > > > > > > > > > > case
> > > > > > > > > > > > as
> > > > > > > > > > > > > > well.  So either we should convince ourselves
> that
> > > it's
> > > > > > safe
> > > > > > > to
> > > > > > > > > > keep
> > > > > > > > > > > > the
> > > > > > > > > > > > > > epoch and do it in the general case, or we always
> > > bump
> > > > > the
> > > > > > > > epoch
> > > > > > > > > > and
> > > > > > > > > > > > > handle
> > > > > > > > > > > > > > the overflow.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > With KIP-890, we bump the epoch on every
> > transaction
> > > > > > commit /
> > > > > > > > > > abort.
> > > > > > > > > > > > > This
> > > > > > > > > > > > > > guarantees that even if
> > > > > > InitProducerId(keepPreparedTxn=true)
> > > > > > > > > > doesn't
> > > > > > > > > > > > > > increment epoch on the ongoing transaction, the
> > > client
> > > > > will
> > > > > > > > have
> > > > > > > > > to
> > > > > > > > > > > > call
> > > > > > > > > > > > > > commit or abort to finish the transaction and
> will
> > > > > > increment
> > > > > > > > the
> > > > > > > > > > > epoch
> > > > > > > > > > > > > (and
> > > > > > > > > > > > > > handle epoch overflow, if needed).  If the
> ongoing
> > > > > > > transaction
> > > > > > > > > was
> > > > > > > > > > > in a
> > > > > > > > > > > > > bad
> > > > > > > > > > > > > > state and had some zombies waiting to arrive, the
> > > abort
> > > > > > > > operation
> > > > > > > > > > > would
> > > > > > > > > > > > > > fence them because with KIP-890 every abort would
> > > bump
> > > > > the
> > > > > > > > epoch.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > We could also look at this from the following
> > > > > perspective.
> > > > > > > > With
> > > > > > > > > > > > KIP-890,
> > > > > > > > > > > > > > zombies won't be able to cross transaction
> > > boundaries;
> > > > > each
> > > > > > > > > > > transaction
> > > > > > > > > > > > > > completion creates a boundary and any activity in
> > the
> > > > > past
> > > > > > > gets
> > > > > > > > > > > > confined
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > the boundary.  Then data in any partition would
> > look
> > > > like
> > > > > > > this:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 1. message1, epoch=42
> > > > > > > > > > > > > > 2. message2, epoch=42
> > > > > > > > > > > > > > 3. message3, epoch=42
> > > > > > > > > > > > > > 4. marker (commit or abort), epoch=43
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Now if we inject steps 3a and 3b like this:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 1. message1, epoch=42
> > > > > > > > > > > > > > 2. message2, epoch=42
> > > > > > > > > > > > > > 3. message3, epoch=42
> > > > > > > > > > > > > > 3a. crash
> > > > > > > > > > > > > > 3b. InitProducerId(keepPreparedTxn=true)
> > > > > > > > > > > > > > 4. marker (commit or abort), epoch=43
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > The invariant still holds even with steps 3a and
> 3b
> > > --
> > > > > > > whatever
> > > > > > > > > > > > activity
> > > > > > > > > > > > > > was in the past will get confined in the past
> with
> > > > > > mandatory
> > > > > > > > > abort
> > > > > > > > > > /
> > > > > > > > > > > > > commit
> > > > > > > > > > > > > > that must follow
> > > InitProducerId(keepPreparedTxn=true).
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > So KIP-890 provides the proper isolation between
> > > > > > > transactions,
> > > > > > > > so
> > > > > > > > > > > > > > injecting crash +
> > > InitProducerId(keepPreparedTxn=true)
> > > > > into
> > > > > > > the
> > > > > > > > > > > > > > transaction sequence is safe from the zombie
> > > protection
> > > > > > > > > > perspective.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > That said, I'm still thinking about it and
> looking
> > > for
> > > > > > cases
> > > > > > > > that
> > > > > > > > > > > might
> > > > > > > > > > > > > > break because we don't bump epoch when
> > > > > > > > > > > > > > InitProducerId(keepPreparedTxn=true), if such
> cases
> > > > > exist,
> > > > > > > > we'll
> > > > > > > > > > need
> > > > > > > > > > > > to
> > > > > > > > > > > > > > develop the logic to handle epoch overflow for
> > > ongoing
> > > > > > > > > > transactions.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > -Artem
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Tue, Oct 3, 2023 at 10:15 AM Justine Olshan
> > > > > > > > > > > > > > <jols...@confluent.io.invalid> wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >> Hey Artem,
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Thanks for the KIP. I had a question about epoch
> > > > > bumping.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Previously when we send an InitProducerId
> request
> > on
> > > > > > > Producer
> > > > > > > > > > > startup,
> > > > > > > > > > > > > we
> > > > > > > > > > > > > >> bump the epoch and abort the transaction. Is it
> > > > correct
> > > > > to
> > > > > > > > > assume
> > > > > > > > > > > that
> > > > > > > > > > > > > we
> > > > > > > > > > > > > >> will still bump the epoch, but just not abort
> the
> > > > > > > transaction?
> > > > > > > > > > > > > >> If we still bump the epoch in this case, how
> does
> > > this
> > > > > > > > interact
> > > > > > > > > > with
> > > > > > > > > > > > > >> KIP-890 where we also bump the epoch on every
> > > > > transaction.
> > > > > > > (I
> > > > > > > > > > think
> > > > > > > > > > > > this
> > > > > > > > > > > > > >> means that we may skip epochs and the data
> itself
> > > will
> > > > > all
> > > > > > > > have
> > > > > > > > > > the
> > > > > > > > > > > > same
> > > > > > > > > > > > > >> epoch)
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> I may have follow ups depending on the answer to
> > > this.
> > > > > :)
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Thanks,
> > > > > > > > > > > > > >> Justine
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> On Thu, Sep 7, 2023 at 9:51 PM Artem Livshits
> > > > > > > > > > > > > >> <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> > Hi Alex,
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > Thank you for your questions.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > > the purpose of having broker-level
> > > > > > > > > > > > > transaction.two.phase.commit.enable
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > The thinking is that 2PC is a bit of an
> advanced
> > > > > > construct
> > > > > > > > so
> > > > > > > > > > > > enabling
> > > > > > > > > > > > > >> 2PC
> > > > > > > > > > > > > >> > in a Kafka cluster should be an explicit
> > decision.
> > > > If
> > > > > > it
> > > > > > > is
> > > > > > > > > set
> > > > > > > > > > > to
> > > > > > > > > > > > > >> 'false'
> > > > > > > > > > > > > >> > InitiProducerId (and initTransactions) would
> > > > > > > > > > > > > >> > return TRANSACTIONAL_ID_AUTHORIZATION_FAILED.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > > WDYT about adding an AdminClient method that
> > > > returns
> > > > > > the
> > > > > > > > > state
> > > > > > > > > > > of
> > > > > > > > > > > > > >> > transaction.two.phase.commit.enable
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > I wonder if the client could just try to use
> 2PC
> > > and
> > > > > > then
> > > > > > > > > handle
> > > > > > > > > > > the
> > > > > > > > > > > > > >> error
> > > > > > > > > > > > > >> > (e.g. if it needs to fall back to ordinary
> > > > > > transactions).
> > > > > > > > > This
> > > > > > > > > > > way
> > > > > > > > > > > > it
> > > > > > > > > > > > > >> > could uniformly handle cases when Kafka
> cluster
> > > > > doesn't
> > > > > > > > > support
> > > > > > > > > > > 2PC
> > > > > > > > > > > > > >> > completely and cases when 2PC is restricted to
> > > > certain
> > > > > > > > users.
> > > > > > > > > > We
> > > > > > > > > > > > > could
> > > > > > > > > > > > > >> > also expose this config in describeConfigs, if
> > the
> > > > > > > fallback
> > > > > > > > > > > approach
> > > > > > > > > > > > > >> > doesn't work for some scenarios.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > -Artem
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > On Tue, Sep 5, 2023 at 12:45 PM Alexander
> > > Sorokoumov
> > > > > > > > > > > > > >> > <asorokou...@confluent.io.invalid> wrote:
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > > Hi Artem,
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > Thanks for publishing this KIP!
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > Can you please clarify the purpose of having
> > > > > > > broker-level
> > > > > > > > > > > > > >> > > transaction.two.phase.commit.enable config
> in
> > > > > addition
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > new
> > > > > > > > > > > > > >> ACL? If
> > > > > > > > > > > > > >> > > the brokers are configured with
> > > > > > > > > > > > > >> > transaction.two.phase.commit.enable=false,
> > > > > > > > > > > > > >> > > at what point will a client configured with
> > > > > > > > > > > > > >> > > transaction.two.phase.commit.enable=true
> fail?
> > > > Will
> > > > > it
> > > > > > > > > happen
> > > > > > > > > > at
> > > > > > > > > > > > > >> > > KafkaProducer#initTransactions?
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > WDYT about adding an AdminClient method that
> > > > returns
> > > > > > the
> > > > > > > > > state
> > > > > > > > > > > of
> > > > > > > > > > > > t
> > > > > > > > > > > > > >> > > ransaction.two.phase.commit.enable? This
> way,
> > > > > clients
> > > > > > > > would
> > > > > > > > > > know
> > > > > > > > > > > > in
> > > > > > > > > > > > > >> > advance
> > > > > > > > > > > > > >> > > if 2PC is enabled on the brokers.
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > Best,
> > > > > > > > > > > > > >> > > Alex
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > On Fri, Aug 25, 2023 at 9:40 AM Roger
> Hoover <
> > > > > > > > > > > > > roger.hoo...@gmail.com>
> > > > > > > > > > > > > >> > > wrote:
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > > Other than supporting multiplexing
> > > transactional
> > > > > > > streams
> > > > > > > > > on
> > > > > > > > > > a
> > > > > > > > > > > > > single
> > > > > > > > > > > > > >> > > > producer, I don't see how to improve it.
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > On Thu, Aug 24, 2023 at 12:12 PM Artem
> > > Livshits
> > > > > > > > > > > > > >> > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > > Hi Roger,
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > Thank you for summarizing the cons.  I
> > agree
> > > > and
> > > > > > I'm
> > > > > > > > > > curious
> > > > > > > > > > > > > what
> > > > > > > > > > > > > >> > would
> > > > > > > > > > > > > >> > > > be
> > > > > > > > > > > > > >> > > > > the alternatives to solve these problems
> > > > better
> > > > > > and
> > > > > > > if
> > > > > > > > > > they
> > > > > > > > > > > > can
> > > > > > > > > > > > > be
> > > > > > > > > > > > > >> > > > > incorporated into this proposal (or
> built
> > > > > > > > independently
> > > > > > > > > in
> > > > > > > > > > > > > >> addition
> > > > > > > > > > > > > >> > to
> > > > > > > > > > > > > >> > > or
> > > > > > > > > > > > > >> > > > > on top of this proposal).  E.g. one
> > > potential
> > > > > > > > extension
> > > > > > > > > we
> > > > > > > > > > > > > >> discussed
> > > > > > > > > > > > > >> > > > > earlier in the thread could be
> > multiplexing
> > > > > > logical
> > > > > > > > > > > > > transactional
> > > > > > > > > > > > > >> > > > "streams"
> > > > > > > > > > > > > >> > > > > with a single producer.
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > -Artem
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > On Wed, Aug 23, 2023 at 4:50 PM Roger
> > > Hoover <
> > > > > > > > > > > > > >> roger.hoo...@gmail.com
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > > > wrote:
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > > Thanks.  I like that you're moving
> Kafka
> > > > > toward
> > > > > > > > > > supporting
> > > > > > > > > > > > > this
> > > > > > > > > > > > > >> > > > > dual-write
> > > > > > > > > > > > > >> > > > > > pattern.  Each use case needs to
> > consider
> > > > the
> > > > > > > > > tradeoffs.
> > > > > > > > > > > > You
> > > > > > > > > > > > > >> > already
> > > > > > > > > > > > > >> > > > > > summarized the pros very well in the
> > > KIP.  I
> > > > > > would
> > > > > > > > > > > summarize
> > > > > > > > > > > > > the
> > > > > > > > > > > > > >> > cons
> > > > > > > > > > > > > >> > > > > > as follows:
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > - you sacrifice availability - each
> > write
> > > > > > requires
> > > > > > > > > both
> > > > > > > > > > DB
> > > > > > > > > > > > and
> > > > > > > > > > > > > >> > Kafka
> > > > > > > > > > > > > >> > > to
> > > > > > > > > > > > > >> > > > > be
> > > > > > > > > > > > > >> > > > > > available so I think your overall
> > > > application
> > > > > > > > > > availability
> > > > > > > > > > > > is
> > > > > > > > > > > > > 1
> > > > > > > > > > > > > >> -
> > > > > > > > > > > > > >> > > p(DB
> > > > > > > > > > > > > >> > > > is
> > > > > > > > > > > > > >> > > > > > unavailable)*p(Kafka is unavailable).
> > > > > > > > > > > > > >> > > > > > - latency will be higher and
> throughput
> > > > lower
> > > > > -
> > > > > > > each
> > > > > > > > > > write
> > > > > > > > > > > > > >> requires
> > > > > > > > > > > > > >> > > > both
> > > > > > > > > > > > > >> > > > > > writes to DB and Kafka while holding
> an
> > > > > > exclusive
> > > > > > > > lock
> > > > > > > > > > in
> > > > > > > > > > > > DB.
> > > > > > > > > > > > > >> > > > > > - you need to create a producer per
> unit
> > > of
> > > > > > > > > concurrency
> > > > > > > > > > in
> > > > > > > > > > > > > your
> > > > > > > > > > > > > >> app
> > > > > > > > > > > > > >> > > > which
> > > > > > > > > > > > > >> > > > > > has some overhead in the app and Kafka
> > > side
> > > > > > > (number
> > > > > > > > of
> > > > > > > > > > > > > >> connections,
> > > > > > > > > > > > > >> > > > poor
> > > > > > > > > > > > > >> > > > > > batching).  I assume the producers
> would
> > > > need
> > > > > to
> > > > > > > be
> > > > > > > > > > > > configured
> > > > > > > > > > > > > >> for
> > > > > > > > > > > > > >> > > low
> > > > > > > > > > > > > >> > > > > > latency (linger.ms=0)
> > > > > > > > > > > > > >> > > > > > - there's some complexity in managing
> > > stable
> > > > > > > > > > transactional
> > > > > > > > > > > > ids
> > > > > > > > > > > > > >> for
> > > > > > > > > > > > > >> > > each
> > > > > > > > > > > > > >> > > > > > producer/concurrency unit in your
> > > > application.
> > > > > > > With
> > > > > > > > > k8s
> > > > > > > > > > > > > >> > deployment,
> > > > > > > > > > > > > >> > > > you
> > > > > > > > > > > > > >> > > > > > may need to switch to something like a
> > > > > > StatefulSet
> > > > > > > > > that
> > > > > > > > > > > > gives
> > > > > > > > > > > > > >> each
> > > > > > > > > > > > > >> > > pod
> > > > > > > > > > > > > >> > > > a
> > > > > > > > > > > > > >> > > > > > stable identity across restarts.  On
> top
> > > of
> > > > > that
> > > > > > > pod
> > > > > > > > > > > > identity
> > > > > > > > > > > > > >> which
> > > > > > > > > > > > > >> > > you
> > > > > > > > > > > > > >> > > > > can
> > > > > > > > > > > > > >> > > > > > use as a prefix, you then assign
> unique
> > > > > > > > transactional
> > > > > > > > > > ids
> > > > > > > > > > > to
> > > > > > > > > > > > > >> each
> > > > > > > > > > > > > >> > > > > > concurrency unit (thread/goroutine).
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > On Wed, Aug 23, 2023 at 12:53 PM Artem
> > > > > Livshits
> > > > > > > > > > > > > >> > > > > > <alivsh...@confluent.io.invalid>
> wrote:
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > > Hi Roger,
> > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > > > Thank you for the feedback.  You
> make
> > a
> > > > very
> > > > > > > good
> > > > > > > > > > point
> > > > > > > > > > > > that
> > > > > > > > > > > > > >> we
> > > > > > > > > > > > > >> > > also
> > > > > > > > > > > > > >> > > > > > > discussed internally.  Adding
> support
> > > for
> > > > > > > multiple
> > > > > > > > > > > > > concurrent
> > > > > > > > > > > > > >> > > > > > > transactions in one producer could
> be
> > > > > valuable
> > > > > > > but
> > > > > > > > > it
> > > > > > > > > > > > seems
> > > > > > > > > > > > > to
> > > > > > > > > > > > > >> > be a
> > > > > > > > > > > > > >> > > > > > fairly
> > > > > > > > > > > > > >> > > > > > > large and independent change that
> > would
> > > > > > deserve
> > > > > > > a
> > > > > > > > > > > separate
> > > > > > > > > > > > > >> KIP.
> > > > > > > > > > > > > >> > If
> > > > > > > > > > > > > >> > > > > such
> > > > > > > > > > > > > >> > > > > > > support is added we could modify 2PC
> > > > > > > functionality
> > > > > > > > > to
> > > > > > > > > > > > > >> incorporate
> > > > > > > > > > > > > >> > > > that.
> > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > > > > Maybe not too bad but a bit of
> pain
> > to
> > > > > > manage
> > > > > > > > > these
> > > > > > > > > > > ids
> > > > > > > > > > > > > >> inside
> > > > > > > > > > > > > >> > > each
> > > > > > > > > > > > > >> > > > > > > process and across all application
> > > > > processes.
> > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > > > I'm not sure if supporting multiple
> > > > > > transactions
> > > > > > > > in
> > > > > > > > > > one
> > > > > > > > > > > > > >> producer
> > > > > > > > > > > > > >> > > > would
> > > > > > > > > > > > > >> > > > > > make
> > > > > > > > > > > > > >> > > > > > > id management simpler: we'd need to
> > > store
> > > > a
> > > > > > > piece
> > > > > > > > of
> > > > > > > > > > > data
> > > > > > > > > > > > > per
> > > > > > > > > > > > > >> > > > > > transaction,
> > > > > > > > > > > > > >> > > > > > > so whether it's N producers with a
> > > single
> > > > > > > > > transaction
> > > > > > > > > > > or N
> > > > > > > > > > > > > >> > > > transactions
> > > > > > > > > > > > > >> > > > > > > with a single producer, it's still
> > > roughly
> > > > > the
> > > > > > > > same
> > > > > > > > > > > amount
> > > > > > > > > > > > > of
> > > > > > > > > > > > > >> > data
> > > > > > > > > > > > > >> > > to
> > > > > > > > > > > > > >> > > > > > > manage.  In fact, managing
> > transactional
> > > > ids
> > > > > > > > > (current
> > > > > > > > > > > > > >> proposal)
> > > > > > > > > > > > > >> > > might
> > > > > > > > > > > > > >> > > > > be
> > > > > > > > > > > > > >> > > > > > > easier, because the id is controlled
> > by
> > > > the
> > > > > > > > > > application
> > > > > > > > > > > > and
> > > > > > > > > > > > > it
> > > > > > > > > > > > > >> > > knows
> > > > > > > > > > > > > >> > > > > how
> > > > > > > > > > > > > >> > > > > > to
> > > > > > > > > > > > > >> > > > > > > complete the transaction after
> crash /
> > > > > > restart;
> > > > > > > > > while
> > > > > > > > > > a
> > > > > > > > > > > > TID
> > > > > > > > > > > > > >> would
> > > > > > > > > > > > > >> > > be
> > > > > > > > > > > > > >> > > > > > > generated by Kafka and that would
> > > create a
> > > > > > > > question
> > > > > > > > > of
> > > > > > > > > > > > > >> starting
> > > > > > > > > > > > > >> > > Kafka
> > > > > > > > > > > > > >> > > > > > > transaction, but not saving its TID
> > and
> > > > then
> > > > > > > > > crashing,
> > > > > > > > > > > > then
> > > > > > > > > > > > > >> > > figuring
> > > > > > > > > > > > > >> > > > > out
> > > > > > > > > > > > > >> > > > > > > which transactions to abort and etc.
> > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > > > > 2) creating a separate producer
> for
> > > each
> > > > > > > > > concurrency
> > > > > > > > > > > > slot
> > > > > > > > > > > > > in
> > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > >> > > > > > > application
> > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > > > This is a very valid concern.  Maybe
> > > we'd
> > > > > need
> > > > > > > to
> > > > > > > > > have
> > > > > > > > > > > > some
> > > > > > > > > > > > > >> > > > > multiplexing
> > > > > > > > > > > > > >> > > > > > of
> > > > > > > > > > > > > >> > > > > > > transactional logical "streams" over
> > the
> > > > > same
> > > > > > > > > > > connection.
> > > > > > > > > > > > > >> Seems
> > > > > > > > > > > > > >> > > > like a
> > > > > > > > > > > > > >> > > > > > > separate KIP, though.
> > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > > > > Otherwise, it seems you're left
> with
> > > > > > > > > single-threaded
> > > > > > > > > > > > model
> > > > > > > > > > > > > >> per
> > > > > > > > > > > > > >> > > > > > > application process?
> > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > > > That's a fair assessment.  Not
> > > necessarily
> > > > > > > exactly
> > > > > > > > > > > > > >> > single-threaded
> > > > > > > > > > > > > >> > > > per
> > > > > > > > > > > > > >> > > > > > > application, but a single producer
> per
> > > > > thread
> > > > > > > > model
> > > > > > > > > > > (i.e.
> > > > > > > > > > > > an
> > > > > > > > > > > > > >> > > > > application
> > > > > > > > > > > > > >> > > > > > > could have a pool of threads +
> > producers
> > > > to
> > > > > > > > increase
> > > > > > > > > > > > > >> > concurrency).
> > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > > > -Artem
> > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > > > On Tue, Aug 22, 2023 at 7:22 PM
> Roger
> > > > > Hoover <
> > > > > > > > > > > > > >> > > roger.hoo...@gmail.com
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > > > wrote:
> > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > > > > Artem,
> > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > Thanks for the reply.
> > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > If I understand correctly, Kafka
> > does
> > > > not
> > > > > > > > support
> > > > > > > > > > > > > concurrent
> > > > > > > > > > > > > >> > > > > > transactions
> > > > > > > > > > > > > >> > > > > > > > from the same producer
> > (transactional
> > > > id).
> > > > > > I
> > > > > > > > > think
> > > > > > > > > > > this
> > > > > > > > > > > > > >> means
> > > > > > > > > > > > > >> > > that
> > > > > > > > > > > > > >> > > > > > > > applications that want to support
> > > > > in-process
> > > > > > > > > > > concurrency
> > > > > > > > > > > > > >> (say
> > > > > > > > > > > > > >> > > > > > > thread-level
> > > > > > > > > > > > > >> > > > > > > > concurrency with row-level DB
> > locking)
> > > > > would
> > > > > > > > need
> > > > > > > > > to
> > > > > > > > > > > > > manage
> > > > > > > > > > > > > >> > > > separate
> > > > > > > > > > > > > >> > > > > > > > transactional ids and producers
> per
> > > > thread
> > > > > > and
> > > > > > > > > then
> > > > > > > > > > > > store
> > > > > > > > > > > > > >> txn
> > > > > > > > > > > > > >> > > state
> > > > > > > > > > > > > >> > > > > > > > accordingly.   The potential
> > usability
> > > > > > > > downsides I
> > > > > > > > > > see
> > > > > > > > > > > > are
> > > > > > > > > > > > > >> > > > > > > > 1) managing a set of transactional
> > ids
> > > > for
> > > > > > > each
> > > > > > > > > > > > > application
> > > > > > > > > > > > > >> > > process
> > > > > > > > > > > > > >> > > > > > that
> > > > > > > > > > > > > >> > > > > > > > scales up to it's max concurrency.
> > > > Maybe
> > > > > > not
> > > > > > > > too
> > > > > > > > > > bad
> > > > > > > > > > > > but
> > > > > > > > > > > > > a
> > > > > > > > > > > > > >> bit
> > > > > > > > > > > > > >> > > of
> > > > > > > > > > > > > >> > > > > pain
> > > > > > > > > > > > > >> > > > > > > to
> > > > > > > > > > > > > >> > > > > > > > manage these ids inside each
> process
> > > and
> > > > > > > across
> > > > > > > > > all
> > > > > > > > > > > > > >> application
> > > > > > > > > > > > > >> > > > > > > processes.
> > > > > > > > > > > > > >> > > > > > > > 2) creating a separate producer
> for
> > > each
> > > > > > > > > concurrency
> > > > > > > > > > > > slot
> > > > > > > > > > > > > in
> > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > >> > > > > > > > application - this could create a
> > lot
> > > > more
> > > > > > > > > producers
> > > > > > > > > > > and
> > > > > > > > > > > > > >> > > resultant
> > > > > > > > > > > > > >> > > > > > > > connections to Kafka than the
> > typical
> > > > > model
> > > > > > > of a
> > > > > > > > > > > single
> > > > > > > > > > > > > >> > producer
> > > > > > > > > > > > > >> > > > per
> > > > > > > > > > > > > >> > > > > > > > process.
> > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > Otherwise, it seems you're left
> with
> > > > > > > > > single-threaded
> > > > > > > > > > > > model
> > > > > > > > > > > > > >> per
> > > > > > > > > > > > > >> > > > > > > application
> > > > > > > > > > > > > >> > > > > > > > process?
> > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > Thanks,
> > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > Roger
> > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > On Tue, Aug 22, 2023 at 5:11 PM
> > Artem
> > > > > > Livshits
> > > > > > > > > > > > > >> > > > > > > > <alivsh...@confluent.io.invalid>
> > > wrote:
> > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > Hi Roger, Arjun,
> > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > Thank you for the questions.
> > > > > > > > > > > > > >> > > > > > > > > > It looks like the application
> > must
> > > > > have
> > > > > > > > stable
> > > > > > > > > > > > > >> > transactional
> > > > > > > > > > > > > >> > > > ids
> > > > > > > > > > > > > >> > > > > > over
> > > > > > > > > > > > > >> > > > > > > > > time?
> > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > The transactional id should
> > uniquely
> > > > > > > identify
> > > > > > > > a
> > > > > > > > > > > > producer
> > > > > > > > > > > > > >> > > instance
> > > > > > > > > > > > > >> > > > > and
> > > > > > > > > > > > > >> > > > > > > > needs
> > > > > > > > > > > > > >> > > > > > > > > to be stable across the
> restarts.
> > > If
> > > > > the
> > > > > > > > > > > > transactional
> > > > > > > > > > > > > >> id is
> > > > > > > > > > > > > >> > > not
> > > > > > > > > > > > > >> > > > > > > stable
> > > > > > > > > > > > > >> > > > > > > > > across restarts, then zombie
> > > messages
> > > > > > from a
> > > > > > > > > > > previous
> > > > > > > > > > > > > >> > > incarnation
> > > > > > > > > > > > > >> > > > > of
> > > > > > > > > > > > > >> > > > > > > the
> > > > > > > > > > > > > >> > > > > > > > > producer may violate atomicity.
> > If
> > > > > there
> > > > > > > are
> > > > > > > > 2
> > > > > > > > > > > > producer
> > > > > > > > > > > > > >> > > > instances
> > > > > > > > > > > > > >> > > > > > > > > concurrently producing data with
> > the
> > > > > same
> > > > > > > > > > > > transactional
> > > > > > > > > > > > > >> id,
> > > > > > > > > > > > > >> > > they
> > > > > > > > > > > > > >> > > > > are
> > > > > > > > > > > > > >> > > > > > > > going
> > > > > > > > > > > > > >> > > > > > > > > to constantly fence each other
> and
> > > > most
> > > > > > > likely
> > > > > > > > > > make
> > > > > > > > > > > > > >> little or
> > > > > > > > > > > > > >> > > no
> > > > > > > > > > > > > >> > > > > > > > progress.
> > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > The name might be a little bit
> > > > confusing
> > > > > > as
> > > > > > > it
> > > > > > > > > may
> > > > > > > > > > > be
> > > > > > > > > > > > > >> > mistaken
> > > > > > > > > > > > > >> > > > for
> > > > > > > > > > > > > >> > > > > a
> > > > > > > > > > > > > >> > > > > > > > > transaction id / TID that
> uniquely
> > > > > > > identifies
> > > > > > > > > > every
> > > > > > > > > > > > > >> > > transaction.
> > > > > > > > > > > > > >> > > > > The
> > > > > > > > > > > > > >> > > > > > > > name
> > > > > > > > > > > > > >> > > > > > > > > and the semantics were defined
> in
> > > the
> > > > > > > original
> > > > > > > > > > > > > >> > > > > exactly-once-semantics
> > > > > > > > > > > > > >> > > > > > > > (EoS)
> > > > > > > > > > > > > >> > > > > > > > > proposal (
> > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > > > > > > > > > > >> > > > > > > > > )
> > > > > > > > > > > > > >> > > > > > > > > and KIP-939 just build on top of
> > > that.
> > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > I'm curious to understand what
> > > > happens
> > > > > > if
> > > > > > > > the
> > > > > > > > > > > > producer
> > > > > > > > > > > > > >> > dies,
> > > > > > > > > > > > > >> > > > and
> > > > > > > > > > > > > >> > > > > > does
> > > > > > > > > > > > > >> > > > > > > > not
> > > > > > > > > > > > > >> > > > > > > > > come up and recover the pending
> > > > > > transaction
> > > > > > > > > within
> > > > > > > > > > > the
> > > > > > > > > > > > > >> > > > transaction
> > > > > > > > > > > > > >> > > > > > > > timeout
> > > > > > > > > > > > > >> > > > > > > > > interval.
> > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > If the producer / application
> > never
> > > > > comes
> > > > > > > > back,
> > > > > > > > > > the
> > > > > > > > > > > > > >> > transaction
> > > > > > > > > > > > > >> > > > > will
> > > > > > > > > > > > > >> > > > > > > > remain
> > > > > > > > > > > > > >> > > > > > > > > in prepared (a.k.a. "in-doubt")
> > > state
> > > > > > until
> > > > > > > an
> > > > > > > > > > > > operator
> > > > > > > > > > > > > >> > > > forcefully
> > > > > > > > > > > > > >> > > > > > > > > terminates the transaction.
> > That's
> > > > why
> > > > > > > there
> > > > > > > > > is a
> > > > > > > > > > > new
> > > > > > > > > > > > > >> ACL is
> > > > > > > > > > > > > >> > > > > defined
> > > > > > > > > > > > > >> > > > > > > in
> > > > > > > > > > > > > >> > > > > > > > > this proposal -- this
> > functionality
> > > > > should
> > > > > > > > only
> > > > > > > > > > > > provided
> > > > > > > > > > > > > >> to
> > > > > > > > > > > > > >> > > > > > > applications
> > > > > > > > > > > > > >> > > > > > > > > that implement proper recovery
> > > logic.
> > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > -Artem
> > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > On Tue, Aug 22, 2023 at 12:52 AM
> > > Arjun
> > > > > > > Satish
> > > > > > > > <
> > > > > > > > > > > > > >> > > > > > arjun.sat...@gmail.com>
> > > > > > > > > > > > > >> > > > > > > > > wrote:
> > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > Hello Artem,
> > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > Thanks for the KIP.
> > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > I have the same question as
> > Roger
> > > on
> > > > > > > > > concurrent
> > > > > > > > > > > > > writes,
> > > > > > > > > > > > > >> and
> > > > > > > > > > > > > >> > > an
> > > > > > > > > > > > > >> > > > > > > > additional
> > > > > > > > > > > > > >> > > > > > > > > > one on consumer behavior.
> > > Typically,
> > > > > > > > > > transactions
> > > > > > > > > > > > will
> > > > > > > > > > > > > >> > > timeout
> > > > > > > > > > > > > >> > > > if
> > > > > > > > > > > > > >> > > > > > not
> > > > > > > > > > > > > >> > > > > > > > > > committed within some time
> > > interval.
> > > > > > With
> > > > > > > > the
> > > > > > > > > > > > proposed
> > > > > > > > > > > > > >> > > changes
> > > > > > > > > > > > > >> > > > in
> > > > > > > > > > > > > >> > > > > > > this
> > > > > > > > > > > > > >> > > > > > > > > KIP,
> > > > > > > > > > > > > >> > > > > > > > > > consumers cannot consume past
> > the
> > > > > > ongoing
> > > > > > > > > > > > transaction.
> > > > > > > > > > > > > >> I'm
> > > > > > > > > > > > > >> > > > > curious
> > > > > > > > > > > > > >> > > > > > to
> > > > > > > > > > > > > >> > > > > > > > > > understand what happens if the
> > > > > producer
> > > > > > > > dies,
> > > > > > > > > > and
> > > > > > > > > > > > does
> > > > > > > > > > > > > >> not
> > > > > > > > > > > > > >> > > come
> > > > > > > > > > > > > >> > > > > up
> > > > > > > > > > > > > >> > > > > > > and
> > > > > > > > > > > > > >> > > > > > > > > > recover the pending
> transaction
> > > > within
> > > > > > the
> > > > > > > > > > > > transaction
> > > > > > > > > > > > > >> > > timeout
> > > > > > > > > > > > > >> > > > > > > > interval.
> > > > > > > > > > > > > >> > > > > > > > > Or
> > > > > > > > > > > > > >> > > > > > > > > > are we saying that when used
> in
> > > this
> > > > > 2PC
> > > > > > > > > > context,
> > > > > > > > > > > we
> > > > > > > > > > > > > >> should
> > > > > > > > > > > > > >> > > > > > configure
> > > > > > > > > > > > > >> > > > > > > > > these
> > > > > > > > > > > > > >> > > > > > > > > > transaction timeouts to very
> > large
> > > > > > > > durations?
> > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > Thanks in advance!
> > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > Best,
> > > > > > > > > > > > > >> > > > > > > > > > Arjun
> > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > On Mon, Aug 21, 2023 at
> 1:06 PM
> > > > Roger
> > > > > > > > Hoover <
> > > > > > > > > > > > > >> > > > > > roger.hoo...@gmail.com
> > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > wrote:
> > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > > Hi Artem,
> > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > > Thanks for writing this KIP.
> > > Can
> > > > > you
> > > > > > > > > clarify
> > > > > > > > > > > the
> > > > > > > > > > > > > >> > > > requirements
> > > > > > > > > > > > > >> > > > > a
> > > > > > > > > > > > > >> > > > > > > bit
> > > > > > > > > > > > > >> > > > > > > > > more
> > > > > > > > > > > > > >> > > > > > > > > > > for managing transaction
> > state?
> > > > It
> > > > > > > looks
> > > > > > > > > like
> > > > > > > > > > > the
> > > > > > > > > > > > > >> > > > application
> > > > > > > > > > > > > >> > > > > > must
> > > > > > > > > > > > > >> > > > > > > > > have
> > > > > > > > > > > > > >> > > > > > > > > > > stable transactional ids
> over
> > > > time?
> > > > > > >  What
> > > > > > > > > is
> > > > > > > > > > > the
> > > > > > > > > > > > > >> > > granularity
> > > > > > > > > > > > > >> > > > > of
> > > > > > > > > > > > > >> > > > > > > > those
> > > > > > > > > > > > > >> > > > > > > > > > ids
> > > > > > > > > > > > > >> > > > > > > > > > > and producers?  Say the
> > > > application
> > > > > > is a
> > > > > > > > > > > > > >> multi-threaded
> > > > > > > > > > > > > >> > > Java
> > > > > > > > > > > > > >> > > > > web
> > > > > > > > > > > > > >> > > > > > > > > server,
> > > > > > > > > > > > > >> > > > > > > > > > > can/should all the
> concurrent
> > > > > threads
> > > > > > > > share
> > > > > > > > > a
> > > > > > > > > > > > > >> > transactional
> > > > > > > > > > > > > >> > > > id
> > > > > > > > > > > > > >> > > > > > and
> > > > > > > > > > > > > >> > > > > > > > > > > producer?  That doesn't seem
> > > right
> > > > > to
> > > > > > me
> > > > > > > > > > unless
> > > > > > > > > > > > the
> > > > > > > > > > > > > >> > > > application
> > > > > > > > > > > > > >> > > > > > is
> > > > > > > > > > > > > >> > > > > > > > > using
> > > > > > > > > > > > > >> > > > > > > > > > > global DB locks that
> serialize
> > > all
> > > > > > > > requests.
> > > > > > > > > > > > > >> Instead, if
> > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > >> > > > > > > > > application
> > > > > > > > > > > > > >> > > > > > > > > > > uses row-level DB locks,
> there
> > > > could
> > > > > > be
> > > > > > > > > > > multiple,
> > > > > > > > > > > > > >> > > concurrent,
> > > > > > > > > > > > > >> > > > > > > > > independent
> > > > > > > > > > > > > >> > > > > > > > > > > txns happening in the same
> JVM
> > > so
> > > > it
> > > > > > > seems
> > > > > > > > > > like
> > > > > > > > > > > > the
> > > > > > > > > > > > > >> > > > granularity
> > > > > > > > > > > > > >> > > > > > > > > managing
> > > > > > > > > > > > > >> > > > > > > > > > > transactional ids and txn
> > state
> > > > > needs
> > > > > > to
> > > > > > > > > line
> > > > > > > > > > up
> > > > > > > > > > > > > with
> > > > > > > > > > > > > >> > > > > granularity
> > > > > > > > > > > > > >> > > > > > > of
> > > > > > > > > > > > > >> > > > > > > > > the
> > > > > > > > > > > > > >> > > > > > > > > > DB
> > > > > > > > > > > > > >> > > > > > > > > > > locking.
> > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > > Does that make sense or am I
> > > > > > > > > misunderstanding?
> > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > > Thanks,
> > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > > Roger
> > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > > On Wed, Aug 16, 2023 at
> > 11:40 PM
> > > > > Artem
> > > > > > > > > > Livshits
> > > > > > > > > > > > > >> > > > > > > > > > > <alivsh...@confluent.io
> > > .invalid>
> > > > > > wrote:
> > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > > > Hello,
> > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > > > This is a discussion
> thread
> > > for
> > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > > > > > > > > > > > > >> > > > > > > > > > > > .
> > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > > > The KIP proposes extending
> > > Kafka
> > > > > > > > > transaction
> > > > > > > > > > > > > support
> > > > > > > > > > > > > >> > > (that
> > > > > > > > > > > > > >> > > > > > > already
> > > > > > > > > > > > > >> > > > > > > > > uses
> > > > > > > > > > > > > >> > > > > > > > > > > 2PC
> > > > > > > > > > > > > >> > > > > > > > > > > > under the hood) to enable
> > > > > atomicity
> > > > > > of
> > > > > > > > > dual
> > > > > > > > > > > > writes
> > > > > > > > > > > > > >> to
> > > > > > > > > > > > > >> > > Kafka
> > > > > > > > > > > > > >> > > > > and
> > > > > > > > > > > > > >> > > > > > > an
> > > > > > > > > > > > > >> > > > > > > > > > > external
> > > > > > > > > > > > > >> > > > > > > > > > > > database, and helps to
> fix a
> > > > long
> > > > > > > > standing
> > > > > > > > > > > Flink
> > > > > > > > > > > > > >> issue.
> > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > > > An example of code that
> uses
> > > the
> > > > > > dual
> > > > > > > > > write
> > > > > > > > > > > > recipe
> > > > > > > > > > > > > >> with
> > > > > > > > > > > > > >> > > > JDBC
> > > > > > > > > > > > > >> > > > > > and
> > > > > > > > > > > > > >> > > > > > > > > should
> > > > > > > > > > > > > >> > > > > > > > > > > > work for most SQL
> databases
> > is
> > > > > here
> > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > https://github.com/apache/kafka/pull/14231.
> > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > > > The FLIP for the sister
> fix
> > in
> > > > > Flink
> > > > > > > is
> > > > > > > > > here
> > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > > > -Artem
> > > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to