Copilot commented on code in PR #23907:
URL: https://github.com/apache/pulsar/pull/23907#discussion_r2498915149
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java:
##########
@@ -468,6 +472,71 @@ public void
testDeletePartitionedTopicIfCursorPropsNotEmpty(SubscriptionType sub
admin.topics().deletePartitionedTopic(topic);
}
+ @Test
+ public void testDelayedMessageCancel() throws Exception {
+ String topic =
BrokerTestUtil.newUniqueName("persistent://public/default/testDelayedMessageCancel");
+ final String subName = "shared-sub";
+ CountDownLatch latch = new CountDownLatch(99);
+ admin.topics().createPartitionedTopic(topic, 2);
+ Set<String> receivedMessages1 = ConcurrentHashMap.newKeySet();
+ Set<String> receivedMessages2 = ConcurrentHashMap.newKeySet();
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName(subName + "-1")
+ .subscriptionType(SubscriptionType.Shared)
+ .messageListener((Consumer<String> c, Message<String> msg) -> {
+ receivedMessages1.add(msg.getValue());
+ c.acknowledgeAsync(msg);
+ latch.countDown();
+ })
+ .subscribe();
+
+ @Cleanup
+ Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName(subName + "-2")
+ .subscriptionType(SubscriptionType.Shared)
+ .messageListener((Consumer<String> c, Message<String> msg) -> {
+ receivedMessages2.add(msg.getValue());
+ c.acknowledgeAsync(msg);
+ latch.countDown();
+ })
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ List<MessageId> messageIds = new ArrayList<>();
+
+ for (int i = 0; i < 100; i++) {
+ final long deliverAtTime = System.currentTimeMillis() + 3000L;
+ MessageId messageId = producer.newMessage()
+ .key(String.valueOf(i))
+ .value("msg-" + i)
+ .deliverAt(deliverAtTime)
+ .send();
+ messageIds.add(i, messageId);
+ }
+
+ final int cancelMessage = 50;
+ MessageIdImpl messageId = (MessageIdImpl)
messageIds.get(cancelMessage);
+
+ Map<String,String> ackMessageIds = new HashMap<>();
+ ackMessageIds.put(String.valueOf(messageId.getLedgerId()),
String.valueOf(messageId.getEntryId()));
+
+ admin.topics().skipMessages(topic + "-partition-0", subName + "-1",
ackMessageIds);
+ admin.topics().skipMessages(topic + "-partition-1", subName + "-1",
ackMessageIds);
+
+ assertTrue(latch.await(15, TimeUnit.SECONDS), "Not all messages were
received in time");
+ assertFalse((receivedMessages1.contains("msg-" + cancelMessage)
+ || receivedMessages2.contains("msg-" + cancelMessage))
Review Comment:
The assertion logic is incorrect. The condition `assertFalse((A || B) && C)`
will pass (return true for assertFalse) when either: (1) the message WAS
received, OR (2) the total count is NOT 99. This is the opposite of the
intended behavior. The assertion should verify that the message was NOT
received AND the count is 99. Consider:
`assertTrue(!receivedMessages1.contains(\"msg-\" + cancelMessage) &&
!receivedMessages2.contains(\"msg-\" + cancelMessage) &&
(receivedMessages1.size() + receivedMessages2.size() == 99))`
```suggestion
assertTrue(
!receivedMessages1.contains("msg-" + cancelMessage)
&& !receivedMessages2.contains("msg-" +
cancelMessage)
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -23,6 +23,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import io.netty.buffer.ByteBuf;
+import java.io.IOException;
Review Comment:
The import `java.io.IOException` is not used in this file and should be
removed.
```suggestion
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java:
##########
@@ -1595,6 +1595,42 @@ public void skipMessages(
}
}
+ @POST
+
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/skipByMessageIds")
+ @ApiOperation(value = "Skipping messages on a topic subscription.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 204, message = "Operation successful"),
+ @ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Don't have permission to
administrate resources on this tenant"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace or topic or
subscription does not exist"),
+ @ApiResponse(code = 405, message = "Skipping messages on a
partitioned topic is not allowed"),
Review Comment:
The API response documentation at line 1607 states 'Skipping messages on a
partitioned topic is not allowed', but the implementation in
`internalSkipByMessageIds` (lines 1962-1972) actually supports partitioned
topics by iterating through partitions. This documentation is misleading and
should either be removed or updated to reflect that partitioned topics are
supported.
```suggestion
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1948,6 +1947,109 @@ protected void internalSkipMessages(AsyncResponse
asyncResponse, String subName,
});
}
+ protected void internalSkipByMessageIds(AsyncResponse asyncResponse,
String subName, boolean authoritative,
+ Map<String, String> messageIds) {
+ CompletableFuture<Void> validationFuture =
validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName);
+ validationFuture = validationFuture.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
+ validationFuture.thenCompose(__ ->
getPartitionedTopicMetadataAsync(topicName, authoritative, false))
+ .thenAccept(partitionMetadata -> {
+ if (topicName.isPartitioned()) {
+
internalSkipByMessageIdsForNonPartitionedTopic(asyncResponse, messageIds,
+ subName, authoritative);
+ } else {
+ if (partitionMetadata.partitions > 0) {
+
internalSkipByMessageIdsForPartitionedTopic(asyncResponse, partitionMetadata,
+ messageIds, subName);
+ } else {
+
internalSkipByMessageIdsForNonPartitionedTopic(asyncResponse, messageIds,
+ subName, authoritative);
+ }
+ }
+ }).exceptionally(ex -> {
+ if (isNot307And404Exception(ex)) {
+ log.error("[{}] Failed to ack messages on topic {}:
{}", clientAppId(), topicName, ex);
Review Comment:
The log message says 'Failed to ack messages' but this is in the context of
skipping messages by message IDs. The message should be updated to 'Failed to
skip messages by message IDs on topic {}: {}' to accurately reflect the
operation being performed.
```suggestion
log.error("[{}] Failed to skip messages by message
IDs on topic {}: {}", clientAppId(), topicName, ex);
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1948,6 +1947,109 @@ protected void internalSkipMessages(AsyncResponse
asyncResponse, String subName,
});
}
+ protected void internalSkipByMessageIds(AsyncResponse asyncResponse,
String subName, boolean authoritative,
+ Map<String, String> messageIds) {
+ CompletableFuture<Void> validationFuture =
validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName);
+ validationFuture = validationFuture.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
+ validationFuture.thenCompose(__ ->
getPartitionedTopicMetadataAsync(topicName, authoritative, false))
+ .thenAccept(partitionMetadata -> {
+ if (topicName.isPartitioned()) {
+
internalSkipByMessageIdsForNonPartitionedTopic(asyncResponse, messageIds,
+ subName, authoritative);
+ } else {
+ if (partitionMetadata.partitions > 0) {
+
internalSkipByMessageIdsForPartitionedTopic(asyncResponse, partitionMetadata,
+ messageIds, subName);
+ } else {
+
internalSkipByMessageIdsForNonPartitionedTopic(asyncResponse, messageIds,
+ subName, authoritative);
+ }
+ }
+ }).exceptionally(ex -> {
+ if (isNot307And404Exception(ex)) {
+ log.error("[{}] Failed to ack messages on topic {}:
{}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
+ private void internalSkipByMessageIdsForPartitionedTopic(AsyncResponse
asyncResponse,
+
PartitionedTopicMetadata partitionMetadata,
+ Map<String,
String> messageIds,
+ String subName) {
+ final List<CompletableFuture<Void>> futures = new
ArrayList<>(partitionMetadata.partitions);
+ PulsarAdmin admin;
+ try {
+ admin = pulsar().getAdminClient();
+ } catch (PulsarServerException e) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ TopicName topicNamePartition = topicName.getPartition(i);
+ futures.add(admin
+ .topics()
+ .skipMessagesAsync(topicNamePartition.toString(), subName,
messageIds));
+ }
+ FutureUtil.waitForAll(futures).handle((result, exception) -> {
+ if (exception != null) {
+ Throwable t = FutureUtil.unwrapCompletionException(exception);
+ log.warn("[{}] Failed to ack messages on some partitions of
{}: {}",
Review Comment:
The log message says 'Failed to ack messages' but should say 'Failed to skip
messages by message IDs' to match the actual operation being performed in this
context.
```suggestion
log.warn("[{}] Failed to skip messages by message IDs on
some partitions of {}: {}",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]