lhotari commented on code in PR #24370:
URL: https://github.com/apache/pulsar/pull/24370#discussion_r2425491675


##########
pip/pip-423.md:
##########
@@ -0,0 +1,254 @@
+# PIP-423: Add a new admin API to acknowledge a single message
+
+# Background knowledge
+
+*   **Message Identification (MessageId):** In Pulsar, every message is 
uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an 
`entryId`, and optionally a partition index. The combination of `ledgerId` and 
`entryId` (often represented as a `Position`) uniquely identifies a message's 
physical storage location within a topic. A producer receives a `MessageId` for 
each message it successfully sends.
+*   **Subscriptions and Cursors:** Each subscription on a topic maintains its 
own "cursor" which tracks consumption progress. For subscription types like 
`Shared` or `Key_Shared`, the cursor can track individually acknowledged 
messages, even if they are out of order relative to the main consumption 
progress marker (`mark-delete position`). The cursor is responsible for 
ensuring that acknowledged messages are not redelivered.
+*   **Individual Acknowledgement:** Pulsar subscriptions support different 
acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on 
**individual acknowledgement**. This allows a consumer to acknowledge a single 
message. When a message is acknowledged individually ahead of the `mark-delete 
position`, the broker's `ManagedCursor` persistently tracks this to ensure that 
acknowledged messages are not redelivered after a broker or consumer restart. 
This proposal leverages this existing, robust mechanism.
+*   **Delayed Messages:** Pulsar supports scheduling messages for future 
delivery. A primary use case for this proposal is to allow a scheduled message 
to be effectively "cancelled" by acknowledging it before its delivery time. 
Since the message is marked as consumed by the cursor, the 
`DelayedDeliveryTracker` will not dispatch it.
+*   **Existing `skip` API:** Pulsar has an admin API to `skip` a specified 
*number* of messages for a subscription. This is useful for bulk-skipping but 
lacks the precision to target a single, specific message within a large 
backlog, especially if its exact sequence is unknown. This proposal provides a 
more precise way to skip messages by their specific `MessageId`.
+
+# Motivation
+
+Operators and SREs occasionally need to intervene in a topic's backlog to 
handle problematic messages or adapt to changing business requirements. For 
instance:
+
+*   **Cancelling Scheduled Actions:** A delayed message representing a future 
task (e.g., a scheduled report or a notification) may become obsolete. The most 
efficient way to handle this is to prevent its delivery entirely by 
acknowledging it pre-emptively.
+*   **Removing Backlogs:** A specific message in a backlog might have a 
malformed payload that causes consumer applications to crash repeatedly. 
Removing this single "poison pill" message without affecting valid messages 
around it is a critical operational capability.
+*   **Manual Business Logic Correction:** An event may have been sent that is 
later determined to be invalid due to external factors. An administrator needs 
a precise tool to remove this specific event from a subscription's delivery 
queue.
+
+The existing `skip(numMessages)` API is a blunt instrument, ill-suited for 
these precise, targeted operations. This proposal introduces an administrative 
API to skip messages by their specific `MessageId`, providing a robust and 
reliable way to remove any individual message—delayed or not—from a 
subscription's backlog.
+
+# Goals
+
+## In Scope
+
+*   Introduce a new Admin API endpoint and a corresponding `pulsar-admin` CLI 
command to support skipping specific messages for a subscription.
+*   The target message(s) will be identified by their `ledgerId` and `entryId`.
+*   The implementation will leverage Pulsar's existing, robust 
`AckType.Individual` mechanism for persistence and reliability.
+*   This feature will only be supported for subscription types that allow 
individual acknowledgements (e.g., `Shared`, `Key_Shared`).
+*   Ensure that once a message is successfully skipped via this API, it will 
not be delivered to any consumer on the targeted subscription.
+*   Support for both partitioned and non-partitioned topics.
+
+## Out of Scope
+
+*   Modifying the existing `skip/{numMessages}` endpoint. A new, dedicated 
endpoint will be created for clarity.
+*   Automatic skipping of messages across geo-replicated clusters. The command 
is a per-cluster administrative operation that must be run on each cluster 
where the skip is needed.
+
+# High Level Design
+
+The proposed solution introduces a new admin API that triggers Pulsar's 
individual acknowledgement capability on demand for specific messages.
+
+1.  **Initiate Skip-by-ID:** An administrator initiates the action via the new 
`pulsar-admin topics skip-messages` command or its corresponding REST API call. 
The request must specify the topic, the target subscription, and a list of 
message IDs (`ledgerId:entryId`) to be skipped.
+
+2.  **Broker Receives Request:** The Pulsar broker receives the admin request 
for the new endpoint `.../subscription/{subName}/skipByMessageIds`. It 
validates the administrator's permissions for the topic, re-using the existing 
`TopicOperation.SKIP` authorization rule.
+
+3.  **Delegate to Subscription:** The broker, after validating topic ownership 
and permissions, invokes a new method `skipMessages(Map<String, String> 
messageIds)` on the target `PersistentSubscription` object. For partitioned 
topics, the request is scattered to all partitions, and each partition broker 
performs this action.
+
+4.  **Perform Individual Acknowledgement:** Inside the 
`PersistentSubscription`, the following occurs:
+    *   It verifies that the subscription's type supports individual 
acknowledgements.
+    *   It constructs `Position` objects from the provided ledger and entry 
IDs.
+    *   It calls its internal `acknowledgeMessage()` method with 
`AckType.Individual` for the specified positions. This is functionally 
identical to what would happen if a consumer individually acknowledged the 
message.
+
+5.  **Persistence and Effect:** The `ManagedCursor` for the subscription 
records these individual acknowledgements, which are persisted to metadata 
storage.
+    *   For a **regular message** in the backlog, it is marked as consumed for 
that subscription and will not be delivered.
+    *   For a **delayed message**, it is marked as consumed before the 
`DelayedDeliveryTracker` attempts to schedule it. The message is thus 
effectively **cancelled**.
+
+This design is simple and robust as it builds upon the broker's proven message 
acknowledgement foundation while providing a clean, dedicated administrative 
API for this precise operational task.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+The core of the implementation involves adding a new method to the 
`Subscription` interface and implementing it in `PersistentSubscription` to 
leverage the existing individual acknowledgment mechanism.
+
+1.  **Subscription Interface Extension:**
+    The `Subscription` interface is extended with a new method to handle 
skipping by message IDs.
+
+```java
+// in 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+public interface Subscription extends MessageExpirer {
+    // ... existing methods
+    CompletableFuture<Void> skipMessages(int numMessagesToSkip);
+
+    CompletableFuture<Void> skipMessages(Map<String, String> messageIds);
+    // ... existing methods
+}
+```
+
+2.  **PersistentSubscription Implementation:**
+
+The `PersistentSubscription` class provides the concrete implementation. It 
converts the message ID map into `Position` objects and uses the standard 
individual acknowledge flow.
+
+```java
+// in 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+@Override
+public CompletableFuture<Void> skipMessages(Map<String, String> messageIds) {
+    if (log.isDebugEnabled()) {
+        log.debug("[{}][{}] Skipping messages by messageIds, current backlog 
{}", topicName, subName,
+                cursor.getNumberOfEntriesInBacklog(false));
+    }
+
+    if (Subscription.isCumulativeAckMode(getType())) {
+        return CompletableFuture.failedFuture(new 
NotAllowedException("Unsupported subscription type."));
+    }
+
+    List<Position> positions = new ArrayList<>();
+    for (Map.Entry<String, String> entry : messageIds.entrySet()) {
+        try {
+            long ledgerId = Long.parseLong(entry.getKey());
+            long entryId = Long.parseLong(entry.getValue());
+            Position position = PositionFactory.create(ledgerId, entryId);
+            positions.add(position);
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(new 
NotAllowedException("Invalid message ID."));
+        }
+    }
+
+    Map<String, Long> properties = Collections.emptyMap();
+    acknowledgeMessage(positions, AckType.Individual, properties);

Review Comment:
   this logic should cover handling the case when the batchIndex is passed. If 
no batchIndex is passed, it should acknowledge the complete entry.



##########
pip/pip-423.md:
##########
@@ -0,0 +1,254 @@
+# PIP-423: Add a new admin API to acknowledge a single message
+
+# Background knowledge
+
+*   **Message Identification (MessageId):** In Pulsar, every message is 
uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an 
`entryId`, and optionally a partition index. The combination of `ledgerId` and 
`entryId` (often represented as a `Position`) uniquely identifies a message's 
physical storage location within a topic. A producer receives a `MessageId` for 
each message it successfully sends.
+*   **Subscriptions and Cursors:** Each subscription on a topic maintains its 
own "cursor" which tracks consumption progress. For subscription types like 
`Shared` or `Key_Shared`, the cursor can track individually acknowledged 
messages, even if they are out of order relative to the main consumption 
progress marker (`mark-delete position`). The cursor is responsible for 
ensuring that acknowledged messages are not redelivered.
+*   **Individual Acknowledgement:** Pulsar subscriptions support different 
acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on 
**individual acknowledgement**. This allows a consumer to acknowledge a single 
message. When a message is acknowledged individually ahead of the `mark-delete 
position`, the broker's `ManagedCursor` persistently tracks this to ensure that 
acknowledged messages are not redelivered after a broker or consumer restart. 
This proposal leverages this existing, robust mechanism.
+*   **Delayed Messages:** Pulsar supports scheduling messages for future 
delivery. A primary use case for this proposal is to allow a scheduled message 
to be effectively "cancelled" by acknowledging it before its delivery time. 
Since the message is marked as consumed by the cursor, the 
`DelayedDeliveryTracker` will not dispatch it.
+*   **Existing `skip` API:** Pulsar has an admin API to `skip` a specified 
*number* of messages for a subscription. This is useful for bulk-skipping but 
lacks the precision to target a single, specific message within a large 
backlog, especially if its exact sequence is unknown. This proposal provides a 
more precise way to skip messages by their specific `MessageId`.
+
+# Motivation
+
+Operators and SREs occasionally need to intervene in a topic's backlog to 
handle problematic messages or adapt to changing business requirements. For 
instance:
+
+*   **Cancelling Scheduled Actions:** A delayed message representing a future 
task (e.g., a scheduled report or a notification) may become obsolete. The most 
efficient way to handle this is to prevent its delivery entirely by 
acknowledging it pre-emptively.
+*   **Removing Backlogs:** A specific message in a backlog might have a 
malformed payload that causes consumer applications to crash repeatedly. 
Removing this single "poison pill" message without affecting valid messages 
around it is a critical operational capability.
+*   **Manual Business Logic Correction:** An event may have been sent that is 
later determined to be invalid due to external factors. An administrator needs 
a precise tool to remove this specific event from a subscription's delivery 
queue.
+
+The existing `skip(numMessages)` API is a blunt instrument, ill-suited for 
these precise, targeted operations. This proposal introduces an administrative 
API to skip messages by their specific `MessageId`, providing a robust and 
reliable way to remove any individual message—delayed or not—from a 
subscription's backlog.
+
+# Goals
+
+## In Scope
+
+*   Introduce a new Admin API endpoint and a corresponding `pulsar-admin` CLI 
command to support skipping specific messages for a subscription.
+*   The target message(s) will be identified by their `ledgerId` and `entryId`.
+*   The implementation will leverage Pulsar's existing, robust 
`AckType.Individual` mechanism for persistence and reliability.
+*   This feature will only be supported for subscription types that allow 
individual acknowledgements (e.g., `Shared`, `Key_Shared`).
+*   Ensure that once a message is successfully skipped via this API, it will 
not be delivered to any consumer on the targeted subscription.
+*   Support for both partitioned and non-partitioned topics.
+
+## Out of Scope
+
+*   Modifying the existing `skip/{numMessages}` endpoint. A new, dedicated 
endpoint will be created for clarity.
+*   Automatic skipping of messages across geo-replicated clusters. The command 
is a per-cluster administrative operation that must be run on each cluster 
where the skip is needed.
+
+# High Level Design
+
+The proposed solution introduces a new admin API that triggers Pulsar's 
individual acknowledgement capability on demand for specific messages.
+
+1.  **Initiate Skip-by-ID:** An administrator initiates the action via the new 
`pulsar-admin topics skip-messages` command or its corresponding REST API call. 
The request must specify the topic, the target subscription, and a list of 
message IDs (`ledgerId:entryId`) to be skipped.
+
+2.  **Broker Receives Request:** The Pulsar broker receives the admin request 
for the new endpoint `.../subscription/{subName}/skipByMessageIds`. It 
validates the administrator's permissions for the topic, re-using the existing 
`TopicOperation.SKIP` authorization rule.
+
+3.  **Delegate to Subscription:** The broker, after validating topic ownership 
and permissions, invokes a new method `skipMessages(Map<String, String> 
messageIds)` on the target `PersistentSubscription` object. For partitioned 
topics, the request is scattered to all partitions, and each partition broker 
performs this action.
+
+4.  **Perform Individual Acknowledgement:** Inside the 
`PersistentSubscription`, the following occurs:
+    *   It verifies that the subscription's type supports individual 
acknowledgements.
+    *   It constructs `Position` objects from the provided ledger and entry 
IDs.
+    *   It calls its internal `acknowledgeMessage()` method with 
`AckType.Individual` for the specified positions. This is functionally 
identical to what would happen if a consumer individually acknowledged the 
message.
+
+5.  **Persistence and Effect:** The `ManagedCursor` for the subscription 
records these individual acknowledgements, which are persisted to metadata 
storage.
+    *   For a **regular message** in the backlog, it is marked as consumed for 
that subscription and will not be delivered.
+    *   For a **delayed message**, it is marked as consumed before the 
`DelayedDeliveryTracker` attempts to schedule it. The message is thus 
effectively **cancelled**.
+
+This design is simple and robust as it builds upon the broker's proven message 
acknowledgement foundation while providing a clean, dedicated administrative 
API for this precise operational task.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+The core of the implementation involves adding a new method to the 
`Subscription` interface and implementing it in `PersistentSubscription` to 
leverage the existing individual acknowledgment mechanism.
+
+1.  **Subscription Interface Extension:**
+    The `Subscription` interface is extended with a new method to handle 
skipping by message IDs.
+
+```java
+// in 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+public interface Subscription extends MessageExpirer {
+    // ... existing methods
+    CompletableFuture<Void> skipMessages(int numMessagesToSkip);
+
+    CompletableFuture<Void> skipMessages(Map<String, String> messageIds);
+    // ... existing methods
+}
+```
+
+2.  **PersistentSubscription Implementation:**
+
+The `PersistentSubscription` class provides the concrete implementation. It 
converts the message ID map into `Position` objects and uses the standard 
individual acknowledge flow.
+
+```java
+// in 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+@Override
+public CompletableFuture<Void> skipMessages(Map<String, String> messageIds) {

Review Comment:
   should be List of message ids with (ledgerId, entryId and batchIndex 
information)



##########
pip/pip-423.md:
##########
@@ -0,0 +1,254 @@
+# PIP-423: Add a new admin API to acknowledge a single message
+
+# Background knowledge
+
+*   **Message Identification (MessageId):** In Pulsar, every message is 
uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an 
`entryId`, and optionally a partition index. The combination of `ledgerId` and 
`entryId` (often represented as a `Position`) uniquely identifies a message's 
physical storage location within a topic. A producer receives a `MessageId` for 
each message it successfully sends.
+*   **Subscriptions and Cursors:** Each subscription on a topic maintains its 
own "cursor" which tracks consumption progress. For subscription types like 
`Shared` or `Key_Shared`, the cursor can track individually acknowledged 
messages, even if they are out of order relative to the main consumption 
progress marker (`mark-delete position`). The cursor is responsible for 
ensuring that acknowledged messages are not redelivered.
+*   **Individual Acknowledgement:** Pulsar subscriptions support different 
acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on 
**individual acknowledgement**. This allows a consumer to acknowledge a single 
message. When a message is acknowledged individually ahead of the `mark-delete 
position`, the broker's `ManagedCursor` persistently tracks this to ensure that 
acknowledged messages are not redelivered after a broker or consumer restart. 
This proposal leverages this existing, robust mechanism.
+*   **Delayed Messages:** Pulsar supports scheduling messages for future 
delivery. A primary use case for this proposal is to allow a scheduled message 
to be effectively "cancelled" by acknowledging it before its delivery time. 
Since the message is marked as consumed by the cursor, the 
`DelayedDeliveryTracker` will not dispatch it.
+*   **Existing `skip` API:** Pulsar has an admin API to `skip` a specified 
*number* of messages for a subscription. This is useful for bulk-skipping but 
lacks the precision to target a single, specific message within a large 
backlog, especially if its exact sequence is unknown. This proposal provides a 
more precise way to skip messages by their specific `MessageId`.
+
+# Motivation
+
+Operators and SREs occasionally need to intervene in a topic's backlog to 
handle problematic messages or adapt to changing business requirements. For 
instance:
+
+*   **Cancelling Scheduled Actions:** A delayed message representing a future 
task (e.g., a scheduled report or a notification) may become obsolete. The most 
efficient way to handle this is to prevent its delivery entirely by 
acknowledging it pre-emptively.
+*   **Removing Backlogs:** A specific message in a backlog might have a 
malformed payload that causes consumer applications to crash repeatedly. 
Removing this single "poison pill" message without affecting valid messages 
around it is a critical operational capability.
+*   **Manual Business Logic Correction:** An event may have been sent that is 
later determined to be invalid due to external factors. An administrator needs 
a precise tool to remove this specific event from a subscription's delivery 
queue.
+
+The existing `skip(numMessages)` API is a blunt instrument, ill-suited for 
these precise, targeted operations. This proposal introduces an administrative 
API to skip messages by their specific `MessageId`, providing a robust and 
reliable way to remove any individual message—delayed or not—from a 
subscription's backlog.
+
+# Goals
+
+## In Scope
+
+*   Introduce a new Admin API endpoint and a corresponding `pulsar-admin` CLI 
command to support skipping specific messages for a subscription.
+*   The target message(s) will be identified by their `ledgerId` and `entryId`.
+*   The implementation will leverage Pulsar's existing, robust 
`AckType.Individual` mechanism for persistence and reliability.
+*   This feature will only be supported for subscription types that allow 
individual acknowledgements (e.g., `Shared`, `Key_Shared`).
+*   Ensure that once a message is successfully skipped via this API, it will 
not be delivered to any consumer on the targeted subscription.
+*   Support for both partitioned and non-partitioned topics.
+
+## Out of Scope
+
+*   Modifying the existing `skip/{numMessages}` endpoint. A new, dedicated 
endpoint will be created for clarity.
+*   Automatic skipping of messages across geo-replicated clusters. The command 
is a per-cluster administrative operation that must be run on each cluster 
where the skip is needed.
+
+# High Level Design
+
+The proposed solution introduces a new admin API that triggers Pulsar's 
individual acknowledgement capability on demand for specific messages.
+
+1.  **Initiate Skip-by-ID:** An administrator initiates the action via the new 
`pulsar-admin topics skip-messages` command or its corresponding REST API call. 
The request must specify the topic, the target subscription, and a list of 
message IDs (`ledgerId:entryId`) to be skipped.
+
+2.  **Broker Receives Request:** The Pulsar broker receives the admin request 
for the new endpoint `.../subscription/{subName}/skipByMessageIds`. It 
validates the administrator's permissions for the topic, re-using the existing 
`TopicOperation.SKIP` authorization rule.
+
+3.  **Delegate to Subscription:** The broker, after validating topic ownership 
and permissions, invokes a new method `skipMessages(Map<String, String> 
messageIds)` on the target `PersistentSubscription` object. For partitioned 
topics, the request is scattered to all partitions, and each partition broker 
performs this action.
+
+4.  **Perform Individual Acknowledgement:** Inside the 
`PersistentSubscription`, the following occurs:
+    *   It verifies that the subscription's type supports individual 
acknowledgements.
+    *   It constructs `Position` objects from the provided ledger and entry 
IDs.
+    *   It calls its internal `acknowledgeMessage()` method with 
`AckType.Individual` for the specified positions. This is functionally 
identical to what would happen if a consumer individually acknowledged the 
message.
+
+5.  **Persistence and Effect:** The `ManagedCursor` for the subscription 
records these individual acknowledgements, which are persisted to metadata 
storage.
+    *   For a **regular message** in the backlog, it is marked as consumed for 
that subscription and will not be delivered.
+    *   For a **delayed message**, it is marked as consumed before the 
`DelayedDeliveryTracker` attempts to schedule it. The message is thus 
effectively **cancelled**.
+
+This design is simple and robust as it builds upon the broker's proven message 
acknowledgement foundation while providing a clean, dedicated administrative 
API for this precise operational task.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+The core of the implementation involves adding a new method to the 
`Subscription` interface and implementing it in `PersistentSubscription` to 
leverage the existing individual acknowledgment mechanism.
+
+1.  **Subscription Interface Extension:**
+    The `Subscription` interface is extended with a new method to handle 
skipping by message IDs.
+
+```java
+// in 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+public interface Subscription extends MessageExpirer {
+    // ... existing methods
+    CompletableFuture<Void> skipMessages(int numMessagesToSkip);
+
+    CompletableFuture<Void> skipMessages(Map<String, String> messageIds);

Review Comment:
   I'm not actually sure if `List<MessageIdAdv>` is the best option here. I 
guess we'd need a new type (class) that captures Position and batchIndex. 
   client's MessageId and MessageIdAdv are slightly different and contain more 
state that's not relevant here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to