[ https://issues.apache.org/jira/browse/KAFKA-19082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ritika Reddy resolved KAFKA-19082. ---------------------------------- Resolution: Fixed > Client side changes to enable 2PC > --------------------------------- > > Key: KAFKA-19082 > URL: https://issues.apache.org/jira/browse/KAFKA-19082 > Project: Kafka > Issue Type: Sub-task > Reporter: Ritika Reddy > Assignee: Ritika Reddy > Priority: Major > > h2. KafkaProducer API Changes > New {{KafkaProducer.PreparedTxnState}} class is going to be defined as > following: > {panel} > {panel} > |{{static}} {{public}} {{class}} {{PreparedTxnState {}} > {{ }}{{public}} {{String toString();}} > {{ }}{{public}} {{PreparedTxnState(String serializedState);}} > {{ }}{{public}} {{PreparedTxnState();}} > {{}}}| > The objects of this class can serialize to / deserialize from a string value > and can be written to / read from a database. The implementation is going to > store {{producerId}} and {{{}epoch{}}}. > New overloaded method will be added to {{{}KafkaProducer{}}}: > {{public void initTransactions(boolean keepPreparedTxn)}} > If the value is 'true' then the corresponding field is set in the > {{InitProducerIdRequest}} and the {{KafkaProducer}} object is set into a > state which only allows calling {{{}.commitTransaction{}}}, > {{.abortTransaction,}} or {{.completeTransaction.}} > New method will be added to {{{}KafkaProducer{}}}: > {{public PreparedTxnState prepareTransaction()}} > This would flush all the pending messages and transition the producer into a > mode where only {{{}.commitTransaction{}}}, {{.abortTransaction,}} or > {{.completeTransaction}} could be called (calling other methods, e.g. > {{.send}} , in that mode would result in {{IllegalStateException}} being > thrown). If the call is successful (all messages successfully got flushed to > all partitions) the transaction is prepared. If the 2PC is not enabled, we > return the {{INVALID_TXN_STATE}} error. > New method would be added to {{{}KafkaProducer{}}}: > {{public void completeTransaction(PreparedTxnState preparedTxnState)}} > The method would compare the currently prepared transaction state and the > state passed in the argument and either commit or abort the transaction. If > the producer is not in prepared state (i.e. neither prepareTransaction was > called nor initTransaction(true) was called) we return an INVALID_TXN_STATE > error. > h2. Client Configuration Changes > *transaction.two.phase.commit.enable* The default would be ‘false’. If set > to ‘true’, then the broker is informed that the client is participating in > two phase commit protocol and transactions that this client starts never > expire. > *transaction.timeout.ms* The semantics is not changed, but it would be an > error to set *transaction.timeout.ms* when *two.phase.commit.enable* is set > to 'true’. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)