Copilot commented on code in PR #24370:
URL: https://github.com/apache/pulsar/pull/24370#discussion_r2507807596


##########
pip/pip-423.md:
##########
@@ -0,0 +1,278 @@
+# PIP-423: Add a new admin API to acknowledge a single message
+
+# Background knowledge
+
+*   **Message Identification (MessageId):** In Pulsar, every message is 
uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an 
`entryId`, and optionally a partition index. The combination of `ledgerId` and 
`entryId` (often represented as a `Position`) uniquely identifies a message's 
physical storage location within a topic. A producer receives a `MessageId` for 
each message it successfully sends.
+*   **Batch Messages:** To improve throughput, Pulsar producers can batch 
multiple individual messages into a single entry that is written to BookKeeper. 
In this case, the `MessageId` also contains a `batchIndex` to identify a 
specific message within the batch. The entry's metadata stores the total number 
of messages in the batch.
+*   **Subscriptions and Cursors:** Each subscription on a topic maintains its 
own "cursor" which tracks consumption progress. For subscription types like 
`Shared` or `Key_Shared`, the cursor can track individually acknowledged 
messages, even if they are out of order relative to the main consumption 
progress marker (`mark-delete position`). The cursor is responsible for 
ensuring that acknowledged messages are not redelivered.
+*   **Individual Acknowledgement:** Pulsar subscriptions support different 
acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on 
**individual acknowledgement**. This allows a consumer to acknowledge a single 
message, or even a single message within a batch. When a message is 
acknowledged individually, the broker's `ManagedCursor` persistently tracks 
this "acknowledgement hole" to ensure that acknowledged messages are not 
redelivered after a broker or consumer restart. This proposal leverages this 
existing, robust mechanism.
+*   **Delayed Messages:** Pulsar supports scheduling messages for future 
delivery. A primary use case for this proposal is to allow a scheduled message 
to be effectively "cancelled" by acknowledging it before its delivery time. 
Since the message is marked as consumed by the cursor, the 
`DelayedDeliveryTracker` will not dispatch it.
+*   **Existing `skip` API:** Pulsar has an admin API to `skip` a specified 
*number* of messages for a subscription. This is useful for bulk-skipping but 
lacks the precision to target a single, specific message within a large 
backlog, especially if its exact sequence is unknown. This proposal provides a 
more precise way to skip messages by their specific `MessageId`.
+
+# Motivation
+
+Operators and SREs occasionally need to intervene in a topic's backlog to 
handle problematic messages or adapt to changing business requirements. For 
instance:
+
+*   **Cancelling Scheduled Actions:** A delayed message representing a future 
task (e.g., a scheduled report or a notification) may become obsolete. The most 
efficient way to handle this is to prevent its delivery entirely by 
acknowledging it pre-emptively.
+*   **Removing Backlogs:** A specific message in a backlog might have a 
malformed payload that causes consumer applications to crash repeatedly. 
Removing this single "poison pill" message without affecting valid messages 
around it is a critical operational capability. This also applies to removing a 
single bad message from within a larger batch.
+*   **Manual Business Logic Correction:** An event may have been sent that is 
later determined to be invalid due to external factors. An administrator needs 
a precise tool to remove this specific event from a subscription's delivery 
queue.
+
+The existing `skip(numMessages)` API is a blunt instrument, ill-suited for 
these precise, targeted operations. This proposal introduces an administrative 
API to skip messages by their specific `MessageId` (including `ledgerId`, 
`entryId`, and optional `batchIndex`), providing a robust and reliable way to 
remove any individual message—delayed or not—from a subscription's backlog.
+
+# Goals
+
+## In Scope
+
+*   Introduce a new Admin API endpoint and a corresponding `pulsar-admin` CLI 
command to support skipping specific messages for a subscription.
+*   The target message(s) will be identified by their `ledgerId`, `entryId`, 
and an optional `batchIndex` for messages within a batch.
+*   The implementation will leverage Pulsar's existing, robust 
`AckType.Individual` mechanism for persistence and reliability.
+*   This feature will only be supported for subscription types that allow 
individual acknowledgements (e.g., `Shared`, `Key_Shared`).
+*   Ensure that once a message is successfully skipped via this API, it will 
not be delivered to any consumer on the targeted subscription.
+*   Support for both partitioned and non-partitioned topics.
+
+## Out of Scope
+
+*   Modifying the existing `skip/{numMessages}` endpoint. A new, dedicated 
endpoint will be created for clarity.
+*   Automatic skipping of messages across geo-replicated clusters. The command 
is a per-cluster administrative operation that must be run on each cluster 
where the skip is needed.
+
+# High Level Design
+
+The proposed solution introduces a new admin API that triggers Pulsar's 
individual acknowledgement capability on demand for specific messages.
+
+1.  **Initiate Skip-by-ID:** An administrator initiates the action via the new 
`pulsar-admin topics skip-messages` command or its corresponding REST API call. 
The request specifies the topic, target subscription, and a list of message 
identifiers. These identifiers can be provided as a triplet of 
`ledgerId:entryId:batchIndex` or as Base64-encoded `MessageId` byte arrays.
+
+2.  **Broker Receives Request:** The Pulsar broker receives the admin request 
for the new endpoint `.../subscription/{subName}/skipByMessageIds`. It parses 
the flexible JSON payload and validates the administrator's permissions for the 
topic, re-using the existing `TopicOperation.SKIP` authorization rule.
+
+3.  **Delegate to Subscription:** The broker, after validating topic ownership 
and permissions, invokes a new method `skipMessages(List<SkipEntry> entries)` 
on the target `PersistentSubscription` object. For partitioned topics, the 
request is scattered to all partition brokers, and each partition broker 
performs this action.
+
+4.  **Perform Individual Acknowledgement:** Inside the 
`PersistentSubscription`, the following occurs:
+    *   It verifies that the subscription's type supports individual 
acknowledgements.
+    *   For messages specified without a `batchIndex`, it constructs a 
`Position` object for the entire entry.
+    *   For messages specified with a `batchIndex`, it first reads the entry 
from BookKeeper to get the batch metadata (e.g., batch size). It then 
constructs a `Position` object that includes an "ack set" (a bitset) indicating 
which messages within the batch are being acknowledged.
+    *   It calls its internal `acknowledgeMessage()` method with 
`AckType.Individual` for all the constructed `Position` objects.
+
+5.  **Persistence and Effect:** The `ManagedCursor` for the subscription 
records these individual acknowledgements, which are persisted to metadata 
storage.
+    *   For a **regular message** in the backlog, it is marked as consumed for 
that subscription and will not be delivered.
+    *   For a **delayed message**, it is marked as consumed before the 
`DelayedDeliveryTracker` attempts to schedule it. The message is thus 
effectively **cancelled**.
+
+This design is simple and robust as it builds upon the broker's proven message 
acknowledgement foundation while providing a clean, dedicated administrative 
API for this precise operational task.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+The implementation introduces a new flexible request DTO, extends the 
`Subscription` interface, and implements the core logic in 
`PersistentSubscription`.
+
+1.  **New Request DTO:** A new class `SkipMessageIdsRequest` is created to 
handle polymorphic JSON deserialization on the broker. This allows the API to 
accept multiple formats for specifying message IDs.
+
+2.  **Subscription Interface Extension:** The `Subscription` interface is 
extended with a new method. `SkipEntry` is an internal record holding the 
`ledgerId`, `entryId`, and an optional list of `batchIndexes`.
+```java
+// in 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+public interface Subscription extends MessageExpirer {
+    // ... existing methods
+    CompletableFuture<Void> skipMessages(int numMessagesToSkip);
+
+    CompletableFuture<Void> skipMessages(List<SkipEntry> entries);

Review Comment:
   The code snippet shows two overloaded `skipMessages` methods but doesn't 
define what `SkipEntry` is within this code block. While line 68 mentions that 
'SkipEntry is an internal record holding the ledgerId, entryId, and an optional 
list of batchIndexes', the actual definition should be included in the code 
example for clarity. Consider adding a code comment or the record definition to 
make the interface extension example self-contained.



##########
pip/pip-423.md:
##########
@@ -0,0 +1,278 @@
+# PIP-423: Add a new admin API to acknowledge a single message
+
+# Background knowledge
+
+*   **Message Identification (MessageId):** In Pulsar, every message is 
uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an 
`entryId`, and optionally a partition index. The combination of `ledgerId` and 
`entryId` (often represented as a `Position`) uniquely identifies a message's 
physical storage location within a topic. A producer receives a `MessageId` for 
each message it successfully sends.
+*   **Batch Messages:** To improve throughput, Pulsar producers can batch 
multiple individual messages into a single entry that is written to BookKeeper. 
In this case, the `MessageId` also contains a `batchIndex` to identify a 
specific message within the batch. The entry's metadata stores the total number 
of messages in the batch.
+*   **Subscriptions and Cursors:** Each subscription on a topic maintains its 
own "cursor" which tracks consumption progress. For subscription types like 
`Shared` or `Key_Shared`, the cursor can track individually acknowledged 
messages, even if they are out of order relative to the main consumption 
progress marker (`mark-delete position`). The cursor is responsible for 
ensuring that acknowledged messages are not redelivered.
+*   **Individual Acknowledgement:** Pulsar subscriptions support different 
acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on 
**individual acknowledgement**. This allows a consumer to acknowledge a single 
message, or even a single message within a batch. When a message is 
acknowledged individually, the broker's `ManagedCursor` persistently tracks 
this "acknowledgement hole" to ensure that acknowledged messages are not 
redelivered after a broker or consumer restart. This proposal leverages this 
existing, robust mechanism.
+*   **Delayed Messages:** Pulsar supports scheduling messages for future 
delivery. A primary use case for this proposal is to allow a scheduled message 
to be effectively "cancelled" by acknowledging it before its delivery time. 
Since the message is marked as consumed by the cursor, the 
`DelayedDeliveryTracker` will not dispatch it.
+*   **Existing `skip` API:** Pulsar has an admin API to `skip` a specified 
*number* of messages for a subscription. This is useful for bulk-skipping but 
lacks the precision to target a single, specific message within a large 
backlog, especially if its exact sequence is unknown. This proposal provides a 
more precise way to skip messages by their specific `MessageId`.
+
+# Motivation
+
+Operators and SREs occasionally need to intervene in a topic's backlog to 
handle problematic messages or adapt to changing business requirements. For 
instance:
+
+*   **Cancelling Scheduled Actions:** A delayed message representing a future 
task (e.g., a scheduled report or a notification) may become obsolete. The most 
efficient way to handle this is to prevent its delivery entirely by 
acknowledging it pre-emptively.
+*   **Removing Backlogs:** A specific message in a backlog might have a 
malformed payload that causes consumer applications to crash repeatedly. 
Removing this single "poison pill" message without affecting valid messages 
around it is a critical operational capability. This also applies to removing a 
single bad message from within a larger batch.
+*   **Manual Business Logic Correction:** An event may have been sent that is 
later determined to be invalid due to external factors. An administrator needs 
a precise tool to remove this specific event from a subscription's delivery 
queue.
+
+The existing `skip(numMessages)` API is a blunt instrument, ill-suited for 
these precise, targeted operations. This proposal introduces an administrative 
API to skip messages by their specific `MessageId` (including `ledgerId`, 
`entryId`, and optional `batchIndex`), providing a robust and reliable way to 
remove any individual message—delayed or not—from a subscription's backlog.
+
+# Goals
+
+## In Scope
+
+*   Introduce a new Admin API endpoint and a corresponding `pulsar-admin` CLI 
command to support skipping specific messages for a subscription.
+*   The target message(s) will be identified by their `ledgerId`, `entryId`, 
and an optional `batchIndex` for messages within a batch.
+*   The implementation will leverage Pulsar's existing, robust 
`AckType.Individual` mechanism for persistence and reliability.
+*   This feature will only be supported for subscription types that allow 
individual acknowledgements (e.g., `Shared`, `Key_Shared`).
+*   Ensure that once a message is successfully skipped via this API, it will 
not be delivered to any consumer on the targeted subscription.
+*   Support for both partitioned and non-partitioned topics.
+
+## Out of Scope
+
+*   Modifying the existing `skip/{numMessages}` endpoint. A new, dedicated 
endpoint will be created for clarity.
+*   Automatic skipping of messages across geo-replicated clusters. The command 
is a per-cluster administrative operation that must be run on each cluster 
where the skip is needed.
+
+# High Level Design
+
+The proposed solution introduces a new admin API that triggers Pulsar's 
individual acknowledgement capability on demand for specific messages.
+
+1.  **Initiate Skip-by-ID:** An administrator initiates the action via the new 
`pulsar-admin topics skip-messages` command or its corresponding REST API call. 
The request specifies the topic, target subscription, and a list of message 
identifiers. These identifiers can be provided as a triplet of 
`ledgerId:entryId:batchIndex` or as Base64-encoded `MessageId` byte arrays.
+
+2.  **Broker Receives Request:** The Pulsar broker receives the admin request 
for the new endpoint `.../subscription/{subName}/skipByMessageIds`. It parses 
the flexible JSON payload and validates the administrator's permissions for the 
topic, re-using the existing `TopicOperation.SKIP` authorization rule.
+
+3.  **Delegate to Subscription:** The broker, after validating topic ownership 
and permissions, invokes a new method `skipMessages(List<SkipEntry> entries)` 
on the target `PersistentSubscription` object. For partitioned topics, the 
request is scattered to all partition brokers, and each partition broker 
performs this action.
+
+4.  **Perform Individual Acknowledgement:** Inside the 
`PersistentSubscription`, the following occurs:
+    *   It verifies that the subscription's type supports individual 
acknowledgements.
+    *   For messages specified without a `batchIndex`, it constructs a 
`Position` object for the entire entry.
+    *   For messages specified with a `batchIndex`, it first reads the entry 
from BookKeeper to get the batch metadata (e.g., batch size). It then 
constructs a `Position` object that includes an "ack set" (a bitset) indicating 
which messages within the batch are being acknowledged.
+    *   It calls its internal `acknowledgeMessage()` method with 
`AckType.Individual` for all the constructed `Position` objects.
+
+5.  **Persistence and Effect:** The `ManagedCursor` for the subscription 
records these individual acknowledgements, which are persisted to metadata 
storage.
+    *   For a **regular message** in the backlog, it is marked as consumed for 
that subscription and will not be delivered.
+    *   For a **delayed message**, it is marked as consumed before the 
`DelayedDeliveryTracker` attempts to schedule it. The message is thus 
effectively **cancelled**.
+
+This design is simple and robust as it builds upon the broker's proven message 
acknowledgement foundation while providing a clean, dedicated administrative 
API for this precise operational task.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+The implementation introduces a new flexible request DTO, extends the 
`Subscription` interface, and implements the core logic in 
`PersistentSubscription`.
+
+1.  **New Request DTO:** A new class `SkipMessageIdsRequest` is created to 
handle polymorphic JSON deserialization on the broker. This allows the API to 
accept multiple formats for specifying message IDs.
+
+2.  **Subscription Interface Extension:** The `Subscription` interface is 
extended with a new method. `SkipEntry` is an internal record holding the 
`ledgerId`, `entryId`, and an optional list of `batchIndexes`.
+```java
+// in 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+public interface Subscription extends MessageExpirer {
+    // ... existing methods
+    CompletableFuture<Void> skipMessages(int numMessagesToSkip);
+
+    CompletableFuture<Void> skipMessages(List<SkipEntry> entries);
+    // ... existing methods
+}
+```
+
+3.  **PersistentSubscription Implementation:** The `PersistentSubscription` 
class provides the concrete implementation. It differentiates between 
full-entry acknowledgements and partial (batch) acknowledgements.
+
+```java
+// in 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+@Override
+public CompletableFuture<Void> skipMessages(List<SkipEntry> entries) {
+    if (Subscription.isCumulativeAckMode(getType())) {
+        return CompletableFuture.failedFuture(new 
NotAllowedException("Unsupported subscription type."));
+    }
+
+    // Separate full-entry acks from partial (batchIndex) acks
+    List<Position> fullEntryPositions = new ArrayList<>();

Review Comment:
   The variable name `partialAckIndexByPos` is unclear about what the Map key 
represents. The key type is `String` but the name suggests it's a position. 
Based on context, it appears to be a string representation of a Position, but 
this should be clarified in a comment or use a more descriptive variable name 
like `partialAckIndexByPosString` or add an inline comment explaining the key 
format.
   ```suggestion
       List<Position> fullEntryPositions = new ArrayList<>();
       // Key is Position.toString(), representing the ledgerId:entryId of the 
entry.
   ```



##########
pip/pip-423.md:
##########
@@ -0,0 +1,278 @@
+# PIP-423: Add a new admin API to acknowledge a single message
+
+# Background knowledge
+
+*   **Message Identification (MessageId):** In Pulsar, every message is 
uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an 
`entryId`, and optionally a partition index. The combination of `ledgerId` and 
`entryId` (often represented as a `Position`) uniquely identifies a message's 
physical storage location within a topic. A producer receives a `MessageId` for 
each message it successfully sends.
+*   **Batch Messages:** To improve throughput, Pulsar producers can batch 
multiple individual messages into a single entry that is written to BookKeeper. 
In this case, the `MessageId` also contains a `batchIndex` to identify a 
specific message within the batch. The entry's metadata stores the total number 
of messages in the batch.
+*   **Subscriptions and Cursors:** Each subscription on a topic maintains its 
own "cursor" which tracks consumption progress. For subscription types like 
`Shared` or `Key_Shared`, the cursor can track individually acknowledged 
messages, even if they are out of order relative to the main consumption 
progress marker (`mark-delete position`). The cursor is responsible for 
ensuring that acknowledged messages are not redelivered.
+*   **Individual Acknowledgement:** Pulsar subscriptions support different 
acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on 
**individual acknowledgement**. This allows a consumer to acknowledge a single 
message, or even a single message within a batch. When a message is 
acknowledged individually, the broker's `ManagedCursor` persistently tracks 
this "acknowledgement hole" to ensure that acknowledged messages are not 
redelivered after a broker or consumer restart. This proposal leverages this 
existing, robust mechanism.
+*   **Delayed Messages:** Pulsar supports scheduling messages for future 
delivery. A primary use case for this proposal is to allow a scheduled message 
to be effectively "cancelled" by acknowledging it before its delivery time. 
Since the message is marked as consumed by the cursor, the 
`DelayedDeliveryTracker` will not dispatch it.
+*   **Existing `skip` API:** Pulsar has an admin API to `skip` a specified 
*number* of messages for a subscription. This is useful for bulk-skipping but 
lacks the precision to target a single, specific message within a large 
backlog, especially if its exact sequence is unknown. This proposal provides a 
more precise way to skip messages by their specific `MessageId`.
+
+# Motivation
+
+Operators and SREs occasionally need to intervene in a topic's backlog to 
handle problematic messages or adapt to changing business requirements. For 
instance:
+
+*   **Cancelling Scheduled Actions:** A delayed message representing a future 
task (e.g., a scheduled report or a notification) may become obsolete. The most 
efficient way to handle this is to prevent its delivery entirely by 
acknowledging it pre-emptively.
+*   **Removing Backlogs:** A specific message in a backlog might have a 
malformed payload that causes consumer applications to crash repeatedly. 
Removing this single "poison pill" message without affecting valid messages 
around it is a critical operational capability. This also applies to removing a 
single bad message from within a larger batch.
+*   **Manual Business Logic Correction:** An event may have been sent that is 
later determined to be invalid due to external factors. An administrator needs 
a precise tool to remove this specific event from a subscription's delivery 
queue.
+
+The existing `skip(numMessages)` API is a blunt instrument, ill-suited for 
these precise, targeted operations. This proposal introduces an administrative 
API to skip messages by their specific `MessageId` (including `ledgerId`, 
`entryId`, and optional `batchIndex`), providing a robust and reliable way to 
remove any individual message—delayed or not—from a subscription's backlog.
+
+# Goals
+
+## In Scope
+
+*   Introduce a new Admin API endpoint and a corresponding `pulsar-admin` CLI 
command to support skipping specific messages for a subscription.
+*   The target message(s) will be identified by their `ledgerId`, `entryId`, 
and an optional `batchIndex` for messages within a batch.
+*   The implementation will leverage Pulsar's existing, robust 
`AckType.Individual` mechanism for persistence and reliability.
+*   This feature will only be supported for subscription types that allow 
individual acknowledgements (e.g., `Shared`, `Key_Shared`).
+*   Ensure that once a message is successfully skipped via this API, it will 
not be delivered to any consumer on the targeted subscription.
+*   Support for both partitioned and non-partitioned topics.
+
+## Out of Scope
+
+*   Modifying the existing `skip/{numMessages}` endpoint. A new, dedicated 
endpoint will be created for clarity.
+*   Automatic skipping of messages across geo-replicated clusters. The command 
is a per-cluster administrative operation that must be run on each cluster 
where the skip is needed.
+
+# High Level Design
+
+The proposed solution introduces a new admin API that triggers Pulsar's 
individual acknowledgement capability on demand for specific messages.
+
+1.  **Initiate Skip-by-ID:** An administrator initiates the action via the new 
`pulsar-admin topics skip-messages` command or its corresponding REST API call. 
The request specifies the topic, target subscription, and a list of message 
identifiers. These identifiers can be provided as a triplet of 
`ledgerId:entryId:batchIndex` or as Base64-encoded `MessageId` byte arrays.
+
+2.  **Broker Receives Request:** The Pulsar broker receives the admin request 
for the new endpoint `.../subscription/{subName}/skipByMessageIds`. It parses 
the flexible JSON payload and validates the administrator's permissions for the 
topic, re-using the existing `TopicOperation.SKIP` authorization rule.
+
+3.  **Delegate to Subscription:** The broker, after validating topic ownership 
and permissions, invokes a new method `skipMessages(List<SkipEntry> entries)` 
on the target `PersistentSubscription` object. For partitioned topics, the 
request is scattered to all partition brokers, and each partition broker 
performs this action.
+
+4.  **Perform Individual Acknowledgement:** Inside the 
`PersistentSubscription`, the following occurs:
+    *   It verifies that the subscription's type supports individual 
acknowledgements.
+    *   For messages specified without a `batchIndex`, it constructs a 
`Position` object for the entire entry.
+    *   For messages specified with a `batchIndex`, it first reads the entry 
from BookKeeper to get the batch metadata (e.g., batch size). It then 
constructs a `Position` object that includes an "ack set" (a bitset) indicating 
which messages within the batch are being acknowledged.
+    *   It calls its internal `acknowledgeMessage()` method with 
`AckType.Individual` for all the constructed `Position` objects.
+
+5.  **Persistence and Effect:** The `ManagedCursor` for the subscription 
records these individual acknowledgements, which are persisted to metadata 
storage.
+    *   For a **regular message** in the backlog, it is marked as consumed for 
that subscription and will not be delivered.
+    *   For a **delayed message**, it is marked as consumed before the 
`DelayedDeliveryTracker` attempts to schedule it. The message is thus 
effectively **cancelled**.
+
+This design is simple and robust as it builds upon the broker's proven message 
acknowledgement foundation while providing a clean, dedicated administrative 
API for this precise operational task.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+The implementation introduces a new flexible request DTO, extends the 
`Subscription` interface, and implements the core logic in 
`PersistentSubscription`.
+
+1.  **New Request DTO:** A new class `SkipMessageIdsRequest` is created to 
handle polymorphic JSON deserialization on the broker. This allows the API to 
accept multiple formats for specifying message IDs.
+
+2.  **Subscription Interface Extension:** The `Subscription` interface is 
extended with a new method. `SkipEntry` is an internal record holding the 
`ledgerId`, `entryId`, and an optional list of `batchIndexes`.
+```java
+// in 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+public interface Subscription extends MessageExpirer {
+    // ... existing methods
+    CompletableFuture<Void> skipMessages(int numMessagesToSkip);
+
+    CompletableFuture<Void> skipMessages(List<SkipEntry> entries);
+    // ... existing methods
+}
+```
+
+3.  **PersistentSubscription Implementation:** The `PersistentSubscription` 
class provides the concrete implementation. It differentiates between 
full-entry acknowledgements and partial (batch) acknowledgements.
+
+```java
+// in 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+@Override
+public CompletableFuture<Void> skipMessages(List<SkipEntry> entries) {
+    if (Subscription.isCumulativeAckMode(getType())) {
+        return CompletableFuture.failedFuture(new 
NotAllowedException("Unsupported subscription type."));
+    }
+
+    // Separate full-entry acks from partial (batchIndex) acks
+    List<Position> fullEntryPositions = new ArrayList<>();
+    Map<String, List<Integer>> partialAckIndexByPos = new HashMap<>();
+    // ... logic to populate these collections from 'entries'
+
+    // If there are partial acks, read the corresponding entries to get batch 
metadata
+    if (!partialAckIndexByPos.isEmpty()) {
+        Set<Position> positionsToLoad = ...; // positions for entries with 
batch acks
+        cursor.asyncReplayEntries(positionsToLoad, new 
AsyncCallbacks.ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> readEntries, Object 
ctx) {
+                // ... logic for each entry:
+                // 1. Parse MessageMetadata to get batch size.
+                // 2. Validate batch indexes.
+                // 3. Create a BitSet representing the ack state.
+                // 4. Create a Position with the ack set using 
AckSetStateUtil.createPositionWithAckSet().
+                // 5. Add this special position to a final list.
+                
+                // Finally, acknowledge all positions (full and partial)
+                acknowledgeMessage(finalPositionsList, AckType.Individual, 
properties);
+                result.complete(null);
+            }
+            // ... handle failures
+        });
+    } else {
+        // Only full-entry acks are present, no need to read entries
+        acknowledgeMessage(fullEntryPositions, AckType.Individual, properties);

Review Comment:
   The variable `properties` is used in the `acknowledgeMessage` call but is 
never defined or declared in the provided code example. This makes the code 
snippet incomplete and potentially confusing. Either define the `properties` 
variable earlier in the example or add a comment explaining where it comes from 
(e.g., `// properties defined elsewhere` or use a more concrete value like 
`Collections.emptyMap()`).
   ```suggestion
                   acknowledgeMessage(finalPositionsList, AckType.Individual, 
Collections.emptyMap());
                   result.complete(null);
               }
               // ... handle failures
           });
       } else {
           // Only full-entry acks are present, no need to read entries
           acknowledgeMessage(fullEntryPositions, AckType.Individual, 
Collections.emptyMap());
   ```



