*Status*
* - Current State: Discussing- Authors: [complone](github.com/complone <http://github.com/complone>)- Shepherds: - Mailing List discussion: [email protected] <[email protected]>- Pull Request: - Released: no*Background & MotivationWhat do we need to do * - Will we add a new module? yes- Will we add new APIs? yes- Will we add new feature? yes*Why should we do that * - Are there any problems of our current project?At present, in the process of rocketmq producing messages, there is uncertainty in the network call itself, that is, the so-called processing state, so there will be repetitions. Many other MQ products also have this problem. The usual approach is to ask consumers to deduplicate messages when consuming messages. Idempotent bases should be generated by message producers. When sending a message, we can pass the message's key. Set the id, the corresponding API is ```org.apache.rocketmq.common.message.setKeys(String keys)```. - What can we benefit proposed changes?We introduce a notion of TransactionalId, to enable users to uniquely identify producers in a persistent way. Different instances of a producer with the same TransactionalId will be able to resume (or abort) any transactions instantiated by the previous instance.We introduce the notion of a producer epoch, which enables us to ensure that there is only one legitimate active instance of a producer with a given TransactionalId, and hence enables us to maintain transaction guarantees in the event of failures.*Goals * - What problem is this proposal designed to solve?When tuned for reliability, users can guarantee that every message write will be persisted at least once without data loss. Duplications may appear in the stream due to producer retries. For example, a broker might crash between committing a message and sending an ack to the producer, causing the producer to retry, resulting in duplicate messages in the stream.However, idempotent producers don’t provide guarantees for writes across multiple MessageQueues. For this, one needs stronger transactional guarantees, ie. the ability to write to several MessageQueues atomically. By atomically, we mean the ability to commit a set of messages across MessageQueues as a unit: either all messages are committed, or none of them are. - To what degree should we solve the problem?Within a transaction, we also need to make sure that there is no duplicate messages generated by the producer. To achieve this, we are going to add sequence numbers to messages to allow the brokers to de-duplicate messages per producer and topic partition. For each topic partition that is written to, the producer maintains a sequence number counter and assigns the next number in the sequence for each new message. The broker verifies that the next message produced has been assigned the next number and otherwise returns an error. In addition, since the sequence number is per producer and topic partition, we also need to uniquely identify a producer across multiple sessions (i.e. when the producer fails and recreates, etc). Hence we introduce a new TransactionalId to distinguish producers, along with an epoch number so that zombie writers with the same TransactionalId can be fenced.At any given point in time, a producer can only have one ongoing transaction, so we can distinguish messages that belong to different transactions by their respective TransactionalId. Producers with the same TransactionalId will talk to the same transaction coordinator which also keeps track of their TransactionalIds in addition to managing their transaction status. * Non-Goals * - What problem is this proposal NOT designed to solve?- Are there any limits of this proposal?*ChangesArchitecture Interface Design/Change * - Method signature changesA set of new public APIs to the TransactionMQProducer class, and describe how these APIs will be implemented./* initialize the producer as a transactional producer */initTransactions()The following steps will be taken when initTransactions() is called: 1. If no TransactionalId has been provided in configuration, skip to step 3.2. Send a FindCoordinatorRequest with the configured TransactionalId and with CoordinatorType encoded as “transaction” to a random broker. Block for the corresponding response, which will return the assigned transaction coordinator for this producer.3. Send an InitPidRequest to the transaction coordinator or to a random broker if no TransactionalId was provided in configuration. Block for the corresponding response to get the returned PID./* start a transaction to produce messages */beginTransaction() The following steps are executed on the producer when beginTransaction is called: 1. Check if the producer is transactional (i.e. init has been called), if not throw an exception (we omit this step in the rest of the APIs, but they all need to execute it).2. Check whether a transaction has already been started. If so, raise an exception./* send offsets for a given consumer group within this transaction */sendOffsetsToTransaction( Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) The following steps are executed on the producer when sendOffsetsToTransaction is called: 1. Check if it is currently within a transaction, if not throw an exception; otherwise proceed to the next step.2. Check if this function has ever been called for the given groupId within this transaction. If not then send an AddOffsetsToTxnRequest to the transaction coordinator, block until the corresponding response is received; otherwise proceed to the next step.3. Send a TxnOffsetCommitRequest to the coordinator return from the response in the previous step, block until the corresponding response is received./* commit the transaction with its produced messages */commitTransaction() The following steps are executed on the producer when commitTransaction is called: 1. Check if there is an active transaction, if not throw an exception; otherwise proceed to the next step.2. Call flush to make sure all sent messages in this transactions are acknowledged.3. Send an EndTxnRequest with COMMIT command to the transaction coordinator, block until the corresponding response is received. - Method behavior changes - CLI command changes- Log format or content changes* Compatibility, Deprecation, and Migration Plan * - Are backward and forward compatibility taken into consideration?- Are there deprecated APIs?- How do we do migration?*Implementation Outline Phase 1 *First, before sending data, you need to manually create a transaction coordinator to control the one to many relationship between PID and transcationId. To this end, you need to ensure that the consumption progress of offset is notified to the tc in a timely manner and that the messages of submitted but unsettled transactions will not be deleted by mistake during the simultaneous process of the scheduled deletion task*Phase 2 *Whether it is necessary to judge the progress of the current offset and the global PID visible transaction offset in PullLiteProducer in order to enable PID ->transactionid to achieve cross partition transaction consistency*Phase 3 *When downstream consumers consume, they need to ensure that messages marked as transactionId in the upstream market are perceived by downstream consumers*Rejected Alternatives How does alternatives solve the issue you proposed?Pros and Cons of alternativesWhy should we reject above alternatives
