[
https://issues.apache.org/jira/browse/KAFKA-19082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ritika Reddy resolved KAFKA-19082.
----------------------------------
Resolution: Fixed
> Client side changes to enable 2PC
> ---------------------------------
>
> Key: KAFKA-19082
> URL: https://issues.apache.org/jira/browse/KAFKA-19082
> Project: Kafka
> Issue Type: Sub-task
> Reporter: Ritika Reddy
> Assignee: Ritika Reddy
> Priority: Major
>
> h2. KafkaProducer API Changes
> New {{KafkaProducer.PreparedTxnState}} class is going to be defined as
> following:
> {panel}
> {panel}
> |{{static}} {{public}} {{class}} {{PreparedTxnState {}}
> {{ }}{{public}} {{String toString();}}
> {{ }}{{public}} {{PreparedTxnState(String serializedState);}}
> {{ }}{{public}} {{PreparedTxnState();}}
> {{}}}|
> The objects of this class can serialize to / deserialize from a string value
> and can be written to / read from a database. The implementation is going to
> store {{producerId}} and {{{}epoch{}}}.
> New overloaded method will be added to {{{}KafkaProducer{}}}:
> {{public void initTransactions(boolean keepPreparedTxn)}}
> If the value is 'true' then the corresponding field is set in the
> {{InitProducerIdRequest}} and the {{KafkaProducer}} object is set into a
> state which only allows calling {{{}.commitTransaction{}}},
> {{.abortTransaction,}} or {{.completeTransaction.}}
> New method will be added to {{{}KafkaProducer{}}}:
> {{public PreparedTxnState prepareTransaction()}}
> This would flush all the pending messages and transition the producer into a
> mode where only {{{}.commitTransaction{}}}, {{.abortTransaction,}} or
> {{.completeTransaction}} could be called (calling other methods, e.g.
> {{.send}} , in that mode would result in {{IllegalStateException}} being
> thrown). If the call is successful (all messages successfully got flushed to
> all partitions) the transaction is prepared. If the 2PC is not enabled, we
> return the {{INVALID_TXN_STATE}} error.
> New method would be added to {{{}KafkaProducer{}}}:
> {{public void completeTransaction(PreparedTxnState preparedTxnState)}}
> The method would compare the currently prepared transaction state and the
> state passed in the argument and either commit or abort the transaction. If
> the producer is not in prepared state (i.e. neither prepareTransaction was
> called nor initTransaction(true) was called) we return an INVALID_TXN_STATE
> error.
> h2. Client Configuration Changes
> *transaction.two.phase.commit.enable* The default would be ‘false’. If set
> to ‘true’, then the broker is informed that the client is participating in
> two phase commit protocol and transactions that this client starts never
> expire.
> *transaction.timeout.ms* The semantics is not changed, but it would be an
> error to set *transaction.timeout.ms* when *two.phase.commit.enable* is set
> to 'true’.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)