*Status*

* - Current State: Discussing- Authors: [complone](github.com/complone
<http://github.com/complone>)- Shepherds: - Mailing List discussion:
[email protected] <[email protected]>- Pull Request: -
Released: no*Background & MotivationWhat do we need to do


* - Will we add a new module? yes- Will we add new APIs? yes- Will we add
new feature? yes*Why should we do that







* - Are there any problems of our current project?At present, in the
process of rocketmq producing messages, there is uncertainty in the network
call itself, that is, the so-called processing state, so there will be
repetitions. Many other MQ products also have this problem. The usual
approach is to ask consumers to deduplicate messages when consuming
messages. Idempotent bases should be generated by message producers. When
sending a message, we can pass the message's key. Set the id, the
corresponding API is ```org.apache.rocketmq.common.message.setKeys(String
keys)```.   - What can we benefit proposed changes?We introduce a notion of
TransactionalId, to enable users to uniquely identify producers in a
persistent way. Different instances of a producer with the same
TransactionalId will be able to resume (or abort) any transactions
instantiated by the previous instance.We introduce the notion of a producer
epoch, which enables us to ensure that there is only one legitimate active
instance of a producer with a given TransactionalId, and hence enables us
to maintain transaction guarantees in the event of failures.*Goals








* - What problem is this proposal designed to solve?When tuned for
reliability, users can guarantee that every message write will be persisted
at least once without data loss. Duplications may appear in the stream due
to producer retries. For example, a broker might crash between committing a
message and sending an ack to the producer, causing the producer to retry,
resulting in duplicate messages in the stream.However, idempotent producers
don’t provide guarantees for writes across multiple MessageQueues. For
this, one needs stronger transactional guarantees, ie. the ability to write
to several MessageQueues atomically. By atomically, we mean the ability to
commit a set of messages across MessageQueues as a unit: either all
messages are committed, or none of them are. - To what degree should we
solve the problem?Within a transaction, we also need to make sure that
there is no duplicate messages generated by the producer. To achieve this,
we are going to add sequence numbers to messages to allow the brokers to
de-duplicate messages per producer and topic partition. For each topic
partition that is written to, the producer maintains a sequence number
counter and assigns the next number in the sequence for each new message.
The broker verifies that the next message produced has been assigned the
next number and otherwise returns an error. In addition, since the sequence
number is per producer and topic partition, we also need to uniquely
identify a producer across multiple sessions (i.e. when the producer fails
and recreates, etc). Hence we introduce a new TransactionalId to
distinguish producers, along with an epoch number so that zombie writers
with the same TransactionalId can be fenced.At any given point in time, a
producer can only have one ongoing transaction, so we can distinguish
messages that belong to different transactions by their respective
TransactionalId. Producers with the same TransactionalId will talk to the
same transaction coordinator which also keeps track of their
TransactionalIds in addition to managing their transaction status. *
Non-Goals


* - What problem is this proposal NOT designed to solve?- Are there any
limits of this proposal?*ChangesArchitecture

Interface Design/Change

























* - Method signature changesA set of new public APIs to the
TransactionMQProducer class, and describe how these APIs will be
implemented./* initialize the producer as a transactional producer
*/initTransactions()The following steps will be taken when
initTransactions() is called: 1. If no TransactionalId has been provided in
configuration, skip to step 3.2. Send a FindCoordinatorRequest with the
configured TransactionalId and with CoordinatorType encoded as
“transaction” to a random broker. Block for the corresponding response,
which will return the assigned transaction coordinator for this producer.3.
Send an InitPidRequest to the transaction coordinator or to a random broker
if no TransactionalId was provided in configuration. Block for the
corresponding response to get the returned PID./* start a transaction to
produce messages */beginTransaction()  The following steps are executed on
the producer when beginTransaction is called: 1. Check if the producer is
transactional (i.e. init has been called), if not throw an exception (we
omit this step in the rest of the APIs, but they all need to execute it).2.
Check whether a transaction has already been started. If so, raise an
exception./* send offsets for a given consumer group within this
transaction */sendOffsetsToTransaction(            Map<TopicPartition,
OffsetAndMetadata> offsets,             String consumerGroupId)  The
following steps are executed on the producer when sendOffsetsToTransaction
is called: 1. Check if it is currently within a transaction, if not throw
an exception; otherwise proceed to the next step.2. Check if this function
has ever been called for the given groupId within this transaction. If not
then send an AddOffsetsToTxnRequest to the transaction coordinator, block
until the corresponding response is received; otherwise proceed to the next
step.3. Send a TxnOffsetCommitRequest to the coordinator return from the
response in the previous step, block until the corresponding response is
received./* commit the transaction with its produced messages
*/commitTransaction() The following steps are executed on the producer when
commitTransaction is called: 1. Check if there is an active transaction, if
not throw an exception; otherwise proceed to the next step.2. Call flush to
make sure all sent messages in this transactions are acknowledged.3. Send
an EndTxnRequest with COMMIT command to the transaction coordinator, block
until the corresponding response is received. - Method behavior changes -
CLI command changes- Log format or content changes* Compatibility,
Deprecation, and Migration Plan


* - Are backward and forward compatibility taken into consideration?- Are
there deprecated APIs?- How do we do migration?*Implementation Outline
Phase 1


*First, before sending data, you need to manually create a transaction
coordinator to control the one to many relationship between PID and
transcationId. To this end, you need to ensure that the consumption
progress of offset is notified to the tc in a timely manner and that the
messages of submitted but unsettled transactions will not be deleted by
mistake during the simultaneous process of the scheduled deletion task*Phase
2

*Whether it is necessary to judge the progress of the current offset and
the global PID visible transaction offset in PullLiteProducer in order to
enable PID ->transactionid to achieve cross partition transaction
consistency*Phase 3


*When downstream consumers consume, they need to ensure that messages
marked as transactionId in the upstream market are perceived by downstream
consumers*Rejected Alternatives How does alternatives solve the issue you
proposed?Pros and Cons of alternativesWhy should we reject above
alternatives

Reply via email to