Denovo1998 commented on code in PR #24833:
URL: https://github.com/apache/pulsar/pull/24833#discussion_r2426451067
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2606,6 +2577,77 @@ protected void
handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet
}
}
+ private void internalHandleGetTopicsOfNamespace(String namespace,
NamespaceName namespaceName, long requestId,
+
CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String>
topicsPattern, Optional<String> topicsHash,
+ Semaphore lookupSemaphore)
{
+ BooleanSupplier isPermitRequestCancelled = () ->
!ctx().channel().isActive();
+
maxTopicListInFlightLimiter.withAcquiredPermits(INITIAL_TOPIC_LIST_HEAP_PERMITS_SIZE,
+ AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
isPermitRequestCancelled, initialPermits -> {
+ return
getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName,
mode)
+ .thenAccept(topics -> {
+ long actualSize =
topics.stream().mapToInt(String::length).sum();
+
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+ isPermitRequestCancelled, permits -> {
+ boolean filterTopics = false;
+ // filter system topic
+ List<String> filteredTopics = topics;
+
+ if (enableSubscriptionPatternEvaluation &&
topicsPattern.isPresent()) {
+ if (topicsPattern.get().length() <=
maxSubscriptionPatternLength) {
+ filterTopics = true;
+ filteredTopics =
TopicList.filterTopics(filteredTopics, topicsPattern.get(),
+
topicsPatternImplementation);
+ } else {
+ log.info("[{}] Subscription
pattern provided [{}] was longer than "
+ + "maximum {}.",
remoteAddress, topicsPattern.get(),
+
maxSubscriptionPatternLength);
+ }
+ }
+ String hash =
TopicList.calculateHash(filteredTopics);
+ boolean hashUnchanged =
topicsHash.isPresent() && topicsHash.get().equals(hash);
+ if (hashUnchanged) {
+ filteredTopics =
Collections.emptyList();
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Received
CommandGetTopicsOfNamespace for namespace "
+ + "[//{}] by {},
size:{}", remoteAddress, namespace,
+ requestId,
+ topics.size());
+ }
+
commandSender.sendGetTopicsOfNamespaceResponse(filteredTopics, hash,
filterTopics,
+ !hashUnchanged, requestId, ex -> {
+ log.warn("[{}] Failed to
acquire direct memory permits for "
+ +
"GetTopicsOfNamespace: {}", remoteAddress, ex.getMessage());
+
commandSender.sendErrorResponse(requestId, ServerError.TooManyRequests,
+ "Cannot acquire
permits for direct memory");
+ });
+ return
CompletableFuture.completedFuture(null);
+ }, t -> {
+ log.warn("[{}] Failed to acquire heap
memory permits for "
+ + "GetTopicsOfNamespace: {}",
remoteAddress, t.getMessage());
+ writeAndFlush(Commands.newError(requestId,
ServerError.TooManyRequests,
+ "Failed due to heap memory limit
exceeded"));
+ return
CompletableFuture.completedFuture(null);
+ });
+ }).whenComplete((__, ___) -> {
+ lookupSemaphore.release();
+ }).exceptionally(ex -> {
+ log.warn("[{}] Error GetTopicsOfNamespace for
namespace [//{}] by {}", remoteAddress,
+ namespace, requestId);
+ commandSender.sendErrorResponse(requestId,
+
BrokerServiceException.getClientErrorCode(new ServerMetadataException(ex)),
+ ex.getMessage());
+ return null;
+ });
+ }, t -> {
+ log.warn("[{}] Failed to acquire initial heap memory
permits for GetTopicsOfNamespace: {}",
+ remoteAddress, t.getMessage());
+ writeAndFlush(Commands.newError(requestId,
ServerError.TooManyRequests,
+ "Failed due to heap memory limit exceeded"));
+ return CompletableFuture.completedFuture(null);
Review Comment:
Should we call `lookupSemaphore.release();` here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]