Copilot commented on code in PR #25070:
URL: https://github.com/apache/pulsar/pull/25070#discussion_r2620330095


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java:
##########
@@ -366,27 +366,30 @@ public void sendEndTxnErrorResponse(long requestId, TxnID 
txnID, ServerError err
 
     /***
      * @param topics topic names which are matching, the topic name contains 
the partition suffix.
+     * @return

Review Comment:
   The @return tag in the Javadoc comment is empty and does not describe what 
the method returns. It should describe that the method returns a 
CompletableFuture<Void> that completes when the operation finishes.
   ```suggestion
        * @return a CompletableFuture&lt;Void&gt; that completes when the 
operation finishes
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java:
##########
@@ -366,27 +366,30 @@ public void sendEndTxnErrorResponse(long requestId, TxnID 
txnID, ServerError err
 
     /***
      * @param topics topic names which are matching, the topic name contains 
the partition suffix.
+     * @return
      */
     @Override
-    public void sendWatchTopicListSuccess(long requestId, long watcherId, 
String topicsHash, List<String> topics) {
+    public CompletableFuture<Void> sendWatchTopicListSuccess(long requestId, 
long watcherId, String topicsHash,
+                                                             List<String> 
topics,
+                                                             
Consumer<Throwable> permitAcquireErrorHandler) {
         BaseCommand command = Commands.newWatchTopicListSuccess(requestId, 
watcherId, topicsHash, topics);
-        interceptAndWriteCommand(command);
+        safeIntercept(command, cnx);
+        return acquireDirectMemoryPermitsAndWriteAndFlush(cnx.ctx(), 
maxTopicListInFlightLimiter, () -> !cnx.isActive(),
+                command, permitAcquireErrorHandler);
     }
 
     /***
      * {@inheritDoc}
+     * @return

Review Comment:
   The @return tag in the Javadoc comment is empty and does not describe what 
the method returns. It should describe that the method returns a 
CompletableFuture<Void> that completes when the operation finishes.
   ```suggestion
        * @return a CompletableFuture that completes when the watch topic list 
update operation finishes
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java:
##########
@@ -88,10 +178,37 @@ public void testCommandWatchSuccessResponse() {
         List<String> topics = 
Collections.singletonList("persistent://tenant/ns/topic1");
         String hash = TopicList.calculateHash(topics);
         topicListFuture.complete(topics);
-        Assert.assertEquals(1, lookupSemaphore.availablePermits());
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(1, 
lookupSemaphore.availablePermits()));
         verify(topicResources).registerPersistentTopicListener(
                 eq(NamespaceName.get("tenant/ns")), 
any(TopicListService.TopicListWatcher.class));
-        verify(connection.getCommandSender()).sendWatchTopicListSuccess(7, 13, 
hash, topics);
+        
verify(connection.getCommandSender()).sendWatchTopicListSuccess(eq(7L), 
eq(13L), eq(hash), eq(topics), any());
+    }
+
+    @Test
+    public void testCommandWatchSuccessResponseWhenOutOfPermits() throws 
ExecutionException, InterruptedException {
+        // acquire all permits
+        AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit permit =
+                memoryLimiter.acquire(1_000_000, 
AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
+                                Boolean.FALSE::booleanValue)
+                        .get();
+        topicListService.handleWatchTopicList(
+                NamespaceName.get("tenant/ns"),
+                13,
+                7,
+                "persistent://tenant/ns/topic\\d",
+                topicsPatternImplementation, null,
+                lookupSemaphore);
+        List<String> topics = 
Collections.singletonList("persistent://tenant/ns/topic1");
+        String hash = TopicList.calculateHash(topics);
+        topicListFuture.complete(topics);
+        // wait for acquisition to timeout a few times
+        Thread.sleep(2000);

Review Comment:
   Using Thread.sleep in tests is a code smell that makes tests brittle and 
slower. Consider using Awaitility or similar mechanisms to wait for specific 
conditions instead of arbitrary sleep durations.
   ```suggestion
           Awaitility.await().atMost(Duration.ofSeconds(2))
                   .untilAsserted(() -> Assert.assertEquals(0, 
lookupSemaphore.availablePermits()));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to