##########
pip/pip-423.md:
##########
@@ -0,0 +1,278 @@
+# PIP-423: Add a new admin API to acknowledge a single message
+
+# Background knowledge
+
+*   **Message Identification (MessageId):** In Pulsar, every message is 
uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an 
`entryId`, and optionally a partition index. The combination of `ledgerId` and 
`entryId` (often represented as a `Position`) uniquely identifies a message's 
physical storage location within a topic. A producer receives a `MessageId` for 
each message it successfully sends.
+*   **Batch Messages:** To improve throughput, Pulsar producers can batch 
multiple individual messages into a single entry that is written to BookKeeper. 
In this case, the `MessageId` also contains a `batchIndex` to identify a 
specific message within the batch. The entry's metadata stores the total number 
of messages in the batch.
+*   **Subscriptions and Cursors:** Each subscription on a topic maintains its 
own "cursor" which tracks consumption progress. For subscription types like 
`Shared` or `Key_Shared`, the cursor can track individually acknowledged 
messages, even if they are out of order relative to the main consumption 
progress marker (`mark-delete position`). The cursor is responsible for 
ensuring that acknowledged messages are not redelivered.
+*   **Individual Acknowledgement:** Pulsar subscriptions support different 
acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on 
**individual acknowledgement**. This allows a consumer to acknowledge a single 
message, or even a single message within a batch. When a message is 
acknowledged individually, the broker's `ManagedCursor` persistently tracks 
this "acknowledgement hole" to ensure that acknowledged messages are not 
redelivered after a broker or consumer restart. This proposal leverages this 
existing, robust mechanism.
+*   **Delayed Messages:** Pulsar supports scheduling messages for future 
delivery. A primary use case for this proposal is to allow a scheduled message 
to be effectively "cancelled" by acknowledging it before its delivery time. 
Since the message is marked as consumed by the cursor, the 
`DelayedDeliveryTracker` will not dispatch it.
+*   **Existing `skip` API:** Pulsar has an admin API to `skip` a specified 
*number* of messages for a subscription. This is useful for bulk-skipping but 
lacks the precision to target a single, specific message within a large 
backlog, especially if its exact sequence is unknown. This proposal provides a 
more precise way to skip messages by their specific `MessageId`.
+
+# Motivation
+
+Operators and SREs occasionally need to intervene in a topic's backlog to 
handle problematic messages or adapt to changing business requirements. For 
instance:
+
+*   **Cancelling Scheduled Actions:** A delayed message representing a future 
task (e.g., a scheduled report or a notification) may become obsolete. The most 
efficient way to handle this is to prevent its delivery entirely by 
acknowledging it pre-emptively.
+*   **Removing Backlogs:** A specific message in a backlog might have a 
malformed payload that causes consumer applications to crash repeatedly. 
Removing this single "poison pill" message without affecting valid messages 
around it is a critical operational capability. This also applies to removing a 
single bad message from within a larger batch.
+*   **Manual Business Logic Correction:** An event may have been sent that is 
later determined to be invalid due to external factors. An administrator needs 
a precise tool to remove this specific event from a subscription's delivery 
queue.
+
+The existing `skip(numMessages)` API is a blunt instrument, ill-suited for 
these precise, targeted operations. This proposal introduces an administrative 
API to skip messages by their specific `MessageId` (including `ledgerId`, 
`entryId`, and optional `batchIndex`), providing a robust and reliable way to 
remove any individual message—delayed or not—from a subscription's backlog.
+
+# Goals
+
+## In Scope
+
+*   Introduce a new Admin API endpoint and a corresponding `pulsar-admin` CLI 
command to support skipping specific messages for a subscription.
+*   The target message(s) will be identified by their `ledgerId`, `entryId`, 
and an optional `batchIndex` for messages within a batch.
+*   The implementation will leverage Pulsar's existing, robust 
`AckType.Individual` mechanism for persistence and reliability.
+*   This feature will only be supported for subscription types that allow 
individual acknowledgements (e.g., `Shared`, `Key_Shared`).
+*   Ensure that once a message is successfully skipped via this API, it will 
not be delivered to any consumer on the targeted subscription.
+*   Support for both partitioned and non-partitioned topics.
+
+## Out of Scope
+
+*   Modifying the existing `skip/{numMessages}` endpoint. A new, dedicated 
endpoint will be created for clarity.
+*   Automatic skipping of messages across geo-replicated clusters. The command 
is a per-cluster administrative operation that must be run on each cluster 
where the skip is needed.
+
+# High Level Design
+
+The proposed solution introduces a new admin API that triggers Pulsar's 
individual acknowledgement capability on demand for specific messages.
+
+1.  **Initiate Skip-by-ID:** An administrator initiates the action via the new 
`pulsar-admin topics skip-messages` command or its corresponding REST API call. 
The request specifies the topic, target subscription, and a list of message 
identifiers. These identifiers can be provided as a triplet of 
`ledgerId:entryId:batchIndex` or as Base64-encoded `MessageId` byte arrays.
+
+2.  **Broker Receives Request:** The Pulsar broker receives the admin request 
for the new endpoint `.../subscription/{subName}/skipByMessageIds`. It parses 
the flexible JSON payload and validates the administrator's permissions for the 
topic, re-using the existing `TopicOperation.SKIP` authorization rule.
+
+3.  **Delegate to Subscription:** The broker, after validating topic ownership 
and permissions, invokes a new method `skipMessages(List<SkipEntry> entries)` 
on the target `PersistentSubscription` object. For partitioned topics, the 
request is scattered to all partition brokers, and each partition broker 
performs this action.
+
+4.  **Perform Individual Acknowledgement:** Inside the 
`PersistentSubscription`, the following occurs:
+    *   It verifies that the subscription's type supports individual 
acknowledgements.
+    *   For messages specified without a `batchIndex`, it constructs a 
`Position` object for the entire entry.
+    *   For messages specified with a `batchIndex`, it first reads the entry 
from BookKeeper to get the batch metadata (e.g., batch size). It then 
constructs a `Position` object that includes an "ack set" (a bitset) indicating 
which messages within the batch are being acknowledged.
+    *   It calls its internal `acknowledgeMessage()` method with 
`AckType.Individual` for all the constructed `Position` objects.
+
+5.  **Persistence and Effect:** The `ManagedCursor` for the subscription 
records these individual acknowledgements, which are persisted to metadata 
storage.
+    *   For a **regular message** in the backlog, it is marked as consumed for 
that subscription and will not be delivered.
+    *   For a **delayed message**, it is marked as consumed before the 
`DelayedDeliveryTracker` attempts to schedule it. The message is thus 
effectively **cancelled**.
+
+This design is simple and robust as it builds upon the broker's proven message 
acknowledgement foundation while providing a clean, dedicated administrative 
API for this precise operational task.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+The implementation introduces a new flexible request DTO, extends the 
`Subscription` interface, and implements the core logic in 
`PersistentSubscription`.
+
+1.  **New Request DTO:** A new class `SkipMessageIdsRequest` is created to 
handle polymorphic JSON deserialization on the broker. This allows the API to 
accept multiple formats for specifying message IDs.
+
+2.  **Subscription Interface Extension:** The `Subscription` interface is 
extended with a new method. `SkipEntry` is an internal record holding the 
`ledgerId`, `entryId`, and an optional list of `batchIndexes`.
+```java
+// in 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+public interface Subscription extends MessageExpirer {
+    // ... existing methods
+    CompletableFuture<Void> skipMessages(int numMessagesToSkip);
+
+    CompletableFuture<Void> skipMessages(List<SkipEntry> entries);
+    // ... existing methods
+}
+```
+
+3.  **PersistentSubscription Implementation:** The `PersistentSubscription` 
class provides the concrete implementation. It differentiates between 
full-entry acknowledgements and partial (batch) acknowledgements.
+
+```java
+// in 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+@Override
+public CompletableFuture<Void> skipMessages(List<SkipEntry> entries) {
+    if (Subscription.isCumulativeAckMode(getType())) {
+        return CompletableFuture.failedFuture(new 
NotAllowedException("Unsupported subscription type."));
+    }
+
+    // Separate full-entry acks from partial (batchIndex) acks
+    List<Position> fullEntryPositions = new ArrayList<>();
+    Map<String, List<Integer>> partialAckIndexByPos = new HashMap<>();
+    // ... logic to populate these collections from 'entries'
+
+    // If there are partial acks, read the corresponding entries to get batch 
metadata
+    if (!partialAckIndexByPos.isEmpty()) {
+        Set<Position> positionsToLoad = ...; // positions for entries with 
batch acks
+        cursor.asyncReplayEntries(positionsToLoad, new 
AsyncCallbacks.ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> readEntries, Object 
ctx) {
+                // ... logic for each entry:
+                // 1. Parse MessageMetadata to get batch size.
+                // 2. Validate batch indexes.
+                // 3. Create a BitSet representing the ack state.
+                // 4. Create a Position with the ack set using 
AckSetStateUtil.createPositionWithAckSet().
+                // 5. Add this special position to a final list.
+                
+                // Finally, acknowledge all positions (full and partial)
+                acknowledgeMessage(finalPositionsList, AckType.Individual, 
properties);
+                result.complete(null);
+            }
+            // ... handle failures
+        });
+    } else {
+        // Only full-entry acks are present, no need to read entries
+        acknowledgeMessage(fullEntryPositions, AckType.Individual, properties);

Review Comment:
   The variable `properties` is used here but is never defined in the code 
example. This makes the documentation code snippet incomplete. Consider 
defining the variable or adding a comment to clarify its source.



##########
pip/pip-423.md:
##########
@@ -0,0 +1,278 @@
+# PIP-423: Add a new admin API to acknowledge a single message
+
+# Background knowledge
+
+*   **Message Identification (MessageId):** In Pulsar, every message is 
uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an 
`entryId`, and optionally a partition index. The combination of `ledgerId` and 
`entryId` (often represented as a `Position`) uniquely identifies a message's 
physical storage location within a topic. A producer receives a `MessageId` for 
each message it successfully sends.
+*   **Batch Messages:** To improve throughput, Pulsar producers can batch 
multiple individual messages into a single entry that is written to BookKeeper. 
In this case, the `MessageId` also contains a `batchIndex` to identify a 
specific message within the batch. The entry's metadata stores the total number 
of messages in the batch.
+*   **Subscriptions and Cursors:** Each subscription on a topic maintains its 
own "cursor" which tracks consumption progress. For subscription types like 
`Shared` or `Key_Shared`, the cursor can track individually acknowledged 
messages, even if they are out of order relative to the main consumption 
progress marker (`mark-delete position`). The cursor is responsible for 
ensuring that acknowledged messages are not redelivered.
+*   **Individual Acknowledgement:** Pulsar subscriptions support different 
acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on 
**individual acknowledgement**. This allows a consumer to acknowledge a single 
message, or even a single message within a batch. When a message is 
acknowledged individually, the broker's `ManagedCursor` persistently tracks 
this "acknowledgement hole" to ensure that acknowledged messages are not 
redelivered after a broker or consumer restart. This proposal leverages this 
existing, robust mechanism.
+*   **Delayed Messages:** Pulsar supports scheduling messages for future 
delivery. A primary use case for this proposal is to allow a scheduled message 
to be effectively "cancelled" by acknowledging it before its delivery time. 
Since the message is marked as consumed by the cursor, the 
`DelayedDeliveryTracker` will not dispatch it.
+*   **Existing `skip` API:** Pulsar has an admin API to `skip` a specified 
*number* of messages for a subscription. This is useful for bulk-skipping but 
lacks the precision to target a single, specific message within a large 
backlog, especially if its exact sequence is unknown. This proposal provides a 
more precise way to skip messages by their specific `MessageId`.
+
+# Motivation
+
+Operators and SREs occasionally need to intervene in a topic's backlog to 
handle problematic messages or adapt to changing business requirements. For 
instance:
+
+*   **Cancelling Scheduled Actions:** A delayed message representing a future 
task (e.g., a scheduled report or a notification) may become obsolete. The most 
efficient way to handle this is to prevent its delivery entirely by 
acknowledging it pre-emptively.
+*   **Removing Backlogs:** A specific message in a backlog might have a 
malformed payload that causes consumer applications to crash repeatedly. 
Removing this single "poison pill" message without affecting valid messages 
around it is a critical operational capability. This also applies to removing a 
single bad message from within a larger batch.
+*   **Manual Business Logic Correction:** An event may have been sent that is 
later determined to be invalid due to external factors. An administrator needs 
a precise tool to remove this specific event from a subscription's delivery 
queue.
+
+The existing `skip(numMessages)` API is a blunt instrument, ill-suited for 
these precise, targeted operations. This proposal introduces an administrative 
API to skip messages by their specific `MessageId` (including `ledgerId`, 
`entryId`, and optional `batchIndex`), providing a robust and reliable way to 
remove any individual message—delayed or not—from a subscription's backlog.
+
+# Goals
+
+## In Scope
+
+*   Introduce a new Admin API endpoint and a corresponding `pulsar-admin` CLI 
command to support skipping specific messages for a subscription.
+*   The target message(s) will be identified by their `ledgerId`, `entryId`, 
and an optional `batchIndex` for messages within a batch.
+*   The implementation will leverage Pulsar's existing, robust 
`AckType.Individual` mechanism for persistence and reliability.
+*   This feature will only be supported for subscription types that allow 
individual acknowledgements (e.g., `Shared`, `Key_Shared`).
+*   Ensure that once a message is successfully skipped via this API, it will 
not be delivered to any consumer on the targeted subscription.
+*   Support for both partitioned and non-partitioned topics.
+
+## Out of Scope
+
+*   Modifying the existing `skip/{numMessages}` endpoint. A new, dedicated 
endpoint will be created for clarity.
+*   Automatic skipping of messages across geo-replicated clusters. The command 
is a per-cluster administrative operation that must be run on each cluster 
where the skip is needed.
+
+# High Level Design
+
+The proposed solution introduces a new admin API that triggers Pulsar's 
individual acknowledgement capability on demand for specific messages.
+
+1.  **Initiate Skip-by-ID:** An administrator initiates the action via the new 
`pulsar-admin topics skip-messages` command or its corresponding REST API call. 
The request specifies the topic, target subscription, and a list of message 
identifiers. These identifiers can be provided as a triplet of 
`ledgerId:entryId:batchIndex` or as Base64-encoded `MessageId` byte arrays.
+
+2.  **Broker Receives Request:** The Pulsar broker receives the admin request 
for the new endpoint `.../subscription/{subName}/skipByMessageIds`. It parses 
the flexible JSON payload and validates the administrator's permissions for the 
topic, re-using the existing `TopicOperation.SKIP` authorization rule.
+
+3.  **Delegate to Subscription:** The broker, after validating topic ownership 
and permissions, invokes a new method `skipMessages(List<SkipEntry> entries)` 
on the target `PersistentSubscription` object. For partitioned topics, the 
request is scattered to all partition brokers, and each partition broker 
performs this action.
+
+4.  **Perform Individual Acknowledgement:** Inside the 
`PersistentSubscription`, the following occurs:
+    *   It verifies that the subscription's type supports individual 
acknowledgements.
+    *   For messages specified without a `batchIndex`, it constructs a 
`Position` object for the entire entry.
+    *   For messages specified with a `batchIndex`, it first reads the entry 
from BookKeeper to get the batch metadata (e.g., batch size). It then 
constructs a `Position` object that includes an "ack set" (a bitset) indicating 
which messages within the batch are being acknowledged.
+    *   It calls its internal `acknowledgeMessage()` method with 
`AckType.Individual` for all the constructed `Position` objects.
+
+5.  **Persistence and Effect:** The `ManagedCursor` for the subscription 
records these individual acknowledgements, which are persisted to metadata 
storage.
+    *   For a **regular message** in the backlog, it is marked as consumed for 
that subscription and will not be delivered.
+    *   For a **delayed message**, it is marked as consumed before the 
`DelayedDeliveryTracker` attempts to schedule it. The message is thus 
effectively **cancelled**.
+
+This design is simple and robust as it builds upon the broker's proven message 
acknowledgement foundation while providing a clean, dedicated administrative 
API for this precise operational task.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+The implementation introduces a new flexible request DTO, extends the 
`Subscription` interface, and implements the core logic in 
`PersistentSubscription`.
+
+1.  **New Request DTO:** A new class `SkipMessageIdsRequest` is created to 
handle polymorphic JSON deserialization on the broker. This allows the API to 
accept multiple formats for specifying message IDs.
+
+2.  **Subscription Interface Extension:** The `Subscription` interface is 
extended with a new method. `SkipEntry` is an internal record holding the 
`ledgerId`, `entryId`, and an optional list of `batchIndexes`.
+```java
+// in 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+public interface Subscription extends MessageExpirer {
+    // ... existing methods
+    CompletableFuture<Void> skipMessages(int numMessagesToSkip);
+
+    CompletableFuture<Void> skipMessages(List<SkipEntry> entries);
+    // ... existing methods
+}
+```
+
+3.  **PersistentSubscription Implementation:** The `PersistentSubscription` 
class provides the concrete implementation. It differentiates between 
full-entry acknowledgements and partial (batch) acknowledgements.
+
+```java
+// in 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+@Override
+public CompletableFuture<Void> skipMessages(List<SkipEntry> entries) {
+    if (Subscription.isCumulativeAckMode(getType())) {
+        return CompletableFuture.failedFuture(new 
NotAllowedException("Unsupported subscription type."));
+    }
+
+    // Separate full-entry acks from partial (batchIndex) acks
+    List<Position> fullEntryPositions = new ArrayList<>();
+    Map<String, List<Integer>> partialAckIndexByPos = new HashMap<>();
+    // ... logic to populate these collections from 'entries'
+
+    // If there are partial acks, read the corresponding entries to get batch 
metadata
+    if (!partialAckIndexByPos.isEmpty()) {
+        Set<Position> positionsToLoad = ...; // positions for entries with 
batch acks
+        cursor.asyncReplayEntries(positionsToLoad, new 
AsyncCallbacks.ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> readEntries, Object 
ctx) {
+                // ... logic for each entry:
+                // 1. Parse MessageMetadata to get batch size.
+                // 2. Validate batch indexes.
+                // 3. Create a BitSet representing the ack state.
+                // 4. Create a Position with the ack set using 
AckSetStateUtil.createPositionWithAckSet().
+                // 5. Add this special position to a final list.
+                
+                // Finally, acknowledge all positions (full and partial)
+                acknowledgeMessage(finalPositionsList, AckType.Individual, 
properties);
+                result.complete(null);
+            }
+            // ... handle failures
+        });
+    } else {
+        // Only full-entry acks are present, no need to read entries
+        acknowledgeMessage(fullEntryPositions, AckType.Individual, properties);
+        return CompletableFuture.completedFuture(null);
+    }
+    return result;

Review Comment:
   The variable `result` is returned at the end of the method but is never 
declared or initialized in the provided code example. Based on the async 
callback pattern shown, it should be declared as something like 
`CompletableFuture<Void> result = new CompletableFuture<>()` at the beginning 
of the method. Add this declaration to make the code example complete and 
compilable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to