This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 53b26f7ad9708bab86db4c836fcef1821bfead03 Author: Lari Hotari <[email protected]> AuthorDate: Thu Oct 9 14:36:44 2025 +0300 [fix] Fix mixed lookup/partition metadata requests causing reliability issues and incorrect responses (#24832) (cherry picked from commit 4457b089daf815d009f89c88c8e4bc35a8233d54) --- .../org/apache/pulsar/broker/service/ServerCnx.java | 19 ++++++++++++++++--- .../java/org/apache/pulsar/client/impl/ClientCnx.java | 2 +- .../org/apache/pulsar/client/impl/ClientCnxTest.java | 5 ++++- .../apache/pulsar/common/protocol/PulsarDecoder.java | 5 +++++ 4 files changed, 26 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 1c61d4c467f..d50b10fd03b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -507,8 +507,12 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { } @Override - protected void handleLookup(CommandLookupTopic lookup) { + protected void handleLookup(CommandLookupTopic lookupParam) { checkArgument(state == State.Connected); + + // Make a copy since the command is handled asynchronously + CommandLookupTopic lookup = new CommandLookupTopic().copyFrom(lookupParam); + final long requestId = lookup.getRequestId(); final boolean authoritative = lookup.isAuthoritative(); @@ -595,8 +599,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { } @Override - protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) { + protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadataParam) { checkArgument(state == State.Connected); + + // Make a copy since the command is handled asynchronously + CommandPartitionedTopicMetadata partitionMetadata = + new CommandPartitionedTopicMetadata().copyFrom(partitionMetadataParam); + final long requestId = partitionMetadata.getRequestId(); if (log.isDebugEnabled()) { log.debug("[{}] Received PartitionMetadataLookup from {} for {}", partitionMetadata.getTopic(), @@ -3114,8 +3123,12 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { } @Override - protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicList) { + protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicListParam) { checkArgument(state == State.Connected); + + // make a copy since command is handled asynchronously + CommandWatchTopicList commandWatchTopicList = new CommandWatchTopicList().copyFrom(commandWatchTopicListParam); + final long requestId = commandWatchTopicList.getRequestId(); final long watcherId = commandWatchTopicList.getWatcherId(); final NamespaceName namespaceName = NamespaceName.get(commandWatchTopicList.getNamespace()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 43be0072a8c..9cccbecf3aa 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -1242,7 +1242,7 @@ public class ClientCnx extends PulsarHandler { CompletableFuture<CommandWatchTopicListSuccess> requestFuture = (CompletableFuture<CommandWatchTopicListSuccess>) pendingRequests.remove(requestId); if (requestFuture != null) { - requestFuture.complete(commandWatchTopicListSuccess); + requestFuture.complete(new CommandWatchTopicListSuccess().copyFrom(commandWatchTopicListSuccess)); } else { duplicatedResponseCounter.incrementAndGet(); log.warn("{} Received unknown request id from server: {}", diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index e0721ffe905..c0a75b09cce 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -334,7 +335,9 @@ public class ClientCnxTest { .setRequestId(7) .setWatcherId(5).setTopicsHash("f00"); cnx.handleCommandWatchTopicListSuccess(success); - assertEquals(result.getNow(null), success); + assertThat(result.getNow(null)) + .usingRecursiveComparison() + .comparingOnlyFields("requestId", "watcherId", "topicsHash"); }); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index c05b1d796df..41658e62f1b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -483,6 +483,11 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter { } } finally { buffer.release(); + // Clear the fields in cmd to release memory. + // The clear() call below also helps prevent misuse of holding references to command objects after + // handle* methods complete, as per the class javadoc requirement. + // While this doesn't completely prevent such misuse, it makes tests more likely to catch violations. + cmd.clear(); } }
