This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c8dbc3c9c22 [fix][broker] PIP-442: Fix race condition in async
semaphore permit updates that causes memory limits to become ineffective
(#25066)
c8dbc3c9c22 is described below
commit c8dbc3c9c2276d1324adc53f33a2f409238d121f
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Dec 12 13:40:10 2025 +0200
[fix][broker] PIP-442: Fix race condition in async semaphore permit updates
that causes memory limits to become ineffective (#25066)
---
.../apache/pulsar/broker/service/ServerCnx.java | 4 +-
...nConsumerBackPressureMultipleConsumersTest.java | 136 ++++++++++++---------
.../semaphore/AsyncDualMemoryLimiterImpl.java | 4 +-
.../common/semaphore/AsyncSemaphoreImpl.java | 68 +++++------
.../semaphore/AsyncDualMemoryLimiterImplTest.java | 24 ++++
.../pulsar/proxy/server/LookupProxyHandler.java | 7 +-
...nConsumerBackPressureMultipleConsumersTest.java | 20 +++
7 files changed, 162 insertions(+), 101 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a539f437e7f..098c5a9090d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2593,10 +2593,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
isPermitRequestCancelled, initialPermits -> {
return
getBrokerService().pulsar().getNamespaceService()
.getListOfUserTopics(namespaceName, mode)
- .thenAccept(topics -> {
+ .thenCompose(topics -> {
long actualSize =
TopicListMemoryLimiter.estimateTopicListSize(topics);
listSizeHolder.updateSize(actualSize);
-
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+ return
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
isPermitRequestCancelled, permits
-> {
boolean filterTopics = false;
// filter system topic
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java
index d91106ab683..15db34823e1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import io.netty.buffer.ByteBuf;
import java.io.Closeable;
@@ -41,6 +42,8 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl;
import org.apache.pulsar.common.stats.JvmMetrics;
import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.testng.annotations.AfterMethod;
@@ -96,69 +99,86 @@ public class
PatternConsumerBackPressureMultipleConsumersTest extends MockedPuls
}
};
- @Cleanup("shutdownNow")
- final ExecutorService executorService =
Executors.newFixedThreadPool(Runtime.getRuntime()
- .availableProcessors());
-
- @Cleanup
- PulsarClientSharedResources sharedResources =
- PulsarClientSharedResources.builder().build();
- List<PulsarClientImpl> clients = new ArrayList<>(numberOfClients);
- @Cleanup
- Closeable closeClients = () -> {
- for (PulsarClient client : clients) {
- try {
- client.close();
- } catch (PulsarClientException e) {
- log.error("Failed to close client {}", client, e);
+ {
+ @Cleanup("shutdownNow") final ExecutorService executorService =
+ Executors.newFixedThreadPool(Runtime.getRuntime()
+ .availableProcessors());
+
+ @Cleanup
+ PulsarClientSharedResources sharedResources =
+ PulsarClientSharedResources.builder().build();
+ List<PulsarClientImpl> clients = new ArrayList<>(numberOfClients);
+ @Cleanup
+ Closeable closeClients = () -> {
+ for (PulsarClient client : clients) {
+ try {
+ client.close();
+ } catch (PulsarClientException e) {
+ log.error("Failed to close client {}", client, e);
+ }
}
+ };
+ for (int i = 0; i < numberOfClients; i++) {
+ PulsarClientImpl client = (PulsarClientImpl)
PulsarClient.builder()
+ .serviceUrl(getClientServiceUrl())
+ .sharedResources(sharedResources)
+ .build();
+ clients.add(client);
}
- };
- for (int i = 0; i < numberOfClients; i++) {
- PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder()
- .serviceUrl(getClientServiceUrl())
- .sharedResources(sharedResources)
- .build();
- clients.add(client);
- }
- final AtomicInteger success = new AtomicInteger(0);
- final CountDownLatch latch = new CountDownLatch(requests);
- final Semaphore semaphore = new Semaphore(maxRequestsInFlight);
- for (int i = 0; i < requests; i++) {
- PulsarClientImpl pulsarClientImpl = clients.get(i %
numberOfClients);
- executorService.execute(() -> {
- semaphore.acquireUninterruptibly();
- try {
- pulsarClientImpl.getLookup()
-
.getTopicsUnderNamespace(NamespaceName.get("public", "default"),
-
CommandGetTopicsOfNamespace.Mode.PERSISTENT, ".*", "")
- .whenComplete((result, ex) -> {
- semaphore.release();
- if (ex == null) {
- success.incrementAndGet();
- } else {
- log.error("Failed to get topic list.", ex);
- }
- log.info(
- "latch-count: {}, succeed: {},
available direct mem: {} MB, free heap mem: {}"
- + " MB",
- latch.getCount(), success.get(),
-
(DirectMemoryUtils.jvmMaxDirectMemory() - JvmMetrics.getJvmDirectMemoryUsed())
- / (1024 * 1024),
Runtime.getRuntime().freeMemory() / (1024 * 1024));
- latch.countDown();
- });
- } catch (Exception e) {
- semaphore.release();
- latch.countDown();
- log.error("Failed to execute getTopicsUnderNamespace
request.", e);
- }
- });
+ final AtomicInteger success = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(requests);
+ final Semaphore semaphore = new Semaphore(maxRequestsInFlight);
+ for (int i = 0; i < requests; i++) {
+ PulsarClientImpl pulsarClientImpl = clients.get(i %
numberOfClients);
+ executorService.execute(() -> {
+ semaphore.acquireUninterruptibly();
+ try {
+ pulsarClientImpl.getLookup()
+
.getTopicsUnderNamespace(NamespaceName.get("public", "default"),
+
CommandGetTopicsOfNamespace.Mode.PERSISTENT, ".*", "")
+ .whenComplete((result, ex) -> {
+ semaphore.release();
+ if (ex == null) {
+ success.incrementAndGet();
+ } else {
+ log.error("Failed to get topic list.",
ex);
+ }
+ log.info("latch-count: {}, succeed: {},
available direct mem: {} MB, "
+ + "free heap mem: {} MB",
+ latch.getCount(), success.get(),
+
(DirectMemoryUtils.jvmMaxDirectMemory()
+ -
JvmMetrics.getJvmDirectMemoryUsed())
+ / (1024 * 1024),
Runtime.getRuntime().freeMemory() / (1024 * 1024));
+ latch.countDown();
+ });
+ } catch (Exception e) {
+ semaphore.release();
+ latch.countDown();
+ log.error("Failed to execute getTopicsUnderNamespace
request.", e);
+ }
+ });
+ }
+ latch.await();
+ assertEquals(success.get(), requests);
+
+ validateTopiclistPrometheusMetrics();
}
- latch.await();
- assertEquals(success.get(), requests);
- validateTopiclistPrometheusMetrics();
+ validateThatTokensHaventLeakedOrIncreased();
+ }
+
+ protected void validateThatTokensHaventLeakedOrIncreased() {
+ AsyncDualMemoryLimiterImpl limiter =
+ pulsar.getBrokerService().getMaxTopicListInFlightLimiter();
+
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAvailablePermits())
+
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightHeapMemSizeMB() *
1024 * 1024);
+
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAcquiredPermits())
+ .isEqualTo(0);
+
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY).getAvailablePermits())
+
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightDirectMemSizeMB() *
1024 * 1024);
+
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY).getAcquiredPermits())
+ .isEqualTo(0);
}
protected int getNumberOfClients() {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
index d6b31aa72ee..1b55d56d201 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.common.semaphore;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
@@ -93,7 +94,8 @@ public class AsyncDualMemoryLimiterImpl implements
AsyncDualMemoryLimiter, AutoC
new DualMemoryLimiterPermit(limitType, result));
}
- protected AsyncSemaphore getLimiter(LimitType limitType) {
+ @VisibleForTesting
+ public AsyncSemaphore getLimiter(LimitType limitType) {
switch (limitType) {
case HEAP_MEMORY:
return heapLimiter;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
index 7fc506753a8..bd5f5912921 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.common.semaphore;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@@ -33,7 +32,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
-import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Runnables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,10 +101,10 @@ public class AsyncSemaphoreImpl implements
AsyncSemaphore, AutoCloseable {
@Override
public CompletableFuture<AsyncSemaphorePermit> acquire(long permits,
BooleanSupplier isCancelled) {
- return internalAcquire(permits, permits, isCancelled);
+ return internalAcquire(permits, null, isCancelled);
}
- private CompletableFuture<AsyncSemaphorePermit> internalAcquire(long
permits, long acquirePermits,
+ private CompletableFuture<AsyncSemaphorePermit> internalAcquire(long
permits, SemaphorePermit previousPermit,
BooleanSupplier isCancelled) {
validatePermits(permits);
@@ -122,7 +120,7 @@ public class AsyncSemaphoreImpl implements AsyncSemaphore,
AutoCloseable {
return future;
}
- PendingRequest request = new PendingRequest(permits, acquirePermits,
future, isCancelled);
+ PendingRequest request = new PendingRequest(permits, previousPermit,
future, isCancelled);
if (!queue.offer(request)) {
future.completeExceptionally(new PermitAcquireQueueFullException(
"Semaphore queue is full"));
@@ -177,34 +175,21 @@ public class AsyncSemaphoreImpl implements
AsyncSemaphore, AutoCloseable {
long oldPermits = permit.getPermits();
long additionalPermits = newPermits - oldPermits;
if (additionalPermits > 0) {
- CompletableFuture<AsyncSemaphorePermit> acquireFuture =
- internalAcquire(newPermits, additionalPermits,
isCancelled);
- // return a future that completes after original permits have been
released when the acquisition
- // has been successfully completed
- CompletableFuture<AsyncSemaphorePermit> returnedFuture =
- acquireFuture.thenApply(p -> {
- // mark the old permits as released without
adding the permits to availablePermits
- castToImplementation(permit).releasePermits();
- return p;
- });
- // add cancellation support for returned future, so that it
cancels the acquireFuture if the returnedFuture
- // is cancelled
- returnedFuture.whenComplete((p, t) -> {
- if (t != null && FutureUtil.unwrapCompletionException(t)
instanceof CancellationException) {
- acquireFuture.cancel(false);
- }
- });
- return returnedFuture;
- }
- if (additionalPermits < 0) {
- // new permits are less than the old ones, so we return the
difference
- availablePermits.addAndGet(-additionalPermits);
- processQueue();
+ return internalAcquire(newPermits, castToImplementation(permit),
isCancelled);
}
// mark the old permits as released without adding the permits to
availablePermits
- castToImplementation(permit).releasePermits();
- // return the new permits immediately
- return CompletableFuture.completedFuture(new
SemaphorePermit(newPermits));
+ long leftoverPermits = castToImplementation(permit).releasePermits() -
newPermits;
+ if (leftoverPermits >= 0) {
+ if (leftoverPermits > 0) {
+ // new permits are less than the old ones, so we return the
difference
+ availablePermits.addAndGet(leftoverPermits);
+ processQueue();
+ }
+ // return the new permits immediately
+ return CompletableFuture.completedFuture(new
SemaphorePermit(newPermits));
+ } else {
+ return acquire(newPermits, isCancelled);
+ }
}
@Override
@@ -282,8 +267,9 @@ public class AsyncSemaphoreImpl implements AsyncSemaphore,
AutoCloseable {
continue;
}
- if (request.acquirePermits <= current) {
- availablePermits.addAndGet(-request.acquirePermits);
+ if (request.getRequiredPermits() <= current) {
+ long requiredPermitsReusingPrevious =
request.getRequiredPermitsReusingPrevious();
+ availablePermits.addAndGet(-requiredPermitsReusingPrevious);
request.cancelTimeoutTask();
queue.remove(request);
SemaphorePermit permit = new SemaphorePermit(request.permits);
@@ -291,7 +277,7 @@ public class AsyncSemaphoreImpl implements AsyncSemaphore,
AutoCloseable {
boolean futureCompleted = request.future.complete(permit);
if (!futureCompleted) {
// request was cancelled by user code, return permits
- availablePermits.addAndGet(request.acquirePermits);
+ availablePermits.addAndGet(requiredPermitsReusingPrevious);
}
} else {
break;
@@ -318,16 +304,16 @@ public class AsyncSemaphoreImpl implements
AsyncSemaphore, AutoCloseable {
private static class PendingRequest {
final long permits;
- private final long acquirePermits;
+ private final SemaphorePermit previousPermit;
final CompletableFuture<AsyncSemaphorePermit> future;
private final BooleanSupplier isCancelled;
private volatile ScheduledFuture<?> timeoutTask;
private final long requestCreatedNanos = System.nanoTime();
- PendingRequest(long permits, long acquirePermits,
CompletableFuture<AsyncSemaphorePermit> future,
+ PendingRequest(long permits, SemaphorePermit previousPermit,
CompletableFuture<AsyncSemaphorePermit> future,
BooleanSupplier isCancelled) {
this.permits = permits;
- this.acquirePermits = acquirePermits;
+ this.previousPermit = previousPermit;
this.future = future;
this.isCancelled = isCancelled;
}
@@ -346,6 +332,14 @@ public class AsyncSemaphoreImpl implements AsyncSemaphore,
AutoCloseable {
long getAgeNanos() {
return System.nanoTime() - requestCreatedNanos;
}
+
+ long getRequiredPermits() {
+ return previousPermit == null ? permits : permits -
previousPermit.getPermits();
+ }
+
+ long getRequiredPermitsReusingPrevious() {
+ return previousPermit == null ? permits : permits -
previousPermit.releasePermits();
+ }
}
private static class SemaphorePermit implements AsyncSemaphorePermit {
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImplTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImplTest.java
index fd23d95151c..3c972ce1640 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImplTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImplTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.common.semaphore;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -32,6 +33,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BooleanSupplier;
import
org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit;
import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter.LimitType;
import
org.apache.pulsar.common.semaphore.AsyncSemaphore.PermitAcquireAlreadyClosedException;
@@ -718,4 +721,25 @@ public class AsyncDualMemoryLimiterImplTest {
limiter.release(permit);
}
}
+
+ @Test(invocationCount = 100)
+ public void testUpdateHeapPermitsIncreaseWithReleasedPermits() throws
Exception {
+ limiter = new AsyncDualMemoryLimiterImpl(100000, 10, 5000, 1000, 100,
5000);
+
+ BooleanSupplier cancelled = () -> false;
+ AtomicReference<CompletableFuture<Void>> future2Ref = new
AtomicReference<>();
+
+ CompletableFuture<Void> future =
+ limiter.withAcquiredPermits(100, LimitType.HEAP_MEMORY,
cancelled, permit -> {
+ future2Ref.set(limiter.withUpdatedPermits(permit, 200,
cancelled, updatedPermit -> {
+ return CompletableFuture.supplyAsync(() ->
null);
+ }, CompletableFuture::failedFuture));
+ return CompletableFuture.completedFuture(null);
+ }, CompletableFuture::failedFuture);
+
+ assertThat(future).succeedsWithin(1, TimeUnit.SECONDS);
+ assertThat(future2Ref.get()).succeedsWithin(1, TimeUnit.SECONDS);
+
assertThat(limiter.getLimiter(LimitType.HEAP_MEMORY).getAvailablePermits()).isEqualTo(100000);
+
assertThat(limiter.getLimiter(LimitType.HEAP_MEMORY).getAcquiredPermits()).isEqualTo(0);
+ }
}
\ No newline at end of file
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 49a59444b62..d276c7996a6 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -368,17 +368,18 @@ public class LookupProxyHandler {
listSizeHolder.getSizeAsync().thenAccept(initialSize -> {
maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
isPermitRequestCancelled, initialPermits -> {
- return clientCnx.newGetTopicsOfNamespace(command,
requestId).whenComplete((r, t) -> {
+ return clientCnx.newGetTopicsOfNamespace(command,
requestId).handle((r, t) -> {
if (t != null) {
log.warn("[{}] Failed to get TopicsOfNamespace
{}: {}", clientAddress, namespaceName,
t.getMessage());
listSizeHolder.resetIfInitializing();
writeAndFlush(Commands.newError(clientRequestId, getServerError(t),
t.getMessage()));
+ return CompletableFuture.completedFuture(null);
} else {
long actualSize =
TopicListMemoryLimiter.estimateTopicListSize(
r.getNonPartitionedOrPartitionTopics());
listSizeHolder.updateSize(actualSize);
-
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+ return
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
isPermitRequestCancelled, permits -> {
return
handleWritingGetTopicsResponse(clientRequestId, r,
isPermitRequestCancelled);
@@ -392,7 +393,7 @@ public class LookupProxyHandler {
return
CompletableFuture.completedFuture(null);
});
}
- }).thenApply(__ -> null);
+ });
}, t -> {
log.warn("[{}] Failed to acquire initial heap memory
permits for GetTopicsOfNamespace: {}",
clientAddress, t.getMessage());
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPatternConsumerBackPressureMultipleConsumersTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPatternConsumerBackPressureMultipleConsumersTest.java
index 3263cc2985c..e8550db40b9 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPatternConsumerBackPressureMultipleConsumersTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPatternConsumerBackPressureMultipleConsumersTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.proxy.server;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doReturn;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
@@ -26,6 +27,8 @@ import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import
org.apache.pulsar.client.api.PatternConsumerBackPressureMultipleConsumersTest;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.mockito.Mockito;
import org.testng.annotations.AfterMethod;
@@ -75,6 +78,23 @@ public class
ProxyPatternConsumerBackPressureMultipleConsumersTest extends
super.cleanup();
}
+ @Override
+ protected void validateThatTokensHaventLeakedOrIncreased() {
+ // validate broker's limiter
+ super.validateThatTokensHaventLeakedOrIncreased();
+ // validate proxy's limiter
+ AsyncDualMemoryLimiterImpl limiter =
+ proxyService.getMaxTopicListInFlightLimiter();
+
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAvailablePermits())
+ .isEqualTo(proxyConfig.getMaxTopicListInFlightHeapMemSizeMB()
* 1024 * 1024);
+
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAcquiredPermits())
+ .isEqualTo(0);
+
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY).getAvailablePermits())
+
.isEqualTo(proxyConfig.getMaxTopicListInFlightDirectMemSizeMB() * 1024 * 1024);
+
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY).getAcquiredPermits())
+ .isEqualTo(0);
+ }
+
@Override
protected String getClientServiceUrl() {
return proxyService.getServiceUrl();