Copilot commented on code in PR #23907:
URL: https://github.com/apache/pulsar/pull/23907#discussion_r2507828061
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java:
##########
@@ -38,6 +38,7 @@
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.GetStatsOptions;
+import org.apache.pulsar.broker.service.SkipEntry;
Review Comment:
The `SkipEntry` class is imported but missing from the PR. This will cause a
compilation error.
```suggestion
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -69,6 +76,7 @@
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryFilterSupport;
import org.apache.pulsar.broker.service.GetStatsOptions;
+import org.apache.pulsar.broker.service.SkipEntry;
Review Comment:
The `SkipEntry` class is imported but its definition is missing from the PR.
This will cause a compilation error. The class needs to be defined, likely as a
record or class with fields for ledgerId, entryId, and batchIndexes.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java:
##########
@@ -80,6 +80,8 @@ default long getNumberOfEntriesDelayed() {
CompletableFuture<Void> skipMessages(int numMessagesToSkip);
+ CompletableFuture<Void> skipMessages(List<SkipEntry> entries);
Review Comment:
The `SkipEntry` type is used but not defined in this PR. This interface
method references a missing class definition.
##########
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/SkipMessageIdsRequest.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import java.util.List;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Request DTO used by the admin client to submit a list of message IDs
+ * for skipping. It supports multiple formats and is serialized to JSON
+ * that the broker understands (polymorphic deserialization on server).
+ * <p>
+ * Supported types:
+ * - type = "byteArray": messageIds is List<String> of base64-encoded
MessageId.toByteArray()
+ * - type = "messageId": messageIds is List<MessageIdItem> (supports
batchIndex)
+ * - type = "map_of_ledgerId_entryId": messageIds is Map<String, String>
(legacy map)
+ */
+@Setter
+@Getter
+public class SkipMessageIdsRequest {
+ // optional; default is byteArray on server when messageIds is an array of
strings
+ private String type;
+ // List<String> | List<MessageIdItem> | Map<String, String>
+ private Object messageIds;
+
+ public SkipMessageIdsRequest() {
+ }
+
+ public static SkipMessageIdsRequest forByteArrays(List<String>
base64MessageIds) {
+ SkipMessageIdsRequest r = new SkipMessageIdsRequest();
+ r.setType("byteArray");
+ r.setMessageIds(base64MessageIds);
+ return r;
+ }
+
+ public static SkipMessageIdsRequest forMessageIds(List<MessageIdItem>
items) {
+ SkipMessageIdsRequest r = new SkipMessageIdsRequest();
+ r.setType("messageId");
+ r.setMessageIds(items);
+ return r;
+ }
+
+ /**
+ * Item representing a messageId as ledgerId, entryId and optional
batchIndex.
+ */
+ @Setter
+ @Getter
+ public static class MessageIdItem {
Review Comment:
[nitpick] Consider adding a no-argument constructor for MessageIdItem to
support certain serialization frameworks that may require it, even though
Lombok's @Setter might handle this.
##########
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java:
##########
@@ -855,6 +857,75 @@ void run() throws PulsarAdminException {
}
}
+ @Command(description = "Skip some messages for the subscription")
+ private class SkipMessages extends CliCommand {
+ @Parameters(description = "persistent://tenant/namespace/topic", arity
= "1")
+ private String topicName;
+
+ @Option(names = { "-s",
+ "--subscription" }, description = "Subscription to be skip
messages on", required = true)
Review Comment:
Corrected grammar in description from 'to be skip messages on' to 'to skip
messages on'.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -81,6 +84,7 @@
import
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
import org.apache.pulsar.broker.service.GetStatsOptions;
import org.apache.pulsar.broker.service.MessageExpirer;
+import org.apache.pulsar.broker.service.SkipEntry;
Review Comment:
The `SkipEntry` class is imported but its definition is not included in this
PR. This will prevent the code from compiling.
```suggestion
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java:
##########
@@ -468,6 +477,72 @@ public void
testDeletePartitionedTopicIfCursorPropsNotEmpty(SubscriptionType sub
admin.topics().deletePartitionedTopic(topic);
}
+ @Test
+ public void testDelayedMessageCancel() throws Exception {
+ String topic =
BrokerTestUtil.newUniqueName("persistent://public/default/testDelayedMessageCancel");
+ final String subName = "shared-sub";
+ CountDownLatch latch = new CountDownLatch(99);
+ admin.topics().createPartitionedTopic(topic, 2);
+ Set<String> receivedMessages1 = ConcurrentHashMap.newKeySet();
+ Set<String> receivedMessages2 = ConcurrentHashMap.newKeySet();
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName(subName + "-1")
+ .subscriptionType(SubscriptionType.Shared)
+ .messageListener((Consumer<String> c, Message<String> msg) -> {
+ receivedMessages1.add(msg.getValue());
+ c.acknowledgeAsync(msg);
+ latch.countDown();
+ })
+ .subscribe();
+
+ @Cleanup
+ Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName(subName + "-2")
+ .subscriptionType(SubscriptionType.Shared)
+ .messageListener((Consumer<String> c, Message<String> msg) -> {
+ receivedMessages2.add(msg.getValue());
+ c.acknowledgeAsync(msg);
+ latch.countDown();
+ })
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ List<MessageId> messageIds = new ArrayList<>();
+
+ for (int i = 0; i < 100; i++) {
+ final long deliverAtTime = System.currentTimeMillis() + 3000L;
+ MessageId messageId = producer.newMessage()
+ .key(String.valueOf(i))
+ .value("msg-" + i)
+ .deliverAt(deliverAtTime)
+ .send();
+ messageIds.add(i, messageId);
+ }
+
+ final int cancelMessage = 50;
+ MessageIdImpl messageId = (MessageIdImpl)
messageIds.get(cancelMessage);
+
+ SkipMessageIdsRequest.MessageIdItem item0 = new
SkipMessageIdsRequest.MessageIdItem(
+ messageId.getLedgerId(), messageId.getEntryId(), null);
+ SkipMessageIdsRequest req =
SkipMessageIdsRequest.forMessageIds(Collections.singletonList(item0));
+
+ admin.topics().skipMessages(topic + "-partition-0", subName + "-1",
req);
+ admin.topics().skipMessages(topic + "-partition-1", subName + "-1",
req);
+
+ assertTrue(latch.await(15, TimeUnit.SECONDS), "Not all messages were
received in time");
+ assertFalse((receivedMessages1.contains("msg-" + cancelMessage)
+ || receivedMessages2.contains("msg-" + cancelMessage))
+ && (receivedMessages1.size() +
receivedMessages2.size() == 99),
+ "msg-" + cancelMessage + " should have been cancelled but was
received");
Review Comment:
The assertion logic is incorrect. The condition `(A || B) && C` will only be
true if C is true AND at least one of A or B is true. This means the assertion
will pass (assertFalse succeeds) when either: 1) the message was not received
in either set, or 2) the total count is not 99. The test should separately
verify that the message is not in either set AND that exactly 99 messages were
received. The correct assertion should be:
`assertFalse(receivedMessages1.contains(\"msg-\" + cancelMessage) ||
receivedMessages2.contains(\"msg-\" + cancelMessage), \"msg should have been
cancelled\"); assertEquals(receivedMessages1.size() + receivedMessages2.size(),
99);`
```suggestion
assertFalse(receivedMessages1.contains("msg-" + cancelMessage)
|| receivedMessages2.contains("msg-" + cancelMessage),
"msg-" + cancelMessage + " should have been cancelled but
was received");
assertEquals(receivedMessages1.size() + receivedMessages2.size(),
99);
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1948,6 +1952,155 @@ protected void internalSkipMessages(AsyncResponse
asyncResponse, String subName,
});
}
+ protected void internalSkipByMessageIds(AsyncResponse asyncResponse,
String subName, boolean authoritative,
+ SkipMessageIdsRequest messageIds) {
+ CompletableFuture<Void> validationFuture =
validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName);
+ validationFuture = validationFuture.thenCompose(__ -> {
+ if (topicName.isGlobal()) {
+ return validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
+ validationFuture.thenCompose(__ ->
getPartitionedTopicMetadataAsync(topicName, authoritative, false))
+ .thenAccept(partitionMetadata -> {
+ if (topicName.isPartitioned()) {
+
internalSkipByMessageIdsForNonPartitionedTopic(asyncResponse, messageIds,
+ subName, authoritative);
+ } else {
+ if (partitionMetadata.partitions > 0) {
+
internalSkipByMessageIdsForPartitionedTopic(asyncResponse, partitionMetadata,
+ messageIds, subName);
+ } else {
+
internalSkipByMessageIdsForNonPartitionedTopic(asyncResponse, messageIds,
+ subName, authoritative);
+ }
+ }
+ }).exceptionally(ex -> {
+ if (isNot307And404Exception(ex)) {
+ log.error("[{}] Failed to ack messages on topic {}:
{}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
+ private void internalSkipByMessageIdsForPartitionedTopic(AsyncResponse
asyncResponse,
+
PartitionedTopicMetadata partitionMetadata,
+
SkipMessageIdsRequest messageIds,
+ String subName) {
+ final List<CompletableFuture<Void>> futures = new
ArrayList<>(partitionMetadata.partitions);
+ PulsarAdmin admin;
+ try {
+ admin = pulsar().getAdminClient();
+ } catch (PulsarServerException e) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ TopicName topicNamePartition = topicName.getPartition(i);
+ // Rebuild an Admin API request using the parsed items to avoid
legacy-map format
+
List<org.apache.pulsar.client.admin.SkipMessageIdsRequest.MessageIdItem> items
= new ArrayList<>();
+ for (SkipMessageIdsRequest.MessageIdItem it :
messageIds.getItems()) {
+ items.add(new
org.apache.pulsar.client.admin.SkipMessageIdsRequest.MessageIdItem(
+ it.getLedgerId(), it.getEntryId(),
it.getBatchIndex()));
+ }
+ org.apache.pulsar.client.admin.SkipMessageIdsRequest req =
+
org.apache.pulsar.client.admin.SkipMessageIdsRequest.forMessageIds(items);
+
+ futures.add(admin
+ .topics()
+ .skipMessagesAsync(topicNamePartition.toString(), subName,
req));
+ }
+ FutureUtil.waitForAll(futures).handle((result, exception) -> {
+ if (exception != null) {
+ Throwable t = FutureUtil.unwrapCompletionException(exception);
+ log.warn("[{}] Failed to ack messages on some partitions of
{}: {}",
+ clientAppId(), topicName, t.getMessage());
+ resumeAsyncResponseExceptionally(asyncResponse, t);
+ } else {
+ log.info("[{}] Successfully requested cancellation for delayed
message on"
+ + " all partitions of topic {}",
clientAppId(), topicName);
+ asyncResponse.resume(Response.noContent().build());
+ }
+ return null;
+ });
+ }
+
+ private void internalSkipByMessageIdsForNonPartitionedTopic(AsyncResponse
asyncResponse,
+
SkipMessageIdsRequest messageIds,
+ String subName,
+ boolean
authoritative) {
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(optTopic -> {
+ if (!(optTopic instanceof PersistentTopic
persistentTopic)) {
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
"Cancel delayed message on a non-persistent"
+ + " topic is not allowed");
+ }
+ log.info("[{}] Cancelling delayed message for subscription
{} on topic {}", clientAppId(),
+ subName, topicName);
+ return
internalSkipByMessageIdsForSubscriptionAsync(persistentTopic, subName,
messageIds);
+ })
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ Throwable t = FutureUtil.unwrapCompletionException(ex);
+ if (isNot307And404Exception(t)) {
+ log.error("[{}] Error in
internalSkipByMessageIdsForNonPartitionedTopic for {}: {}",
+ clientAppId(), topicName, t.getMessage(), t);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, t);
+ return null;
+ });
+ }
+
+ private CompletableFuture<Void>
internalSkipByMessageIdsForSubscriptionAsync(
+ PersistentTopic topic, String subName, SkipMessageIdsRequest
messageIds) {
+ Subscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topic.getName(), subName)));
+ }
+ // Build List<SkipEntry> from parsed items
+ Map<String, AggregatedSkip> aggregated = new LinkedHashMap<>();
+ for (SkipMessageIdsRequest.MessageIdItem it : messageIds.getItems()) {
+ long ledgerId = it.getLedgerId();
+ long entryId = it.getEntryId();
+ Integer batchIndex = it.getBatchIndex();
+ String key = ledgerId + ":" + entryId;
+ AggregatedSkip agg = aggregated.computeIfAbsent(key, k -> new
AggregatedSkip(ledgerId, entryId));
+ if (batchIndex == null) {
+ agg.full = true;
+ } else {
+ agg.indexes.add(batchIndex);
+ }
+ }
+ List<SkipEntry> skipEntries = new ArrayList<>(aggregated.size());
+ for (AggregatedSkip v : aggregated.values()) {
+ if (v.full) {
+ skipEntries.add(new SkipEntry(v.ledgerId, v.entryId, null));
+ } else {
+ // sort indexes to have deterministic order
+ List<Integer> idx = new ArrayList<>(v.indexes);
+ Collections.sort(idx);
Review Comment:
[nitpick] Use `idx.sort(null)` or `Collections.sort(idx)` is fine, but
consider using a more modern approach with `new
ArrayList<>(v.indexes).stream().sorted().collect(Collectors.toList())` or
simply `v.indexes.stream().sorted().toList()` if using Java 16+. However, the
current approach is acceptable.
```suggestion
List<Integer> idx =
v.indexes.stream().sorted().collect(Collectors.toList());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]