Hi, Artem,

Thanks for the reply.

20. "Say if an application
currently uses initTransactions() to achieve the current semantics, it
would need to be rewritten to use initTransactions() + abort to achieve the
same semantics if the config is changed. "

This only takes care of the abort case. The application still needs to be
changed to handle the commit case properly
if transaction.two.phase.commit.enable is set to true.

"Even when KIP-939 is implemented,
there would be situations when 2PC is disabled by the admin (e.g. Kafka
service providers may be reluctant to enable 2PC for Flink services that
users host themselves), so we either have to perpetuate the
reflection-based implementation in Flink or enable keepPreparedTxn=true
without 2PC."

Hmm, if the admin disables 2PC, there is likely a reason behind that. I am
not sure that we should provide an API to encourage the application to
circumvent that.

32. Ok. That's the kafka metric. In that case, the metric name has a group
and a name. There is no type and no package name.

Jun


On Thu, Feb 15, 2024 at 8:23 PM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:

> Hi Jun,
>
> Thank you for your questions.
>
> > 20. So to abort a prepared transaction after the producer start, we could
> use ...
>
> I agree, initTransaction(true) + abort would accomplish the behavior of
> initTransactions(false), so we could technically have fewer ways to achieve
> the same thing, which is generally valuable.  I wonder, though, if that
> would be intuitive from the application perspective.  Say if an application
> currently uses initTransactions() to achieve the current semantics, it
> would need to be rewritten to use initTransactions() + abort to achieve the
> same semantics if the config is changed.  I think this could create
> subtle confusion, as the config change is generally decoupled from changing
> application implementation.
>
> >  The use case mentioned for keepPreparedTxn=true without 2PC doesn't seem
> very important
>
> I agree, it's not a strict requirement.  It is, however, a missing option
> in the public API, so currently Flink has to use reflection to emulate this
> functionality without 2PC support.   Even when KIP-939 is implemented,
> there would be situations when 2PC is disabled by the admin (e.g. Kafka
> service providers may be reluctant to enable 2PC for Flink services that
> users host themselves), so we either have to perpetuate the
> reflection-based implementation in Flink or enable keepPreparedTxn=true
> without 2PC.
>
> > 32.
>
> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
>
> I just followed the existing metric implementation example
>
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L95
> ,
> which maps to
>
> kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max.
>
> > 33. "If the value is 'true' then the corresponding field is set
>
> That's correct.  Updated the KIP.
>
> -Artem
>
> On Wed, Feb 7, 2024 at 10:06 AM Jun Rao <j...@confluent.io.invalid> wrote:
>
> > Hi, Artem,
> >
> > Thanks for the reply.
> >
> > 20. So to abort a prepared transaction after producer start, we could use
> > either
> >   producer.initTransactions(false)
> > or
> >   producer.initTransactions(true)
> >   producer.abortTransaction
> > Could we just always use the latter API? If we do this, we could
> > potentially eliminate the keepPreparedTxn flag in initTransactions().
> After
> > the initTransactions() call, the outstanding txn is always preserved if
> 2pc
> > is enabled and aborted if 2pc is disabled. The use case mentioned for
> > keepPreparedTxn=true without 2PC doesn't seem very important. If we could
> > do that, it seems that we have (1) less redundant and simpler APIs; (2)
> > more symmetric syntax for aborting/committing a prepared txn after
> producer
> > restart.
> >
> > 32.
> >
> >
> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
> > Is this a Yammer or kafka metric? The former uses the camel case for name
> > and type. The latter uses the hyphen notation, but doesn't have the type
> > attribute.
> >
> > 33. "If the value is 'true' then the corresponding field is set in the
> > InitProducerIdRequest and the KafkaProducer object is set into a state
> > which only allows calling .commitTransaction or .abortTransaction."
> > We should also allow .completeTransaction, right?
> >
> > Jun
> >
> >
> > On Tue, Feb 6, 2024 at 3:29 PM Artem Livshits
> > <alivsh...@confluent.io.invalid> wrote:
> >
> > > Hi Jun,
> > >
> > > > 20. For Flink usage, it seems that the APIs used to abort and commit
> a
> > > prepared txn are not symmetric.
> > >
> > > For Flink it is expected that Flink would call .commitTransaction or
> > > .abortTransaction directly, it wouldn't need to deal with
> > PreparedTxnState,
> > > the outcome is actually determined by the Flink's job manager, not by
> > > comparison of PreparedTxnState.  So for Flink, if the Kafka sync
> crashes
> > > and restarts there are 2 cases:
> > >
> > > 1. Transaction is not prepared.  In that case just call
> > > producer.initTransactions(false) and then can start transactions as
> > needed.
> > > 2. Transaction is prepared.  In that case call
> > > producer.initTransactions(true) and wait for the decision from the job
> > > manager.  Note that it's not given that the transaction will get
> > committed,
> > > the decision could also be an abort.
> > >
> > >  > 21. transaction.max.timeout.ms could in theory be MAX_INT. Perhaps
> we
> > > could use a negative timeout in the record to indicate 2PC?
> > >
> > > -1 sounds good, updated.
> > >
> > > > 30. The KIP has two different APIs to abort an ongoing txn. Do we
> need
> > > both?
> > >
> > > I think of producer.initTransactions() to be an implementation for
> > > adminClient.forceTerminateTransaction(transactionalId).
> > >
> > > > 31. "This would flush all the pending messages and transition the
> > > producer
> > >
> > > Updated the KIP to clarify that IllegalStateException will be thrown.
> > >
> > > -Artem
> > >
> > >
> > > On Mon, Feb 5, 2024 at 2:22 PM Jun Rao <j...@confluent.io.invalid>
> wrote:
> > >
> > > > Hi, Artem,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > 20. For Flink usage, it seems that the APIs used to abort and commit
> a
> > > > prepared txn are not symmetric.
> > > > To abort, the app will just call
> > > >   producer.initTransactions(false)
> > > >
> > > > To commit, the app needs to call
> > > >   producer.initTransactions(true)
> > > >   producer.completeTransaction(preparedTxnState)
> > > >
> > > > Will this be a concern? For the dual-writer usage, both abort/commit
> > use
> > > > the same API.
> > > >
> > > > 21. transaction.max.timeout.ms could in theory be MAX_INT. Perhaps
> we
> > > > could
> > > > use a negative timeout in the record to indicate 2PC?
> > > >
> > > > 30. The KIP has two different APIs to abort an ongoing txn. Do we
> need
> > > > both?
> > > >   producer.initTransactions(false)
> > > >   adminClient.forceTerminateTransaction(transactionalId)
> > > >
> > > > 31. "This would flush all the pending messages and transition the
> > > producer
> > > > into a mode where only .commitTransaction, .abortTransaction, or
> > > > .completeTransaction could be called.  If the call is successful (all
> > > > messages successfully got flushed to all partitions) the transaction
> is
> > > > prepared."
> > > >  If the producer calls send() in that state, what exception will the
> > > caller
> > > > receive?
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Fri, Feb 2, 2024 at 3:34 PM Artem Livshits
> > > > <alivsh...@confluent.io.invalid> wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > >  Then, should we change the following in the example to use
> > > > > InitProducerId(true) instead?
> > > > >
> > > > > We could. I just thought that it's good to make the example
> > > > self-contained
> > > > > by starting from a clean state.
> > > > >
> > > > > > Also, could Flink just follow the dual-write recipe?
> > > > >
> > > > > I think it would bring some unnecessary logic to Flink (or any
> other
> > > > system
> > > > > that already has a transaction coordinator and just wants to drive
> > > Kafka
> > > > to
> > > > > the desired state).  We could discuss it with Flink folks, the
> > current
> > > > > proposal was developed in collaboration with them.
> > > > >
> > > > > > 21. Could a non 2pc user explicitly set the TransactionTimeoutMs
> to
> > > > > Integer.MAX_VALUE?
> > > > >
> > > > > The server would reject this for regular transactions, it only
> > accepts
> > > > > values that are <= *transaction.max.timeout.ms
> > > > > <http://transaction.max.timeout.ms> *(a broker config).
> > > > >
> > > > > > 24. Hmm, In KIP-890, without 2pc, the coordinator expects the
> > endTxn
> > > > > request to use the ongoing pid. ...
> > > > >
> > > > > Without 2PC there is no case where the pid could change between
> > > starting
> > > > a
> > > > > transaction and endTxn (InitProducerId would abort any ongoing
> > > > > transaction).  WIth 2PC there is now a case where there could be
> > > > > InitProducerId that can change the pid without aborting the
> > > transaction,
> > > > so
> > > > > we need to handle that.  I wouldn't say that the flow is different,
> > but
> > > > > it's rather extended to handle new cases.  The main principle is
> > still
> > > > the
> > > > > same -- for all operations we use the latest "operational" pid and
> > > epoch
> > > > > known to the client, this way we guarantee that we can fence
> zombie /
> > > > split
> > > > > brain clients by disrupting the "latest known" pid + epoch
> > progression.
> > > > >
> > > > > > 25. "We send out markers using the original ongoing transaction
> > > > > ProducerId and ProducerEpoch" ...
> > > > >
> > > > > Updated.
> > > > >
> > > > > -Artem
> > > > >
> > > > > On Mon, Jan 29, 2024 at 4:57 PM Jun Rao <j...@confluent.io.invalid>
> > > > wrote:
> > > > >
> > > > > > Hi, Artem,
> > > > > >
> > > > > > Thanks for the reply.
> > > > > >
> > > > > > 20. So for the dual-write recipe, we should always call
> > > > > > InitProducerId(keepPreparedTxn=true) from the producer? Then,
> > should
> > > we
> > > > > > change the following in the example to use InitProducerId(true)
> > > > instead?
> > > > > > 1. InitProducerId(false); TC STATE: Empty, ProducerId=42,
> > > > > > ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=-1,
> > > > > > NextProducerEpoch=-1; RESPONSE ProducerId=42, Epoch=MAX-1,
> > > > > > OngoingTxnProducerId=-1, OngoingTxnEpoch=-1.
> > > > > > Also, could Flink just follow the dual-write recipe? It's simpler
> > if
> > > > > there
> > > > > > is one way to solve the 2pc issue.
> > > > > >
> > > > > > 21. Could a non 2pc user explicitly set the TransactionTimeoutMs
> to
> > > > > > Integer.MAX_VALUE?
> > > > > >
> > > > > > 24. Hmm, In KIP-890, without 2pc, the coordinator expects the
> > endTxn
> > > > > > request to use the ongoing pid. With 2pc, the coordinator now
> > expects
> > > > the
> > > > > > endTxn request to use the next pid. So, the flow is different,
> > right?
> > > > > >
> > > > > > 25. "We send out markers using the original ongoing transaction
> > > > > ProducerId
> > > > > > and ProducerEpoch"
> > > > > > We should use ProducerEpoch + 1 in the marker, right?
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Fri, Jan 26, 2024 at 8:35 PM Artem Livshits
> > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > > 20.  I am a bit confused by how we set keepPreparedTxn.  ...
> > > > > > >
> > > > > > > keepPreparedTxn=true informs the transaction coordinator that
> it
> > > > should
> > > > > > > keep the ongoing transaction, if any.  If the
> > > keepPreparedTxn=false,
> > > > > then
> > > > > > > any ongoing transaction is aborted (this is exactly the current
> > > > > > behavior).
> > > > > > > enable2Pc is a separate argument that is controlled by the
> > > > > > > *transaction.two.phase.commit.enable *setting on the client.
> > > > > > >
> > > > > > > To start 2PC, the client just needs to set
> > > > > > > *transaction.two.phase.commit.enable*=true in the config.  Then
> > if
> > > > the
> > > > > > > client knows the status of the transaction upfront (in the case
> > of
> > > > > Flink,
> > > > > > > Flink keeps the knowledge if the transaction is prepared in its
> > own
> > > > > > store,
> > > > > > > so it always knows upfront), it can set keepPreparedTxn
> > > accordingly,
> > > > > then
> > > > > > > if the transaction was prepared, it'll be ready for the client
> to
> > > > > > complete
> > > > > > > the appropriate action; if the client doesn't have a knowledge
> > that
> > > > the
> > > > > > > transaction is prepared, keepPreparedTxn is going to be false,
> in
> > > > which
> > > > > > > case we'll get to a clean state (the same way we do today).
> > > > > > >
> > > > > > > For the dual-write recipe, the client doesn't know upfront if
> the
> > > > > > > transaction is prepared, this information is implicitly encoded
> > > > > > > PreparedTxnState value that can be used to resolve the
> > transaction
> > > > > state.
> > > > > > > In that case, keepPreparedTxn should always be true, because we
> > > don't
> > > > > > know
> > > > > > > upfront and we don't want to accidentally abort a committed
> > > > > transaction.
> > > > > > >
> > > > > > > The forceTerminateTransaction call can just use
> > > > keepPreparedTxn=false,
> > > > > it
> > > > > > > actually doesn't matter if it sets Enable2Pc flag.
> > > > > > >
> > > > > > > > 21. TransactionLogValue: Do we need some field to identify
> > > whether
> > > > > this
> > > > > > > is written for 2PC so that ongoing txn is never auto aborted?
> > > > > > >
> > > > > > > The TransactionTimeoutMs would be set to Integer.MAX_VALUE if
> 2PC
> > > was
> > > > > > > enabled.  I've added a note to the KIP about this.
> > > > > > >
> > > > > > > > 22
> > > > > > >
> > > > > > > You're right it's a typo.  I fixed it as well as step 9
> (REQUEST:
> > > > > > > ProducerId=73, ProducerEpoch=MAX).
> > > > > > >
> > > > > > > > 23. It's a bit weird that Enable2Pc is driven by a config
> while
> > > > > > > KeepPreparedTxn is from an API param ...
> > > > > > >
> > > > > > > The intent to use 2PC doesn't change from transaction to
> > > transaction,
> > > > > but
> > > > > > > the intent to keep prepared txn may change from transaction to
> > > > > > > transaction.  In dual-write recipes the distinction is not
> clear,
> > > but
> > > > > for
> > > > > > > use cases where keepPreparedTxn value is known upfront (e.g.
> > Flink)
> > > > > it's
> > > > > > > more prominent.  E.g. a Flink's Kafka sink operator could be
> > > deployed
> > > > > > with
> > > > > > > *transaction.two.phase.commit.enable*=true hardcoded in the
> > image,
> > > > but
> > > > > > > keepPreparedTxn cannot be hardcoded in the image, because it
> > > depends
> > > > on
> > > > > > the
> > > > > > > job manager's state.
> > > > > > >
> > > > > > > > 24
> > > > > > >
> > > > > > > The flow is actually going to be the same way as it is now --
> the
> > > > > "main"
> > > > > > > producer id + epoch needs to be used in all operations to
> prevent
> > > > > fencing
> > > > > > > (it's sort of a common "header" in all RPC calls that follow
> the
> > > same
> > > > > > > rules).  The ongoing txn info is just additional info for
> making
> > a
> > > > > > commit /
> > > > > > > abort decision based on the PreparedTxnState from the DB.
> > > > > > >
> > > > > > > --Artem
> > > > > > >
> > > > > > > On Thu, Jan 25, 2024 at 11:05 AM Jun Rao
> > <j...@confluent.io.invalid
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi, Artem,
> > > > > > > >
> > > > > > > > Thanks for the reply. A few more comments.
> > > > > > > >
> > > > > > > > 20. I am a bit confused by how we set keepPreparedTxn. From
> the
> > > > KIP,
> > > > > I
> > > > > > > got
> > > > > > > > the following (1) to start 2pc, we call
> > > > > > > > InitProducerId(keepPreparedTxn=false); (2) when the producer
> > > fails
> > > > > and
> > > > > > > > needs to do recovery, it calls
> > > > InitProducerId(keepPreparedTxn=true);
> > > > > > (3)
> > > > > > > > Admin.forceTerminateTransaction() calls
> > > > > > > > InitProducerId(keepPreparedTxn=false).
> > > > > > > > 20.1 In (1), when a producer calls InitProducerId(false) with
> > 2pc
> > > > > > > enabled,
> > > > > > > > and there is an ongoing txn, should the server return an
> error
> > to
> > > > the
> > > > > > > > InitProducerId request? If so, what would be the error code?
> > > > > > > > 20.2 How do we distinguish between (1) and (3)? It's the same
> > API
> > > > > call
> > > > > > > but
> > > > > > > > (1) doesn't abort ongoing txn and (2) does.
> > > > > > > > 20.3 The usage in (1) seems unintuitive. 2pc implies keeping
> > the
> > > > > > ongoing
> > > > > > > > txn. So, setting keepPreparedTxn to false to start 2pc seems
> > > > counter
> > > > > > > > intuitive.
> > > > > > > >
> > > > > > > > 21. TransactionLogValue: Do we need some field to identify
> > > whether
> > > > > this
> > > > > > > is
> > > > > > > > written for 2PC so that ongoing txn is never auto aborted?
> > > > > > > >
> > > > > > > > 22. "8. InitProducerId(true); TC STATE: Ongoing,
> ProducerId=42,
> > > > > > > > ProducerEpoch=MAX-1, PrevProducerId=-1, NextProducerId=73,
> > > > > > > > NextProducerEpoch=MAX; RESPONSE ProducerId=73, Epoch=MAX-1,
> > > > > > > > OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1"
> > > > > > > > It seems in the above example, Epoch in RESPONSE should be
> MAX
> > to
> > > > > match
> > > > > > > > NextProducerEpoch?
> > > > > > > >
> > > > > > > > 23. It's a bit weird that Enable2Pc is driven by a config
> > > > > > > > while KeepPreparedTxn is from an API param. Should we make
> them
> > > > more
> > > > > > > > consistent since they seem related?
> > > > > > > >
> > > > > > > > 24. "9. Commit; REQUEST: ProducerId=73, ProducerEpoch=MAX-1;
> TC
> > > > > STATE:
> > > > > > > > PrepareCommit, ProducerId=42, ProducerEpoch=MAX,
> > > PrevProducerId=73,
> > > > > > > > NextProducerId=85, NextProducerEpoch=0; RESPONSE
> ProducerId=85,
> > > > > > Epoch=0,
> > > > > > > > When a commit request is sent, it uses the latest ProducerId
> > and
> > > > > > > > ProducerEpoch."
> > > > > > > > The step where we use the next produceId to commit an old txn
> > > > works,
> > > > > > but
> > > > > > > > can be confusing. It's going to be hard for people
> implementing
> > > > this
> > > > > > new
> > > > > > > > client protocol to figure out when to use the current or the
> > new
> > > > > > > producerId
> > > > > > > > in the EndTxnRequest. One potential way to improve this is to
> > > > extend
> > > > > > > > EndTxnRequest with a new field like expectedNextProducerId.
> > Then
> > > we
> > > > > can
> > > > > > > > always use the old produceId in the existing field, but set
> > > > > > > > expectedNextProducerId to bypass the fencing logic when
> needed.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Dec 18, 2023 at 2:06 PM Artem Livshits
> > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > >
> > > > > > > > > Hi Jun,
> > > > > > > > >
> > > > > > > > > Thank you for the comments.
> > > > > > > > >
> > > > > > > > > > 10. For the two new fields in Enable2Pc and
> KeepPreparedTxn
> > > ...
> > > > > > > > >
> > > > > > > > > I added a note that all combinations are valid.
> > > Enable2Pc=false
> > > > &
> > > > > > > > > KeepPreparedTxn=true could be potentially useful for
> backward
> > > > > > > > compatibility
> > > > > > > > > with Flink, when the new version of Flink that implements
> > > KIP-319
> > > > > > tries
> > > > > > > > to
> > > > > > > > > work with a cluster that doesn't authorize 2PC.
> > > > > > > > >
> > > > > > > > > > 11.  InitProducerIdResponse: If there is no ongoing txn,
> > what
> > > > > will
> > > > > > > > > OngoingTxnProducerId and OngoingTxnEpoch be set?
> > > > > > > > >
> > > > > > > > > I added a note that they will be set to -1.  The client
> then
> > > will
> > > > > > know
> > > > > > > > that
> > > > > > > > > there is no ongoing txn and .completeTransaction becomes a
> > > no-op
> > > > > (but
> > > > > > > > still
> > > > > > > > > required before .send is enabled).
> > > > > > > > >
> > > > > > > > > > 12. ListTransactionsRequest related changes: It seems
> those
> > > are
> > > > > > > already
> > > > > > > > > covered by KIP-994?
> > > > > > > > >
> > > > > > > > > Removed from this KIP.
> > > > > > > > >
> > > > > > > > > > 13. TransactionalLogValue ...
> > > > > > > > >
> > > > > > > > > This is now updated to work on top of KIP-890.
> > > > > > > > >
> > > > > > > > > > 14. "Note that the (producerId, epoch) pair that
> > corresponds
> > > to
> > > > > the
> > > > > > > > > ongoing transaction ...
> > > > > > > > >
> > > > > > > > > This is now updated to work on top of KIP-890.
> > > > > > > > >
> > > > > > > > > > 15. active-transaction-total-time-max : ...
> > > > > > > > >
> > > > > > > > > Updated.
> > > > > > > > >
> > > > > > > > > > 16. "transaction.two.phase.commit.enable The default
> would
> > be
> > > > > > > ‘false’.
> > > > > > > > > If it’s ‘false’, 2PC functionality is disabled even if the
> > ACL
> > > is
> > > > > set
> > > > > > > ...
> > > > > > > > >
> > > > > > > > > Disabling 2PC effectively removes all authorization to use
> > it,
> > > > > hence
> > > > > > I
> > > > > > > > > thought TRANSACTIONAL_ID_AUTHORIZATION_FAILED would be
> > > > appropriate.
> > > > > > > > >
> > > > > > > > > Do you suggest using a different error code for 2PC
> > > authorization
> > > > > vs
> > > > > > > some
> > > > > > > > > other authorization (e.g.
> > > > > TRANSACTIONAL_ID_2PC_AUTHORIZATION_FAILED)
> > > > > > > or a
> > > > > > > > > different code for disabled vs. unauthorised (e.g.
> > > > > > > > > TWO_PHASE_COMMIT_DISABLED) or both?
> > > > > > > > >
> > > > > > > > > > 17. completeTransaction(). We expect this to be only used
> > > > during
> > > > > > > > > recovery.
> > > > > > > > >
> > > > > > > > > It can also be used if, say, a commit to the database fails
> > and
> > > > the
> > > > > > > > result
> > > > > > > > > is inconclusive, e.g.
> > > > > > > > >
> > > > > > > > > 1. Begin DB transaction
> > > > > > > > > 2. Begin Kafka transaction
> > > > > > > > > 3. Prepare Kafka transaction
> > > > > > > > > 4. Commit DB transaction
> > > > > > > > > 5. The DB commit fails, figure out the state of the
> > transaction
> > > > by
> > > > > > > > reading
> > > > > > > > > the PreparedTxnState from DB
> > > > > > > > > 6. Complete Kafka transaction with the PreparedTxnState.
> > > > > > > > >
> > > > > > > > > > 18. "either prepareTransaction was called or
> > > > > initTransaction(true)
> > > > > > > was
> > > > > > > > > called": "either" should be "neither"?
> > > > > > > > >
> > > > > > > > > Updated.
> > > > > > > > >
> > > > > > > > > > 19. Since InitProducerId always bumps up the epoch, it
> > > creates
> > > > a
> > > > > > > > > situation ...
> > > > > > > > >
> > > > > > > > > InitProducerId only bumps the producer epoch, the ongoing
> > > > > transaction
> > > > > > > > epoch
> > > > > > > > > stays the same, no matter how many times the InitProducerId
> > is
> > > > > called
> > > > > > > > > before the transaction is completed.  Eventually the epoch
> > may
> > > > > > > overflow,
> > > > > > > > > and then a new producer id would be allocated, but the
> > ongoing
> > > > > > > > transaction
> > > > > > > > > producer id would stay the same.
> > > > > > > > >
> > > > > > > > > I've added a couple examples in the KIP (
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-PersistedDataFormatChanges
> > > > > > > > > )
> > > > > > > > > that walk through some scenarios and show how the state is
> > > > changed.
> > > > > > > > >
> > > > > > > > > -Artem
> > > > > > > > >
> > > > > > > > > On Fri, Dec 8, 2023 at 6:04 PM Jun Rao
> > > <j...@confluent.io.invalid
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi, Artem,
> > > > > > > > > >
> > > > > > > > > > Thanks for the KIP. A few comments below.
> > > > > > > > > >
> > > > > > > > > > 10. For the two new fields in Enable2Pc and
> KeepPreparedTxn
> > > in
> > > > > > > > > > InitProducerId, it would be useful to document a bit more
> > > > detail
> > > > > on
> > > > > > > > what
> > > > > > > > > > values are set under what cases. For example, are all
> four
> > > > > > > combinations
> > > > > > > > > > valid?
> > > > > > > > > >
> > > > > > > > > > 11.  InitProducerIdResponse: If there is no ongoing txn,
> > what
> > > > > will
> > > > > > > > > > OngoingTxnProducerId and OngoingTxnEpoch be set?
> > > > > > > > > >
> > > > > > > > > > 12. ListTransactionsRequest related changes: It seems
> those
> > > are
> > > > > > > already
> > > > > > > > > > covered by KIP-994?
> > > > > > > > > >
> > > > > > > > > > 13. TransactionalLogValue: Could we name
> > > TransactionProducerId
> > > > > and
> > > > > > > > > > ProducerId better? It's not clear from the name which is
> > for
> > > > > which.
> > > > > > > > > >
> > > > > > > > > > 14. "Note that the (producerId, epoch) pair that
> > corresponds
> > > to
> > > > > the
> > > > > > > > > ongoing
> > > > > > > > > > transaction is going to be written instead of the
> existing
> > > > > > ProducerId
> > > > > > > > and
> > > > > > > > > > ProducerEpoch fields (which are renamed to reflect the
> > > > semantics)
> > > > > > to
> > > > > > > > > > support downgrade.": I am a bit confused on that. Are we
> > > > writing
> > > > > > > > > different
> > > > > > > > > > values to the existing fields? Then, we can't downgrade,
> > > right?
> > > > > > > > > >
> > > > > > > > > > 15. active-transaction-total-time-max : Would
> > > > > > > > > > active-transaction-open-time-max be more intuitive? Also,
> > > could
> > > > > we
> > > > > > > > > include
> > > > > > > > > > the full name (group, tags, etc)?
> > > > > > > > > >
> > > > > > > > > > 16. "transaction.two.phase.commit.enable The default
> would
> > be
> > > > > > > ‘false’.
> > > > > > > > > If
> > > > > > > > > > it’s ‘false’, 2PC functionality is disabled even if the
> ACL
> > > is
> > > > > set,
> > > > > > > > > clients
> > > > > > > > > > that attempt to use this functionality would receive
> > > > > > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED error."
> > > > > > > > > > TRANSACTIONAL_ID_AUTHORIZATION_FAILED seems unintuitive
> for
> > > the
> > > > > > > client
> > > > > > > > to
> > > > > > > > > > understand what the actual cause is.
> > > > > > > > > >
> > > > > > > > > > 17. completeTransaction(). We expect this to be only used
> > > > during
> > > > > > > > > recovery.
> > > > > > > > > > Could we document this clearly? Could we prevent it from
> > > being
> > > > > used
> > > > > > > > > > incorrectly (e.g. throw an exception if the producer has
> > > called
> > > > > > other
> > > > > > > > > > methods like send())?
> > > > > > > > > >
> > > > > > > > > > 18. "either prepareTransaction was called or
> > > > > initTransaction(true)
> > > > > > > was
> > > > > > > > > > called": "either" should be "neither"?
> > > > > > > > > >
> > > > > > > > > > 19. Since InitProducerId always bumps up the epoch, it
> > > creates
> > > > a
> > > > > > > > > situation
> > > > > > > > > > where there could be multiple outstanding txns. The
> > following
> > > > is
> > > > > an
> > > > > > > > > example
> > > > > > > > > > of a potential problem during recovery.
> > > > > > > > > >    The last txn epoch in the external store is 41 when
> the
> > > app
> > > > > > dies.
> > > > > > > > > >    Instance1 is created for recovery.
> > > > > > > > > >      1. (instance1) InitProducerId(keepPreparedTxn=true),
> > > > > epoch=42,
> > > > > > > > > > ongoingEpoch=41
> > > > > > > > > >      2. (instance1) dies before completeTxn(41) can be
> > > called.
> > > > > > > > > >    Instance2 is created for recovery.
> > > > > > > > > >      3. (instance2) InitProducerId(keepPreparedTxn=true),
> > > > > epoch=43,
> > > > > > > > > > ongoingEpoch=42
> > > > > > > > > >      4. (instance2) completeTxn(41) => abort
> > > > > > > > > >    The first problem is that 41 now is aborted when it
> > should
> > > > be
> > > > > > > > > committed.
> > > > > > > > > > The second one is that it's not clear who could abort
> epoch
> > > 42,
> > > > > > which
> > > > > > > > is
> > > > > > > > > > still open.
> > > > > > > > > >
> > > > > > > > > > Jun
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, Dec 7, 2023 at 2:43 PM Justine Olshan
> > > > > > > > > <jols...@confluent.io.invalid
> > > > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hey Artem,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the updates. I think what you say makes
> > sense. I
> > > > > just
> > > > > > > > > updated
> > > > > > > > > > my
> > > > > > > > > > > KIP so I want to reconcile some of the changes we made
> > > > > especially
> > > > > > > > with
> > > > > > > > > > > respect to the TransactionLogValue.
> > > > > > > > > > >
> > > > > > > > > > > Firstly, I believe tagged fields require a default
> value
> > so
> > > > > that
> > > > > > if
> > > > > > > > > they
> > > > > > > > > > > are not filled, we return the default (and know that
> they
> > > > were
> > > > > > > > empty).
> > > > > > > > > > For
> > > > > > > > > > > my KIP, I proposed the default for producer ID tagged
> > > fields
> > > > > > should
> > > > > > > > be
> > > > > > > > > > -1.
> > > > > > > > > > > I was wondering if we could update the KIP to include
> the
> > > > > default
> > > > > > > > > values
> > > > > > > > > > > for producer ID and epoch.
> > > > > > > > > > >
> > > > > > > > > > > Next, I noticed we decided to rename the fields. I
> guess
> > > that
> > > > > the
> > > > > > > > field
> > > > > > > > > > > "NextProducerId" in my KIP correlates to "ProducerId"
> in
> > > this
> > > > > > KIP.
> > > > > > > Is
> > > > > > > > > > that
> > > > > > > > > > > correct? So we would have "TransactionProducerId" for
> the
> > > > > > > non-tagged
> > > > > > > > > > field
> > > > > > > > > > > and have "ProducerId" (NextProducerId) and
> > "PrevProducerId"
> > > > as
> > > > > > > tagged
> > > > > > > > > > > fields the final version after KIP-890 and KIP-936 are
> > > > > > implemented.
> > > > > > > > Is
> > > > > > > > > > this
> > > > > > > > > > > correct? I think the tags will need updating, but that
> is
> > > > > > trivial.
> > > > > > > > > > >
> > > > > > > > > > > The final question I had was with respect to storing
> the
> > > new
> > > > > > epoch.
> > > > > > > > In
> > > > > > > > > > > KIP-890 part 2 (epoch bumps) I think we concluded that
> we
> > > > don't
> > > > > > > need
> > > > > > > > to
> > > > > > > > > > > store the epoch since we can interpret the previous
> epoch
> > > > based
> > > > > > on
> > > > > > > > the
> > > > > > > > > > > producer ID. But here we could call the InitProducerId
> > > > multiple
> > > > > > > times
> > > > > > > > > and
> > > > > > > > > > > we only want the producer with the correct epoch to be
> > able
> > > > to
> > > > > > > commit
> > > > > > > > > the
> > > > > > > > > > > transaction. Is that the correct reasoning for why we
> > need
> > > > > epoch
> > > > > > > here
> > > > > > > > > but
> > > > > > > > > > > not the Prepare/Commit state.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Justine
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Nov 22, 2023 at 9:48 AM Artem Livshits
> > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Justine,
> > > > > > > > > > > >
> > > > > > > > > > > > After thinking a bit about supporting atomic dual
> > writes
> > > > for
> > > > > > > Kafka
> > > > > > > > +
> > > > > > > > > > > NoSQL
> > > > > > > > > > > > database, I came to a conclusion that we do need to
> > bump
> > > > the
> > > > > > > epoch
> > > > > > > > > even
> > > > > > > > > > > > with InitProducerId(keepPreparedTxn=true).  As I
> > > described
> > > > in
> > > > > > my
> > > > > > > > > > previous
> > > > > > > > > > > > email, we wouldn't need to bump the epoch to protect
> > from
> > > > > > zombies
> > > > > > > > so
> > > > > > > > > > that
> > > > > > > > > > > > reasoning is still true.  But we cannot protect from
> > > > > > split-brain
> > > > > > > > > > > scenarios
> > > > > > > > > > > > when two or more instances of a producer with the
> same
> > > > > > > > transactional
> > > > > > > > > id
> > > > > > > > > > > try
> > > > > > > > > > > > to produce at the same time.  The dual-write example
> > for
> > > > SQL
> > > > > > > > > databases
> > > > > > > > > > (
> > > > > > > > > > > > https://github.com/apache/kafka/pull/14231/files)
> > > doesn't
> > > > > > have a
> > > > > > > > > > > > split-brain problem because execution is protected by
> > the
> > > > > > update
> > > > > > > > lock
> > > > > > > > > > on
> > > > > > > > > > > > the transaction state record; however NoSQL databases
> > may
> > > > not
> > > > > > > have
> > > > > > > > > this
> > > > > > > > > > > > protection (I'll write an example for NoSQL database
> > > > > dual-write
> > > > > > > > > soon).
> > > > > > > > > > > >
> > > > > > > > > > > > In a nutshell, here is an example of a split-brain
> > > > scenario:
> > > > > > > > > > > >
> > > > > > > > > > > >    1. (instance1)
> InitProducerId(keepPreparedTxn=true),
> > > got
> > > > > > > > epoch=42
> > > > > > > > > > > >    2. (instance2)
> InitProducerId(keepPreparedTxn=true),
> > > got
> > > > > > > > epoch=42
> > > > > > > > > > > >    3. (instance1) CommitTxn, epoch bumped to 43
> > > > > > > > > > > >    4. (instance2) CommitTxn, this is considered a
> > retry,
> > > so
> > > > > it
> > > > > > > got
> > > > > > > > > > epoch
> > > > > > > > > > > 43
> > > > > > > > > > > >    as well
> > > > > > > > > > > >    5. (instance1) Produce messageA w/sequence 1
> > > > > > > > > > > >    6. (instance2) Produce messageB w/sequence 1, this
> > is
> > > > > > > > considered a
> > > > > > > > > > > >    duplicate
> > > > > > > > > > > >    7. (instance2) Produce messageC w/sequence 2
> > > > > > > > > > > >    8. (instance1) Produce messageD w/sequence 2, this
> > is
> > > > > > > > considered a
> > > > > > > > > > > >    duplicate
> > > > > > > > > > > >
> > > > > > > > > > > > Now if either of those commit the transaction, it
> would
> > > > have
> > > > > a
> > > > > > > mix
> > > > > > > > of
> > > > > > > > > > > > messages from the two instances (messageA and
> > messageC).
> > > > > With
> > > > > > > the
> > > > > > > > > > proper
> > > > > > > > > > > > epoch bump, instance1 would get fenced at step 3.
> > > > > > > > > > > >
> > > > > > > > > > > > In order to update epoch in
> > > > > > InitProducerId(keepPreparedTxn=true)
> > > > > > > we
> > > > > > > > > > need
> > > > > > > > > > > to
> > > > > > > > > > > > preserve the ongoing transaction's epoch (and
> > producerId,
> > > > if
> > > > > > the
> > > > > > > > > epoch
> > > > > > > > > > > > overflows), because we'd need to make a correct
> > decision
> > > > when
> > > > > > we
> > > > > > > > > > compare
> > > > > > > > > > > > the PreparedTxnState that we read from the database
> > with
> > > > the
> > > > > > > > > > (producerId,
> > > > > > > > > > > > epoch) of the ongoing transaction.
> > > > > > > > > > > >
> > > > > > > > > > > > I've updated the KIP with the following:
> > > > > > > > > > > >
> > > > > > > > > > > >    - Ongoing transaction now has 2 (producerId,
> epoch)
> > > > pairs
> > > > > --
> > > > > > > one
> > > > > > > > > > pair
> > > > > > > > > > > >    describes the ongoing transaction, the other pair
> > > > > describes
> > > > > > > > > expected
> > > > > > > > > > > > epoch
> > > > > > > > > > > >    for operations on this transactional id
> > > > > > > > > > > >    - InitProducerIdResponse now returns 2
> (producerId,
> > > > epoch)
> > > > > > > pairs
> > > > > > > > > > > >    - TransactionalLogValue now has 2 (producerId,
> > epoch)
> > > > > pairs,
> > > > > > > the
> > > > > > > > > new
> > > > > > > > > > > >    values added as tagged fields, so it's easy to
> > > downgrade
> > > > > > > > > > > >    - Added a note about downgrade in the
> Compatibility
> > > > > section
> > > > > > > > > > > >    - Added a rejected alternative
> > > > > > > > > > > >
> > > > > > > > > > > > -Artem
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Oct 6, 2023 at 5:16 PM Artem Livshits <
> > > > > > > > > alivsh...@confluent.io>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Justine,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thank you for the questions.  Currently
> (pre-KIP-939)
> > > we
> > > > > > always
> > > > > > > > > bump
> > > > > > > > > > > the
> > > > > > > > > > > > > epoch on InitProducerId and abort an ongoing
> > > transaction
> > > > > (if
> > > > > > > > > any).  I
> > > > > > > > > > > > > expect this behavior will continue with KIP-890 as
> > > well.
> > > > > > > > > > > > >
> > > > > > > > > > > > > With KIP-939 we need to support the case when the
> > > ongoing
> > > > > > > > > transaction
> > > > > > > > > > > > > needs to be preserved when keepPreparedTxn=true.
> > > Bumping
> > > > > > epoch
> > > > > > > > > > without
> > > > > > > > > > > > > aborting or committing a transaction is tricky
> > because
> > > > > epoch
> > > > > > > is a
> > > > > > > > > > short
> > > > > > > > > > > > > value and it's easy to overflow.  Currently, the
> > > overflow
> > > > > > case
> > > > > > > is
> > > > > > > > > > > handled
> > > > > > > > > > > > > by aborting the ongoing transaction, which would
> send
> > > out
> > > > > > > > > transaction
> > > > > > > > > > > > > markers with epoch=Short.MAX_VALUE to the partition
> > > > > leaders,
> > > > > > > > which
> > > > > > > > > > > would
> > > > > > > > > > > > > fence off any messages with the producer id that
> > > started
> > > > > the
> > > > > > > > > > > transaction
> > > > > > > > > > > > > (they would have epoch that is less than
> > > > Short.MAX_VALUE).
> > > > > > > Then
> > > > > > > > it
> > > > > > > > > > is
> > > > > > > > > > > > safe
> > > > > > > > > > > > > to allocate a new producer id and use it in new
> > > > > transactions.
> > > > > > > > > > > > >
> > > > > > > > > > > > > We could say that maybe when keepPreparedTxn=true
> we
> > > bump
> > > > > > epoch
> > > > > > > > > > unless
> > > > > > > > > > > it
> > > > > > > > > > > > > leads to overflow, and don't bump epoch in the
> > overflow
> > > > > case.
> > > > > > > I
> > > > > > > > > > don't
> > > > > > > > > > > > > think it's a good solution because if it's not safe
> > to
> > > > keep
> > > > > > the
> > > > > > > > > same
> > > > > > > > > > > > epoch
> > > > > > > > > > > > > when keepPreparedTxn=true, then we must handle the
> > > epoch
> > > > > > > overflow
> > > > > > > > > > case
> > > > > > > > > > > as
> > > > > > > > > > > > > well.  So either we should convince ourselves that
> > it's
> > > > > safe
> > > > > > to
> > > > > > > > > keep
> > > > > > > > > > > the
> > > > > > > > > > > > > epoch and do it in the general case, or we always
> > bump
> > > > the
> > > > > > > epoch
> > > > > > > > > and
> > > > > > > > > > > > handle
> > > > > > > > > > > > > the overflow.
> > > > > > > > > > > > >
> > > > > > > > > > > > > With KIP-890, we bump the epoch on every
> transaction
> > > > > commit /
> > > > > > > > > abort.
> > > > > > > > > > > > This
> > > > > > > > > > > > > guarantees that even if
> > > > > InitProducerId(keepPreparedTxn=true)
> > > > > > > > > doesn't
> > > > > > > > > > > > > increment epoch on the ongoing transaction, the
> > client
> > > > will
> > > > > > > have
> > > > > > > > to
> > > > > > > > > > > call
> > > > > > > > > > > > > commit or abort to finish the transaction and will
> > > > > increment
> > > > > > > the
> > > > > > > > > > epoch
> > > > > > > > > > > > (and
> > > > > > > > > > > > > handle epoch overflow, if needed).  If the ongoing
> > > > > > transaction
> > > > > > > > was
> > > > > > > > > > in a
> > > > > > > > > > > > bad
> > > > > > > > > > > > > state and had some zombies waiting to arrive, the
> > abort
> > > > > > > operation
> > > > > > > > > > would
> > > > > > > > > > > > > fence them because with KIP-890 every abort would
> > bump
> > > > the
> > > > > > > epoch.
> > > > > > > > > > > > >
> > > > > > > > > > > > > We could also look at this from the following
> > > > perspective.
> > > > > > > With
> > > > > > > > > > > KIP-890,
> > > > > > > > > > > > > zombies won't be able to cross transaction
> > boundaries;
> > > > each
> > > > > > > > > > transaction
> > > > > > > > > > > > > completion creates a boundary and any activity in
> the
> > > > past
> > > > > > gets
> > > > > > > > > > > confined
> > > > > > > > > > > > in
> > > > > > > > > > > > > the boundary.  Then data in any partition would
> look
> > > like
> > > > > > this:
> > > > > > > > > > > > >
> > > > > > > > > > > > > 1. message1, epoch=42
> > > > > > > > > > > > > 2. message2, epoch=42
> > > > > > > > > > > > > 3. message3, epoch=42
> > > > > > > > > > > > > 4. marker (commit or abort), epoch=43
> > > > > > > > > > > > >
> > > > > > > > > > > > > Now if we inject steps 3a and 3b like this:
> > > > > > > > > > > > >
> > > > > > > > > > > > > 1. message1, epoch=42
> > > > > > > > > > > > > 2. message2, epoch=42
> > > > > > > > > > > > > 3. message3, epoch=42
> > > > > > > > > > > > > 3a. crash
> > > > > > > > > > > > > 3b. InitProducerId(keepPreparedTxn=true)
> > > > > > > > > > > > > 4. marker (commit or abort), epoch=43
> > > > > > > > > > > > >
> > > > > > > > > > > > > The invariant still holds even with steps 3a and 3b
> > --
> > > > > > whatever
> > > > > > > > > > > activity
> > > > > > > > > > > > > was in the past will get confined in the past with
> > > > > mandatory
> > > > > > > > abort
> > > > > > > > > /
> > > > > > > > > > > > commit
> > > > > > > > > > > > > that must follow
> > InitProducerId(keepPreparedTxn=true).
> > > > > > > > > > > > >
> > > > > > > > > > > > > So KIP-890 provides the proper isolation between
> > > > > > transactions,
> > > > > > > so
> > > > > > > > > > > > > injecting crash +
> > InitProducerId(keepPreparedTxn=true)
> > > > into
> > > > > > the
> > > > > > > > > > > > > transaction sequence is safe from the zombie
> > protection
> > > > > > > > > perspective.
> > > > > > > > > > > > >
> > > > > > > > > > > > > That said, I'm still thinking about it and looking
> > for
> > > > > cases
> > > > > > > that
> > > > > > > > > > might
> > > > > > > > > > > > > break because we don't bump epoch when
> > > > > > > > > > > > > InitProducerId(keepPreparedTxn=true), if such cases
> > > > exist,
> > > > > > > we'll
> > > > > > > > > need
> > > > > > > > > > > to
> > > > > > > > > > > > > develop the logic to handle epoch overflow for
> > ongoing
> > > > > > > > > transactions.
> > > > > > > > > > > > >
> > > > > > > > > > > > > -Artem
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Tue, Oct 3, 2023 at 10:15 AM Justine Olshan
> > > > > > > > > > > > > <jols...@confluent.io.invalid> wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > >> Hey Artem,
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Thanks for the KIP. I had a question about epoch
> > > > bumping.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Previously when we send an InitProducerId request
> on
> > > > > > Producer
> > > > > > > > > > startup,
> > > > > > > > > > > > we
> > > > > > > > > > > > >> bump the epoch and abort the transaction. Is it
> > > correct
> > > > to
> > > > > > > > assume
> > > > > > > > > > that
> > > > > > > > > > > > we
> > > > > > > > > > > > >> will still bump the epoch, but just not abort the
> > > > > > transaction?
> > > > > > > > > > > > >> If we still bump the epoch in this case, how does
> > this
> > > > > > > interact
> > > > > > > > > with
> > > > > > > > > > > > >> KIP-890 where we also bump the epoch on every
> > > > transaction.
> > > > > > (I
> > > > > > > > > think
> > > > > > > > > > > this
> > > > > > > > > > > > >> means that we may skip epochs and the data itself
> > will
> > > > all
> > > > > > > have
> > > > > > > > > the
> > > > > > > > > > > same
> > > > > > > > > > > > >> epoch)
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> I may have follow ups depending on the answer to
> > this.
> > > > :)
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Thanks,
> > > > > > > > > > > > >> Justine
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> On Thu, Sep 7, 2023 at 9:51 PM Artem Livshits
> > > > > > > > > > > > >> <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> > Hi Alex,
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Thank you for your questions.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > > the purpose of having broker-level
> > > > > > > > > > > > transaction.two.phase.commit.enable
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > The thinking is that 2PC is a bit of an advanced
> > > > > construct
> > > > > > > so
> > > > > > > > > > > enabling
> > > > > > > > > > > > >> 2PC
> > > > > > > > > > > > >> > in a Kafka cluster should be an explicit
> decision.
> > > If
> > > > > it
> > > > > > is
> > > > > > > > set
> > > > > > > > > > to
> > > > > > > > > > > > >> 'false'
> > > > > > > > > > > > >> > InitiProducerId (and initTransactions) would
> > > > > > > > > > > > >> > return TRANSACTIONAL_ID_AUTHORIZATION_FAILED.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > > WDYT about adding an AdminClient method that
> > > returns
> > > > > the
> > > > > > > > state
> > > > > > > > > > of
> > > > > > > > > > > > >> > transaction.two.phase.commit.enable
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > I wonder if the client could just try to use 2PC
> > and
> > > > > then
> > > > > > > > handle
> > > > > > > > > > the
> > > > > > > > > > > > >> error
> > > > > > > > > > > > >> > (e.g. if it needs to fall back to ordinary
> > > > > transactions).
> > > > > > > > This
> > > > > > > > > > way
> > > > > > > > > > > it
> > > > > > > > > > > > >> > could uniformly handle cases when Kafka cluster
> > > > doesn't
> > > > > > > > support
> > > > > > > > > > 2PC
> > > > > > > > > > > > >> > completely and cases when 2PC is restricted to
> > > certain
> > > > > > > users.
> > > > > > > > > We
> > > > > > > > > > > > could
> > > > > > > > > > > > >> > also expose this config in describeConfigs, if
> the
> > > > > > fallback
> > > > > > > > > > approach
> > > > > > > > > > > > >> > doesn't work for some scenarios.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > -Artem
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > On Tue, Sep 5, 2023 at 12:45 PM Alexander
> > Sorokoumov
> > > > > > > > > > > > >> > <asorokou...@confluent.io.invalid> wrote:
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > > Hi Artem,
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > Thanks for publishing this KIP!
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > Can you please clarify the purpose of having
> > > > > > broker-level
> > > > > > > > > > > > >> > > transaction.two.phase.commit.enable config in
> > > > addition
> > > > > > to
> > > > > > > > the
> > > > > > > > > > new
> > > > > > > > > > > > >> ACL? If
> > > > > > > > > > > > >> > > the brokers are configured with
> > > > > > > > > > > > >> > transaction.two.phase.commit.enable=false,
> > > > > > > > > > > > >> > > at what point will a client configured with
> > > > > > > > > > > > >> > > transaction.two.phase.commit.enable=true fail?
> > > Will
> > > > it
> > > > > > > > happen
> > > > > > > > > at
> > > > > > > > > > > > >> > > KafkaProducer#initTransactions?
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > WDYT about adding an AdminClient method that
> > > returns
> > > > > the
> > > > > > > > state
> > > > > > > > > > of
> > > > > > > > > > > t
> > > > > > > > > > > > >> > > ransaction.two.phase.commit.enable? This way,
> > > > clients
> > > > > > > would
> > > > > > > > > know
> > > > > > > > > > > in
> > > > > > > > > > > > >> > advance
> > > > > > > > > > > > >> > > if 2PC is enabled on the brokers.
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > Best,
> > > > > > > > > > > > >> > > Alex
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > On Fri, Aug 25, 2023 at 9:40 AM Roger Hoover <
> > > > > > > > > > > > roger.hoo...@gmail.com>
> > > > > > > > > > > > >> > > wrote:
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > > Other than supporting multiplexing
> > transactional
> > > > > > streams
> > > > > > > > on
> > > > > > > > > a
> > > > > > > > > > > > single
> > > > > > > > > > > > >> > > > producer, I don't see how to improve it.
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > On Thu, Aug 24, 2023 at 12:12 PM Artem
> > Livshits
> > > > > > > > > > > > >> > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > > > > Hi Roger,
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > Thank you for summarizing the cons.  I
> agree
> > > and
> > > > > I'm
> > > > > > > > > curious
> > > > > > > > > > > > what
> > > > > > > > > > > > >> > would
> > > > > > > > > > > > >> > > > be
> > > > > > > > > > > > >> > > > > the alternatives to solve these problems
> > > better
> > > > > and
> > > > > > if
> > > > > > > > > they
> > > > > > > > > > > can
> > > > > > > > > > > > be
> > > > > > > > > > > > >> > > > > incorporated into this proposal (or built
> > > > > > > independently
> > > > > > > > in
> > > > > > > > > > > > >> addition
> > > > > > > > > > > > >> > to
> > > > > > > > > > > > >> > > or
> > > > > > > > > > > > >> > > > > on top of this proposal).  E.g. one
> > potential
> > > > > > > extension
> > > > > > > > we
> > > > > > > > > > > > >> discussed
> > > > > > > > > > > > >> > > > > earlier in the thread could be
> multiplexing
> > > > > logical
> > > > > > > > > > > > transactional
> > > > > > > > > > > > >> > > > "streams"
> > > > > > > > > > > > >> > > > > with a single producer.
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > -Artem
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > On Wed, Aug 23, 2023 at 4:50 PM Roger
> > Hoover <
> > > > > > > > > > > > >> roger.hoo...@gmail.com
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> > > > > wrote:
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > > Thanks.  I like that you're moving Kafka
> > > > toward
> > > > > > > > > supporting
> > > > > > > > > > > > this
> > > > > > > > > > > > >> > > > > dual-write
> > > > > > > > > > > > >> > > > > > pattern.  Each use case needs to
> consider
> > > the
> > > > > > > > tradeoffs.
> > > > > > > > > > > You
> > > > > > > > > > > > >> > already
> > > > > > > > > > > > >> > > > > > summarized the pros very well in the
> > KIP.  I
> > > > > would
> > > > > > > > > > summarize
> > > > > > > > > > > > the
> > > > > > > > > > > > >> > cons
> > > > > > > > > > > > >> > > > > > as follows:
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > - you sacrifice availability - each
> write
> > > > > requires
> > > > > > > > both
> > > > > > > > > DB
> > > > > > > > > > > and
> > > > > > > > > > > > >> > Kafka
> > > > > > > > > > > > >> > > to
> > > > > > > > > > > > >> > > > > be
> > > > > > > > > > > > >> > > > > > available so I think your overall
> > > application
> > > > > > > > > availability
> > > > > > > > > > > is
> > > > > > > > > > > > 1
> > > > > > > > > > > > >> -
> > > > > > > > > > > > >> > > p(DB
> > > > > > > > > > > > >> > > > is
> > > > > > > > > > > > >> > > > > > unavailable)*p(Kafka is unavailable).
> > > > > > > > > > > > >> > > > > > - latency will be higher and throughput
> > > lower
> > > > -
> > > > > > each
> > > > > > > > > write
> > > > > > > > > > > > >> requires
> > > > > > > > > > > > >> > > > both
> > > > > > > > > > > > >> > > > > > writes to DB and Kafka while holding an
> > > > > exclusive
> > > > > > > lock
> > > > > > > > > in
> > > > > > > > > > > DB.
> > > > > > > > > > > > >> > > > > > - you need to create a producer per unit
> > of
> > > > > > > > concurrency
> > > > > > > > > in
> > > > > > > > > > > > your
> > > > > > > > > > > > >> app
> > > > > > > > > > > > >> > > > which
> > > > > > > > > > > > >> > > > > > has some overhead in the app and Kafka
> > side
> > > > > > (number
> > > > > > > of
> > > > > > > > > > > > >> connections,
> > > > > > > > > > > > >> > > > poor
> > > > > > > > > > > > >> > > > > > batching).  I assume the producers would
> > > need
> > > > to
> > > > > > be
> > > > > > > > > > > configured
> > > > > > > > > > > > >> for
> > > > > > > > > > > > >> > > low
> > > > > > > > > > > > >> > > > > > latency (linger.ms=0)
> > > > > > > > > > > > >> > > > > > - there's some complexity in managing
> > stable
> > > > > > > > > transactional
> > > > > > > > > > > ids
> > > > > > > > > > > > >> for
> > > > > > > > > > > > >> > > each
> > > > > > > > > > > > >> > > > > > producer/concurrency unit in your
> > > application.
> > > > > > With
> > > > > > > > k8s
> > > > > > > > > > > > >> > deployment,
> > > > > > > > > > > > >> > > > you
> > > > > > > > > > > > >> > > > > > may need to switch to something like a
> > > > > StatefulSet
> > > > > > > > that
> > > > > > > > > > > gives
> > > > > > > > > > > > >> each
> > > > > > > > > > > > >> > > pod
> > > > > > > > > > > > >> > > > a
> > > > > > > > > > > > >> > > > > > stable identity across restarts.  On top
> > of
> > > > that
> > > > > > pod
> > > > > > > > > > > identity
> > > > > > > > > > > > >> which
> > > > > > > > > > > > >> > > you
> > > > > > > > > > > > >> > > > > can
> > > > > > > > > > > > >> > > > > > use as a prefix, you then assign unique
> > > > > > > transactional
> > > > > > > > > ids
> > > > > > > > > > to
> > > > > > > > > > > > >> each
> > > > > > > > > > > > >> > > > > > concurrency unit (thread/goroutine).
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > On Wed, Aug 23, 2023 at 12:53 PM Artem
> > > > Livshits
> > > > > > > > > > > > >> > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > > > > Hi Roger,
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > Thank you for the feedback.  You make
> a
> > > very
> > > > > > good
> > > > > > > > > point
> > > > > > > > > > > that
> > > > > > > > > > > > >> we
> > > > > > > > > > > > >> > > also
> > > > > > > > > > > > >> > > > > > > discussed internally.  Adding support
> > for
> > > > > > multiple
> > > > > > > > > > > > concurrent
> > > > > > > > > > > > >> > > > > > > transactions in one producer could be
> > > > valuable
> > > > > > but
> > > > > > > > it
> > > > > > > > > > > seems
> > > > > > > > > > > > to
> > > > > > > > > > > > >> > be a
> > > > > > > > > > > > >> > > > > > fairly
> > > > > > > > > > > > >> > > > > > > large and independent change that
> would
> > > > > deserve
> > > > > > a
> > > > > > > > > > separate
> > > > > > > > > > > > >> KIP.
> > > > > > > > > > > > >> > If
> > > > > > > > > > > > >> > > > > such
> > > > > > > > > > > > >> > > > > > > support is added we could modify 2PC
> > > > > > functionality
> > > > > > > > to
> > > > > > > > > > > > >> incorporate
> > > > > > > > > > > > >> > > > that.
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > > Maybe not too bad but a bit of pain
> to
> > > > > manage
> > > > > > > > these
> > > > > > > > > > ids
> > > > > > > > > > > > >> inside
> > > > > > > > > > > > >> > > each
> > > > > > > > > > > > >> > > > > > > process and across all application
> > > > processes.
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > I'm not sure if supporting multiple
> > > > > transactions
> > > > > > > in
> > > > > > > > > one
> > > > > > > > > > > > >> producer
> > > > > > > > > > > > >> > > > would
> > > > > > > > > > > > >> > > > > > make
> > > > > > > > > > > > >> > > > > > > id management simpler: we'd need to
> > store
> > > a
> > > > > > piece
> > > > > > > of
> > > > > > > > > > data
> > > > > > > > > > > > per
> > > > > > > > > > > > >> > > > > > transaction,
> > > > > > > > > > > > >> > > > > > > so whether it's N producers with a
> > single
> > > > > > > > transaction
> > > > > > > > > > or N
> > > > > > > > > > > > >> > > > transactions
> > > > > > > > > > > > >> > > > > > > with a single producer, it's still
> > roughly
> > > > the
> > > > > > > same
> > > > > > > > > > amount
> > > > > > > > > > > > of
> > > > > > > > > > > > >> > data
> > > > > > > > > > > > >> > > to
> > > > > > > > > > > > >> > > > > > > manage.  In fact, managing
> transactional
> > > ids
> > > > > > > > (current
> > > > > > > > > > > > >> proposal)
> > > > > > > > > > > > >> > > might
> > > > > > > > > > > > >> > > > > be
> > > > > > > > > > > > >> > > > > > > easier, because the id is controlled
> by
> > > the
> > > > > > > > > application
> > > > > > > > > > > and
> > > > > > > > > > > > it
> > > > > > > > > > > > >> > > knows
> > > > > > > > > > > > >> > > > > how
> > > > > > > > > > > > >> > > > > > to
> > > > > > > > > > > > >> > > > > > > complete the transaction after crash /
> > > > > restart;
> > > > > > > > while
> > > > > > > > > a
> > > > > > > > > > > TID
> > > > > > > > > > > > >> would
> > > > > > > > > > > > >> > > be
> > > > > > > > > > > > >> > > > > > > generated by Kafka and that would
> > create a
> > > > > > > question
> > > > > > > > of
> > > > > > > > > > > > >> starting
> > > > > > > > > > > > >> > > Kafka
> > > > > > > > > > > > >> > > > > > > transaction, but not saving its TID
> and
> > > then
> > > > > > > > crashing,
> > > > > > > > > > > then
> > > > > > > > > > > > >> > > figuring
> > > > > > > > > > > > >> > > > > out
> > > > > > > > > > > > >> > > > > > > which transactions to abort and etc.
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > > 2) creating a separate producer for
> > each
> > > > > > > > concurrency
> > > > > > > > > > > slot
> > > > > > > > > > > > in
> > > > > > > > > > > > >> > the
> > > > > > > > > > > > >> > > > > > > application
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > This is a very valid concern.  Maybe
> > we'd
> > > > need
> > > > > > to
> > > > > > > > have
> > > > > > > > > > > some
> > > > > > > > > > > > >> > > > > multiplexing
> > > > > > > > > > > > >> > > > > > of
> > > > > > > > > > > > >> > > > > > > transactional logical "streams" over
> the
> > > > same
> > > > > > > > > > connection.
> > > > > > > > > > > > >> Seems
> > > > > > > > > > > > >> > > > like a
> > > > > > > > > > > > >> > > > > > > separate KIP, though.
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > > Otherwise, it seems you're left with
> > > > > > > > single-threaded
> > > > > > > > > > > model
> > > > > > > > > > > > >> per
> > > > > > > > > > > > >> > > > > > > application process?
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > That's a fair assessment.  Not
> > necessarily
> > > > > > exactly
> > > > > > > > > > > > >> > single-threaded
> > > > > > > > > > > > >> > > > per
> > > > > > > > > > > > >> > > > > > > application, but a single producer per
> > > > thread
> > > > > > > model
> > > > > > > > > > (i.e.
> > > > > > > > > > > an
> > > > > > > > > > > > >> > > > > application
> > > > > > > > > > > > >> > > > > > > could have a pool of threads +
> producers
> > > to
> > > > > > > increase
> > > > > > > > > > > > >> > concurrency).
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > -Artem
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > On Tue, Aug 22, 2023 at 7:22 PM Roger
> > > > Hoover <
> > > > > > > > > > > > >> > > roger.hoo...@gmail.com
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > > > > > wrote:
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > > Artem,
> > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > >> > > > > > > > Thanks for the reply.
> > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > >> > > > > > > > If I understand correctly, Kafka
> does
> > > not
> > > > > > > support
> > > > > > > > > > > > concurrent
> > > > > > > > > > > > >> > > > > > transactions
> > > > > > > > > > > > >> > > > > > > > from the same producer
> (transactional
> > > id).
> > > > > I
> > > > > > > > think
> > > > > > > > > > this
> > > > > > > > > > > > >> means
> > > > > > > > > > > > >> > > that
> > > > > > > > > > > > >> > > > > > > > applications that want to support
> > > > in-process
> > > > > > > > > > concurrency
> > > > > > > > > > > > >> (say
> > > > > > > > > > > > >> > > > > > > thread-level
> > > > > > > > > > > > >> > > > > > > > concurrency with row-level DB
> locking)
> > > > would
> > > > > > > need
> > > > > > > > to
> > > > > > > > > > > > manage
> > > > > > > > > > > > >> > > > separate
> > > > > > > > > > > > >> > > > > > > > transactional ids and producers per
> > > thread
> > > > > and
> > > > > > > > then
> > > > > > > > > > > store
> > > > > > > > > > > > >> txn
> > > > > > > > > > > > >> > > state
> > > > > > > > > > > > >> > > > > > > > accordingly.   The potential
> usability
> > > > > > > downsides I
> > > > > > > > > see
> > > > > > > > > > > are
> > > > > > > > > > > > >> > > > > > > > 1) managing a set of transactional
> ids
> > > for
> > > > > > each
> > > > > > > > > > > > application
> > > > > > > > > > > > >> > > process
> > > > > > > > > > > > >> > > > > > that
> > > > > > > > > > > > >> > > > > > > > scales up to it's max concurrency.
> > > Maybe
> > > > > not
> > > > > > > too
> > > > > > > > > bad
> > > > > > > > > > > but
> > > > > > > > > > > > a
> > > > > > > > > > > > >> bit
> > > > > > > > > > > > >> > > of
> > > > > > > > > > > > >> > > > > pain
> > > > > > > > > > > > >> > > > > > > to
> > > > > > > > > > > > >> > > > > > > > manage these ids inside each process
> > and
> > > > > > across
> > > > > > > > all
> > > > > > > > > > > > >> application
> > > > > > > > > > > > >> > > > > > > processes.
> > > > > > > > > > > > >> > > > > > > > 2) creating a separate producer for
> > each
> > > > > > > > concurrency
> > > > > > > > > > > slot
> > > > > > > > > > > > in
> > > > > > > > > > > > >> > the
> > > > > > > > > > > > >> > > > > > > > application - this could create a
> lot
> > > more
> > > > > > > > producers
> > > > > > > > > > and
> > > > > > > > > > > > >> > > resultant
> > > > > > > > > > > > >> > > > > > > > connections to Kafka than the
> typical
> > > > model
> > > > > > of a
> > > > > > > > > > single
> > > > > > > > > > > > >> > producer
> > > > > > > > > > > > >> > > > per
> > > > > > > > > > > > >> > > > > > > > process.
> > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > >> > > > > > > > Otherwise, it seems you're left with
> > > > > > > > single-threaded
> > > > > > > > > > > model
> > > > > > > > > > > > >> per
> > > > > > > > > > > > >> > > > > > > application
> > > > > > > > > > > > >> > > > > > > > process?
> > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > >> > > > > > > > Thanks,
> > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > >> > > > > > > > Roger
> > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > >> > > > > > > > On Tue, Aug 22, 2023 at 5:11 PM
> Artem
> > > > > Livshits
> > > > > > > > > > > > >> > > > > > > > <alivsh...@confluent.io.invalid>
> > wrote:
> > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > Hi Roger, Arjun,
> > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > Thank you for the questions.
> > > > > > > > > > > > >> > > > > > > > > > It looks like the application
> must
> > > > have
> > > > > > > stable
> > > > > > > > > > > > >> > transactional
> > > > > > > > > > > > >> > > > ids
> > > > > > > > > > > > >> > > > > > over
> > > > > > > > > > > > >> > > > > > > > > time?
> > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > The transactional id should
> uniquely
> > > > > > identify
> > > > > > > a
> > > > > > > > > > > producer
> > > > > > > > > > > > >> > > instance
> > > > > > > > > > > > >> > > > > and
> > > > > > > > > > > > >> > > > > > > > needs
> > > > > > > > > > > > >> > > > > > > > > to be stable across the restarts.
> > If
> > > > the
> > > > > > > > > > > transactional
> > > > > > > > > > > > >> id is
> > > > > > > > > > > > >> > > not
> > > > > > > > > > > > >> > > > > > > stable
> > > > > > > > > > > > >> > > > > > > > > across restarts, then zombie
> > messages
> > > > > from a
> > > > > > > > > > previous
> > > > > > > > > > > > >> > > incarnation
> > > > > > > > > > > > >> > > > > of
> > > > > > > > > > > > >> > > > > > > the
> > > > > > > > > > > > >> > > > > > > > > producer may violate atomicity.
> If
> > > > there
> > > > > > are
> > > > > > > 2
> > > > > > > > > > > producer
> > > > > > > > > > > > >> > > > instances
> > > > > > > > > > > > >> > > > > > > > > concurrently producing data with
> the
> > > > same
> > > > > > > > > > > transactional
> > > > > > > > > > > > >> id,
> > > > > > > > > > > > >> > > they
> > > > > > > > > > > > >> > > > > are
> > > > > > > > > > > > >> > > > > > > > going
> > > > > > > > > > > > >> > > > > > > > > to constantly fence each other and
> > > most
> > > > > > likely
> > > > > > > > > make
> > > > > > > > > > > > >> little or
> > > > > > > > > > > > >> > > no
> > > > > > > > > > > > >> > > > > > > > progress.
> > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > The name might be a little bit
> > > confusing
> > > > > as
> > > > > > it
> > > > > > > > may
> > > > > > > > > > be
> > > > > > > > > > > > >> > mistaken
> > > > > > > > > > > > >> > > > for
> > > > > > > > > > > > >> > > > > a
> > > > > > > > > > > > >> > > > > > > > > transaction id / TID that uniquely
> > > > > > identifies
> > > > > > > > > every
> > > > > > > > > > > > >> > > transaction.
> > > > > > > > > > > > >> > > > > The
> > > > > > > > > > > > >> > > > > > > > name
> > > > > > > > > > > > >> > > > > > > > > and the semantics were defined in
> > the
> > > > > > original
> > > > > > > > > > > > >> > > > > exactly-once-semantics
> > > > > > > > > > > > >> > > > > > > > (EoS)
> > > > > > > > > > > > >> > > > > > > > > proposal (
> > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > > > > > > > > > >> > > > > > > > > )
> > > > > > > > > > > > >> > > > > > > > > and KIP-939 just build on top of
> > that.
> > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > I'm curious to understand what
> > > happens
> > > > > if
> > > > > > > the
> > > > > > > > > > > producer
> > > > > > > > > > > > >> > dies,
> > > > > > > > > > > > >> > > > and
> > > > > > > > > > > > >> > > > > > does
> > > > > > > > > > > > >> > > > > > > > not
> > > > > > > > > > > > >> > > > > > > > > come up and recover the pending
> > > > > transaction
> > > > > > > > within
> > > > > > > > > > the
> > > > > > > > > > > > >> > > > transaction
> > > > > > > > > > > > >> > > > > > > > timeout
> > > > > > > > > > > > >> > > > > > > > > interval.
> > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > If the producer / application
> never
> > > > comes
> > > > > > > back,
> > > > > > > > > the
> > > > > > > > > > > > >> > transaction
> > > > > > > > > > > > >> > > > > will
> > > > > > > > > > > > >> > > > > > > > remain
> > > > > > > > > > > > >> > > > > > > > > in prepared (a.k.a. "in-doubt")
> > state
> > > > > until
> > > > > > an
> > > > > > > > > > > operator
> > > > > > > > > > > > >> > > > forcefully
> > > > > > > > > > > > >> > > > > > > > > terminates the transaction.
> That's
> > > why
> > > > > > there
> > > > > > > > is a
> > > > > > > > > > new
> > > > > > > > > > > > >> ACL is
> > > > > > > > > > > > >> > > > > defined
> > > > > > > > > > > > >> > > > > > > in
> > > > > > > > > > > > >> > > > > > > > > this proposal -- this
> functionality
> > > > should
> > > > > > > only
> > > > > > > > > > > provided
> > > > > > > > > > > > >> to
> > > > > > > > > > > > >> > > > > > > applications
> > > > > > > > > > > > >> > > > > > > > > that implement proper recovery
> > logic.
> > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > -Artem
> > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > On Tue, Aug 22, 2023 at 12:52 AM
> > Arjun
> > > > > > Satish
> > > > > > > <
> > > > > > > > > > > > >> > > > > > arjun.sat...@gmail.com>
> > > > > > > > > > > > >> > > > > > > > > wrote:
> > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > Hello Artem,
> > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > Thanks for the KIP.
> > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > I have the same question as
> Roger
> > on
> > > > > > > > concurrent
> > > > > > > > > > > > writes,
> > > > > > > > > > > > >> and
> > > > > > > > > > > > >> > > an
> > > > > > > > > > > > >> > > > > > > > additional
> > > > > > > > > > > > >> > > > > > > > > > one on consumer behavior.
> > Typically,
> > > > > > > > > transactions
> > > > > > > > > > > will
> > > > > > > > > > > > >> > > timeout
> > > > > > > > > > > > >> > > > if
> > > > > > > > > > > > >> > > > > > not
> > > > > > > > > > > > >> > > > > > > > > > committed within some time
> > interval.
> > > > > With
> > > > > > > the
> > > > > > > > > > > proposed
> > > > > > > > > > > > >> > > changes
> > > > > > > > > > > > >> > > > in
> > > > > > > > > > > > >> > > > > > > this
> > > > > > > > > > > > >> > > > > > > > > KIP,
> > > > > > > > > > > > >> > > > > > > > > > consumers cannot consume past
> the
> > > > > ongoing
> > > > > > > > > > > transaction.
> > > > > > > > > > > > >> I'm
> > > > > > > > > > > > >> > > > > curious
> > > > > > > > > > > > >> > > > > > to
> > > > > > > > > > > > >> > > > > > > > > > understand what happens if the
> > > > producer
> > > > > > > dies,
> > > > > > > > > and
> > > > > > > > > > > does
> > > > > > > > > > > > >> not
> > > > > > > > > > > > >> > > come
> > > > > > > > > > > > >> > > > > up
> > > > > > > > > > > > >> > > > > > > and
> > > > > > > > > > > > >> > > > > > > > > > recover the pending transaction
> > > within
> > > > > the
> > > > > > > > > > > transaction
> > > > > > > > > > > > >> > > timeout
> > > > > > > > > > > > >> > > > > > > > interval.
> > > > > > > > > > > > >> > > > > > > > > Or
> > > > > > > > > > > > >> > > > > > > > > > are we saying that when used in
> > this
> > > > 2PC
> > > > > > > > > context,
> > > > > > > > > > we
> > > > > > > > > > > > >> should
> > > > > > > > > > > > >> > > > > > configure
> > > > > > > > > > > > >> > > > > > > > > these
> > > > > > > > > > > > >> > > > > > > > > > transaction timeouts to very
> large
> > > > > > > durations?
> > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > Thanks in advance!
> > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > Best,
> > > > > > > > > > > > >> > > > > > > > > > Arjun
> > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > On Mon, Aug 21, 2023 at 1:06 PM
> > > Roger
> > > > > > > Hoover <
> > > > > > > > > > > > >> > > > > > roger.hoo...@gmail.com
> > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > wrote:
> > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > > Hi Artem,
> > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > > Thanks for writing this KIP.
> > Can
> > > > you
> > > > > > > > clarify
> > > > > > > > > > the
> > > > > > > > > > > > >> > > > requirements
> > > > > > > > > > > > >> > > > > a
> > > > > > > > > > > > >> > > > > > > bit
> > > > > > > > > > > > >> > > > > > > > > more
> > > > > > > > > > > > >> > > > > > > > > > > for managing transaction
> state?
> > > It
> > > > > > looks
> > > > > > > > like
> > > > > > > > > > the
> > > > > > > > > > > > >> > > > application
> > > > > > > > > > > > >> > > > > > must
> > > > > > > > > > > > >> > > > > > > > > have
> > > > > > > > > > > > >> > > > > > > > > > > stable transactional ids over
> > > time?
> > > > > >  What
> > > > > > > > is
> > > > > > > > > > the
> > > > > > > > > > > > >> > > granularity
> > > > > > > > > > > > >> > > > > of
> > > > > > > > > > > > >> > > > > > > > those
> > > > > > > > > > > > >> > > > > > > > > > ids
> > > > > > > > > > > > >> > > > > > > > > > > and producers?  Say the
> > > application
> > > > > is a
> > > > > > > > > > > > >> multi-threaded
> > > > > > > > > > > > >> > > Java
> > > > > > > > > > > > >> > > > > web
> > > > > > > > > > > > >> > > > > > > > > server,
> > > > > > > > > > > > >> > > > > > > > > > > can/should all the concurrent
> > > > threads
> > > > > > > share
> > > > > > > > a
> > > > > > > > > > > > >> > transactional
> > > > > > > > > > > > >> > > > id
> > > > > > > > > > > > >> > > > > > and
> > > > > > > > > > > > >> > > > > > > > > > > producer?  That doesn't seem
> > right
> > > > to
> > > > > me
> > > > > > > > > unless
> > > > > > > > > > > the
> > > > > > > > > > > > >> > > > application
> > > > > > > > > > > > >> > > > > > is
> > > > > > > > > > > > >> > > > > > > > > using
> > > > > > > > > > > > >> > > > > > > > > > > global DB locks that serialize
> > all
> > > > > > > requests.
> > > > > > > > > > > > >> Instead, if
> > > > > > > > > > > > >> > > the
> > > > > > > > > > > > >> > > > > > > > > application
> > > > > > > > > > > > >> > > > > > > > > > > uses row-level DB locks, there
> > > could
> > > > > be
> > > > > > > > > > multiple,
> > > > > > > > > > > > >> > > concurrent,
> > > > > > > > > > > > >> > > > > > > > > independent
> > > > > > > > > > > > >> > > > > > > > > > > txns happening in the same JVM
> > so
> > > it
> > > > > > seems
> > > > > > > > > like
> > > > > > > > > > > the
> > > > > > > > > > > > >> > > > granularity
> > > > > > > > > > > > >> > > > > > > > > managing
> > > > > > > > > > > > >> > > > > > > > > > > transactional ids and txn
> state
> > > > needs
> > > > > to
> > > > > > > > line
> > > > > > > > > up
> > > > > > > > > > > > with
> > > > > > > > > > > > >> > > > > granularity
> > > > > > > > > > > > >> > > > > > > of
> > > > > > > > > > > > >> > > > > > > > > the
> > > > > > > > > > > > >> > > > > > > > > > DB
> > > > > > > > > > > > >> > > > > > > > > > > locking.
> > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > > Does that make sense or am I
> > > > > > > > misunderstanding?
> > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > > Thanks,
> > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > > Roger
> > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > > On Wed, Aug 16, 2023 at
> 11:40 PM
> > > > Artem
> > > > > > > > > Livshits
> > > > > > > > > > > > >> > > > > > > > > > > <alivsh...@confluent.io
> > .invalid>
> > > > > wrote:
> > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > > > Hello,
> > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > > > This is a discussion thread
> > for
> > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > > > > > > > > > > > >> > > > > > > > > > > > .
> > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > > > The KIP proposes extending
> > Kafka
> > > > > > > > transaction
> > > > > > > > > > > > support
> > > > > > > > > > > > >> > > (that
> > > > > > > > > > > > >> > > > > > > already
> > > > > > > > > > > > >> > > > > > > > > uses
> > > > > > > > > > > > >> > > > > > > > > > > 2PC
> > > > > > > > > > > > >> > > > > > > > > > > > under the hood) to enable
> > > > atomicity
> > > > > of
> > > > > > > > dual
> > > > > > > > > > > writes
> > > > > > > > > > > > >> to
> > > > > > > > > > > > >> > > Kafka
> > > > > > > > > > > > >> > > > > and
> > > > > > > > > > > > >> > > > > > > an
> > > > > > > > > > > > >> > > > > > > > > > > external
> > > > > > > > > > > > >> > > > > > > > > > > > database, and helps to fix a
> > > long
> > > > > > > standing
> > > > > > > > > > Flink
> > > > > > > > > > > > >> issue.
> > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > > > An example of code that uses
> > the
> > > > > dual
> > > > > > > > write
> > > > > > > > > > > recipe
> > > > > > > > > > > > >> with
> > > > > > > > > > > > >> > > > JDBC
> > > > > > > > > > > > >> > > > > > and
> > > > > > > > > > > > >> > > > > > > > > should
> > > > > > > > > > > > >> > > > > > > > > > > > work for most SQL databases
> is
> > > > here
> > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > https://github.com/apache/kafka/pull/14231.
> > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > > > The FLIP for the sister fix
> in
> > > > Flink
> > > > > > is
> > > > > > > > here
> > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > > > -Artem
> > > > > > > > > > > > >> > > > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > >> > > >
> > > > > > > > > > > > >> > >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to