This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9d8bf601749 [fix][client] Fix lookup request semaphore not release
problem (#25038)
9d8bf601749 is described below
commit 9d8bf601749d465e2394a1a0db96bfe6b70d13a5
Author: congbo <[email protected]>
AuthorDate: Fri Dec 5 15:40:50 2025 +0800
[fix][client] Fix lookup request semaphore not release problem (#25038)
### Motivation
fix bugs
### Bug
if lookup request timeout, `pendingLookupRequestSemaphore` will not be
released, so fix it
### Matching PR in forked repository
PR in forked repository: https://github.com/congbobo184/pulsar/pull/23
---
.../apache/pulsar/client/impl/LookupRetryTest.java | 34 ++++++++++++++++++++++
.../org/apache/pulsar/client/impl/ClientCnx.java | 3 +-
2 files changed, 36 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupRetryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupRetryTest.java
index 77966713071..f8ebebd53e4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupRetryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupRetryTest.java
@@ -20,9 +20,12 @@ package org.apache.pulsar.client.impl;
import static
org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
import static
org.apache.pulsar.common.protocol.Commands.newPartitionMetadataResponse;
+import static org.testng.AssertJUnit.fail;
import com.google.common.collect.Sets;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -34,13 +37,16 @@ import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -51,6 +57,7 @@ public class LookupRetryTest extends
MockedPulsarServiceBaseTest {
private static final String subscription = "reader-sub";
private final AtomicInteger connectionsCreated = new AtomicInteger(0);
private final ConcurrentHashMap<String, Queue<LookupError>> failureMap =
new ConcurrentHashMap<>();
+ private static final String TEST_TIME_OUT_SEMAPHORE_RELEASE =
"testTimeoutReleasePendingLookupRequestSemaphore";
@BeforeClass
@Override
@@ -110,6 +117,30 @@ public class LookupRetryTest extends
MockedPulsarServiceBaseTest {
}
}
+ @Test
+ public void testTimeoutReleasePendingLookupRequestSemaphore() throws
Exception {
+ PulsarClientImpl client = (PulsarClientImpl) newClient();
+
+ LookupService lookup = client.getLookup();
+
+ CompletableFuture<PartitionedTopicMetadata> future =
+
lookup.getPartitionedTopicMetadata(TopicName.get(TEST_TIME_OUT_SEMAPHORE_RELEASE),
false);
+ try {
+ future.get();
+ fail();
+ } catch (Exception e) {
+ Assert.assertTrue(FutureUtil.unwrapCompletionException(e)
+ instanceof PulsarClientException.TimeoutException);
+ }
+
+ Set<CompletableFuture<ClientCnx>> clientCnxs =
client.getCnxPool().getConnections();
+ Assert.assertEquals(clientCnxs.size(), 1);
+ ClientCnx clientCnx = ((CompletableFuture<ClientCnx>)
clientCnxs.toArray()[0]).get();
+
Assert.assertEquals(clientCnx.getPendingLookupRequestSemaphore().availablePermits(),
+ client.conf.getConcurrentLookupRequest());
+ client.close();
+ }
+
@Test
public void testTimeoutRetriesOnPartitionMetadata() throws Exception {
try (PulsarClient client = newClient();
@@ -282,6 +313,9 @@ public class LookupRetryTest extends
MockedPulsarServiceBaseTest {
@Override
protected void
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata
partitionMetadata) {
TopicName t = TopicName.get(partitionMetadata.getTopic());
+ if (t.getLocalName().equals(TEST_TIME_OUT_SEMAPHORE_RELEASE)) {
+ return;
+ }
LookupError error = errorList(t.getLocalName()).poll();
if (error == LookupError.TOO_MANY) {
final long requestId = partitionMetadata.getRequestId();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 07df92f501e..184b210e2b0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -922,7 +922,8 @@ public class ClientCnx extends PulsarHandler {
if (pendingLookupRequestSemaphore.tryAcquire()) {
future.whenComplete((lookupDataResult, throwable) -> {
if (throwable instanceof ConnectException
- || throwable instanceof
PulsarClientException.LookupException) {
+ || throwable instanceof
PulsarClientException.LookupException
+ || FutureUtil.unwrapCompletionException(throwable)
instanceof TimeoutException) {
pendingLookupRequestSemaphore.release();
}
});