Denovo1998 commented on code in PR #24833:
URL: https://github.com/apache/pulsar/pull/24833#discussion_r2426451067


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2606,6 +2577,77 @@ protected void 
handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet
         }
     }
 
+    private void internalHandleGetTopicsOfNamespace(String namespace, 
NamespaceName namespaceName, long requestId,
+                                                    
CommandGetTopicsOfNamespace.Mode mode,
+                                                    Optional<String> 
topicsPattern, Optional<String> topicsHash,
+                                                    Semaphore lookupSemaphore) 
{
+        BooleanSupplier isPermitRequestCancelled = () -> 
!ctx().channel().isActive();
+        
maxTopicListInFlightLimiter.withAcquiredPermits(INITIAL_TOPIC_LIST_HEAP_PERMITS_SIZE,
+                AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, 
isPermitRequestCancelled, initialPermits -> {
+                    return 
getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName,
 mode)
+                            .thenAccept(topics -> {
+                                long actualSize = 
topics.stream().mapToInt(String::length).sum();
+                                
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+                                        isPermitRequestCancelled, permits -> {
+                                    boolean filterTopics = false;
+                                    // filter system topic
+                                    List<String> filteredTopics = topics;
+
+                                    if (enableSubscriptionPatternEvaluation && 
topicsPattern.isPresent()) {
+                                        if (topicsPattern.get().length() <= 
maxSubscriptionPatternLength) {
+                                            filterTopics = true;
+                                            filteredTopics = 
TopicList.filterTopics(filteredTopics, topicsPattern.get(),
+                                                    
topicsPatternImplementation);
+                                        } else {
+                                            log.info("[{}] Subscription 
pattern provided [{}] was longer than "
+                                                            + "maximum {}.", 
remoteAddress, topicsPattern.get(),
+                                                    
maxSubscriptionPatternLength);
+                                        }
+                                    }
+                                    String hash = 
TopicList.calculateHash(filteredTopics);
+                                    boolean hashUnchanged = 
topicsHash.isPresent() && topicsHash.get().equals(hash);
+                                    if (hashUnchanged) {
+                                        filteredTopics = 
Collections.emptyList();
+                                    }
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("[{}] Received 
CommandGetTopicsOfNamespace for namespace "
+                                                        + "[//{}] by {}, 
size:{}", remoteAddress, namespace,
+                                                requestId,
+                                                topics.size());
+                                    }
+                                    
commandSender.sendGetTopicsOfNamespaceResponse(filteredTopics, hash, 
filterTopics,
+                                            !hashUnchanged, requestId, ex -> {
+                                                log.warn("[{}] Failed to 
acquire direct memory permits for "
+                                                        + 
"GetTopicsOfNamespace: {}", remoteAddress, ex.getMessage());
+                                                
commandSender.sendErrorResponse(requestId, ServerError.TooManyRequests,
+                                                        "Cannot acquire 
permits for direct memory");
+                                            });
+                                    return 
CompletableFuture.completedFuture(null);
+                                }, t -> {
+                                    log.warn("[{}] Failed to acquire heap 
memory permits for "
+                                            + "GetTopicsOfNamespace: {}", 
remoteAddress, t.getMessage());
+                                    writeAndFlush(Commands.newError(requestId, 
ServerError.TooManyRequests,
+                                            "Failed due to heap memory limit 
exceeded"));
+                                    return 
CompletableFuture.completedFuture(null);
+                                });
+                            }).whenComplete((__, ___) -> {
+                                lookupSemaphore.release();
+                            }).exceptionally(ex -> {
+                                log.warn("[{}] Error GetTopicsOfNamespace for 
namespace [//{}] by {}", remoteAddress,
+                                        namespace, requestId);
+                                commandSender.sendErrorResponse(requestId,
+                                        
BrokerServiceException.getClientErrorCode(new ServerMetadataException(ex)),
+                                        ex.getMessage());
+                                return null;
+                            });
+                }, t -> {
+                    log.warn("[{}] Failed to acquire initial heap memory 
permits for GetTopicsOfNamespace: {}",
+                            remoteAddress, t.getMessage());
+                    writeAndFlush(Commands.newError(requestId, 
ServerError.TooManyRequests,
+                            "Failed due to heap memory limit exceeded"));
+                    return CompletableFuture.completedFuture(null);

Review Comment:
   Should we call `lookupSemaphore.release();` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to