ok i have resent Xinyu Zhou <[email protected]> 于2022年9月29日周四 20:10写道:
> Hi, thanks for starting this discussion. > > But the format is broken, it's hard to read, could you please resend a > well-formatted version? Or, paste a doc link? > > Regards > > On Thu, Sep 29, 2022 at 7:23 PM 柳尘 <[email protected]> wrote: > > > *Status* > > > > > > * - Current State: Discussing- Authors: [complone](github.com/complone > > <http://github.com/complone>)- Shepherds: - Mailing List discussion: > > [email protected] <[email protected]>- Pull Request: - > > Released: no*Background & MotivationWhat do we need to do > > > > > > * - Will we add a new module? yes- Will we add new APIs? yes- Will we add > > new feature? yes*Why should we do that > > > > > > > > > > > > > > > > * - Are there any problems of our current project?At present, in the > > process of rocketmq producing messages, there is uncertainty in the > network > > call itself, that is, the so-called processing state, so there will be > > repetitions. Many other MQ products also have this problem. The usual > > approach is to ask consumers to deduplicate messages when consuming > > messages. Idempotent bases should be generated by message producers. When > > sending a message, we can pass the message's key. Set the id, the > > corresponding API is ```org.apache.rocketmq.common.message.setKeys(String > > keys)```. - What can we benefit proposed changes?We introduce a notion > of > > TransactionalId, to enable users to uniquely identify producers in a > > persistent way. Different instances of a producer with the same > > TransactionalId will be able to resume (or abort) any transactions > > instantiated by the previous instance.We introduce the notion of a > producer > > epoch, which enables us to ensure that there is only one legitimate > active > > instance of a producer with a given TransactionalId, and hence enables us > > to maintain transaction guarantees in the event of failures.*Goals > > > > > > > > > > > > > > > > > > * - What problem is this proposal designed to solve?When tuned for > > reliability, users can guarantee that every message write will be > persisted > > at least once without data loss. Duplications may appear in the stream > due > > to producer retries. For example, a broker might crash between > committing a > > message and sending an ack to the producer, causing the producer to > retry, > > resulting in duplicate messages in the stream.However, idempotent > producers > > don’t provide guarantees for writes across multiple MessageQueues. For > > this, one needs stronger transactional guarantees, ie. the ability to > write > > to several MessageQueues atomically. By atomically, we mean the ability > to > > commit a set of messages across MessageQueues as a unit: either all > > messages are committed, or none of them are. - To what degree should we > > solve the problem?Within a transaction, we also need to make sure that > > there is no duplicate messages generated by the producer. To achieve > this, > > we are going to add sequence numbers to messages to allow the brokers to > > de-duplicate messages per producer and topic partition. For each topic > > partition that is written to, the producer maintains a sequence number > > counter and assigns the next number in the sequence for each new message. > > The broker verifies that the next message produced has been assigned the > > next number and otherwise returns an error. In addition, since the > sequence > > number is per producer and topic partition, we also need to uniquely > > identify a producer across multiple sessions (i.e. when the producer > fails > > and recreates, etc). Hence we introduce a new TransactionalId to > > distinguish producers, along with an epoch number so that zombie writers > > with the same TransactionalId can be fenced.At any given point in time, a > > producer can only have one ongoing transaction, so we can distinguish > > messages that belong to different transactions by their respective > > TransactionalId. Producers with the same TransactionalId will talk to the > > same transaction coordinator which also keeps track of their > > TransactionalIds in addition to managing their transaction status. * > > Non-Goals > > > > > > * - What problem is this proposal NOT designed to solve?- Are there any > > limits of this proposal?*ChangesArchitecture > > > > Interface Design/Change > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > * - Method signature changesA set of new public APIs to the > > TransactionMQProducer class, and describe how these APIs will be > > implemented./* initialize the producer as a transactional producer > > */initTransactions()The following steps will be taken when > > initTransactions() is called: 1. If no TransactionalId has been provided > in > > configuration, skip to step 3.2. Send a FindCoordinatorRequest with the > > configured TransactionalId and with CoordinatorType encoded as > > “transaction” to a random broker. Block for the corresponding response, > > which will return the assigned transaction coordinator for this > producer.3. > > Send an InitPidRequest to the transaction coordinator or to a random > broker > > if no TransactionalId was provided in configuration. Block for the > > corresponding response to get the returned PID./* start a transaction to > > produce messages */beginTransaction() The following steps are executed > on > > the producer when beginTransaction is called: 1. Check if the producer is > > transactional (i.e. init has been called), if not throw an exception (we > > omit this step in the rest of the APIs, but they all need to execute > it).2. > > Check whether a transaction has already been started. If so, raise an > > exception./* send offsets for a given consumer group within this > > transaction */sendOffsetsToTransaction( Map<TopicPartition, > > OffsetAndMetadata> offsets, String consumerGroupId) The > > following steps are executed on the producer when > sendOffsetsToTransaction > > is called: 1. Check if it is currently within a transaction, if not throw > > an exception; otherwise proceed to the next step.2. Check if this > function > > has ever been called for the given groupId within this transaction. If > not > > then send an AddOffsetsToTxnRequest to the transaction coordinator, block > > until the corresponding response is received; otherwise proceed to the > next > > step.3. Send a TxnOffsetCommitRequest to the coordinator return from the > > response in the previous step, block until the corresponding response is > > received./* commit the transaction with its produced messages > > */commitTransaction() The following steps are executed on the producer > when > > commitTransaction is called: 1. Check if there is an active transaction, > if > > not throw an exception; otherwise proceed to the next step.2. Call flush > to > > make sure all sent messages in this transactions are acknowledged.3. Send > > an EndTxnRequest with COMMIT command to the transaction coordinator, > block > > until the corresponding response is received. - Method behavior changes - > > CLI command changes- Log format or content changes* Compatibility, > > Deprecation, and Migration Plan > > > > > > * - Are backward and forward compatibility taken into consideration?- Are > > there deprecated APIs?- How do we do migration?*Implementation Outline > > Phase 1 > > > > > > *First, before sending data, you need to manually create a transaction > > coordinator to control the one to many relationship between PID and > > transcationId. To this end, you need to ensure that the consumption > > progress of offset is notified to the tc in a timely manner and that the > > messages of submitted but unsettled transactions will not be deleted by > > mistake during the simultaneous process of the scheduled deletion > > task*Phase > > 2 > > > > *Whether it is necessary to judge the progress of the current offset and > > the global PID visible transaction offset in PullLiteProducer in order to > > enable PID ->transactionid to achieve cross partition transaction > > consistency*Phase 3 > > > > > > *When downstream consumers consume, they need to ensure that messages > > marked as transactionId in the upstream market are perceived by > downstream > > consumers*Rejected Alternatives How does alternatives solve the issue you > > proposed?Pros and Cons of alternativesWhy should we reject above > > alternatives > > >
