Copilot commented on code in PR #23907:
URL: https://github.com/apache/pulsar/pull/23907#discussion_r2507828061


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java:
##########
@@ -38,6 +38,7 @@
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.GetStatsOptions;
+import org.apache.pulsar.broker.service.SkipEntry;

Review Comment:
   The `SkipEntry` class is imported but missing from the PR. This will cause a 
compilation error.
   ```suggestion
   
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -69,6 +76,7 @@
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.EntryFilterSupport;
 import org.apache.pulsar.broker.service.GetStatsOptions;
+import org.apache.pulsar.broker.service.SkipEntry;

Review Comment:
   The `SkipEntry` class is imported but its definition is missing from the PR. 
This will cause a compilation error. The class needs to be defined, likely as a 
record or class with fields for ledgerId, entryId, and batchIndexes.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java:
##########
@@ -80,6 +80,8 @@ default long getNumberOfEntriesDelayed() {
 
     CompletableFuture<Void> skipMessages(int numMessagesToSkip);
 
+    CompletableFuture<Void> skipMessages(List<SkipEntry> entries);

Review Comment:
   The `SkipEntry` type is used but not defined in this PR. This interface 
method references a missing class definition.



##########
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/SkipMessageIdsRequest.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import java.util.List;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Request DTO used by the admin client to submit a list of message IDs
+ * for skipping. It supports multiple formats and is serialized to JSON
+ * that the broker understands (polymorphic deserialization on server).
+ * <p>
+ * Supported types:
+ * - type = "byteArray": messageIds is List<String> of base64-encoded 
MessageId.toByteArray()
+ * - type = "messageId": messageIds is List<MessageIdItem> (supports 
batchIndex)
+ * - type = "map_of_ledgerId_entryId": messageIds is Map<String, String> 
(legacy map)
+ */
+@Setter
+@Getter
+public class SkipMessageIdsRequest {
+    // optional; default is byteArray on server when messageIds is an array of 
strings
+    private String type;
+    // List<String> | List<MessageIdItem> | Map<String, String>
+    private Object messageIds;
+
+    public SkipMessageIdsRequest() {
+    }
+
+    public static SkipMessageIdsRequest forByteArrays(List<String> 
base64MessageIds) {
+        SkipMessageIdsRequest r = new SkipMessageIdsRequest();
+        r.setType("byteArray");
+        r.setMessageIds(base64MessageIds);
+        return r;
+    }
+
+    public static SkipMessageIdsRequest forMessageIds(List<MessageIdItem> 
items) {
+        SkipMessageIdsRequest r = new SkipMessageIdsRequest();
+        r.setType("messageId");
+        r.setMessageIds(items);
+        return r;
+    }
+
+    /**
+     * Item representing a messageId as ledgerId, entryId and optional 
batchIndex.
+     */
+    @Setter
+    @Getter
+    public static class MessageIdItem {

Review Comment:
   [nitpick] Consider adding a no-argument constructor for MessageIdItem to 
support certain serialization frameworks that may require it, even though 
Lombok's @Setter might handle this.



##########
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java:
##########
@@ -855,6 +857,75 @@ void run() throws PulsarAdminException {
         }
     }
 
+    @Command(description = "Skip some messages for the subscription")
+    private class SkipMessages extends CliCommand {
+        @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
+        private String topicName;
+
+        @Option(names = { "-s",
+                "--subscription" }, description = "Subscription to be skip 
messages on", required = true)

Review Comment:
   Corrected grammar in description from 'to be skip messages on' to 'to skip 
messages on'.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -81,6 +84,7 @@
 import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
 import org.apache.pulsar.broker.service.GetStatsOptions;
 import org.apache.pulsar.broker.service.MessageExpirer;
+import org.apache.pulsar.broker.service.SkipEntry;

Review Comment:
   The `SkipEntry` class is imported but its definition is not included in this 
PR. This will prevent the code from compiling.
   ```suggestion
   
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java:
##########
@@ -468,6 +477,72 @@ public void 
testDeletePartitionedTopicIfCursorPropsNotEmpty(SubscriptionType sub
         admin.topics().deletePartitionedTopic(topic);
     }
 
