wang-jiahua commented on code in PR #10564:
URL: https://github.com/apache/rocketmq/pull/10564#discussion_r3510574617


##########
broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java:
##########
@@ -316,6 +316,30 @@ public Channel getAvailableChannel(String groupId) {
         return lastActiveChannel;
     }
 
+    /**
+     * Get an available channel for the given group, preferring the producer 
that originally sent the message.
+     * Falls back to round-robin if the preferred producer is not found or not 
available.
+     *
+     * @param groupId producer group
+     * @param preferredClientId the clientId of the original producer (from 
half message properties), may be null
+     * @return an available channel, or null if none found
+     */
+    public Channel getAvailableChannel(String groupId, String 
preferredClientId) {
+        if (preferredClientId != null) {
+            ConcurrentMap<Channel, ClientChannelInfo> channelMap = 
groupChannelTable.get(groupId);
+            if (channelMap != null) {
+                for (Map.Entry<Channel, ClientChannelInfo> entry : 
channelMap.entrySet()) {
+                    if 
(preferredClientId.equals(entry.getValue().getClientId())
+                        && entry.getKey().isActive() && 
entry.getKey().isWritable()) {
+                        return entry.getKey();
+                    }
+                }
+            }
+        }
+        // Fall back to round-robin selection
+        return getAvailableChannel(groupId);
+    }

Review Comment:
   Fixed — added `if (groupId == null) return null;` guard at the beginning of 
the new `getAvailableChannel(String, String)` overload.



##########
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java:
##########
@@ -1444,6 +1444,7 @@ public TransactionSendResult 
sendMessageInTransaction(final Message msg,
         SendResult sendResult = null;
         MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
         MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, 
this.defaultMQProducer.getProducerGroup());
+        MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID, 
this.mQClientFactory.getClientId());

Review Comment:
   The method already calls `this.makeSureStateOK()` at line 1438 before 
dereferencing `mQClientFactory` at line 1448. This guarantees the producer is 
in RUNNING state and `mQClientFactory` is not null.



##########
common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java:
##########
@@ -52,6 +52,7 @@ public class MessageConst {
     public static final String PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET = 
"TRAN_PREPARED_QUEUE_OFFSET";
     public static final String PROPERTY_TRANSACTION_ID = "__transactionId__";
     public static final String PROPERTY_TRANSACTION_CHECK_TIMES = 
"TRANSACTION_CHECK_TIMES";
+    public static final String PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID = 
"__TXN_PRODUCER_CID__";

Review Comment:
   Fixed — added `STRING_HASH_SET.add(PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID)` 
in the static initializer block of `MessageConst`.



##########
broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java:
##########
@@ -225,4 +225,37 @@ public void testGetAvailableChannel() {
         assertThat(c).isNull();
     }
 
+    @Test
+    public void testGetAvailableChannelWithPreferredClientId() {
+        producerManager.registerProducer(group, clientInfo);
+        when(channel.isActive()).thenReturn(true);
+        when(channel.isWritable()).thenReturn(true);
+
+        // Match: preferred clientId matches registered producer
+        Channel c = producerManager.getAvailableChannel(group, "clientId");
+        assertThat(c).isSameAs(channel);
+    }
+
+    @Test
+    public void testGetAvailableChannelWithPreferredClientIdNotFound() {
+        producerManager.registerProducer(group, clientInfo);
+        when(channel.isActive()).thenReturn(true);
+        when(channel.isWritable()).thenReturn(true);
+
+        // No match: falls back to round-robin (returns some channel from 
group)
+        Channel c = producerManager.getAvailableChannel(group, 
"nonExistentClientId");
+        assertThat(c).isNotNull(); // should fall back to round-robin
+    }
+
+    @Test
+    public void testGetAvailableChannelWithNullPreferredClientId() {
+        producerManager.registerProducer(group, clientInfo);
+        when(channel.isActive()).thenReturn(true);
+        when(channel.isWritable()).thenReturn(true);
+
+        // null clientId: should behave exactly like original 
getAvailableChannel
+        Channel c = producerManager.getAvailableChannel(group, null);
+        assertThat(c).isNotNull();
+    }
+
 }

Review Comment:
   Fixed — added `testGetAvailableChannelWithNullGroupId` test in 
`ProducerManagerTest` that verifies null groupId returns null without throwing 
NPE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to