Copilot commented on code in PR #25070:
URL: https://github.com/apache/pulsar/pull/25070#discussion_r2620330095
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java:
##########
@@ -366,27 +366,30 @@ public void sendEndTxnErrorResponse(long requestId, TxnID
txnID, ServerError err
/***
* @param topics topic names which are matching, the topic name contains
the partition suffix.
+ * @return
Review Comment:
The @return tag in the Javadoc comment is empty and does not describe what
the method returns. It should describe that the method returns a
CompletableFuture<Void> that completes when the operation finishes.
```suggestion
* @return a CompletableFuture<Void> that completes when the
operation finishes
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java:
##########
@@ -366,27 +366,30 @@ public void sendEndTxnErrorResponse(long requestId, TxnID
txnID, ServerError err
/***
* @param topics topic names which are matching, the topic name contains
the partition suffix.
+ * @return
*/
@Override
- public void sendWatchTopicListSuccess(long requestId, long watcherId,
String topicsHash, List<String> topics) {
+ public CompletableFuture<Void> sendWatchTopicListSuccess(long requestId,
long watcherId, String topicsHash,
+ List<String>
topics,
+
Consumer<Throwable> permitAcquireErrorHandler) {
BaseCommand command = Commands.newWatchTopicListSuccess(requestId,
watcherId, topicsHash, topics);
- interceptAndWriteCommand(command);
+ safeIntercept(command, cnx);
+ return acquireDirectMemoryPermitsAndWriteAndFlush(cnx.ctx(),
maxTopicListInFlightLimiter, () -> !cnx.isActive(),
+ command, permitAcquireErrorHandler);
}
/***
* {@inheritDoc}
+ * @return
Review Comment:
The @return tag in the Javadoc comment is empty and does not describe what
the method returns. It should describe that the method returns a
CompletableFuture<Void> that completes when the operation finishes.
```suggestion
* @return a CompletableFuture that completes when the watch topic list
update operation finishes
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java:
##########
@@ -88,10 +178,37 @@ public void testCommandWatchSuccessResponse() {
List<String> topics =
Collections.singletonList("persistent://tenant/ns/topic1");
String hash = TopicList.calculateHash(topics);
topicListFuture.complete(topics);
- Assert.assertEquals(1, lookupSemaphore.availablePermits());
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(1,
lookupSemaphore.availablePermits()));
verify(topicResources).registerPersistentTopicListener(
eq(NamespaceName.get("tenant/ns")),
any(TopicListService.TopicListWatcher.class));
- verify(connection.getCommandSender()).sendWatchTopicListSuccess(7, 13,
hash, topics);
+
verify(connection.getCommandSender()).sendWatchTopicListSuccess(eq(7L),
eq(13L), eq(hash), eq(topics), any());
+ }
+
+ @Test
+ public void testCommandWatchSuccessResponseWhenOutOfPermits() throws
ExecutionException, InterruptedException {
+ // acquire all permits
+ AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit permit =
+ memoryLimiter.acquire(1_000_000,
AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
+ Boolean.FALSE::booleanValue)
+ .get();
+ topicListService.handleWatchTopicList(
+ NamespaceName.get("tenant/ns"),
+ 13,
+ 7,
+ "persistent://tenant/ns/topic\\d",
+ topicsPatternImplementation, null,
+ lookupSemaphore);
+ List<String> topics =
Collections.singletonList("persistent://tenant/ns/topic1");
+ String hash = TopicList.calculateHash(topics);
+ topicListFuture.complete(topics);
+ // wait for acquisition to timeout a few times
+ Thread.sleep(2000);
Review Comment:
Using Thread.sleep in tests is a code smell that makes tests brittle and
slower. Consider using Awaitility or similar mechanisms to wait for specific
conditions instead of arbitrary sleep durations.
```suggestion
Awaitility.await().atMost(Duration.ofSeconds(2))
.untilAsserted(() -> Assert.assertEquals(0,
lookupSemaphore.availablePermits()));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]