lhotari commented on code in PR #24833:
URL: https://github.com/apache/pulsar/pull/24833#discussion_r2479071244
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2610,6 +2581,93 @@ protected void
handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet
}
}
+ private void internalHandleGetTopicsOfNamespace(String namespace,
NamespaceName namespaceName, long requestId,
+
CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String>
topicsPattern, Optional<String> topicsHash,
+ Semaphore lookupSemaphore)
{
+ BooleanSupplier isPermitRequestCancelled = () ->
!ctx().channel().isActive();
+ TopicListSizeResultCache.ResultHolder
+ listSizeHolder =
service.getTopicListSizeResultCache().getTopicListSize(namespaceName.toString(),
mode);
+ listSizeHolder.getSizeAsync().thenAccept(initialSize -> {
+ maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+ AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
isPermitRequestCancelled, initialPermits -> {
+ return
getBrokerService().pulsar().getNamespaceService()
+ .getListOfUserTopics(namespaceName, mode)
+ .thenAccept(topics -> {
+ long actualSize =
TopicListMemoryLimiter.estimateTopicListSize(topics);
+ listSizeHolder.updateSize(actualSize);
+
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
Review Comment:
> No, the implementation is like follows, which never covered the case I
said above
>
> * update
> * call `internalAcquire` if `realSize > estimatedSize`, see also
https://github.com/apache/pulsar/pull/24833/files#diff-1daf74806e4d1f752d9576bd63d71678a7a46cbc7a1cf56f71ba88719f87d346R173-R175
> * `internalAcquire` will pending the current thread or throws errors, see
also
https://github.com/apache/pulsar/pull/24833/files#diff-1daf74806e4d1f752d9576bd63d71678a7a46cbc7a1cf56f71ba88719f87d346R117-R122
I don't know exactly what you are referring to. I made some modifications so
that permits on the original instance are only released after successfully
acquiring the new permits. If you have something else in mind, it could be
useful to reveal more details.
> Instead of updating and acquiring additional permits if the actual size is
larger than estimated, we'd better improve as follows:
This is intentionally designed in the way it is. I'll explain in the
following paragraph.
> * The current request does not need to acquire permits again; only
updating the permits that were borrowed out is enough, even though the permits
were over-acquired
In the current design the original permit instance is reseted when the
permits are moved to a new instance successfully (that was now fixed). This is
intended behavior so that any later logic can always release the original
permit without side-effects. This makes it easier to prevent permits leaks when
such pattern is applied. For example, the
org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterUtil#withPermitsFuture
and
org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterUtil#acquireDirectMemoryPermitsAndWriteAndFlush
helpers are simpler to implement when there's 2 separate permit instances. You
can also review the implementation code and see why I came to this design
decision.
> * Updates the permits that were borrowed out, to limit the following
permits acquisition;
I wonder if you meant to point out the bug that there was previously where
original permits were released before the new permit (with updated amount) was
successfully acquired?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]