+    @Test
+    public void testDelayedMessageCancel() throws Exception {
+        String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/testDelayedMessageCancel");
+        final String subName = "shared-sub";
+        CountDownLatch latch = new CountDownLatch(99);
+        admin.topics().createPartitionedTopic(topic, 2);
+        Set<String> receivedMessages1 = ConcurrentHashMap.newKeySet();
+        Set<String> receivedMessages2 = ConcurrentHashMap.newKeySet();
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subName + "-1")
+                .subscriptionType(SubscriptionType.Shared)
+                .messageListener((Consumer<String> c, Message<String> msg) -> {
+                    receivedMessages1.add(msg.getValue());
+                    c.acknowledgeAsync(msg);
+                    latch.countDown();
+                })
+                .subscribe();
+
+        @Cleanup
+        Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subName + "-2")
+                .subscriptionType(SubscriptionType.Shared)
+                .messageListener((Consumer<String> c, Message<String> msg) -> {
+                    receivedMessages2.add(msg.getValue());
+                    c.acknowledgeAsync(msg);
+                    latch.countDown();
+                })
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        List<MessageId> messageIds = new ArrayList<>();
+
+        for (int i = 0; i < 100; i++) {
+            final long deliverAtTime = System.currentTimeMillis() + 3000L;
+            MessageId messageId = producer.newMessage()
+                    .key(String.valueOf(i))
+                    .value("msg-" + i)
+                    .deliverAt(deliverAtTime)
+                    .send();
+            messageIds.add(i, messageId);
+        }
+
+        final int cancelMessage = 50;
+        MessageIdImpl messageId = (MessageIdImpl) 
messageIds.get(cancelMessage);
+
+        SkipMessageIdsRequest.MessageIdItem item0 = new 
SkipMessageIdsRequest.MessageIdItem(
+                messageId.getLedgerId(), messageId.getEntryId(), null);
+        SkipMessageIdsRequest req = 
SkipMessageIdsRequest.forMessageIds(Collections.singletonList(item0));
+
+        admin.topics().skipMessages(topic + "-partition-0", subName + "-1", 
req);
+        admin.topics().skipMessages(topic + "-partition-1", subName + "-1", 
req);
+
+        assertTrue(latch.await(15, TimeUnit.SECONDS), "Not all messages were 
received in time");
+        assertFalse((receivedMessages1.contains("msg-" + cancelMessage)
+                        || receivedMessages2.contains("msg-" + cancelMessage))
+                        && (receivedMessages1.size() + 
receivedMessages2.size() == 99),
+                "msg-" + cancelMessage + " should have been cancelled but was 
received");

Review Comment:
   The assertion logic is incorrect. The condition `(A || B) && C` will only be 
true if C is true AND at least one of A or B is true. This means the assertion 
will pass (assertFalse succeeds) when either: 1) the message was not received 
in either set, or 2) the total count is not 99. The test should separately 
verify that the message is not in either set AND that exactly 99 messages were 
received. The correct assertion should be: 
`assertFalse(receivedMessages1.contains(\"msg-\" + cancelMessage) || 
receivedMessages2.contains(\"msg-\" + cancelMessage), \"msg should have been 
cancelled\"); assertEquals(receivedMessages1.size() + receivedMessages2.size(), 
99);`
   ```suggestion
           assertFalse(receivedMessages1.contains("msg-" + cancelMessage)
                   || receivedMessages2.contains("msg-" + cancelMessage),
                   "msg-" + cancelMessage + " should have been cancelled but 
was received");
           assertEquals(receivedMessages1.size() + receivedMessages2.size(), 
99);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1948,6 +1952,155 @@ protected void internalSkipMessages(AsyncResponse 
asyncResponse, String subName,
         });
     }
 
+    protected void internalSkipByMessageIds(AsyncResponse asyncResponse, 
String subName, boolean authoritative,
+                                            SkipMessageIdsRequest messageIds) {
+        CompletableFuture<Void> validationFuture = 
validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName);
+        validationFuture = validationFuture.thenCompose(__ -> {
+            if (topicName.isGlobal()) {
+                return validateGlobalNamespaceOwnershipAsync(namespaceName);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        });
+        validationFuture.thenCompose(__ -> 
getPartitionedTopicMetadataAsync(topicName, authoritative, false))
+                .thenAccept(partitionMetadata -> {
+                    if (topicName.isPartitioned()) {
+                        
internalSkipByMessageIdsForNonPartitionedTopic(asyncResponse, messageIds,
+                                subName, authoritative);
+                    } else {
+                        if (partitionMetadata.partitions > 0) {
+                            
internalSkipByMessageIdsForPartitionedTopic(asyncResponse, partitionMetadata,
+                                    messageIds, subName);
+                        } else {
+                            
internalSkipByMessageIdsForNonPartitionedTopic(asyncResponse, messageIds,
+                                    subName, authoritative);
+                        }
+                    }
+                }).exceptionally(ex -> {
+                    if (isNot307And404Exception(ex)) {
+                        log.error("[{}] Failed to ack messages on topic {}: 
{}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
+    private void internalSkipByMessageIdsForPartitionedTopic(AsyncResponse 
asyncResponse,
+                                                             
PartitionedTopicMetadata partitionMetadata,
+                                                             
SkipMessageIdsRequest messageIds,
+                                                             String subName) {
+        final List<CompletableFuture<Void>> futures = new 
ArrayList<>(partitionMetadata.partitions);
+        PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (PulsarServerException e) {
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
+        for (int i = 0; i < partitionMetadata.partitions; i++) {
+            TopicName topicNamePartition = topicName.getPartition(i);
+            // Rebuild an Admin API request using the parsed items to avoid 
legacy-map format
+            
List<org.apache.pulsar.client.admin.SkipMessageIdsRequest.MessageIdItem> items 
= new ArrayList<>();
+            for (SkipMessageIdsRequest.MessageIdItem it : 
messageIds.getItems()) {
+                items.add(new 
org.apache.pulsar.client.admin.SkipMessageIdsRequest.MessageIdItem(
+                        it.getLedgerId(), it.getEntryId(), 
it.getBatchIndex()));
+            }
+            org.apache.pulsar.client.admin.SkipMessageIdsRequest req =
+                    
org.apache.pulsar.client.admin.SkipMessageIdsRequest.forMessageIds(items);
+
+            futures.add(admin
+                    .topics()
+                    .skipMessagesAsync(topicNamePartition.toString(), subName, 
req));
+        }
+        FutureUtil.waitForAll(futures).handle((result, exception) -> {
+            if (exception != null) {
+                Throwable t = FutureUtil.unwrapCompletionException(exception);
+                log.warn("[{}] Failed to ack messages on some partitions of 
{}: {}",
+                        clientAppId(), topicName, t.getMessage());
+                resumeAsyncResponseExceptionally(asyncResponse, t);
+            } else {
+                log.info("[{}] Successfully requested cancellation for delayed 
message on"
+                                + " all partitions of topic {}", 
clientAppId(), topicName);
+                asyncResponse.resume(Response.noContent().build());
+            }
+            return null;
+        });
+    }
+
+    private void internalSkipByMessageIdsForNonPartitionedTopic(AsyncResponse 
asyncResponse,
+                                                                
SkipMessageIdsRequest messageIds,
+                                                                String subName,
+                                                                boolean 
authoritative) {
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(optTopic -> {
+                    if (!(optTopic instanceof PersistentTopic 
persistentTopic)) {
+                        throw new RestException(Status.METHOD_NOT_ALLOWED, 
"Cancel delayed message on a non-persistent"
+                                + " topic is not allowed");
+                    }
+                    log.info("[{}] Cancelling delayed message for subscription 
{} on topic {}", clientAppId(),
+                            subName, topicName);
+                    return 
internalSkipByMessageIdsForSubscriptionAsync(persistentTopic, subName, 
messageIds);
+                })
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    Throwable t = FutureUtil.unwrapCompletionException(ex);
+                    if (isNot307And404Exception(t)) {
+                        log.error("[{}] Error in 
internalSkipByMessageIdsForNonPartitionedTopic for {}: {}",
+                                clientAppId(), topicName, t.getMessage(), t);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, t);
+                    return null;
+                });
+    }
+
+    private CompletableFuture<Void> 
internalSkipByMessageIdsForSubscriptionAsync(
+            PersistentTopic topic, String subName, SkipMessageIdsRequest 
messageIds) {
+        Subscription sub = topic.getSubscription(subName);
+        if (sub == null) {
+            return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+                    getSubNotFoundErrorMessage(topic.getName(), subName)));
+        }
+        // Build List<SkipEntry> from parsed items
+        Map<String, AggregatedSkip> aggregated = new LinkedHashMap<>();
+        for (SkipMessageIdsRequest.MessageIdItem it : messageIds.getItems()) {
+            long ledgerId = it.getLedgerId();
+            long entryId = it.getEntryId();
+            Integer batchIndex = it.getBatchIndex();
+            String key = ledgerId + ":" + entryId;
+            AggregatedSkip agg = aggregated.computeIfAbsent(key, k -> new 
AggregatedSkip(ledgerId, entryId));
+            if (batchIndex == null) {
+                agg.full = true;
+            } else {
+                agg.indexes.add(batchIndex);
+            }
+        }
+        List<SkipEntry> skipEntries = new ArrayList<>(aggregated.size());
+        for (AggregatedSkip v : aggregated.values()) {
+            if (v.full) {
+                skipEntries.add(new SkipEntry(v.ledgerId, v.entryId, null));
+            } else {
+              // sort indexes to have deterministic order
+              List<Integer> idx = new ArrayList<>(v.indexes);
+              Collections.sort(idx);

Review Comment:
   [nitpick] Use `idx.sort(null)` or `Collections.sort(idx)` is fine, but 
consider using a more modern approach with `new 
ArrayList<>(v.indexes).stream().sorted().collect(Collectors.toList())` or 
simply `v.indexes.stream().sorted().toList()` if using Java 16+. However, the 
current approach is acceptable.
   ```suggestion
                 List<Integer> idx = 
v.indexes.stream().sorted().collect(Collectors.toList());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to