This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 4d756d11fab [fix] Fix mixed lookup/partition metadata requests causing
reliability issues and incorrect responses (#24832)
4d756d11fab is described below
commit 4d756d11fabcd66823ad5159b7be91e29995e5b8
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Oct 9 14:36:44 2025 +0300
[fix] Fix mixed lookup/partition metadata requests causing reliability
issues and incorrect responses (#24832)
(cherry picked from commit 4457b089daf815d009f89c88c8e4bc35a8233d54)
---
.../org/apache/pulsar/broker/service/ServerCnx.java | 19 ++++++++++++++++---
.../java/org/apache/pulsar/client/impl/ClientCnx.java | 2 +-
.../org/apache/pulsar/client/impl/ClientCnxTest.java | 5 ++++-
.../apache/pulsar/common/protocol/PulsarDecoder.java | 5 +++++
4 files changed, 26 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 43497692d0f..af48cd8a381 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -514,8 +514,12 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
@Override
- protected void handleLookup(CommandLookupTopic lookup) {
+ protected void handleLookup(CommandLookupTopic lookupParam) {
checkArgument(state == State.Connected);
+
+ // Make a copy since the command is handled asynchronously
+ CommandLookupTopic lookup = new
CommandLookupTopic().copyFrom(lookupParam);
+
final long requestId = lookup.getRequestId();
final boolean authoritative = lookup.isAuthoritative();
@@ -591,8 +595,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
@Override
- protected void
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata
partitionMetadata) {
+ protected void
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata
partitionMetadataParam) {
checkArgument(state == State.Connected);
+
+ // Make a copy since the command is handled asynchronously
+ CommandPartitionedTopicMetadata partitionMetadata =
+ new
CommandPartitionedTopicMetadata().copyFrom(partitionMetadataParam);
+
final long requestId = partitionMetadata.getRequestId();
if (log.isDebugEnabled()) {
log.debug("[{}] Received PartitionMetadataLookup from {} for {}",
partitionMetadata.getTopic(),
@@ -3153,8 +3162,12 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
@Override
- protected void handleCommandWatchTopicList(CommandWatchTopicList
commandWatchTopicList) {
+ protected void handleCommandWatchTopicList(CommandWatchTopicList
commandWatchTopicListParam) {
checkArgument(state == State.Connected);
+
+ // make a copy since command is handled asynchronously
+ CommandWatchTopicList commandWatchTopicList = new
CommandWatchTopicList().copyFrom(commandWatchTopicListParam);
+
final long requestId = commandWatchTopicList.getRequestId();
final long watcherId = commandWatchTopicList.getWatcherId();
final NamespaceName namespaceName =
NamespaceName.get(commandWatchTopicList.getNamespace());
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index effd5a85e64..dc2cc8d1bea 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -1242,7 +1242,7 @@ public class ClientCnx extends PulsarHandler {
CompletableFuture<CommandWatchTopicListSuccess> requestFuture =
(CompletableFuture<CommandWatchTopicListSuccess>)
pendingRequests.remove(requestId);
if (requestFuture != null) {
- requestFuture.complete(commandWatchTopicListSuccess);
+ requestFuture.complete(new
CommandWatchTopicListSuccess().copyFrom(commandWatchTopicListSuccess));
} else {
duplicatedResponseCounter.incrementAndGet();
log.warn("{} Received unknown request id from server: {}",
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index 86644708eba..7c0e9f1492a 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -330,7 +331,9 @@ public class ClientCnxTest {
.setRequestId(7)
.setWatcherId(5).setTopicsHash("f00");
cnx.handleCommandWatchTopicListSuccess(success);
- assertEquals(result.getNow(null), success);
+ assertThat(result.getNow(null))
+ .usingRecursiveComparison()
+ .comparingOnlyFields("requestId", "watcherId",
"topicsHash");
});
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index c05b1d796df..41658e62f1b 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -483,6 +483,11 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
}
} finally {
buffer.release();
+ // Clear the fields in cmd to release memory.
+ // The clear() call below also helps prevent misuse of holding
references to command objects after
+ // handle* methods complete, as per the class javadoc requirement.
+ // While this doesn't completely prevent such misuse, it makes
tests more likely to catch violations.
+ cmd.clear();
}
}