Alaske opened a new issue, #10545:
URL: https://github.com/apache/rocketmq/issues/10545

   ### Before Creating the Enhancement Request
   
   - [x] I have confirmed that this should be classified as an enhancement 
rather than a bug/feature.
   
   
   ### Summary
   
   
   # Support `MessageQueueSelector` for transactional messages
   
   ## Background
   
   RocketMQ currently supports selecting a target queue for normal messages 
through APIs such as:
   
   ```java
   DefaultMQProducer#send(Message msg, MessageQueueSelector selector, Object 
arg)
   ```
   
   This is useful for scenarios that require deterministic queue routing, for 
example ordered messages, sharding by business key, traffic isolation, canary 
release, or custom queue lane routing.
   
   However, transactional messages currently do not expose an equivalent queue 
selection API.
   
   The current transactional send API is:
   
   ```java
   TransactionMQProducer#sendMessageInTransaction(Message msg, Object arg)
   ```
   
   Internally, `TransactionMQProducer` delegates to 
`DefaultMQProducerImpl#sendMessageInTransaction(...)`, which marks the message 
as a prepared transaction message and then calls the default send path:
   
   ```java
   sendResult = this.send(msg);
   ```
   
   As a result, transactional messages go through `sendDefaultImpl(...)`, where 
the client chooses a queue by `selectOneMessageQueue(...)`. There is no way for 
users to provide a custom `MessageQueueSelector`.
   
   Relevant source references:
   
   - `TransactionMQProducer#sendMessageInTransaction(...)`
     - 
<https://github.com/apache/rocketmq/blob/rocketmq-all-4.9.7/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java#L705-L719>
   - `DefaultMQProducerImpl#sendMessageInTransaction(...)`
     - 
<https://github.com/apache/rocketmq/blob/rocketmq-all-4.9.7/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java#L1222-L1242>
   - Normal message selector path:
     - 
<https://github.com/apache/rocketmq/blob/rocketmq-all-4.9.7/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java#L513-L526>
     - 
<https://github.com/apache/rocketmq/blob/rocketmq-all-4.9.7/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java#L1105-L1113>
   
   ## Broker-side behavior
   
   From the broker side, the final queue of a transactional message is 
determined by the queue selected when the half message is sent.
   
   When the broker receives a prepared transactional message, it stores the 
original topic and queue id into message properties:
   
   ```text
   PROPERTY_REAL_TOPIC
   PROPERTY_REAL_QUEUE_ID
   ```
   
   Then the message is rewritten to the internal half topic.
   
   During commit, the broker restores the final message topic and queue id from 
these properties.
   
   Relevant source references:
   
   - Store original topic and queue id before writing half message:
     - 
<https://github.com/apache/rocketmq/blob/rocketmq-all-4.9.7/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java#L203-L211>
   - Restore original topic and queue id when committing:
     - 
<https://github.com/apache/rocketmq/blob/rocketmq-all-4.9.7/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java#L194-L215>
   
   This means queue selection must happen before the half message is sent. 
After the half message is stored, the commit phase only restores the previously 
selected queue and does not perform routing again.
   
   ## Problem
   
   For transactional messages, users cannot provide a `MessageQueueSelector`, 
so they cannot control which queue the half message is written to.
   
   This makes it difficult to support scenarios such as:
   
   - deterministic transaction message sharding by business key
   - canary / gray release queue lanes
   - traffic isolation by queue
   - keeping normal messages and transactional messages under the same queue 
routing strategy
   
   For example, an application may use a custom selector for normal messages to 
route gray traffic to a subset of queues. But when the same flow uses 
transactional messages, the routing strategy cannot be reused because 
transactional send always uses the default queue selection logic.
   
   ## Proposal
   
   Add overloaded transactional send APIs that accept `MessageQueueSelector`, 
similar to normal message sending APIs.
   
   For example:
   
   ```java
   TransactionSendResult sendMessageInTransaction(
       Message msg,
       MessageQueueSelector selector,
       Object selectorArg,
       Object transactionArg
   ) throws MQClientException;
   ```
   
   Internally, the transactional send flow could still keep the existing 
