This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d8bf601749 [fix][client] Fix lookup request semaphore not release 
problem (#25038)
9d8bf601749 is described below

commit 9d8bf601749d465e2394a1a0db96bfe6b70d13a5
Author: congbo <[email protected]>
AuthorDate: Fri Dec 5 15:40:50 2025 +0800

    [fix][client] Fix lookup request semaphore not release problem (#25038)
    
    ### Motivation
    fix bugs
    ### Bug
    if lookup request timeout, `pendingLookupRequestSemaphore` will not be 
released, so fix it
    
    ### Matching PR in forked repository
    
    PR in forked repository: https://github.com/congbobo184/pulsar/pull/23
---
 .../apache/pulsar/client/impl/LookupRetryTest.java | 34 ++++++++++++++++++++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  3 +-
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupRetryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupRetryTest.java
index 77966713071..f8ebebd53e4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupRetryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupRetryTest.java
@@ -20,9 +20,12 @@ package org.apache.pulsar.client.impl;
 
 import static 
org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
 import static 
org.apache.pulsar.common.protocol.Commands.newPartitionMetadataResponse;
+import static org.testng.AssertJUnit.fail;
 import com.google.common.collect.Sets;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -34,13 +37,16 @@ import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.common.api.proto.CommandLookupTopic;
 import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -51,6 +57,7 @@ public class LookupRetryTest extends 
MockedPulsarServiceBaseTest {
     private static final String subscription = "reader-sub";
     private final AtomicInteger connectionsCreated = new AtomicInteger(0);
     private final ConcurrentHashMap<String, Queue<LookupError>> failureMap = 
new ConcurrentHashMap<>();
+    private static final String TEST_TIME_OUT_SEMAPHORE_RELEASE = 
"testTimeoutReleasePendingLookupRequestSemaphore";
 
     @BeforeClass
     @Override
@@ -110,6 +117,30 @@ public class LookupRetryTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testTimeoutReleasePendingLookupRequestSemaphore() throws 
Exception {
+        PulsarClientImpl client = (PulsarClientImpl) newClient();
+
+        LookupService lookup = client.getLookup();
+
+        CompletableFuture<PartitionedTopicMetadata> future =
+                
lookup.getPartitionedTopicMetadata(TopicName.get(TEST_TIME_OUT_SEMAPHORE_RELEASE),
 false);
+        try {
+            future.get();
+            fail();
+        } catch (Exception e) {
+            Assert.assertTrue(FutureUtil.unwrapCompletionException(e)
+                    instanceof PulsarClientException.TimeoutException);
+        }
+
+        Set<CompletableFuture<ClientCnx>> clientCnxs = 
client.getCnxPool().getConnections();
+        Assert.assertEquals(clientCnxs.size(), 1);
+        ClientCnx clientCnx = ((CompletableFuture<ClientCnx>) 
clientCnxs.toArray()[0]).get();
+        
Assert.assertEquals(clientCnx.getPendingLookupRequestSemaphore().availablePermits(),
+                client.conf.getConcurrentLookupRequest());
+        client.close();
+    }
+
     @Test
     public void testTimeoutRetriesOnPartitionMetadata() throws Exception {
         try (PulsarClient client = newClient();
@@ -282,6 +313,9 @@ public class LookupRetryTest extends 
MockedPulsarServiceBaseTest {
         @Override
         protected void 
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata 
partitionMetadata) {
             TopicName t = TopicName.get(partitionMetadata.getTopic());
+            if (t.getLocalName().equals(TEST_TIME_OUT_SEMAPHORE_RELEASE)) {
+                return;
+            }
             LookupError error = errorList(t.getLocalName()).poll();
             if (error == LookupError.TOO_MANY) {
                 final long requestId = partitionMetadata.getRequestId();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 07df92f501e..184b210e2b0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -922,7 +922,8 @@ public class ClientCnx extends PulsarHandler {
         if (pendingLookupRequestSemaphore.tryAcquire()) {
             future.whenComplete((lookupDataResult, throwable) -> {
                 if (throwable instanceof ConnectException
-                        || throwable instanceof 
PulsarClientException.LookupException) {
+                        || throwable instanceof 
PulsarClientException.LookupException
+                        || FutureUtil.unwrapCompletionException(throwable) 
instanceof TimeoutException) {
                     pendingLookupRequestSemaphore.release();
                 }
             });

Reply via email to