This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 53b26f7ad9708bab86db4c836fcef1821bfead03
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Oct 9 14:36:44 2025 +0300

    [fix] Fix mixed lookup/partition metadata requests causing reliability 
issues and incorrect responses (#24832)
    
    (cherry picked from commit 4457b089daf815d009f89c88c8e4bc35a8233d54)
---
 .../org/apache/pulsar/broker/service/ServerCnx.java   | 19 ++++++++++++++++---
 .../java/org/apache/pulsar/client/impl/ClientCnx.java |  2 +-
 .../org/apache/pulsar/client/impl/ClientCnxTest.java  |  5 ++++-
 .../apache/pulsar/common/protocol/PulsarDecoder.java  |  5 +++++
 4 files changed, 26 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1c61d4c467f..d50b10fd03b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -507,8 +507,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     }
 
     @Override
-    protected void handleLookup(CommandLookupTopic lookup) {
+    protected void handleLookup(CommandLookupTopic lookupParam) {
         checkArgument(state == State.Connected);
+
+        // Make a copy since the command is handled asynchronously
+        CommandLookupTopic lookup = new 
CommandLookupTopic().copyFrom(lookupParam);
+
         final long requestId = lookup.getRequestId();
         final boolean authoritative = lookup.isAuthoritative();
 
@@ -595,8 +599,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     }
 
     @Override
-    protected void 
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata 
partitionMetadata) {
+    protected void 
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata 
partitionMetadataParam) {
         checkArgument(state == State.Connected);
+
+        // Make a copy since the command is handled asynchronously
+        CommandPartitionedTopicMetadata partitionMetadata =
+                new 
CommandPartitionedTopicMetadata().copyFrom(partitionMetadataParam);
+
         final long requestId = partitionMetadata.getRequestId();
         if (log.isDebugEnabled()) {
             log.debug("[{}] Received PartitionMetadataLookup from {} for {}", 
partitionMetadata.getTopic(),
@@ -3114,8 +3123,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     }
 
     @Override
-    protected void handleCommandWatchTopicList(CommandWatchTopicList 
commandWatchTopicList) {
+    protected void handleCommandWatchTopicList(CommandWatchTopicList 
commandWatchTopicListParam) {
         checkArgument(state == State.Connected);
+
+        // make a copy since command is handled asynchronously
+        CommandWatchTopicList commandWatchTopicList = new 
CommandWatchTopicList().copyFrom(commandWatchTopicListParam);
+
         final long requestId = commandWatchTopicList.getRequestId();
         final long watcherId = commandWatchTopicList.getWatcherId();
         final NamespaceName namespaceName = 
NamespaceName.get(commandWatchTopicList.getNamespace());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 43be0072a8c..9cccbecf3aa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -1242,7 +1242,7 @@ public class ClientCnx extends PulsarHandler {
         CompletableFuture<CommandWatchTopicListSuccess> requestFuture =
                 (CompletableFuture<CommandWatchTopicListSuccess>) 
pendingRequests.remove(requestId);
         if (requestFuture != null) {
-            requestFuture.complete(commandWatchTopicListSuccess);
+            requestFuture.complete(new 
CommandWatchTopicListSuccess().copyFrom(commandWatchTopicListSuccess));
         } else {
             duplicatedResponseCounter.incrementAndGet();
             log.warn("{} Received unknown request id from server: {}",
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index e0721ffe905..c0a75b09cce 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -334,7 +335,9 @@ public class ClientCnxTest {
                     .setRequestId(7)
                     .setWatcherId(5).setTopicsHash("f00");
             cnx.handleCommandWatchTopicListSuccess(success);
-            assertEquals(result.getNow(null), success);
+            assertThat(result.getNow(null))
+                    .usingRecursiveComparison()
+                    .comparingOnlyFields("requestId", "watcherId", 
"topicsHash");
         });
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index c05b1d796df..41658e62f1b 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -483,6 +483,11 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
             }
         } finally {
             buffer.release();
+            // Clear the fields in cmd to release memory.
+            // The clear() call below also helps prevent misuse of holding 
references to command objects after
+            // handle* methods complete, as per the class javadoc requirement.
+            // While this doesn't completely prevent such misuse, it makes 
tests more likely to catch violations.
+            cmd.clear();
         }
     }
 

Reply via email to