transaction lifecycle unchanged:
   
   1. validate transaction listener
   2. mark message with `PROPERTY_TRANSACTION_PREPARED`
   3. set producer group
   4. send the half message using selector-based queue selection
   5. execute local transaction
   6. end transaction with commit / rollback / unknown
   
   The only expected difference is step 4: instead of always calling the 
default send path, allow the half message to be sent through the selector-based 
path.
   
   ## Compatibility
   
   This can be added as an overload, so existing APIs and behavior remain 
unchanged.
   
   Existing users of:
   
   ```java
   sendMessageInTransaction(Message msg, Object arg)
   ```
   
   would continue to use the current default queue selection logic.
   
   Users who need deterministic transactional queue routing can opt in to the 
new overload.
   
   ## Expected behavior
   
   When a transactional message is sent with a `MessageQueueSelector`, the 
selected queue should become the original queue of the half message. The broker 
should then store this queue id as `PROPERTY_REAL_QUEUE_ID` and restore it when 
the transaction is committed, consistent with the existing broker transaction 
message flow.
   
   ## Additional context
   
   This request does not require broker-side transaction protocol changes. The 
broker already restores the final queue from the queue id selected before the 
half message is stored. The missing capability is mainly on the client API and 
transactional send path.
   
   
   ### Motivation
   
   Transactional messages currently do not provide an API to select the target 
message queue.
   
   For normal messages, RocketMQ supports `MessageQueueSelector`, which allows 
users to route messages deterministically by business key, shard key, 
canary/gray label, or other custom routing rules. However, transactional 
messages always go through the default queue selection path internally.
   
   From the broker-side transaction flow, the final queue of a transactional 
message is determined when the half message is sent. The broker stores the 
original queue id as `PROPERTY_REAL_QUEUE_ID` and restores it when the 
transaction is committed. Therefore, if users cannot select the queue before 
the half message is written, they cannot control the final queue of the 
committed transactional message.
   
   This makes it difficult to keep transactional messages and normal messages 
under the same routing strategy, especially in scenarios such as deterministic 
sharding, traffic isolation, ordered business lanes, and canary/gray release 
queue lanes.
   
   ### Describe the Solution You'd Like
   
   Transactional messages currently do not provide an API to select the target 
message queue.
   
   For normal messages, RocketMQ supports `MessageQueueSelector`, which allows 
users to route messages deterministically by business key, shard key, 
canary/gray label, or other custom routing rules. However, transactional 
messages always go through the default queue selection path internally.
   
   From the broker-side transaction flow, the final queue of a transactional 
message is determined when the half message is sent. The broker stores the 
original queue id as `PROPERTY_REAL_QUEUE_ID` and restores it when the 
transaction is committed. Therefore, if users cannot select the queue before 
the half message is written, they cannot control the final queue of the 
committed transactional message.
   
   This makes it difficult to keep transactional messages and normal messages 
under the same routing strategy, especially in scenarios such as deterministic 
sharding, traffic isolation, ordered business lanes, and canary/gray release 
queue lanes.
   
   ### Describe Alternatives You've Considered
   
   I considered the following alternatives:
   
   1. Put the desired queue id into message properties or user headers.
   
   This does not work because the broker does not read the target queue id from 
message properties. The queue id is set in `SendMessageRequestHeader` by the 
client, based on the selected `MessageQueue`.
   
   2. Select or change the queue during transaction commit.
   
   This does not fit the current broker transaction flow. For transactional 
messages, the broker stores the original queue id as `PROPERTY_REAL_QUEUE_ID` 
when the half message is written. During commit, the broker only restores the 
original queue id from the half message. It does not perform queue selection 
again.
   
   3. Add custom broker-side routing logic based on message properties.
   
   This is possible in theory, but it would push application-level routing 
rules into the broker. It also introduces compatibility and maintenance risks 
around half messages, commit/rollback, transaction check, operation messages, 
and future upgrades.
   
   4. Use normal messages with an outbox pattern instead of RocketMQ 
transactional messages.
   
   This can be used as an application-level workaround, but it changes the 
programming model and does not solve the missing queue-selection capability for 
RocketMQ transactional messages.
   
   ### Additional Context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to