Hi Artem,
I totally agree that a timeout for the 2PC case is a bad idea. It does abandon
the 2PC guarantee.

Thanks,
Andrew

> On 28 Feb 2024, at 00:44, Artem Livshits <alivsh...@confluent.io.INVALID> 
> wrote:
>
> Hi Jun,
>
> Thank you for the discussion.
>
>> For 3b, it would be useful to understand the reason why an admin doesn't
> authorize 2PC for self-hosted Flink
>
> I think the nuance here is that for cloud, there is a cloud admin
> (operator) and there is cluster admin (who, for example could manage acls
> on topics or etc.).  The 2PC functionality can affect cloud operations,
> because a long running transaction can block the last stable offset and
> prevent compaction or data tiering.  In a multi-tenant environment, a long
> running transaction that involves consumer offset may affect data that is
> shared by multiple tenants (Flink transactions don't use consumer offsets,
> so this is not an issue for Flink, but we'd need a separate ACL or some
> other way to express this permission if we wanted to go in that direction).
>
> For that reason, I expect 2PC to be controlled by the cloud operator and it
> just may not be scalable for the cloud operator to manage all potential
> interactions required to resolve in-doubt transactions (communicate to the
> end users, etc.).  In general, we make no assumptions about Kafka
> applications -- they may come and go, they may abandon transactional ids
> and generate new ones.  For 2PC we need to make sure that the application
> is highly available and wouldn't easily abandon an open transaction in
> Kafka.
>
>> If so, another way to address that is to allow the admin to set a timeout
> even for the 2PC case.
>
> This effectively abandons the 2PC guarantee because it creates a case for
> Kafka to unilaterally make an automatic decision on a prepared
> transaction.  I think it's fundamental for 2PC to abandon this ability and
> wait for the external coordinator for the decision, after all the
> coordinator may legitimately be unavailable for an arbitrary amount of
> time.  Also, we already have a timeout on regular Kafka transactions,
> having another "special" timeout could be confusing, and a large enough
> timeout could still produce the undesirable effects for the cloud
> operations (so we kind of get worst of both options -- we don't provide
> guarantees and still have impact on operations).
>
> -Artem
>
> On Fri, Feb 23, 2024 at 8:55 AM Jun Rao <j...@confluent.io.invalid> wrote:
>
>> Hi, Artem,
>>
>> Thanks for the reply.
>>
>> For 3b, it would be useful to understand the reason why an admin doesn't
>> authorize 2PC for self-hosted Flink. Is the main reason that 2PC has
>> unbounded timeout that could lead to unbounded outstanding transactions? If
>> so, another way to address that is to allow the admin to set a timeout even
>> for the 2PC case. The timeout would be long enough for behavioring
>> applications to complete 2PC operations, but not too long for non-behaving
>> applications' transactions to hang.
>>
>> Jun
>>
>> On Wed, Feb 21, 2024 at 4:34 PM Artem Livshits
>> <alivsh...@confluent.io.invalid> wrote:
>>
>>> Hi Jun,
>>>
>>>> 20A. One option is to make the API initTransactions(boolean enable2PC).
>>>
>>> We could do that.  I think there is a little bit of symmetry between the
>>> client and server that would get lost with this approach (server has
>>> enable2PC as config), but I don't really see a strong reason for
>> enable2PC
>>> to be a config vs. an argument for initTransactions.  But let's see if we
>>> find 20B to be a strong consideration for keeping a separate flag for
>>> keepPreparedTxn.
>>>
>>>> 20B. But realistically, we want Flink (and other apps) to have a single
>>> implementation
>>>
>>> That's correct and here's what I think can happen if we don't allow
>>> independent keepPreparedTxn:
>>>
>>> 1. Pre-KIP-939 self-hosted Flink vs. any Kafka cluster -- reflection is
>>> used, which effectively implements keepPreparedTxn=true without our
>>> explicit support.
>>> 2. KIP-939 self-hosted Flink vs. pre-KIP-939 Kafka cluster -- we can
>>> either fall back to reflection or we just say we don't support this, have
>>> to upgrade Kafka cluster first.
>>> 3. KIP-939 self-hosted Flink vs. KIP-939 Kafka cluster, this becomes
>>> interesting depending on whether the Kafka cluster authorizes 2PC or not:
>>> 3a. Kafka cluster autorizes 2PC for self-hosted Flink -- everything uses
>>> KIP-939 and there is no problem
>>> 3b. Kafka cluster doesn't authorize 2PC for self-hosted Flink -- we can
>>> either fallback to reflection or use keepPreparedTxn=true even if 2PC is
>>> not enabled.
>>>
>>> It seems to be ok to not support case 2 (i.e. require Kafka upgrade
>> first),
>>> it shouldn't be an issue for cloud offerings as cloud providers are
>> likely
>>> to upgrade their Kafka to the latest versions.
>>>
>>> The case 3b seems to be important to support, though -- the latest
>> version
>>> of everything should work at least as well (and preferably better) than
>>> previous ones.  It's possible to downgrade to case 1, but it's probably
>> not
>>> sustainable as newer versions of Flink would also add other features that
>>> the customers may want to take advantage of.
>>>
>>> If we enabled keepPreparedTxn=true even without 2PC, then we could enable
>>> case 3b without the need to fall back to reflection, so we could get rid
>> of
>>> reflection-based logic and just have a single implementation based on
>>> KIP-939.
>>>
>>>> 32. My suggestion is to change
>>>
>>> Let me think about it and I'll come back to this.
>>>
>>> -Artem
>>>
>>> On Wed, Feb 21, 2024 at 3:40 PM Jun Rao <j...@confluent.io.invalid>
>> wrote:
>>>
>>>> Hi, Artem,
>>>>
>>>> Thanks for the reply.
>>>>
>>>> 20A. One option is to make the API initTransactions(boolean enable2PC).
>>>> Then, it's clear from the code whether 2PC related logic should be
>> added.
>>>>
>>>> 20B. But realistically, we want Flink (and other apps) to have a single
>>>> implementation of the 2PC logic, not two different implementations,
>>> right?
>>>>
>>>> 32. My suggestion is to
>>>> change
>>>>
>>>
>> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
>>>> to sth like
>>>> Metric Name                        Type  Group
>>>> Tags   Description
>>>> active-transaction-open-time-max   Max
>> transaction-coordinator-metrics
>>>> none  The max time a currently-open transaction has been open
>>>>
>>>> Jun
>>>>
>>>> On Wed, Feb 21, 2024 at 11:25 AM Artem Livshits
>>>> <alivsh...@confluent.io.invalid> wrote:
>>>>
>>>>> Hi Jun,
>>>>>
>>>>>> 20A.  This only takes care of the abort case. The application still
>>>> needs
>>>>> to be changed to handle the commit case properly
>>>>>
>>>>> My point here is that looking at the initTransactions() call it's not
>>>> clear
>>>>> what the semantics is.  Say I'm doing code review, I cannot say if
>> the
>>>> code
>>>>> is correct or not -- if the config (that's something that's
>>>>> theoretically not known at the time of code review) is going to
>> enable
>>>> 2PC,
>>>>> then the correct code should look one way, otherwise it would need to
>>>> look
>>>>> differently.  Also, say if code is written with InitTransaction()
>>> without
>>>>> explicit abort and then for whatever reason the code would get used
>>> with
>>>>> 2PC enabled (could be a library in a bigger product) it'll start
>>> breaking
>>>>> in a non-intuitive way.
>>>>>
>>>>>> 20B. Hmm, if the admin disables 2PC, there is likely a reason
>> behind
>>>> that
>>>>>
>>>>> That's true, but reality may be more complicated.  Say a user wants
>> to
>>>> run
>>>>> a self-managed Flink with Confluent cloud.  Confluent cloud adim may
>>> not
>>>>> be comfortable enabling 2PC to general user accounts that use
>> services
>>>> not
>>>>> managed by Confluent (the same way Confluent doesn't allow increasing
>>> max
>>>>> transaction timeout for general user accounts).  Right now,
>>> self-managed
>>>>> Flink works because it uses reflection, if it moves to use public
>> APIs
>>>>> provided by KIP-939 it'll break.
>>>>>
>>>>>> 32. Ok. That's the kafka metric. In that case, the metric name has
>> a
>>>>> group and a name. There is no type and no package name.
>>>>>
>>>>> Is this a suggestion to change or confirmation that the current logic
>>> is
>>>>> ok?  I just copied an existing metric but can change if needed.
>>>>>
>>>>> -Artem
>>>>>
>>>>> On Tue, Feb 20, 2024 at 11:25 AM Jun Rao <j...@confluent.io.invalid>
>>>> wrote:
>>>>>
>>>>>> Hi, Artem,
>>>>>>
>>>>>> Thanks for the reply.
>>>>>>
>>>>>> 20. "Say if an application
>>>>>> currently uses initTransactions() to achieve the current semantics,
>>> it
>>>>>> would need to be rewritten to use initTransactions() + abort to
>>> achieve
>>>>> the
>>>>>> same semantics if the config is changed. "
>>>>>>
>>>>>> This only takes care of the abort case. The application still needs
>>> to
>>>> be
>>>>>> changed to handle the commit case properly
>>>>>> if transaction.two.phase.commit.enable is set to true.
>>>>>>
>>>>>> "Even when KIP-939 is implemented,
>>>>>> there would be situations when 2PC is disabled by the admin (e.g.
>>> Kafka
>>>>>> service providers may be reluctant to enable 2PC for Flink services
>>>> that
>>>>>> users host themselves), so we either have to perpetuate the
>>>>>> reflection-based implementation in Flink or enable
>>> keepPreparedTxn=true
>>>>>> without 2PC."
>>>>>>
>>>>>> Hmm, if the admin disables 2PC, there is likely a reason behind
>>> that. I
>>>>> am
>>>>>> not sure that we should provide an API to encourage the application
>>> to
>>>>>> circumvent that.
>>>>>>
>>>>>> 32. Ok. That's the kafka metric. In that case, the metric name has
>> a
>>>>> group
>>>>>> and a name. There is no type and no package name.
>>>>>>
>>>>>> Jun
>>>>>>
>>>>>>
>>>>>> On Thu, Feb 15, 2024 at 8:23 PM Artem Livshits
>>>>>> <alivsh...@confluent.io.invalid> wrote:
>>>>>>
>>>>>>> Hi Jun,
>>>>>>>
>>>>>>> Thank you for your questions.
>>>>>>>
>>>>>>>> 20. So to abort a prepared transaction after the producer
>> start,
>>> we
>>>>>> could
>>>>>>> use ...
>>>>>>>
>>>>>>> I agree, initTransaction(true) + abort would accomplish the
>>> behavior
>>>> of
>>>>>>> initTransactions(false), so we could technically have fewer ways
>> to
>>>>>> achieve
>>>>>>> the same thing, which is generally valuable.  I wonder, though,
>> if
>>>> that
>>>>>>> would be intuitive from the application perspective.  Say if an
>>>>>> application
>>>>>>> currently uses initTransactions() to achieve the current
>> semantics,
>>>> it
>>>>>>> would need to be rewritten to use initTransactions() + abort to
>>>> achieve
>>>>>> the
>>>>>>> same semantics if the config is changed.  I think this could
>> create
>>>>>>> subtle confusion, as the config change is generally decoupled
>> from
>>>>>> changing
>>>>>>> application implementation.
>>>>>>>
>>>>>>>> The use case mentioned for keepPreparedTxn=true without 2PC
>>>> doesn't
>>>>>> seem
>>>>>>> very important
>>>>>>>
>>>>>>> I agree, it's not a strict requirement.  It is, however, a
>> missing
>>>>> option
>>>>>>> in the public API, so currently Flink has to use reflection to
>>>> emulate
>>>>>> this
>>>>>>> functionality without 2PC support.   Even when KIP-939 is
>>>> implemented,
>>>>>>> there would be situations when 2PC is disabled by the admin (e.g.
>>>> Kafka
>>>>>>> service providers may be reluctant to enable 2PC for Flink
>> services
>>>>> that
>>>>>>> users host themselves), so we either have to perpetuate the
>>>>>>> reflection-based implementation in Flink or enable
>>>> keepPreparedTxn=true
>>>>>>> without 2PC.
>>>>>>>
>>>>>>>> 32.
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
>>>>>>>
>>>>>>> I just followed the existing metric implementation example
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L95
>>>>>>> ,
>>>>>>> which maps to
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max.
>>>>>>>
>>>>>>>> 33. "If the value is 'true' then the corresponding field is set
>>>>>>>
>>>>>>> That's correct.  Updated the KIP.
>>>>>>>
>>>>>>> -Artem
>>>>>>>
>>>>>>> On Wed, Feb 7, 2024 at 10:06 AM Jun Rao <j...@confluent.io.invalid
>>>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi, Artem,
>>>>>>>>
>>>>>>>> Thanks for the reply.
>>>>>>>>
>>>>>>>> 20. So to abort a prepared transaction after producer start, we
>>>> could
>>>>>> use
>>>>>>>> either
>>>>>>>>  producer.initTransactions(false)
>>>>>>>> or
>>>>>>>>  producer.initTransactions(true)
>>>>>>>>  producer.abortTransaction
>>>>>>>> Could we just always use the latter API? If we do this, we
>> could
>>>>>>>> potentially eliminate the keepPreparedTxn flag in
>>>> initTransactions().
>>>>>>> After
>>>>>>>> the initTransactions() call, the outstanding txn is always
>>>> preserved
>>>>> if
>>>>>>> 2pc
>>>>>>>> is enabled and aborted if 2pc is disabled. The use case
>> mentioned
>>>> for
>>>>>>>> keepPreparedTxn=true without 2PC doesn't seem very important.
>> If
>>> we
>>>>>> could
>>>>>>>> do that, it seems that we have (1) less redundant and simpler
>>> APIs;
>>>>> (2)
>>>>>>>> more symmetric syntax for aborting/committing a prepared txn
>>> after
>>>>>>> producer
>>>>>>>> restart.
>>>>>>>>
>>>>>>>> 32.
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> kafka.server:type=transaction-coordinator-metrics,name=active-transaction-open-time-max
>>>>>>>> Is this a Yammer or kafka metric? The former uses the camel
>> case
>>>> for
>>>>>> name
>>>>>>>> and type. The latter uses the hyphen notation, but doesn't have
>>> the
>>>>>> type
>>>>>>>> attribute.
>>>>>>>>
>>>>>>>> 33. "If the value is 'true' then the corresponding field is set
>>> in
>>>>> the
>>>>>>>> InitProducerIdRequest and the KafkaProducer object is set into
>> a
>>>>> state
>>>>>>>> which only allows calling .commitTransaction or
>>> .abortTransaction."
>>>>>>>> We should also allow .completeTransaction, right?
>>>>>>>>
>>>>>>>> Jun
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Feb 6, 2024 at 3:29 PM Artem Livshits
>>>>>>>> <alivsh...@confluent.io.invalid> wrote:
>>>>>>>>
>>>>>>>>> Hi Jun,
>>>>>>>>>
>>>>>>>>>> 20. For Flink usage, it seems that the APIs used to abort
>> and
>>>>>> commit
>>>>>>> a
>>>>>>>>> prepared txn are not symmetric.
>>>>>>>>>
>>>>>>>>> For Flink it is expected that Flink would call
>>> .commitTransaction
>>>>> or
>>>>>>>>> .abortTransaction directly, it wouldn't need to deal with
>>>>>>>> PreparedTxnState,
>>>>>>>>> the outcome is actually determined by the Flink's job
>> manager,
>>>> not
>>>>> by
>>>>>>>>> comparison of PreparedTxnState.  So for Flink, if the Kafka
>>> sync
>>>>>>> crashes
>>>>>>>>> and restarts there are 2 cases:
>>>>>>>>>
>>>>>>>>> 1. Transaction is not prepared.  In that case just call
>>>>>>>>> producer.initTransactions(false) and then can start
>>> transactions
>>>> as
>>>>>>>> needed.
>>>>>>>>> 2. Transaction is prepared.  In that case call
>>>>>>>>> producer.initTransactions(true) and wait for the decision
>> from
>>>> the
>>>>>> job
>>>>>>>>> manager.  Note that it's not given that the transaction will
>>> get
>>>>>>>> committed,
>>>>>>>>> the decision could also be an abort.
>>>>>>>>>
>>>>>>>>>> 21. transaction.max.timeout.ms could in theory be
>> MAX_INT.
>>>>>> Perhaps
>>>>>>> we
>>>>>>>>> could use a negative timeout in the record to indicate 2PC?
>>>>>>>>>
>>>>>>>>> -1 sounds good, updated.
>>>>>>>>>
>>>>>>>>>> 30. The KIP has two different APIs to abort an ongoing txn.
>>> Do
>>>> we
>>>>>>> need
>>>>>>>>> both?
>>>>>>>>>
>>>>>>>>> I think of producer.initTransactions() to be an
>> implementation
>>>> for
>>>>>>>>> adminClient.forceTerminateTransaction(transactionalId).
>>>>>>>>>
>>>>>>>>>> 31. "This would flush all the pending messages and
>> transition
>>>> the
>>>>>>>>> producer
>>>>>>>>>
>>>>>>>>> Updated the KIP to clarify that IllegalStateException will be
>>>>> thrown.
>>>>>>>>>
>>>>>>>>> -Artem
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Feb 5, 2024 at 2:22 PM Jun Rao
>>> <j...@confluent.io.invalid
>>>>>
>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi, Artem,
>>>>>>>>>>
>>>>>>>>>> Thanks for the reply.
>>>>>>>>>>
>>>>>>>>>> 20. For Flink usage, it seems that the APIs used to abort
>> and
>>>>>> commit
>>>>>>> a
>>>>>>>>>> prepared txn are not symmetric.
>>>>>>>>>> To abort, the app will just call
>>>>>>>>>>  producer.initTransactions(false)
>>>>>>>>>>
>>>>>>>>>> To commit, the app needs to call
>>>>>>>>>>  producer.initTransactions(true)
>>>>>>>>>>  producer.completeTransaction(preparedTxnState)
>>>>>>>>>>
>>>>>>>>>> Will this be a concern? For the dual-writer usage, both
>>>>>> abort/commit
>>>>>>>> use
>>>>>>>>>> the same API.
>>>>>>>>>>
>>>>>>>>>> 21. transaction.max.timeout.ms could in theory be MAX_INT.
>>>>> Perhaps
>>>>>>> we
>>>>>>>>>> could
>>>>>>>>>> use a negative timeout in the record to indicate 2PC?
>>>>>>>>>>
>>>>>>>>>> 30. The KIP has two different APIs to abort an ongoing txn.
>>> Do
>>>> we
>>>>>>> need
>>>>>>>>>> both?
>>>>>>>>>>  producer.initTransactions(false)
>>>>>>>>>>  adminClient.forceTerminateTransaction(transactionalId)
>>>>>>>>>>
>>>>>>>>>> 31. "This would flush all the pending messages and
>> transition
>>>> the
>>>>>>>>> producer
>>>>>>>>>> into a mode where only .commitTransaction,
>> .abortTransaction,
>>>> or
>>>>>>>>>> .completeTransaction could be called.  If the call is
>>>> successful
>>>>>> (all
>>>>>>>>>> messages successfully got flushed to all partitions) the
>>>>>> transaction
>>>>>>> is
>>>>>>>>>> prepared."
>>>>>>>>>> If the producer calls send() in that state, what exception
>>>> will
>>>>>> the
>>>>>>>>> caller
>>>>>>>>>> receive?
>>>>>>>>>>
>>>>>>>>>> Jun
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Feb 2, 2024 at 3:34 PM Artem Livshits
>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Jun,
>>>>>>>>>>>
>>>>>>>>>>>> Then, should we change the following in the example to
>>> use
>>>>>>>>>>> InitProducerId(true) instead?
>>>>>>>>>>>
>>>>>>>>>>> We could. I just thought that it's good to make the
>> example
>>>>>>>>>> self-contained
>>>>>>>>>>> by starting from a clean state.
>>>>>>>>>>>
>>>>>>>>>>>> Also, could Flink just follow the dual-write recipe?
>>>>>>>>>>>
>>>>>>>>>>> I think it would bring some unnecessary logic to Flink
>> (or
>>>> any
>>>>>>> other
>>>>>>>>>> system
>>>>>>>>>>> that already has a transaction coordinator and just wants
>>> to
>>>>>> drive
>>>>>>>>> Kafka
>>>>>>>>>> to
>>>>>>>>>>> the desired state).  We could discuss it with Flink
>> folks,
>>>> the
>>>>>>>> current
>>>>>>>>>>> proposal was developed in collaboration with them.
>>>>>>>>>>>
>>>>>>>>>>>> 21. Could a non 2pc user explicitly set the
>>>>>> TransactionTimeoutMs
>>>>>>> to
>>>>>>>>>>> Integer.MAX_VALUE?
>>>>>>>>>>>
>>>>>>>>>>> The server would reject this for regular transactions, it
>>>> only
>>>>>>>> accepts
>>>>>>>>>>> values that are <= *transaction.max.timeout.ms
>>>>>>>>>>> <http://transaction.max.timeout.ms/> *(a broker config).
>>>>>>>>>>>
>>>>>>>>>>>> 24. Hmm, In KIP-890, without 2pc, the coordinator
>> expects
>>>> the
>>>>>>>> endTxn
>>>>>>>>>>> request to use the ongoing pid. ...
>>>>>>>>>>>
>>>>>>>>>>> Without 2PC there is no case where the pid could change
>>>> between
>>>>>>>>> starting
>>>>>>>>>> a
>>>>>>>>>>> transaction and endTxn (InitProducerId would abort any
>>>> ongoing
>>>>>>>>>>> transaction).  WIth 2PC there is now a case where there
>>> could
>>>>> be
>>>>>>>>>>> InitProducerId that can change the pid without aborting
>> the
>>>>>>>>> transaction,
>>>>>>>>>> so
>>>>>>>>>>> we need to handle that.  I wouldn't say that the flow is
>>>>>> different,
>>>>>>>> but
>>>>>>>>>>> it's rather extended to handle new cases.  The main
>>> principle
>>>>> is
>>>>>>>> still
>>>>>>>>>> the
>>>>>>>>>>> same -- for all operations we use the latest
>> "operational"
>>>> pid
>>>>>> and
>>>>>>>>> epoch
>>>>>>>>>>> known to the client, this way we guarantee that we can
>>> fence
>>>>>>> zombie /
>>>>>>>>>> split
>>>>>>>>>>> brain clients by disrupting the "latest known" pid +
>> epoch
>>>>>>>> progression.
>>>>>>>>>>>
>>>>>>>>>>>> 25. "We send out markers using the original ongoing
>>>>> transaction
>>>>>>>>>>> ProducerId and ProducerEpoch" ...
>>>>>>>>>>>
>>>>>>>>>>> Updated.
>>>>>>>>>>>
>>>>>>>>>>> -Artem
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Jan 29, 2024 at 4:57 PM Jun Rao
>>>>> <j...@confluent.io.invalid
>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi, Artem,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the reply.
>>>>>>>>>>>>
>>>>>>>>>>>> 20. So for the dual-write recipe, we should always call
>>>>>>>>>>>> InitProducerId(keepPreparedTxn=true) from the producer?
>>>> Then,
>>>>>>>> should
>>>>>>>>> we
>>>>>>>>>>>> change the following in the example to use
>>>>> InitProducerId(true)
>>>>>>>>>> instead?
>>>>>>>>>>>> 1. InitProducerId(false); TC STATE: Empty,
>> ProducerId=42,
>>>>>>>>>>>> ProducerEpoch=MAX-1, PrevProducerId=-1,
>>> NextProducerId=-1,
>>>>>>>>>>>> NextProducerEpoch=-1; RESPONSE ProducerId=42,
>>> Epoch=MAX-1,
>>>>>>>>>>>> OngoingTxnProducerId=-1, OngoingTxnEpoch=-1.
>>>>>>>>>>>> Also, could Flink just follow the dual-write recipe?
>> It's
>>>>>> simpler
>>>>>>>> if
>>>>>>>>>>> there
>>>>>>>>>>>> is one way to solve the 2pc issue.
>>>>>>>>>>>>
>>>>>>>>>>>> 21. Could a non 2pc user explicitly set the
>>>>>> TransactionTimeoutMs
>>>>>>> to
>>>>>>>>>>>> Integer.MAX_VALUE?
>>>>>>>>>>>>
>>>>>>>>>>>> 24. Hmm, In KIP-890, without 2pc, the coordinator
>> expects
>>>> the
>>>>>>>> endTxn
>>>>>>>>>>>> request to use the ongoing pid. With 2pc, the
>> coordinator
>>>> now
>>>>>>>> expects
>>>>>>>>>> the
>>>>>>>>>>>> endTxn request to use the next pid. So, the flow is
>>>>> different,
>>>>>>>> right?
>>>>>>>>>>>>
>>>>>>>>>>>> 25. "We send out markers using the original ongoing
>>>>> transaction
>>>>>>>>>>> ProducerId
>>>>>>>>>>>> and ProducerEpoch"
>>>>>>>>>>>> We should use ProducerEpoch + 1 in the marker, right?
>>>>>>>>>>>>
>>>>>>>>>>>> Jun
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jan 26, 2024 at 8:35 PM Artem Livshits
>>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Jun,
>>>>>>>>>>>>>
>>>>>>>>>>>>>> 20.  I am a bit confused by how we set
>>> keepPreparedTxn.
>>>>>> ...
>>>>>>>>>>>>>
>>>>>>>>>>>>> keepPreparedTxn=true informs the transaction
>>> coordinator
>>>>> that
>>>>>>> it
>>>>>>>>>> should
>>>>>>>>>>>>> keep the ongoing transaction, if any.  If the
>>>>>>>>> keepPreparedTxn=false,
>>>>>>>>>>> then
>>>>>>>>>>>>> any ongoing transaction is aborted (this is exactly
>> the
>>>>>> current
>>>>>>>>>>>> behavior).
>>>>>>>>>>>>> enable2Pc is a separate argument that is controlled
>> by
>>>> the
>>>>>>>>>>>>> *transaction.two.phase.commit.enable *setting on the
>>>>> client.
>>>>>>>>>>>>>
>>>>>>>>>>>>> To start 2PC, the client just needs to set
>>>>>>>>>>>>> *transaction.two.phase.commit.enable*=true in the
>>> config.
>>>>>> Then
>>>>>>>> if
>>>>>>>>>> the
>>>>>>>>>>>>> client knows the status of the transaction upfront
>> (in
>>>> the
>>>>>> case
>>>>>>>> of
>>>>>>>>>>> Flink,
>>>>>>>>>>>>> Flink keeps the knowledge if the transaction is
>>> prepared
>>>> in
>>>>>> its
>>>>>>>> own
>>>>>>>>>>>> store,
>>>>>>>>>>>>> so it always knows upfront), it can set
>> keepPreparedTxn
>>>>>>>>> accordingly,
>>>>>>>>>>> then
>>>>>>>>>>>>> if the transaction was prepared, it'll be ready for
>> the
>>>>>> client
>>>>>>> to
>>>>>>>>>>>> complete
>>>>>>>>>>>>> the appropriate action; if the client doesn't have a
>>>>>> knowledge
>>>>>>>> that
>>>>>>>>>> the
>>>>>>>>>>>>> transaction is prepared, keepPreparedTxn is going to
>> be
>>>>>> false,
>>>>>>> in
>>>>>>>>>> which
>>>>>>>>>>>>> case we'll get to a clean state (the same way we do
>>>> today).
>>>>>>>>>>>>>
>>>>>>>>>>>>> For the dual-write recipe, the client doesn't know
>>>> upfront
>>>>> if
>>>>>>> the
>>>>>>>>>>>>> transaction is prepared, this information is
>> implicitly
>>>>>> encoded
>>>>>>>>>>>>> PreparedTxnState value that can be used to resolve
>> the
>>>>>>>> transaction
>>>>>>>>>>> state.
>>>>>>>>>>>>> In that case, keepPreparedTxn should always be true,
>>>>> because
>>>>>> we
>>>>>>>>> don't
>>>>>>>>>>>> know
>>>>>>>>>>>>> upfront and we don't want to accidentally abort a
>>>> committed
>>>>>>>>>>> transaction.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The forceTerminateTransaction call can just use
>>>>>>>>>> keepPreparedTxn=false,
>>>>>>>>>>> it
>>>>>>>>>>>>> actually doesn't matter if it sets Enable2Pc flag.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> 21. TransactionLogValue: Do we need some field to
>>>>> identify
>>>>>>>>> whether
>>>>>>>>>>> this
>>>>>>>>>>>>> is written for 2PC so that ongoing txn is never auto
>>>>> aborted?
>>>>>>>>>>>>>
>>>>>>>>>>>>> The TransactionTimeoutMs would be set to
>>>> Integer.MAX_VALUE
>>>>> if
>>>>>>> 2PC
>>>>>>>>> was
>>>>>>>>>>>>> enabled.  I've added a note to the KIP about this.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> 22
>>>>>>>>>>>>>
>>>>>>>>>>>>> You're right it's a typo.  I fixed it as well as
>> step 9
>>>>>>> (REQUEST:
>>>>>>>>>>>>> ProducerId=73, ProducerEpoch=MAX).
>>>>>>>>>>>>>
>>>>>>>>>>>>>> 23. It's a bit weird that Enable2Pc is driven by a
>>>> config
>>>>>>> while
>>>>>>>>>>>>> KeepPreparedTxn is from an API param ...
>>>>>>>>>>>>>
>>>>>>>>>>>>> The intent to use 2PC doesn't change from transaction
>>> to
>>>>>>>>> transaction,
>>>>>>>>>>> but
>>>>>>>>>>>>> the intent to keep prepared txn may change from
>>>> transaction
>>>>>> to
>>>>>>>>>>>>> transaction.  In dual-write recipes the distinction
>> is
>>>> not
>>>>>>> clear,
>>>>>>>>> but
>>>>>>>>>>> for
>>>>>>>>>>>>> use cases where keepPreparedTxn value is known
>> upfront
>>>>> (e.g.
>>>>>>>> Flink)
>>>>>>>>>>> it's
>>>>>>>>>>>>> more prominent.  E.g. a Flink's Kafka sink operator
>>> could
>>>>> be
>>>>>>>>> deployed
>>>>>>>>>>>> with
>>>>>>>>>>>>> *transaction.two.phase.commit.enable*=true hardcoded
>> in
>>>> the
>>>>>>>> image,
>>>>>>>>>> but
>>>>>>>>>>>>> keepPreparedTxn cannot be hardcoded in the image,
>>> because
>>>>> it
>>>>>>>>> depends
>>>>>>>>>> on
>>>>>>>>>>>> the
>>>>>>>>>>>>> job manager's state.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> 24
>>>>>>>>>>>>>
>>>>>>>>>>>>> The flow is actually going to be the same way as it
>> is
>>>> now
>>>>> --
>>>>>>> the
>>>>>>>>>>> "main"
>>>>>>>>>>>>> producer id + epoch needs to be used in all
>> operations
>>> to
>>>>>>> prevent
>>>>>>>>>>> fencing
>>>>>>>>>>>>> (it's sort of a common "header" in all RPC calls that
>>>>> follow
>>>>>>> the
>>>>>>>>> same
>>>>>>>>>>>>> rules).  The ongoing txn info is just additional info
>>> for
>>>>>>> making
>>>>>>>> a
>>>>>>>>>>>> commit /
>>>>>>>>>>>>> abort decision based on the PreparedTxnState from the
>>> DB.
>>>>>>>>>>>>>
>>>>>>>>>>>>> --Artem
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jan 25, 2024 at 11:05 AM Jun Rao
>>>>>>>> <j...@confluent.io.invalid
>>>>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi, Artem,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the reply. A few more comments.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 20. I am a bit confused by how we set
>>> keepPreparedTxn.
>>>>> From
>>>>>>> the
>>>>>>>>>> KIP,
>>>>>>>>>>> I
>>>>>>>>>>>>> got
>>>>>>>>>>>>>> the following (1) to start 2pc, we call
>>>>>>>>>>>>>> InitProducerId(keepPreparedTxn=false); (2) when the
>>>>>> producer
>>>>>>>>> fails
>>>>>>>>>>> and
>>>>>>>>>>>>>> needs to do recovery, it calls
>>>>>>>>>> InitProducerId(keepPreparedTxn=true);
>>>>>>>>>>>> (3)
>>>>>>>>>>>>>> Admin.forceTerminateTransaction() calls
>>>>>>>>>>>>>> InitProducerId(keepPreparedTxn=false).
>>>>>>>>>>>>>> 20.1 In (1), when a producer calls
>>>> InitProducerId(false)
>>>>>> with
>>>>>>>> 2pc
>>>>>>>>>>>>> enabled,
>>>>>>>>>>>>>> and there is an ongoing txn, should the server
>> return
>>>> an
>>>>>>> error
>>>>>>>> to
>>>>>>>>>> the
>>>>>>>>>>>>>> InitProducerId request? If so, what would be the
>>> error
>>>>>> code?
>>>>>>>>>>>>>> 20.2 How do we distinguish between (1) and (3)?
>> It's
>>>> the
>>>>>> same
>>>>>>>> API
>>>>>>>>>>> call
>>>>>>>>>>>>> but
>>>>>>>>>>>>>> (1) doesn't abort ongoing txn and (2) does.
>>>>>>>>>>>>>> 20.3 The usage in (1) seems unintuitive. 2pc
>> implies
>>>>>> keeping
>>>>>>>> the
>>>>>>>>>>>> ongoing
>>>>>>>>>>>>>> txn. So, setting keepPreparedTxn to false to start
>>> 2pc
>>>>>> seems
>>>>>>>>>> counter
>>>>>>>>>>>>>> intuitive.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 21. TransactionLogValue: Do we need some field to
>>>>> identify
>>>>>>>>> whether
>>>>>>>>>>> this
>>>>>>>>>>>>> is
>>>>>>>>>>>>>> written for 2PC so that ongoing txn is never auto
>>>>> aborted?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 22. "8. InitProducerId(true); TC STATE: Ongoing,
>>>>>>> ProducerId=42,
>>>>>>>>>>>>>> ProducerEpoch=MAX-1, PrevProducerId=-1,
>>>>> NextProducerId=73,
>>>>>>>>>>>>>> NextProducerEpoch=MAX; RESPONSE ProducerId=73,
>>>>> Epoch=MAX-1,
>>>>>>>>>>>>>> OngoingTxnProducerId=42, OngoingTxnEpoch=MAX-1"
>>>>>>>>>>>>>> It seems in the above example, Epoch in RESPONSE
>>> should
>>>>> be
>>>>>>> MAX
>>>>>>>> to
>>>>>>>>>>> match
>>>>>>>>>>>>>> NextProducerEpoch?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 23. It's a bit weird that Enable2Pc is driven by a
>>>> config
>>>>>>>>>>>>>> while KeepPreparedTxn is from an API param. Should
>> we
>>>>> make
>>>>>>> them
>>>>>>>>>> more
>>>>>>>>>>>>>> consistent since they seem related?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 24. "9. Commit; REQUEST: ProducerId=73,
>>>>>> ProducerEpoch=MAX-1;
>>>>>>> TC
>>>>>>>>>>> STATE:
>>>>>>>>>>>>>> PrepareCommit, ProducerId=42, ProducerEpoch=MAX,
>>>>>>>>> PrevProducerId=73,
>>>>>>>>>>>>>> NextProducerId=85, NextProducerEpoch=0; RESPONSE
>>>>>>> ProducerId=85,
>>>>>>>>>>>> Epoch=0,
>>>>>>>>>>>>>> When a commit request is sent, it uses the latest
>>>>>> ProducerId
>>>>>>>> and
>>>>>>>>>>>>>> ProducerEpoch."
>>>>>>>>>>>>>> The step where we use the next produceId to commit
>> an
>>>> old
>>>>>> txn
>>>>>>>>>> works,
>>>>>>>>>>>> but
>>>>>>>>>>>>>> can be confusing. It's going to be hard for people
>>>>>>> implementing
>>>>>>>>>> this
>>>>>>>>>>>> new
>>>>>>>>>>>>>> client protocol to figure out when to use the
>> current
>>>> or
>>>>>> the
>>>>>>>> new
>>>>>>>>>>>>> producerId
>>>>>>>>>>>>>> in the EndTxnRequest. One potential way to improve
>>> this
>>>>> is
>>>>>> to
>>>>>>>>>> extend
>>>>>>>>>>>>>> EndTxnRequest with a new field like
>>>>> expectedNextProducerId.
>>>>>>>> Then
>>>>>>>>> we
>>>>>>>>>>> can
>>>>>>>>>>>>>> always use the old produceId in the existing field,
>>> but
>>>>> set
>>>>>>>>>>>>>> expectedNextProducerId to bypass the fencing logic
>>> when
>>>>>>> needed.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jun
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Dec 18, 2023 at 2:06 PM Artem Livshits
>>>>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Jun,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thank you for the comments.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 10. For the two new fields in Enable2Pc and
>>>>>>> KeepPreparedTxn
>>>>>>>>> ...
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I added a note that all combinations are valid.
>>>>>>>>> Enable2Pc=false
>>>>>>>>>> &
>>>>>>>>>>>>>>> KeepPreparedTxn=true could be potentially useful
>>> for
>>>>>>> backward
>>>>>>>>>>>>>> compatibility
>>>>>>>>>>>>>>> with Flink, when the new version of Flink that
>>>>> implements
>>>>>>>>> KIP-319
>>>>>>>>>>>> tries
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> work with a cluster that doesn't authorize 2PC.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 11.  InitProducerIdResponse: If there is no
>>> ongoing
>>>>>> txn,
>>>>>>>> what
>>>>>>>>>>> will
>>>>>>>>>>>>>>> OngoingTxnProducerId and OngoingTxnEpoch be set?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I added a note that they will be set to -1.  The
>>>> client
>>>>>>> then
>>>>>>>>> will
>>>>>>>>>>>> know
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>> there is no ongoing txn and .completeTransaction
>>>>> becomes
>>>>>> a
>>>>>>>>> no-op
>>>>>>>>>>> (but
>>>>>>>>>>>>>> still
>>>>>>>>>>>>>>> required before .send is enabled).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 12. ListTransactionsRequest related changes: It
>>>> seems
>>>>>>> those
>>>>>>>>> are
>>>>>>>>>>>>> already
>>>>>>>>>>>>>>> covered by KIP-994?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Removed from this KIP.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 13. TransactionalLogValue ...
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This is now updated to work on top of KIP-890.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 14. "Note that the (producerId, epoch) pair
>> that
>>>>>>>> corresponds
>>>>>>>>> to
>>>>>>>>>>> the
>>>>>>>>>>>>>>> ongoing transaction ...
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This is now updated to work on top of KIP-890.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 15. active-transaction-total-time-max : ...
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Updated.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 16. "transaction.two.phase.commit.enable The
>>>> default
>>>>>>> would
>>>>>>>> be
>>>>>>>>>>>>> ‘false’.
>>>>>>>>>>>>>>> If it’s ‘false’, 2PC functionality is disabled
>> even
>>>> if
>>>>>> the
>>>>>>>> ACL
>>>>>>>>> is
>>>>>>>>>>> set
>>>>>>>>>>>>> ...
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Disabling 2PC effectively removes all
>> authorization
>>>> to
>>>>>> use
>>>>>>>> it,
>>>>>>>>>>> hence
>>>>>>>>>>>> I
>>>>>>>>>>>>>>> thought TRANSACTIONAL_ID_AUTHORIZATION_FAILED
>> would
>>>> be
>>>>>>>>>> appropriate.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Do you suggest using a different error code for
>> 2PC
>>>>>>>>> authorization
>>>>>>>>>>> vs
>>>>>>>>>>>>> some
>>>>>>>>>>>>>>> other authorization (e.g.
>>>>>>>>>>> TRANSACTIONAL_ID_2PC_AUTHORIZATION_FAILED)
>>>>>>>>>>>>> or a
>>>>>>>>>>>>>>> different code for disabled vs. unauthorised
>> (e.g.
>>>>>>>>>>>>>>> TWO_PHASE_COMMIT_DISABLED) or both?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 17. completeTransaction(). We expect this to be
>>>> only
>>>>>> used
>>>>>>>>>> during
>>>>>>>>>>>>>>> recovery.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It can also be used if, say, a commit to the
>>> database
>>>>>> fails
>>>>>>>> and
>>>>>>>>>> the
>>>>>>>>>>>>>> result
>>>>>>>>>>>>>>> is inconclusive, e.g.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1. Begin DB transaction
>>>>>>>>>>>>>>> 2. Begin Kafka transaction
>>>>>>>>>>>>>>> 3. Prepare Kafka transaction
>>>>>>>>>>>>>>> 4. Commit DB transaction
>>>>>>>>>>>>>>> 5. The DB commit fails, figure out the state of
>> the
>>>>>>>> transaction
>>>>>>>>>> by
>>>>>>>>>>>>>> reading
>>>>>>>>>>>>>>> the PreparedTxnState from DB
>>>>>>>>>>>>>>> 6. Complete Kafka transaction with the
>>>>> PreparedTxnState.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 18. "either prepareTransaction was called or
>>>>>>>>>>> initTransaction(true)
>>>>>>>>>>>>> was
>>>>>>>>>>>>>>> called": "either" should be "neither"?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Updated.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 19. Since InitProducerId always bumps up the
>>> epoch,
>>>>> it
>>>>>>>>> creates
>>>>>>>>>> a
>>>>>>>>>>>>>>> situation ...
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> InitProducerId only bumps the producer epoch, the
>>>>> ongoing
>>>>>>>>>>> transaction
>>>>>>>>>>>>>> epoch
>>>>>>>>>>>>>>> stays the same, no matter how many times the
>>>>>> InitProducerId
>>>>>>>> is
>>>>>>>>>>> called
>>>>>>>>>>>>>>> before the transaction is completed.  Eventually
>>> the
>>>>>> epoch
>>>>>>>> may
>>>>>>>>>>>>> overflow,
>>>>>>>>>>>>>>> and then a new producer id would be allocated,
>> but
>>>> the
>>>>>>>> ongoing
>>>>>>>>>>>>>> transaction
>>>>>>>>>>>>>>> producer id would stay the same.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I've added a couple examples in the KIP (
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC#KIP939:SupportParticipationin2PC-PersistedDataFormatChanges
>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>> that walk through some scenarios and show how the
>>>> state
>>>>>> is
>>>>>>>>>> changed.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -Artem
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Dec 8, 2023 at 6:04 PM Jun Rao
>>>>>>>>> <j...@confluent.io.invalid
>>>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi, Artem,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for the KIP. A few comments below.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 10. For the two new fields in Enable2Pc and
>>>>>>> KeepPreparedTxn
>>>>>>>>> in
>>>>>>>>>>>>>>>> InitProducerId, it would be useful to document
>> a
>>>> bit
>>>>>> more
>>>>>>>>>> detail
>>>>>>>>>>> on
>>>>>>>>>>>>>> what
>>>>>>>>>>>>>>>> values are set under what cases. For example,
>> are
>>>> all
>>>>>>> four
>>>>>>>>>>>>> combinations
>>>>>>>>>>>>>>>> valid?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 11.  InitProducerIdResponse: If there is no
>>> ongoing
>>>>>> txn,
>>>>>>>> what
>>>>>>>>>>> will
>>>>>>>>>>>>>>>> OngoingTxnProducerId and OngoingTxnEpoch be
>> set?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 12. ListTransactionsRequest related changes: It
>>>> seems
>>>>>>> those
>>>>>>>>> are
>>>>>>>>>>>>> already
>>>>>>>>>>>>>>>> covered by KIP-994?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 13. TransactionalLogValue: Could we name
>>>>>>>>> TransactionProducerId
>>>>>>>>>>> and
>>>>>>>>>>>>>>>> ProducerId better? It's not clear from the name
>>>> which
>>>>>> is
>>>>>>>> for
>>>>>>>>>>> which.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 14. "Note that the (producerId, epoch) pair
>> that
>>>>>>>> corresponds
>>>>>>>>> to
>>>>>>>>>>> the
>>>>>>>>>>>>>>> ongoing
>>>>>>>>>>>>>>>> transaction is going to be written instead of
>> the
>>>>>>> existing
>>>>>>>>>>>> ProducerId
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> ProducerEpoch fields (which are renamed to
>>> reflect
>>>>> the
>>>>>>>>>> semantics)
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> support downgrade.": I am a bit confused on
>> that.
>>>> Are
>>>>>> we
>>>>>>>>>> writing
>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>> values to the existing fields? Then, we can't
>>>>>> downgrade,
>>>>>>>>> right?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 15. active-transaction-total-time-max : Would
>>>>>>>>>>>>>>>> active-transaction-open-time-max be more
>>> intuitive?
>>>>>> Also,
>>>>>>>>> could
>>>>>>>>>>> we
>>>>>>>>>>>>>>> include
>>>>>>>>>>>>>>>> the full name (group, tags, etc)?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 16. "transaction.two.phase.commit.enable The
>>>> default
>>>>>>> would
>>>>>>>> be
>>>>>>>>>>>>> ‘false’.
>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>> it’s ‘false’, 2PC functionality is disabled
>> even
>>> if
>>>>> the
>>>>>>> ACL
>>>>>>>>> is
>>>>>>>>>>> set,
>>>>>>>>>>>>>>> clients
>>>>>>>>>>>>>>>> that attempt to use this functionality would
>>>> receive
>>>>>>>>>>>>>>>> TRANSACTIONAL_ID_AUTHORIZATION_FAILED error."
>>>>>>>>>>>>>>>> TRANSACTIONAL_ID_AUTHORIZATION_FAILED seems
>>>>> unintuitive
>>>>>>> for
>>>>>>>>> the
>>>>>>>>>>>>> client
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> understand what the actual cause is.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 17. completeTransaction(). We expect this to be
>>>> only
>>>>>> used
>>>>>>>>>> during
>>>>>>>>>>>>>>> recovery.
>>>>>>>>>>>>>>>> Could we document this clearly? Could we
>> prevent
>>> it
>>>>>> from
>>>>>>>>> being
>>>>>>>>>>> used
>>>>>>>>>>>>>>>> incorrectly (e.g. throw an exception if the
>>>> producer
>>>>>> has
>>>>>>>>> called
>>>>>>>>>>>> other
>>>>>>>>>>>>>>>> methods like send())?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 18. "either prepareTransaction was called or
>>>>>>>>>>> initTransaction(true)
>>>>>>>>>>>>> was
>>>>>>>>>>>>>>>> called": "either" should be "neither"?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 19. Since InitProducerId always bumps up the
>>> epoch,
>>>>> it
>>>>>>>>> creates
>>>>>>>>>> a
>>>>>>>>>>>>>>> situation
>>>>>>>>>>>>>>>> where there could be multiple outstanding txns.
>>> The
>>>>>>>> following
>>>>>>>>>> is
>>>>>>>>>>> an
>>>>>>>>>>>>>>> example
>>>>>>>>>>>>>>>> of a potential problem during recovery.
>>>>>>>>>>>>>>>>   The last txn epoch in the external store is
>> 41
>>>>> when
>>>>>>> the
>>>>>>>>> app
>>>>>>>>>>>> dies.
>>>>>>>>>>>>>>>>   Instance1 is created for recovery.
>>>>>>>>>>>>>>>>     1. (instance1)
>>>>>> InitProducerId(keepPreparedTxn=true),
>>>>>>>>>>> epoch=42,
>>>>>>>>>>>>>>>> ongoingEpoch=41
>>>>>>>>>>>>>>>>     2. (instance1) dies before completeTxn(41)
>>> can
>>>>> be
>>>>>>>>> called.
>>>>>>>>>>>>>>>>   Instance2 is created for recovery.
>>>>>>>>>>>>>>>>     3. (instance2)
>>>>>> InitProducerId(keepPreparedTxn=true),
>>>>>>>>>>> epoch=43,
>>>>>>>>>>>>>>>> ongoingEpoch=42
>>>>>>>>>>>>>>>>     4. (instance2) completeTxn(41) => abort
>>>>>>>>>>>>>>>>   The first problem is that 41 now is aborted
>>> when
>>>>> it
>>>>>>>> should
>>>>>>>>>> be
>>>>>>>>>>>>>>> committed.
>>>>>>>>>>>>>>>> The second one is that it's not clear who could
>>>> abort
>>>>>>> epoch
>>>>>>>>> 42,
>>>>>>>>>>>> which
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> still open.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Jun
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Dec 7, 2023 at 2:43 PM Justine Olshan
>>>>>>>>>>>>>>> <jols...@confluent.io.invalid
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hey Artem,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for the updates. I think what you say
>>>> makes
>>>>>>>> sense. I
>>>>>>>>>>> just
>>>>>>>>>>>>>>> updated
>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>> KIP so I want to reconcile some of the
>> changes
>>> we
>>>>>> made
>>>>>>>>>>> especially
>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>> respect to the TransactionLogValue.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Firstly, I believe tagged fields require a
>>>> default
>>>>>>> value
>>>>>>>> so
>>>>>>>>>>> that
>>>>>>>>>>>> if
>>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>>> are not filled, we return the default (and
>> know
>>>>> that
>>>>>>> they
>>>>>>>>>> were
>>>>>>>>>>>>>> empty).
>>>>>>>>>>>>>>>> For
>>>>>>>>>>>>>>>>> my KIP, I proposed the default for producer
>> ID
>>>>> tagged
>>>>>>>>> fields
>>>>>>>>>>>> should
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>> -1.
>>>>>>>>>>>>>>>>> I was wondering if we could update the KIP to
>>>>> include
>>>>>>> the
>>>>>>>>>>> default
>>>>>>>>>>>>>>> values
>>>>>>>>>>>>>>>>> for producer ID and epoch.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Next, I noticed we decided to rename the
>>> fields.
>>>> I
>>>>>>> guess
>>>>>>>>> that
>>>>>>>>>>> the
>>>>>>>>>>>>>> field
>>>>>>>>>>>>>>>>> "NextProducerId" in my KIP correlates to
>>>>> "ProducerId"
>>>>>>> in
>>>>>>>>> this
>>>>>>>>>>>> KIP.
>>>>>>>>>>>>> Is
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> correct? So we would have
>>> "TransactionProducerId"
>>>>> for
>>>>>>> the
>>>>>>>>>>>>> non-tagged
>>>>>>>>>>>>>>>> field
>>>>>>>>>>>>>>>>> and have "ProducerId" (NextProducerId) and
>>>>>>>> "PrevProducerId"
>>>>>>>>>> as
>>>>>>>>>>>>> tagged
>>>>>>>>>>>>>>>>> fields the final version after KIP-890 and
>>>> KIP-936
>>>>>> are
>>>>>>>>>>>> implemented.
>>>>>>>>>>>>>> Is
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> correct? I think the tags will need updating,
>>> but
>>>>>> that
>>>>>>> is
>>>>>>>>>>>> trivial.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The final question I had was with respect to
>>>>> storing
>>>>>>> the
>>>>>>>>> new
>>>>>>>>>>>> epoch.
>>>>>>>>>>>>>> In
>>>>>>>>>>>>>>>>> KIP-890 part 2 (epoch bumps) I think we
>>> concluded
>>>>>> that
>>>>>>> we
>>>>>>>>>> don't
>>>>>>>>>>>>> need
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> store the epoch since we can interpret the
>>>> previous
>>>>>>> epoch
>>>>>>>>>> based
>>>>>>>>>>>> on
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> producer ID. But here we could call the
>>>>>> InitProducerId
>>>>>>>>>> multiple
>>>>>>>>>>>>> times
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> we only want the producer with the correct
>>> epoch
>>>> to
>>>>>> be
>>>>>>>> able
>>>>>>>>>> to
>>>>>>>>>>>>> commit
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> transaction. Is that the correct reasoning
>> for
>>>> why
>>>>> we
>>>>>>>> need
>>>>>>>>>>> epoch
>>>>>>>>>>>>> here
>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>> not the Prepare/Commit state.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Justine
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Nov 22, 2023 at 9:48 AM Artem
>> Livshits
>>>>>>>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Justine,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> After thinking a bit about supporting
>> atomic
>>>> dual
>>>>>>>> writes
>>>>>>>>>> for
>>>>>>>>>>>>> Kafka
>>>>>>>>>>>>>> +
>>>>>>>>>>>>>>>>> NoSQL
>>>>>>>>>>>>>>>>>> database, I came to a conclusion that we do
>>>> need
>>>>> to
>>>>>>>> bump
>>>>>>>>>> the
>>>>>>>>>>>>> epoch
>>>>>>>>>>>>>>> even
>>>>>>>>>>>>>>>>>> with InitProducerId(keepPreparedTxn=true).
>>> As
>>>> I
>>>>>>>>> described
>>>>>>>>>> in
>>>>>>>>>>>> my
>>>>>>>>>>>>>>>> previous
>>>>>>>>>>>>>>>>>> email, we wouldn't need to bump the epoch
>> to
>>>>>> protect
>>>>>>>> from
>>>>>>>>>>>> zombies
>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>> reasoning is still true.  But we cannot
>>> protect
>>>>>> from
>>>>>>>>>>>> split-brain
>>>>>>>>>>>>>>>>> scenarios
>>>>>>>>>>>>>>>>>> when two or more instances of a producer
>> with
>>>> the
>>>>>>> same
>>>>>>>>>>>>>> transactional
>>>>>>>>>>>>>>> id
>>>>>>>>>>>>>>>>> try
>>>>>>>>>>>>>>>>>> to produce at the same time.  The
>> dual-write
>>>>>> example
>>>>>>>> for
>>>>>>>>>> SQL
>>>>>>>>>>>>>>> databases
>>>>>>>>>>>>>>>> (
>>>>>>>>>>>>>>>>>>
>>>> https://github.com/apache/kafka/pull/14231/files
>>>>> )
>>>>>>>>> doesn't
>>>>>>>>>>>> have a
>>>>>>>>>>>>>>>>>> split-brain problem because execution is
>>>>> protected
>>>>>> by
>>>>>>>> the
>>>>>>>>>>>> update
>>>>>>>>>>>>>> lock
>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>> the transaction state record; however NoSQL
>>>>>> databases
>>>>>>>> may
>>>>>>>>>> not
>>>>>>>>>>>>> have
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>> protection (I'll write an example for NoSQL
>>>>>> database
>>>>>>>>>>> dual-write
>>>>>>>>>>>>>>> soon).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> In a nutshell, here is an example of a
>>>>> split-brain
>>>>>>>>>> scenario:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>   1. (instance1)
>>>>>>> InitProducerId(keepPreparedTxn=true),
>>>>>>>>> got
>>>>>>>>>>>>>> epoch=42
>>>>>>>>>>>>>>>>>>   2. (instance2)
>>>>>>> InitProducerId(keepPreparedTxn=true),
>>>>>>>>> got
>>>>>>>>>>>>>> epoch=42
>>>>>>>>>>>>>>>>>>   3. (instance1) CommitTxn, epoch bumped
>> to
>>> 43
>>>>>>>>>>>>>>>>>>   4. (instance2) CommitTxn, this is
>>>> considered a
>>>>>>>> retry,
>>>>>>>>> so
>>>>>>>>>>> it
>>>>>>>>>>>>> got
>>>>>>>>>>>>>>>> epoch
>>>>>>>>>>>>>>>>> 43
>>>>>>>>>>>>>>>>>>   as well
>>>>>>>>>>>>>>>>>>   5. (instance1) Produce messageA
>>> w/sequence 1
>>>>>>>>>>>>>>>>>>   6. (instance2) Produce messageB
>> w/sequence
>>>> 1,
>>>>>> this
>>>>>>>> is
>>>>>>>>>>>>>> considered a
>>>>>>>>>>>>>>>>>>   duplicate
>>>>>>>>>>>>>>>>>>   7. (instance2) Produce messageC
>>> w/sequence 2
>>>>>>>>>>>>>>>>>>   8. (instance1) Produce messageD
>> w/sequence
>>>> 2,
>>>>>> this
>>>>>>>> is
>>>>>>>>>>>>>> considered a
>>>>>>>>>>>>>>>>>>   duplicate
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Now if either of those commit the
>>> transaction,
>>>> it
>>>>>>> would
>>>>>>>>>> have
>>>>>>>>>>> a
>>>>>>>>>>>>> mix
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>> messages from the two instances (messageA
>> and
>>>>>>>> messageC).
>>>>>>>>>>> With
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> proper
>>>>>>>>>>>>>>>>>> epoch bump, instance1 would get fenced at
>>> step
>>>> 3.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> In order to update epoch in
>>>>>>>>>>>> InitProducerId(keepPreparedTxn=true)
>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> preserve the ongoing transaction's epoch
>> (and
>>>>>>>> producerId,
>>>>>>>>>> if
>>>>>>>>>>>> the
>>>>>>>>>>>>>>> epoch
>>>>>>>>>>>>>>>>>> overflows), because we'd need to make a
>>> correct
>>>>>>>> decision
>>>>>>>>>> when
>>>>>>>>>>>> we
>>>>>>>>>>>>>>>> compare
>>>>>>>>>>>>>>>>>> the PreparedTxnState that we read from the
>>>>> database
>>>>>>>> with
>>>>>>>>>> the
>>>>>>>>>>>>>>>> (producerId,
>>>>>>>>>>>>>>>>>> epoch) of the ongoing transaction.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I've updated the KIP with the following:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>   - Ongoing transaction now has 2
>>> (producerId,
>>>>>>> epoch)
>>>>>>>>>> pairs
>>>>>>>>>>> --
>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>> pair
>>>>>>>>>>>>>>>>>>   describes the ongoing transaction, the
>>> other
>>>>>> pair
>>>>>>>>>>> describes
>>>>>>>>>>>>>>> expected
>>>>>>>>>>>>>>>>>> epoch
>>>>>>>>>>>>>>>>>>   for operations on this transactional id
>>>>>>>>>>>>>>>>>>   - InitProducerIdResponse now returns 2
>>>>>>> (producerId,
>>>>>>>>>> epoch)
>>>>>>>>>>>>> pairs
>>>>>>>>>>>>>>>>>>   - TransactionalLogValue now has 2
>>>> (producerId,
>>>>>>>> epoch)
>>>>>>>>>>> pairs,
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>   values added as tagged fields, so it's
>>> easy
>>>> to
>>>>>>>>> downgrade
>>>>>>>>>>>>>>>>>>   - Added a note about downgrade in the
>>>>>>> Compatibility
>>>>>>>>>>> section
>>>>>>>>>>>>>>>>>>   - Added a rejected alternative
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> -Artem
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Oct 6, 2023 at 5:16 PM Artem
>>> Livshits <
>>>>>>>>>>>>>>> alivsh...@confluent.io>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Justine,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thank you for the questions.  Currently
>>>>>>> (pre-KIP-939)
>>>>>>>>> we
>>>>>>>>>>>> always
>>>>>>>>>>>>>>> bump
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> epoch on InitProducerId and abort an
>>> ongoing
>>>>>>>>> transaction
>>>>>>>>>>> (if
>>>>>>>>>>>>>>> any).  I
>>>>>>>>>>>>>>>>>>> expect this behavior will continue with
>>>> KIP-890
>>>>>> as
>>>>>>>>> well.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> With KIP-939 we need to support the case
>>> when
>>>>> the
>>>>>>>>> ongoing
>>>>>>>>>>>>>>> transaction
>>>>>>>>>>>>>>>>>>> needs to be preserved when
>>>>> keepPreparedTxn=true.
>>>>>>>>> Bumping
>>>>>>>>>>>> epoch
>>>>>>>>>>>>>>>> without
>>>>>>>>>>>>>>>>>>> aborting or committing a transaction is
>>>> tricky
>>>>>>>> because
>>>>>>>>>>> epoch
>>>>>>>>>>>>> is a
>>>>>>>>>>>>>>>> short
>>>>>>>>>>>>>>>>>>> value and it's easy to overflow.
>>> Currently,
>>>>> the
>>>>>>>>> overflow
>>>>>>>>>>>> case
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> handled
>>>>>>>>>>>>>>>>>>> by aborting the ongoing transaction,
>> which
>>>>> would
>>>>>>> send
>>>>>>>>> out
>>>>>>>>>>>>>>> transaction
>>>>>>>>>>>>>>>>>>> markers with epoch=Short.MAX_VALUE to the
>>>>>> partition
>>>>>>>>>>> leaders,
>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>> fence off any messages with the producer
>> id
>>>>> that
>>>>>>>>> started
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> transaction
>>>>>>>>>>>>>>>>>>> (they would have epoch that is less than
>>>>>>>>>> Short.MAX_VALUE).
>>>>>>>>>>>>> Then
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>> safe
>>>>>>>>>>>>>>>>>>> to allocate a new producer id and use it
>> in
>>>> new
>>>>>>>>>>> transactions.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> We could say that maybe when
>>>>> keepPreparedTxn=true
>>>>>>> we
>>>>>>>>> bump
>>>>>>>>>>>> epoch
>>>>>>>>>>>>>>>> unless
>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>> leads to overflow, and don't bump epoch
>> in
>>>> the
>>>>>>>> overflow
>>>>>>>>>>> case.
>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>>>> think it's a good solution because if
>> it's
>>>> not
>>>>>> safe
>>>>>>>> to
>>>>>>>>>> keep
>>>>>>>>>>>> the
>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>> epoch
>>>>>>>>>>>>>>>>>>> when keepPreparedTxn=true, then we must
>>>> handle
>>>>>> the
>>>>>>>>> epoch
>>>>>>>>>>>>> overflow
>>>>>>>>>>>>>>>> case
>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>> well.  So either we should convince
>>> ourselves
>>>>>> that
>>>>>>>> it's
>>>>>>>>>>> safe
>>>>>>>>>>>> to
>>>>>>>>>>>>>>> keep
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> epoch and do it in the general case, or
>> we
>>>>> always
>>>>>>>> bump
>>>>>>>>>> the
>>>>>>>>>>>>> epoch
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> handle
>>>>>>>>>>>>>>>>>>> the overflow.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> With KIP-890, we bump the epoch on every
>>>>>>> transaction
>>>>>>>>>>> commit /
>>>>>>>>>>>>>>> abort.
>>>>>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>>> guarantees that even if
>>>>>>>>>>> InitProducerId(keepPreparedTxn=true)
>>>>>>>>>>>>>>> doesn't
>>>>>>>>>>>>>>>>>>> increment epoch on the ongoing
>> transaction,
>>>> the
>>>>>>>> client
>>>>>>>>>> will
>>>>>>>>>>>>> have
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> call
>>>>>>>>>>>>>>>>>>> commit or abort to finish the transaction
>>> and
>>>>>> will
>>>>>>>>>>> increment
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> epoch
>>>>>>>>>>>>>>>>>> (and
>>>>>>>>>>>>>>>>>>> handle epoch overflow, if needed).  If
>> the
>>>>>> ongoing
>>>>>>>>>>>> transaction
>>>>>>>>>>>>>> was
>>>>>>>>>>>>>>>> in a
>>>>>>>>>>>>>>>>>> bad
>>>>>>>>>>>>>>>>>>> state and had some zombies waiting to
>>> arrive,
>>>>> the
>>>>>>>> abort
>>>>>>>>>>>>> operation
>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>> fence them because with KIP-890 every
>> abort
>>>>> would
>>>>>>>> bump
>>>>>>>>>> the
>>>>>>>>>>>>> epoch.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> We could also look at this from the
>>> following
>>>>>>>>>> perspective.
>>>>>>>>>>>>> With
>>>>>>>>>>>>>>>>> KIP-890,
>>>>>>>>>>>>>>>>>>> zombies won't be able to cross
>> transaction
>>>>>>>> boundaries;
>>>>>>>>>> each
>>>>>>>>>>>>>>>> transaction
>>>>>>>>>>>>>>>>>>> completion creates a boundary and any
>>>> activity
>>>>> in
>>>>>>> the
>>>>>>>>>> past
>>>>>>>>>>>> gets
>>>>>>>>>>>>>>>>> confined
>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> the boundary.  Then data in any partition
>>>> would
>>>>>>> look
>>>>>>>>> like
>>>>>>>>>>>> this:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 1. message1, epoch=42
>>>>>>>>>>>>>>>>>>> 2. message2, epoch=42
>>>>>>>>>>>>>>>>>>> 3. message3, epoch=42
>>>>>>>>>>>>>>>>>>> 4. marker (commit or abort), epoch=43
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Now if we inject steps 3a and 3b like
>> this:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 1. message1, epoch=42
>>>>>>>>>>>>>>>>>>> 2. message2, epoch=42
>>>>>>>>>>>>>>>>>>> 3. message3, epoch=42
>>>>>>>>>>>>>>>>>>> 3a. crash
>>>>>>>>>>>>>>>>>>> 3b. InitProducerId(keepPreparedTxn=true)
>>>>>>>>>>>>>>>>>>> 4. marker (commit or abort), epoch=43
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The invariant still holds even with steps
>>> 3a
>>>>> and
>>>>>> 3b
>>>>>>>> --
>>>>>>>>>>>> whatever
>>>>>>>>>>>>>>>>> activity
>>>>>>>>>>>>>>>>>>> was in the past will get confined in the
>>> past
>>>>>> with
>>>>>>>>>>> mandatory
>>>>>>>>>>>>>> abort
>>>>>>>>>>>>>>> /
>>>>>>>>>>>>>>>>>> commit
>>>>>>>>>>>>>>>>>>> that must follow
>>>>>>>> InitProducerId(keepPreparedTxn=true).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> So KIP-890 provides the proper isolation
>>>>> between
>>>>>>>>>>>> transactions,
>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>>> injecting crash +
>>>>>>>> InitProducerId(keepPreparedTxn=true)
>>>>>>>>>> into
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> transaction sequence is safe from the
>>> zombie
>>>>>>>> protection
>>>>>>>>>>>>>>> perspective.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> That said, I'm still thinking about it
>> and
>>>>>> looking
>>>>>>>> for
>>>>>>>>>>> cases
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>>> break because we don't bump epoch when
>>>>>>>>>>>>>>>>>>> InitProducerId(keepPreparedTxn=true), if
>>> such
>>>>>> cases
>>>>>>>>>> exist,
>>>>>>>>>>>>> we'll
>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> develop the logic to handle epoch
>> overflow
>>>> for
>>>>>>>> ongoing
>>>>>>>>>>>>>>> transactions.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> -Artem
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Oct 3, 2023 at 10:15 AM Justine
>>>> Olshan
>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hey Artem,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. I had a question
>> about
>>>>> epoch
>>>>>>>>>> bumping.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Previously when we send an
>> InitProducerId
>>>>>> request
>>>>>>> on
>>>>>>>>>>>> Producer
>>>>>>>>>>>>>>>> startup,
>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>> bump the epoch and abort the
>> transaction.
>>> Is
>>>>> it
>>>>>>>>> correct
>>>>>>>>>> to
>>>>>>>>>>>>>> assume
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>> will still bump the epoch, but just not
>>>> abort
>>>>>> the
>>>>>>>>>>>> transaction?
>>>>>>>>>>>>>>>>>>>> If we still bump the epoch in this case,
>>> how
>>>>>> does
>>>>>>>> this
>>>>>>>>>>>>> interact
>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>> KIP-890 where we also bump the epoch on
>>>> every
>>>>>>>>>> transaction.
>>>>>>>>>>>> (I
>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>> means that we may skip epochs and the
>> data
>>>>>> itself
>>>>>>>> will
>>>>>>>>>> all
>>>>>>>>>>>>> have
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>> epoch)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I may have follow ups depending on the
>>>> answer
>>>>> to
>>>>>>>> this.
>>>>>>>>>> :)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>> Justine
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Thu, Sep 7, 2023 at 9:51 PM Artem
>>>> Livshits
>>>>>>>>>>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Alex,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thank you for your questions.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> the purpose of having broker-level
>>>>>>>>>>>>>>>>>> transaction.two.phase.commit.enable
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> The thinking is that 2PC is a bit of
>> an
>>>>>> advanced
>>>>>>>>>>> construct
>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>> enabling
>>>>>>>>>>>>>>>>>>>> 2PC
>>>>>>>>>>>>>>>>>>>>> in a Kafka cluster should be an
>> explicit
>>>>>>> decision.
>>>>>>>>> If
>>>>>>>>>>> it
>>>>>>>>>>>> is
>>>>>>>>>>>>>> set
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> 'false'
>>>>>>>>>>>>>>>>>>>>> InitiProducerId (and initTransactions)
>>>> would
>>>>>>>>>>>>>>>>>>>>> return
>>>>> TRANSACTIONAL_ID_AUTHORIZATION_FAILED.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> WDYT about adding an AdminClient
>>> method
>>>>> that
>>>>>>>>> returns
>>>>>>>>>>> the
>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>> transaction.two.phase.commit.enable
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I wonder if the client could just try
>> to
>>>> use
>>>>>> 2PC
>>>>>>>> and
>>>>>>>>>>> then
>>>>>>>>>>>>>> handle
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> error
>>>>>>>>>>>>>>>>>>>>> (e.g. if it needs to fall back to
>>> ordinary
>>>>>>>>>>> transactions).
>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>> way
>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>> could uniformly handle cases when
>> Kafka
>>>>>> cluster
>>>>>>>>>> doesn't
>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>> 2PC
>>>>>>>>>>>>>>>>>>>>> completely and cases when 2PC is
>>>> restricted
>>>>> to
>>>>>>>>> certain
>>>>>>>>>>>>> users.
>>>>>>>>>>>>>>> We
>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>> also expose this config in
>>>> describeConfigs,
>>>>> if
>>>>>>> the
>>>>>>>>>>>> fallback
>>>>>>>>>>>>>>>> approach
>>>>>>>>>>>>>>>>>>>>> doesn't work for some scenarios.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> -Artem
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Tue, Sep 5, 2023 at 12:45 PM
>>> Alexander
>>>>>>>> Sorokoumov
>>>>>>>>>>>>>>>>>>>>> <asorokou...@confluent.io.invalid>
>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi Artem,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks for publishing this KIP!
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Can you please clarify the purpose
>> of
>>>>> having
>>>>>>>>>>>> broker-level
>>>>>>>>>>>>>>>>>>>>>> transaction.two.phase.commit.enable
>>>> config
>>>>>> in
>>>>>>>>>> addition
>>>>>>>>>>>> to
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>> ACL? If
>>>>>>>>>>>>>>>>>>>>>> the brokers are configured with
>>>>>>>>>>>>>>>>>>>>>
>>> transaction.two.phase.commit.enable=false,
>>>>>>>>>>>>>>>>>>>>>> at what point will a client
>> configured
>>>>> with
>>>>>>>>>>>>>>>>>>>>>>
>>> transaction.two.phase.commit.enable=true
>>>>>> fail?
>>>>>>>>> Will
>>>>>>>>>> it
>>>>>>>>>>>>>> happen
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>>>> KafkaProducer#initTransactions?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> WDYT about adding an AdminClient
>>> method
>>>>> that
>>>>>>>>> returns
>>>>>>>>>>> the
>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>> t
>>>>>>>>>>>>>>>>>>>>>> ransaction.two.phase.commit.enable?
>>> This
>>>>>> way,
>>>>>>>>>> clients
>>>>>>>>>>>>> would
>>>>>>>>>>>>>>> know
>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>> advance
>>>>>>>>>>>>>>>>>>>>>> if 2PC is enabled on the brokers.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>> Alex
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 25, 2023 at 9:40 AM
>> Roger
>>>>>> Hoover <
>>>>>>>>>>>>>>>>>> roger.hoo...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Other than supporting multiplexing
>>>>>>>> transactional
>>>>>>>>>>>> streams
>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> single
>>>>>>>>>>>>>>>>>>>>>>> producer, I don't see how to
>> improve
>>>> it.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Thu, Aug 24, 2023 at 12:12 PM
>>> Artem
>>>>>>>> Livshits
>>>>>>>>>>>>>>>>>>>>>>> <alivsh...@confluent.io.invalid>
>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Hi Roger,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thank you for summarizing the
>>>> cons.  I
>>>>>>> agree
>>>>>>>>> and
>>>>>>>>>>> I'm
>>>>>>>>>>>>>>> curious
>>>>>>>>>>>>>>>>>> what
>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>> the alternatives to solve these
>>>>> problems
>>>>>>>>> better
>>>>>>>>>>> and
>>>>>>>>>>>> if
>>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>> incorporated into this proposal
>>> (or
>>>>>> built
>>>>>>>>>>>>> independently
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>> addition
>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>>> on top of this proposal).  E.g.
>>> one
>>>>>>>> potential
>>>>>>>>>>>>> extension
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>> discussed
>>>>>>>>>>>>>>>>>>>>>>>> earlier in the thread could be
>>>>>>> multiplexing
>>>>>>>>>>> logical
>>>>>>>>>>>>>>>>>> transactional
>>>>>>>>>>>>>>>>>>>>>>> "streams"
>>>>>>>>>>>>>>>>>>>>>>>> with a single producer.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> -Artem
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Aug 23, 2023 at 4:50 PM
>>>> Roger
>>>>>>>> Hoover <
>>>>>>>>>>>>>>>>>>>> roger.hoo...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks.  I like that you're
>>> moving
>>>>>> Kafka
>>>>>>>>>> toward
>>>>>>>>>>>>>>> supporting
>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>> dual-write
>>>>>>>>>>>>>>>>>>>>>>>>> pattern.  Each use case needs
>> to
>>>>>>> consider
>>>>>>>>> the
>>>>>>>>>>>>>> tradeoffs.
>>>>>>>>>>>>>>>>> You
>>>>>>>>>>>>>>>>>>>>> already
>>>>>>>>>>>>>>>>>>>>>>>>> summarized the pros very well
>> in
>>>> the
>>>>>>>> KIP.  I
>>>>>>>>>>> would
>>>>>>>>>>>>>>>> summarize
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> cons
>>>>>>>>>>>>>>>>>>>>>>>>> as follows:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> - you sacrifice availability -
>>>> each
>>>>>>> write
>>>>>>>>>>> requires
>>>>>>>>>>>>>> both
>>>>>>>>>>>>>>> DB
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> Kafka
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>> available so I think your
>>> overall
>>>>>>>>> application
>>>>>>>>>>>>>>> availability
>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>> 1
>>>>>>>>>>>>>>>>>>>> -
>>>>>>>>>>>>>>>>>>>>>> p(DB
>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>> unavailable)*p(Kafka is
>>>>> unavailable).
>>>>>>>>>>>>>>>>>>>>>>>>> - latency will be higher and
>>>>>> throughput
>>>>>>>>> lower
>>>>>>>>>> -
>>>>>>>>>>>> each
>>>>>>>>>>>>>>> write
>>>>>>>>>>>>>>>>>>>> requires
>>>>>>>>>>>>>>>>>>>>>>> both
>>>>>>>>>>>>>>>>>>>>>>>>> writes to DB and Kafka while
>>>> holding
>>>>>> an
>>>>>>>>>>> exclusive
>>>>>>>>>>>>> lock
>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> DB.
>>>>>>>>>>>>>>>>>>>>>>>>> - you need to create a
>> producer
>>>> per
>>>>>> unit
>>>>>>>> of
>>>>>>>>>>>>>> concurrency
>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>> your
>>>>>>>>>>>>>>>>>>>> app
>>>>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>>>>> has some overhead in the app
>> and
>>>>> Kafka
>>>>>>>> side
>>>>>>>>>>>> (number
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> connections,
>>>>>>>>>>>>>>>>>>>>>>> poor
>>>>>>>>>>>>>>>>>>>>>>>>> batching).  I assume the
>>> producers
>>>>>> would
>>>>>>>>> need
>>>>>>>>>> to
>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> configured
>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> low
>>>>>>>>>>>>>>>>>>>>>>>>> latency (linger.ms=0)
>>>>>>>>>>>>>>>>>>>>>>>>> - there's some complexity in
>>>>> managing
>>>>>>>> stable
>>>>>>>>>>>>>>> transactional
>>>>>>>>>>>>>>>>> ids
>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> each
>>>>>>>>>>>>>>>>>>>>>>>>> producer/concurrency unit in
>>> your
>>>>>>>>> application.
>>>>>>>>>>>> With
>>>>>>>>>>>>>> k8s
>>>>>>>>>>>>>>>>>>>>> deployment,
>>>>>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>>>>> may need to switch to
>> something
>>>>> like a
>>>>>>>>>>> StatefulSet
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> gives
>>>>>>>>>>>>>>>>>>>> each
>>>>>>>>>>>>>>>>>>>>>> pod
>>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>> stable identity across
>> restarts.
>>>> On
>>>>>> top
>>>>>>>> of
>>>>>>>>>> that
>>>>>>>>>>>> pod
>>>>>>>>>>>>>>>>> identity
>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>>> use as a prefix, you then
>> assign
>>>>>> unique
>>>>>>>>>>>>> transactional
>>>>>>>>>>>>>>> ids
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> each
>>>>>>>>>>>>>>>>>>>>>>>>> concurrency unit
>>>> (thread/goroutine).
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Aug 23, 2023 at
>> 12:53 PM
>>>>> Artem
>>>>>>>>>> Livshits
>>>>>>>>>>>>>>>>>>>>>>>>> <alivsh...@confluent.io
>>> .invalid>
>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Roger,
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Thank you for the feedback.
>>> You
>>>>>> make
>>>>>>> a
>>>>>>>>> very
>>>>>>>>>>>> good
>>>>>>>>>>>>>>> point
>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>>>>>>>>>> discussed internally.
>> Adding
>>>>>> support
>>>>>>>> for
>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>>>> concurrent
>>>>>>>>>>>>>>>>>>>>>>>>>> transactions in one producer
>>>> could
>>>>>> be
>>>>>>>>>> valuable
>>>>>>>>>>>> but
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> be a
>>>>>>>>>>>>>>>>>>>>>>>>> fairly
>>>>>>>>>>>>>>>>>>>>>>>>>> large and independent change
>>>> that
>>>>>>> would
>>>>>>>>>>> deserve
>>>>>>>>>>>> a
>>>>>>>>>>>>>>>> separate
>>>>>>>>>>>>>>>>>>>> KIP.
>>>>>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>>>>>>> such
>>>>>>>>>>>>>>>>>>>>>>>>>> support is added we could
>>> modify
>>>>> 2PC
>>>>>>>>>>>> functionality
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> incorporate
>>>>>>>>>>>>>>>>>>>>>>> that.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Maybe not too bad but a
>> bit
>>> of
>>>>>> pain
>>>>>>> to
>>>>>>>>>>> manage
>>>>>>>>>>>>>> these
>>>>>>>>>>>>>>>> ids
>>>>>>>>>>>>>>>>>>>> inside
>>>>>>>>>>>>>>>>>>>>>> each
>>>>>>>>>>>>>>>>>>>>>>>>>> process and across all
>>>> application
>>>>>>>>>> processes.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> I'm not sure if supporting
>>>>> multiple
>>>>>>>>>>> transactions
>>>>>>>>>>>>> in
>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>> producer
>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>>>>>>>>>>> id management simpler: we'd
>>> need
>>>>> to
>>>>>>>> store
>>>>>>>>> a
>>>>>>>>>>>> piece
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>> per
>>>>>>>>>>>>>>>>>>>>>>>>> transaction,
>>>>>>>>>>>>>>>>>>>>>>>>>> so whether it's N producers
>>>> with a
>>>>>>>> single
>>>>>>>>>>>>>> transaction
>>>>>>>>>>>>>>>> or N
>>>>>>>>>>>>>>>>>>>>>>> transactions
>>>>>>>>>>>>>>>>>>>>>>>>>> with a single producer, it's
>>>> still
>>>>>>>> roughly
>>>>>>>>>> the
>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>> amount
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>> manage.  In fact, managing
>>>>>>> transactional
>>>>>>>>> ids
>>>>>>>>>>>>>> (current
>>>>>>>>>>>>>>>>>>>> proposal)
>>>>>>>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>> easier, because the id is
>>>>> controlled
>>>>>>> by
>>>>>>>>> the
>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>> knows
>>>>>>>>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>> complete the transaction
>> after
>>>>>> crash /
>>>>>>>>>>> restart;
>>>>>>>>>>>>>> while
>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>> TID
>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>> generated by Kafka and that
>>>> would
>>>>>>>> create a
>>>>>>>>>>>>> question
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> starting
>>>>>>>>>>>>>>>>>>>>>> Kafka
>>>>>>>>>>>>>>>>>>>>>>>>>> transaction, but not saving
>>> its
>>>>> TID
>>>>>>> and
>>>>>>>>> then
>>>>>>>>>>>>>> crashing,
>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>>> figuring
>>>>>>>>>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>>>>>>>>>>> which transactions to abort
>>> and
>>>>> etc.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) creating a separate
>>>> producer
>>>>>> for
>>>>>>>> each
>>>>>>>>>>>>>> concurrency
>>>>>>>>>>>>>>>>> slot
>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> This is a very valid
>> concern.
>>>>> Maybe
>>>>>>>> we'd
>>>>>>>>>> need
>>>>>>>>>>>> to
>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>> multiplexing
>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>> transactional logical
>>> "streams"
>>>>> over
>>>>>>> the
>>>>>>>>>> same
>>>>>>>>>>>>>>>> connection.
>>>>>>>>>>>>>>>>>>>> Seems
>>>>>>>>>>>>>>>>>>>>>>> like a
>>>>>>>>>>>>>>>>>>>>>>>>>> separate KIP, though.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Otherwise, it seems you're
>>>> left
>>>>>> with
>>>>>>>>>>>>>> single-threaded
>>>>>>>>>>>>>>>>> model
>>>>>>>>>>>>>>>>>>>> per
>>>>>>>>>>>>>>>>>>>>>>>>>> application process?
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> That's a fair assessment.
>> Not
>>>>>>>> necessarily
>>>>>>>>>>>> exactly
>>>>>>>>>>>>>>>>>>>>> single-threaded
>>>>>>>>>>>>>>>>>>>>>>> per
>>>>>>>>>>>>>>>>>>>>>>>>>> application, but a single
>>>> producer
>>>>>> per
>>>>>>>>>> thread
>>>>>>>>>>>>> model
>>>>>>>>>>>>>>>> (i.e.
>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>>>>>>>>>>>>> could have a pool of
>> threads +
>>>>>>> producers
>>>>>>>>> to
>>>>>>>>>>>>> increase
>>>>>>>>>>>>>>>>>>>>> concurrency).
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> -Artem
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Aug 22, 2023 at
>>> 7:22 PM
>>>>>> Roger
>>>>>>>>>> Hoover <
>>>>>>>>>>>>>>>>>>>>>> roger.hoo...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Artem,
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the reply.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> If I understand correctly,
>>>> Kafka
>>>>>>> does
>>>>>>>>> not
>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>>>> concurrent
>>>>>>>>>>>>>>>>>>>>>>>>> transactions
>>>>>>>>>>>>>>>>>>>>>>>>>>> from the same producer
>>>>>>> (transactional
>>>>>>>>> id).
>>>>>>>>>>> I
>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>> means
>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>> applications that want to
>>>>> support
>>>>>>>>>> in-process
>>>>>>>>>>>>>>>> concurrency
>>>>>>>>>>>>>>>>>>>> (say
>>>>>>>>>>>>>>>>>>>>>>>>>> thread-level
>>>>>>>>>>>>>>>>>>>>>>>>>>> concurrency with row-level
>>> DB
>>>>>>> locking)
>>>>>>>>>> would
>>>>>>>>>>>>> need
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> manage
>>>>>>>>>>>>>>>>>>>>>>> separate
>>>>>>>>>>>>>>>>>>>>>>>>>>> transactional ids and
>>>> producers
>>>>>> per
>>>>>>>>> thread
>>>>>>>>>>> and
>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>> store
>>>>>>>>>>>>>>>>>>>> txn
>>>>>>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>>>>>>>>>> accordingly.   The
>> potential
>>>>>>> usability
>>>>>>>>>>>>> downsides I
>>>>>>>>>>>>>>> see
>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) managing a set of
>>>>> transactional
>>>>>>> ids
>>>>>>>>> for
>>>>>>>>>>>> each
>>>>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>>>>>>>>> process
>>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>> scales up to it's max
>>>>> concurrency.
>>>>>>>>> Maybe
>>>>>>>>>>> not
>>>>>>>>>>>>> too
>>>>>>>>>>>>>>> bad
>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>> bit
>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>> pain
>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>> manage these ids inside
>> each
>>>>>> process
>>>>>>>> and
>>>>>>>>>>>> across
>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>>>>>>>>>>>>> processes.
>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) creating a separate
>>>> producer
>>>>>> for
>>>>>>>> each
>>>>>>>>>>>>>> concurrency
>>>>>>>>>>>>>>>>> slot
>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> application - this could
>>>> create
>>>>> a
>>>>>>> lot
>>>>>>>>> more
>>>>>>>>>>>>>> producers
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> resultant
>>>>>>>>>>>>>>>>>>>>>>>>>>> connections to Kafka than
>>> the
>>>>>>> typical
>>>>>>>>>> model
>>>>>>>>>>>> of a
>>>>>>>>>>>>>>>> single
>>>>>>>>>>>>>>>>>>>>> producer
>>>>>>>>>>>>>>>>>>>>>>> per
>>>>>>>>>>>>>>>>>>>>>>>>>>> process.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Otherwise, it seems you're
>>>> left
>>>>>> with
>>>>>>>>>>>>>> single-threaded
>>>>>>>>>>>>>>>>> model
>>>>>>>>>>>>>>>>>>>> per
>>>>>>>>>>>>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>>>>>>>>>>>>>> process?
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Roger
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Aug 22, 2023 at
>>>> 5:11 PM
>>>>>>> Artem
>>>>>>>>>>> Livshits
>>>>>>>>>>>>>>>>>>>>>>>>>>> <alivsh...@confluent.io
>>>>> .invalid>
>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Roger, Arjun,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thank you for the
>>> questions.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It looks like the
>>>>> application
>>>>>>> must
>>>>>>>>>> have
>>>>>>>>>>>>> stable
>>>>>>>>>>>>>>>>>>>>> transactional
>>>>>>>>>>>>>>>>>>>>>>> ids
>>>>>>>>>>>>>>>>>>>>>>>>> over
>>>>>>>>>>>>>>>>>>>>>>>>>>>> time?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> The transactional id
>>> should
>>>>>>> uniquely
>>>>>>>>>>>> identify
>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>> producer
>>>>>>>>>>>>>>>>>>>>>> instance
>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>>>>>>>>>>>>>>> to be stable across the
>>>>>> restarts.
>>>>>>>> If
>>>>>>>>>> the
>>>>>>>>>>>>>>>>> transactional
>>>>>>>>>>>>>>>>>>>> id is
>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>> stable
>>>>>>>>>>>>>>>>>>>>>>>>>>>> across restarts, then
>>> zombie
>>>>>>>> messages
>>>>>>>>>>> from a
>>>>>>>>>>>>>>>> previous
>>>>>>>>>>>>>>>>>>>>>> incarnation
>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> producer may violate
>>>>> atomicity.
>>>>>>> If
>>>>>>>>>> there
>>>>>>>>>>>> are
>>>>>>>>>>>>> 2
>>>>>>>>>>>>>>>>> producer
>>>>>>>>>>>>>>>>>>>>>>> instances
>>>>>>>>>>>>>>>>>>>>>>>>>>>> concurrently producing
>>> data
>>>>> with
>>>>>>> the
>>>>>>>>>> same
>>>>>>>>>>>>>>>>> transactional
>>>>>>>>>>>>>>>>>>>> id,
>>>>>>>>>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>>>> going
>>>>>>>>>>>>>>>>>>>>>>>>>>>> to constantly fence each
>>>> other
>>>>>> and
>>>>>>>>> most
>>>>>>>>>>>> likely
>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>>>>> little or
>>>>>>>>>>>>>>>>>>>>>> no
>>>>>>>>>>>>>>>>>>>>>>>>>>> progress.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> The name might be a
>> little
>>>> bit
>>>>>>>>> confusing
>>>>>>>>>>> as
>>>>>>>>>>>> it
>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>> mistaken
>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>>> transaction id / TID
>> that
>>>>>> uniquely
>>>>>>>>>>>> identifies
>>>>>>>>>>>>>>> every
>>>>>>>>>>>>>>>>>>>>>> transaction.
>>>>>>>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>>>>>>> name
>>>>>>>>>>>>>>>>>>>>>>>>>>>> and the semantics were
>>>> defined
>>>>>> in
>>>>>>>> the
>>>>>>>>>>>> original
>>>>>>>>>>>>>>>>>>>>>>>> exactly-once-semantics
>>>>>>>>>>>>>>>>>>>>>>>>>>> (EoS)
>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposal (
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
>>>>>>>>>>>>>>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>>>>>>>>>>>> and KIP-939 just build
>> on
>>>> top
>>>>> of
>>>>>>>> that.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm curious to
>>> understand
>>>>> what
>>>>>>>>> happens
>>>>>>>>>>> if
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> producer
>>>>>>>>>>>>>>>>>>>>> dies,
>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>> does
>>>>>>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>>>> come up and recover the
>>>>> pending
>>>>>>>>>>> transaction
>>>>>>>>>>>>>> within
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> transaction
>>>>>>>>>>>>>>>>>>>>>>>>>>> timeout
>>>>>>>>>>>>>>>>>>>>>>>>>>>> interval.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> If the producer /
>>>> application
>>>>>>> never
>>>>>>>>>> comes
>>>>>>>>>>>>> back,
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> transaction
>>>>>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>>>>>> remain
>>>>>>>>>>>>>>>>>>>>>>>>>>>> in prepared (a.k.a.
>>>>> "in-doubt")
>>>>>>>> state
>>>>>>>>>>> until
>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>>>>>>>>> forcefully
>>>>>>>>>>>>>>>>>>>>>>>>>>>> terminates the
>>> transaction.
>>>>>>> That's
>>>>>>>>> why
>>>>>>>>>>>> there
>>>>>>>>>>>>>> is a
>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>> ACL is
>>>>>>>>>>>>>>>>>>>>>>>> defined
>>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>> this proposal -- this
>>>>>>> functionality
>>>>>>>>>> should
>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>> provided
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>> applications
>>>>>>>>>>>>>>>>>>>>>>>>>>>> that implement proper
>>>> recovery
>>>>>>>> logic.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Artem
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Aug 22, 2023 at
>>>>> 12:52 AM
>>>>>>>> Arjun
>>>>>>>>>>>> Satish
>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>> arjun.sat...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Artem,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have the same
>> question
>>>> as
>>>>>>> Roger
>>>>>>>> on
>>>>>>>>>>>>>> concurrent
>>>>>>>>>>>>>>>>>> writes,
>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>>>>>> additional
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> one on consumer
>>> behavior.
>>>>>>>> Typically,
>>>>>>>>>>>>>>> transactions
>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>> timeout
>>>>>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> committed within some
>>> time
>>>>>>>> interval.
>>>>>>>>>>> With
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> proposed
>>>>>>>>>>>>>>>>>>>>>> changes
>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consumers cannot
>> consume
>>>>> past
>>>>>>> the
>>>>>>>>>>> ongoing
>>>>>>>>>>>>>>>>> transaction.
>>>>>>>>>>>>>>>>>>>> I'm
>>>>>>>>>>>>>>>>>>>>>>>> curious
>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> understand what
>> happens
>>> if
>>>>> the
>>>>>>>>>> producer
>>>>>>>>>>>>> dies,
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> does
>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>> come
>>>>>>>>>>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> recover the pending
>>>>>> transaction
>>>>>>>>> within
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> transaction
>>>>>>>>>>>>>>>>>>>>>> timeout
>>>>>>>>>>>>>>>>>>>>>>>>>>> interval.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Or
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are we saying that
>> when
>>>> used
>>>>>> in
>>>>>>>> this
>>>>>>>>>> 2PC
>>>>>>>>>>>>>>> context,
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>> configure
>>>>>>>>>>>>>>>>>>>>>>>>>>>> these
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transaction timeouts
>> to
>>>> very
>>>>>>> large
>>>>>>>>>>>>> durations?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks in advance!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Arjun
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Aug 21, 2023
>> at
>>>>>> 1:06 PM
>>>>>>>>> Roger
>>>>>>>>>>>>> Hoover <
>>>>>>>>>>>>>>>>>>>>>>>>> roger.hoo...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Artem,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for writing
>>> this
>>>>> KIP.
>>>>>>>> Can
>>>>>>>>>> you
>>>>>>>>>>>>>> clarify
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> requirements
>>>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>> bit
>>>>>>>>>>>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for managing
>>> transaction
>>>>>>> state?
>>>>>>>>> It
>>>>>>>>>>>> looks
>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>>>>>>>>>>>> must
>>>>>>>>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> stable transactional
>>> ids
>>>>>> over
>>>>>>>>> time?
>>>>>>>>>>>> What
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> granularity
>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>> those
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ids
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and producers?  Say
>>> the
>>>>>>>>> application
>>>>>>>>>>> is a
>>>>>>>>>>>>>>>>>>>> multi-threaded
>>>>>>>>>>>>>>>>>>>>>> Java
>>>>>>>>>>>>>>>>>>>>>>>> web
>>>>>>>>>>>>>>>>>>>>>>>>>>>> server,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can/should all the
>>>>>> concurrent
>>>>>>>>>> threads
>>>>>>>>>>>>> share
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>> transactional
>>>>>>>>>>>>>>>>>>>>>>> id
>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> producer?  That
>>> doesn't
>>>>> seem
>>>>>>>> right
>>>>>>>>>> to
>>>>>>>>>>> me
>>>>>>>>>>>>>>> unless
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>> using
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> global DB locks that
>>>>>> serialize
>>>>>>>> all
>>>>>>>>>>>>> requests.
>>>>>>>>>>>>>>>>>>>> Instead, if
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> uses row-level DB
>>> locks,
>>>>>> there
>>>>>>>>> could
>>>>>>>>>>> be
>>>>>>>>>>>>>>>> multiple,
>>>>>>>>>>>>>>>>>>>>>> concurrent,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> independent
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> txns happening in
>> the
>>>> same
>>>>>> JVM
>>>>>>>> so
>>>>>>>>> it
>>>>>>>>>>>> seems
>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> granularity
>>>>>>>>>>>>>>>>>>>>>>>>>>>> managing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transactional ids
>> and
>>>> txn
>>>>>>> state
>>>>>>>>>> needs
>>>>>>>>>>> to
>>>>>>>>>>>>>> line
>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>> granularity
>>>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DB
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> locking.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Does that make sense
>>> or
>>>>> am I
>>>>>>>>>>>>>> misunderstanding?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Roger
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Aug 16, 2023
>>> at
>>>>>>> 11:40 PM
>>>>>>>>>> Artem
>>>>>>>>>>>>>>> Livshits
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <
>>> alivsh...@confluent.io
>>>>>>>> .invalid>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is a
>> discussion
>>>>>> thread
>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The KIP proposes
>>>>> extending
>>>>>>>> Kafka
>>>>>>>>>>>>>> transaction
>>>>>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>>>>>>>> (that
>>>>>>>>>>>>>>>>>>>>>>>>>> already
>>>>>>>>>>>>>>>>>>>>>>>>>>>> uses
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2PC
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> under the hood) to
>>>>> enable
>>>>>>>>>> atomicity
>>>>>>>>>>> of
>>>>>>>>>>>>>> dual
>>>>>>>>>>>>>>>>> writes
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> Kafka
>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> external
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> database, and
>> helps
>>> to
>>>>>> fix a
>>>>>>>>> long
>>>>>>>>>>>>> standing
>>>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>>>>>>>>> issue.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> An example of code
>>>> that
>>>>>> uses
>>>>>>>> the
>>>>>>>>>>> dual
>>>>>>>>>>>>>> write
>>>>>>>>>>>>>>>>> recipe
>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>> JDBC
>>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> work for most SQL
>>>>>> databases
>>>>>>> is
>>>>>>>>>> here
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/14231.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The FLIP for the
>>>> sister
>>>>>> fix
>>>>>>> in
>>>>>>>>>> Flink
>>>>>>>>>>>> is
>>>>>>>>>>>>>> here
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Artem


Reply via email to