This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c8dbc3c9c22 [fix][broker] PIP-442: Fix race condition in async 
semaphore permit updates that causes memory limits to become ineffective 
(#25066)
c8dbc3c9c22 is described below

commit c8dbc3c9c2276d1324adc53f33a2f409238d121f
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Dec 12 13:40:10 2025 +0200

    [fix][broker] PIP-442: Fix race condition in async semaphore permit updates 
that causes memory limits to become ineffective (#25066)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |   4 +-
 ...nConsumerBackPressureMultipleConsumersTest.java | 136 ++++++++++++---------
 .../semaphore/AsyncDualMemoryLimiterImpl.java      |   4 +-
 .../common/semaphore/AsyncSemaphoreImpl.java       |  68 +++++------
 .../semaphore/AsyncDualMemoryLimiterImplTest.java  |  24 ++++
 .../pulsar/proxy/server/LookupProxyHandler.java    |   7 +-
 ...nConsumerBackPressureMultipleConsumersTest.java |  20 +++
 7 files changed, 162 insertions(+), 101 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a539f437e7f..098c5a9090d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2593,10 +2593,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, 
isPermitRequestCancelled, initialPermits -> {
                         return 
getBrokerService().pulsar().getNamespaceService()
                                 .getListOfUserTopics(namespaceName, mode)
-                                .thenAccept(topics -> {
+                                .thenCompose(topics -> {
                                     long actualSize = 
TopicListMemoryLimiter.estimateTopicListSize(topics);
                                     listSizeHolder.updateSize(actualSize);
-                                    
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+                                    return 
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
                                             isPermitRequestCancelled, permits 
-> {
                                                 boolean filterTopics = false;
                                                 // filter system topic
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java
index d91106ab683..15db34823e1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import io.netty.buffer.ByteBuf;
 import java.io.Closeable;
@@ -41,6 +42,8 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl;
 import org.apache.pulsar.common.stats.JvmMetrics;
 import org.apache.pulsar.common.util.DirectMemoryUtils;
 import org.testng.annotations.AfterMethod;
@@ -96,69 +99,86 @@ public class 
PatternConsumerBackPressureMultipleConsumersTest extends MockedPuls
             }
         };
 
-        @Cleanup("shutdownNow")
-        final ExecutorService executorService = 
Executors.newFixedThreadPool(Runtime.getRuntime()
-                .availableProcessors());
-
-        @Cleanup
-        PulsarClientSharedResources sharedResources =
-                PulsarClientSharedResources.builder().build();
-        List<PulsarClientImpl> clients = new ArrayList<>(numberOfClients);
-        @Cleanup
-        Closeable closeClients = () -> {
-            for (PulsarClient client : clients) {
-                try {
-                    client.close();
-                } catch (PulsarClientException e) {
-                    log.error("Failed to close client {}", client, e);
+        {
+            @Cleanup("shutdownNow") final ExecutorService executorService =
+                    Executors.newFixedThreadPool(Runtime.getRuntime()
+                            .availableProcessors());
+
+            @Cleanup
+            PulsarClientSharedResources sharedResources =
+                    PulsarClientSharedResources.builder().build();
+            List<PulsarClientImpl> clients = new ArrayList<>(numberOfClients);
+            @Cleanup
+            Closeable closeClients = () -> {
+                for (PulsarClient client : clients) {
+                    try {
+                        client.close();
+                    } catch (PulsarClientException e) {
+                        log.error("Failed to close client {}", client, e);
+                    }
                 }
+            };
+            for (int i = 0; i < numberOfClients; i++) {
+                PulsarClientImpl client = (PulsarClientImpl) 
PulsarClient.builder()
+                        .serviceUrl(getClientServiceUrl())
+                        .sharedResources(sharedResources)
+                        .build();
+                clients.add(client);
             }
-        };
-        for (int i = 0; i < numberOfClients; i++) {
-            PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder()
-                    .serviceUrl(getClientServiceUrl())
-                    .sharedResources(sharedResources)
-                    .build();
-            clients.add(client);
-        }
 
-        final AtomicInteger success = new AtomicInteger(0);
-        final CountDownLatch latch = new CountDownLatch(requests);
-        final Semaphore semaphore = new Semaphore(maxRequestsInFlight);
-        for (int i = 0; i < requests; i++) {
-            PulsarClientImpl pulsarClientImpl = clients.get(i % 
numberOfClients);
-            executorService.execute(() -> {
-                semaphore.acquireUninterruptibly();
-                try {
-                    pulsarClientImpl.getLookup()
-                            
.getTopicsUnderNamespace(NamespaceName.get("public", "default"),
-                                    
CommandGetTopicsOfNamespace.Mode.PERSISTENT, ".*", "")
-                            .whenComplete((result, ex) -> {
-                                semaphore.release();
-                                if (ex == null) {
-                                    success.incrementAndGet();
-                                } else {
-                                    log.error("Failed to get topic list.", ex);
-                                }
-                                log.info(
-                                        "latch-count: {}, succeed: {}, 
available direct mem: {} MB, free heap mem: {}"
-                                                + " MB",
-                                        latch.getCount(), success.get(),
-                                        
(DirectMemoryUtils.jvmMaxDirectMemory() - JvmMetrics.getJvmDirectMemoryUsed())
-                                                / (1024 * 1024), 
Runtime.getRuntime().freeMemory() / (1024 * 1024));
-                                latch.countDown();
-                            });
-                } catch (Exception e) {
-                    semaphore.release();
-                    latch.countDown();
-                    log.error("Failed to execute getTopicsUnderNamespace 
request.", e);
-                }
-            });
+            final AtomicInteger success = new AtomicInteger(0);
+            final CountDownLatch latch = new CountDownLatch(requests);
+            final Semaphore semaphore = new Semaphore(maxRequestsInFlight);
+            for (int i = 0; i < requests; i++) {
+                PulsarClientImpl pulsarClientImpl = clients.get(i % 
numberOfClients);
+                executorService.execute(() -> {
+                    semaphore.acquireUninterruptibly();
+                    try {
+                        pulsarClientImpl.getLookup()
+                                
.getTopicsUnderNamespace(NamespaceName.get("public", "default"),
+                                        
CommandGetTopicsOfNamespace.Mode.PERSISTENT, ".*", "")
+                                .whenComplete((result, ex) -> {
+                                    semaphore.release();
+                                    if (ex == null) {
+                                        success.incrementAndGet();
+                                    } else {
+                                        log.error("Failed to get topic list.", 
ex);
+                                    }
+                                    log.info("latch-count: {}, succeed: {}, 
available direct mem: {} MB, "
+                                                    + "free heap mem: {} MB",
+                                            latch.getCount(), success.get(),
+                                            
(DirectMemoryUtils.jvmMaxDirectMemory()
+                                                    - 
JvmMetrics.getJvmDirectMemoryUsed())
+                                                    / (1024 * 1024), 
Runtime.getRuntime().freeMemory() / (1024 * 1024));
+                                    latch.countDown();
+                                });
+                    } catch (Exception e) {
+                        semaphore.release();
+                        latch.countDown();
+                        log.error("Failed to execute getTopicsUnderNamespace 
request.", e);
+                    }
+                });
+            }
+            latch.await();
+            assertEquals(success.get(), requests);
+
+            validateTopiclistPrometheusMetrics();
         }
-        latch.await();
-        assertEquals(success.get(), requests);
 
-        validateTopiclistPrometheusMetrics();
+        validateThatTokensHaventLeakedOrIncreased();
+    }
+
+    protected void validateThatTokensHaventLeakedOrIncreased() {
+        AsyncDualMemoryLimiterImpl limiter =
+                pulsar.getBrokerService().getMaxTopicListInFlightLimiter();
+        
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAvailablePermits())
+                
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightHeapMemSizeMB() * 
1024 * 1024);
+        
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAcquiredPermits())
+                .isEqualTo(0);
+        
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY).getAvailablePermits())
+                
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightDirectMemSizeMB() * 
1024 * 1024);
+        
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY).getAcquiredPermits())
+                .isEqualTo(0);
     }
 
     protected int getNumberOfClients() {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
index d6b31aa72ee..1b55d56d201 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.common.semaphore;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
@@ -93,7 +94,8 @@ public class AsyncDualMemoryLimiterImpl implements 
AsyncDualMemoryLimiter, AutoC
                 new DualMemoryLimiterPermit(limitType, result));
     }
 
-    protected AsyncSemaphore getLimiter(LimitType limitType) {
+    @VisibleForTesting
+    public AsyncSemaphore getLimiter(LimitType limitType) {
         switch (limitType) {
         case HEAP_MEMORY:
             return heapLimiter;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
index 7fc506753a8..bd5f5912921 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.common.semaphore;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -33,7 +32,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.function.BooleanSupplier;
 import java.util.function.LongConsumer;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.Runnables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -103,10 +101,10 @@ public class AsyncSemaphoreImpl implements 
AsyncSemaphore, AutoCloseable {
 
     @Override
     public CompletableFuture<AsyncSemaphorePermit> acquire(long permits, 
BooleanSupplier isCancelled) {
-        return internalAcquire(permits, permits, isCancelled);
+        return internalAcquire(permits, null, isCancelled);
     }
 
-    private CompletableFuture<AsyncSemaphorePermit> internalAcquire(long 
permits, long acquirePermits,
+    private CompletableFuture<AsyncSemaphorePermit> internalAcquire(long 
permits, SemaphorePermit previousPermit,
                                                                     
BooleanSupplier isCancelled) {
         validatePermits(permits);
 
@@ -122,7 +120,7 @@ public class AsyncSemaphoreImpl implements AsyncSemaphore, 
AutoCloseable {
             return future;
         }
 
-        PendingRequest request = new PendingRequest(permits, acquirePermits, 
future, isCancelled);
+        PendingRequest request = new PendingRequest(permits, previousPermit, 
future, isCancelled);
         if (!queue.offer(request)) {
             future.completeExceptionally(new PermitAcquireQueueFullException(
                     "Semaphore queue is full"));
@@ -177,34 +175,21 @@ public class AsyncSemaphoreImpl implements 
AsyncSemaphore, AutoCloseable {
         long oldPermits = permit.getPermits();
         long additionalPermits = newPermits - oldPermits;
         if (additionalPermits > 0) {
-            CompletableFuture<AsyncSemaphorePermit> acquireFuture =
-                    internalAcquire(newPermits, additionalPermits, 
isCancelled);
-            // return a future that completes after original permits have been 
released when the acquisition
-            // has been successfully completed
-            CompletableFuture<AsyncSemaphorePermit> returnedFuture =
-                    acquireFuture.thenApply(p -> {
-                                // mark the old permits as released without 
adding the permits to availablePermits
-                                castToImplementation(permit).releasePermits();
-                                return p;
-                            });
-            // add cancellation support for returned future, so that it 
cancels the acquireFuture if the returnedFuture
-            // is cancelled
-            returnedFuture.whenComplete((p, t) -> {
-                if (t != null && FutureUtil.unwrapCompletionException(t) 
instanceof CancellationException) {
-                    acquireFuture.cancel(false);
-                }
-            });
-            return returnedFuture;
-        }
-        if (additionalPermits < 0) {
-            // new permits are less than the old ones, so we return the 
difference
-            availablePermits.addAndGet(-additionalPermits);
-            processQueue();
+            return internalAcquire(newPermits, castToImplementation(permit), 
isCancelled);
         }
         // mark the old permits as released without adding the permits to 
availablePermits
-        castToImplementation(permit).releasePermits();
-        // return the new permits immediately
-        return CompletableFuture.completedFuture(new 
SemaphorePermit(newPermits));
+        long leftoverPermits = castToImplementation(permit).releasePermits() - 
newPermits;
+        if (leftoverPermits >= 0) {
+            if (leftoverPermits > 0) {
+                // new permits are less than the old ones, so we return the 
difference
+                availablePermits.addAndGet(leftoverPermits);
+                processQueue();
+            }
+            // return the new permits immediately
+            return CompletableFuture.completedFuture(new 
SemaphorePermit(newPermits));
+        } else {
+            return acquire(newPermits, isCancelled);
+        }
     }
 
     @Override
@@ -282,8 +267,9 @@ public class AsyncSemaphoreImpl implements AsyncSemaphore, 
AutoCloseable {
                 continue;
             }
 
-            if (request.acquirePermits <= current) {
-                availablePermits.addAndGet(-request.acquirePermits);
+            if (request.getRequiredPermits() <= current) {
+                long requiredPermitsReusingPrevious = 
request.getRequiredPermitsReusingPrevious();
+                availablePermits.addAndGet(-requiredPermitsReusingPrevious);
                 request.cancelTimeoutTask();
                 queue.remove(request);
                 SemaphorePermit permit = new SemaphorePermit(request.permits);
@@ -291,7 +277,7 @@ public class AsyncSemaphoreImpl implements AsyncSemaphore, 
AutoCloseable {
                 boolean futureCompleted = request.future.complete(permit);
                 if (!futureCompleted) {
                     // request was cancelled by user code, return permits
-                    availablePermits.addAndGet(request.acquirePermits);
+                    availablePermits.addAndGet(requiredPermitsReusingPrevious);
                 }
             } else {
                 break;
@@ -318,16 +304,16 @@ public class AsyncSemaphoreImpl implements 
AsyncSemaphore, AutoCloseable {
 
     private static class PendingRequest {
         final long permits;
-        private final long acquirePermits;
+        private final SemaphorePermit previousPermit;
         final CompletableFuture<AsyncSemaphorePermit> future;
         private final BooleanSupplier isCancelled;
         private volatile ScheduledFuture<?> timeoutTask;
         private final long requestCreatedNanos = System.nanoTime();
 
-        PendingRequest(long permits, long acquirePermits, 
CompletableFuture<AsyncSemaphorePermit> future,
+        PendingRequest(long permits, SemaphorePermit previousPermit, 
CompletableFuture<AsyncSemaphorePermit> future,
                        BooleanSupplier isCancelled) {
             this.permits = permits;
-            this.acquirePermits = acquirePermits;
+            this.previousPermit = previousPermit;
             this.future = future;
             this.isCancelled = isCancelled;
         }
@@ -346,6 +332,14 @@ public class AsyncSemaphoreImpl implements AsyncSemaphore, 
AutoCloseable {
         long getAgeNanos() {
             return System.nanoTime() - requestCreatedNanos;
         }
+
+        long getRequiredPermits() {
+            return previousPermit == null ? permits : permits - 
previousPermit.getPermits();
+        }
+
+        long getRequiredPermitsReusingPrevious() {
+            return previousPermit == null ? permits : permits - 
previousPermit.releasePermits();
+        }
     }
 
     private static class SemaphorePermit implements AsyncSemaphorePermit {
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImplTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImplTest.java
index fd23d95151c..3c972ce1640 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImplTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImplTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.common.semaphore;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -32,6 +33,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
 import 
org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit;
 import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter.LimitType;
 import 
org.apache.pulsar.common.semaphore.AsyncSemaphore.PermitAcquireAlreadyClosedException;
@@ -718,4 +721,25 @@ public class AsyncDualMemoryLimiterImplTest {
             limiter.release(permit);
         }
     }
+
+    @Test(invocationCount = 100)
+    public void testUpdateHeapPermitsIncreaseWithReleasedPermits() throws 
Exception {
+        limiter = new AsyncDualMemoryLimiterImpl(100000, 10, 5000, 1000, 100, 
5000);
+
+        BooleanSupplier cancelled = () -> false;
+        AtomicReference<CompletableFuture<Void>> future2Ref = new 
AtomicReference<>();
+
+        CompletableFuture<Void> future =
+                limiter.withAcquiredPermits(100, LimitType.HEAP_MEMORY, 
cancelled, permit -> {
+                    future2Ref.set(limiter.withUpdatedPermits(permit, 200, 
cancelled, updatedPermit -> {
+                                return CompletableFuture.supplyAsync(() -> 
null);
+                            }, CompletableFuture::failedFuture));
+                    return CompletableFuture.completedFuture(null);
+                }, CompletableFuture::failedFuture);
+
+        assertThat(future).succeedsWithin(1, TimeUnit.SECONDS);
+        assertThat(future2Ref.get()).succeedsWithin(1, TimeUnit.SECONDS);
+        
assertThat(limiter.getLimiter(LimitType.HEAP_MEMORY).getAvailablePermits()).isEqualTo(100000);
+        
assertThat(limiter.getLimiter(LimitType.HEAP_MEMORY).getAcquiredPermits()).isEqualTo(0);
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 49a59444b62..d276c7996a6 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -368,17 +368,18 @@ public class LookupProxyHandler {
         listSizeHolder.getSizeAsync().thenAccept(initialSize -> {
             maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
                     AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, 
isPermitRequestCancelled, initialPermits -> {
-                        return clientCnx.newGetTopicsOfNamespace(command, 
requestId).whenComplete((r, t) -> {
+                        return clientCnx.newGetTopicsOfNamespace(command, 
requestId).handle((r, t) -> {
                             if (t != null) {
                                 log.warn("[{}] Failed to get TopicsOfNamespace 
{}: {}", clientAddress, namespaceName,
                                         t.getMessage());
                                 listSizeHolder.resetIfInitializing();
                                 
writeAndFlush(Commands.newError(clientRequestId, getServerError(t), 
t.getMessage()));
+                                return CompletableFuture.completedFuture(null);
                             } else {
                                 long actualSize = 
TopicListMemoryLimiter.estimateTopicListSize(
                                         
r.getNonPartitionedOrPartitionTopics());
                                 listSizeHolder.updateSize(actualSize);
-                                
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+                                return 
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
                                         isPermitRequestCancelled, permits -> {
                                             return 
handleWritingGetTopicsResponse(clientRequestId, r,
                                                     isPermitRequestCancelled);
@@ -392,7 +393,7 @@ public class LookupProxyHandler {
                                             return 
CompletableFuture.completedFuture(null);
                                         });
                             }
-                        }).thenApply(__ -> null);
+                        });
                     }, t -> {
                         log.warn("[{}] Failed to acquire initial heap memory 
permits for GetTopicsOfNamespace: {}",
                                 clientAddress, t.getMessage());
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPatternConsumerBackPressureMultipleConsumersTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPatternConsumerBackPressureMultipleConsumersTest.java
index 3263cc2985c..e8550db40b9 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPatternConsumerBackPressureMultipleConsumersTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPatternConsumerBackPressureMultipleConsumersTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.doReturn;
 import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
@@ -26,6 +27,8 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import 
org.apache.pulsar.client.api.PatternConsumerBackPressureMultipleConsumersTest;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.mockito.Mockito;
 import org.testng.annotations.AfterMethod;
@@ -75,6 +78,23 @@ public class 
ProxyPatternConsumerBackPressureMultipleConsumersTest extends
         super.cleanup();
     }
 
+    @Override
+    protected void validateThatTokensHaventLeakedOrIncreased() {
+        // validate broker's limiter
+        super.validateThatTokensHaventLeakedOrIncreased();
+        // validate proxy's limiter
+        AsyncDualMemoryLimiterImpl limiter =
+                proxyService.getMaxTopicListInFlightLimiter();
+        
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAvailablePermits())
+                .isEqualTo(proxyConfig.getMaxTopicListInFlightHeapMemSizeMB() 
* 1024 * 1024);
+        
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAcquiredPermits())
+                .isEqualTo(0);
+        
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY).getAvailablePermits())
+                
.isEqualTo(proxyConfig.getMaxTopicListInFlightDirectMemSizeMB() * 1024 * 1024);
+        
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY).getAcquiredPermits())
+                .isEqualTo(0);
+    }
+
     @Override
     protected String getClientServiceUrl() {
         return proxyService.getServiceUrl();

Reply via email to