codelipenghui commented on code in PR #24370:
URL: https://github.com/apache/pulsar/pull/24370#discussion_r2240231061


##########
pip/pip-423.md:
##########
@@ -0,0 +1,255 @@
+# PIP-423: Add a new admin API to acknowledge a single message
+
+# Background knowledge
+
+*   **Message Identification (MessageId):** In Pulsar, every message is 
uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an 
`entryId`, and optionally a partition index. The combination of `ledgerId` and 
`entryId` (often represented as a `Position`) uniquely identifies a message's 
physical storage location within a topic. A producer receives a `MessageId` for 
each message it successfully sends.
+*   **Subscriptions and Cursors:** Each subscription on a topic maintains its 
own "cursor" which tracks consumption progress. For subscription types like 
`Shared` or `Key_Shared`, the cursor can track individually acknowledged 
messages, even if they are out of order relative to the main consumption 
progress marker (`mark-delete position`).
+*   **Individual Acknowledgement:** Pulsar subscriptions support different 
acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on 
**individual acknowledgement**. This allows a consumer to acknowledge a single 
message. When a message is acknowledged individually ahead of the `mark-delete 
position`, the broker's `ManagedCursor` persistently tracks this to ensure that 
acknowledged messages are not redelivered after a broker or consumer restart.
+*   **Delayed Messages:** Pulsar supports scheduling messages for future 
delivery. A primary use case for this proposal is to allow a scheduled message 
to be effectively "cancelled" by acknowledging it before its delivery time.
+*   **Existing `skipMessages` API:** Pulsar has an admin API to `skip` a 
specified *number* of messages for a subscription. This is useful for 
bulk-skipping but lacks the precision to target a single, specific message 
within a large backlog, especially if its exact sequence is unknown. This 
proposal extends this existing API to add the ability to skip messages by their 
specific `MessageId`.
+
+# Motivation
+
+Operators and SREs occasionally need to intervene in a topic's backlog to 
handle problematic messages or adapt to changing business requirements. For 
instance:
+
+*   **Cancelling Scheduled Actions:** A delayed message representing a future 
task (e.g., a scheduled report or a notification) may become obsolete. The most 
efficient way to handle this is to prevent its delivery entirely.
+*   **Removing Backlogs:** A specific message in a backlog might have a 
malformed payload that causes consumer applications to crash repeatedly. 
Removing this single message without affecting valid messages around it is a 
critical operational capability.
+*   **Manual Business Logic Correction:** An event may have been sent that is 
later determined to be invalid due to external factors. An administrator needs 
a precise tool to remove this specific event from a subscription's queue.
+
+The existing `skipMessages(numMessages)` API is a blunt instrument, ill-suited 
for these precise, targeted operations. This proposal enhances the 
`skipMessages` API to accept specific message IDs, providing a robust and 
reliable way to remove any individual message—delayed or not—from a 
subscription's backlog.
+
+# Goals
+
+## In Scope
+
+*   Enhance the existing Admin API endpoint and `pulsar-admin` CLI to support 
skipping specific messages for a subscription.
+*   Introduce a new CLI command `pulsar-admin topics skip-messages` for this 
purpose.
+*   The target message(s) will be identified by their `ledgerId` and `entryId`.
+*   The implementation will leverage Pulsar's existing, robust 
`AckType.Individual` mechanism for persistence and reliability.
+*   This feature will only be supported for subscription types that allow 
individual acknowledgements (e.g., `Shared`, `Key_Shared`).
+*   Ensure that once a message is successfully skipped via this API, it will 
not be delivered to any consumer on the targeted subscription.
+
+## Out of Scope
+*   Adding a new, separate Admin API endpoint. This feature enhances the 
existing `skip` endpoint.
+*   Automatic skipping of messages across geo-replicated clusters. The command 
is a per-cluster administrative operation.
+
+# High Level Design
+
+The proposed solution extends the existing administrative `skipMessages` API 
to trigger Pulsar's individual acknowledgement capability on demand.
+
+1.  **Initiate Skip-by-ID:** An administrator initiates the action via the new 
`pulsar-admin topics skip-messages` command or its corresponding REST API call. 
The request must specify the topic, the target subscription, and a map of 
`ledgerId` to `entryId` for the messages to be skipped.
+
+2.  **Broker Receives Request:** The Pulsar broker that owns the topic 
partition receives the admin request. It validates the parameters and the 
administrator's permissions for the topic, re-using the existing 
`TopicOperation.SKIP` permission. The API call is an overload of the existing 
skip endpoint, where the number of messages to skip is specified as `0` in the 
URL path, and the message IDs are passed in the POST body.
+
+3.  **Delegate to Subscription:** The broker invokes a new 
`skipMessages(Map<String, String> messageIds)` method on the target 
`PersistentSubscription` object.
+
+4.  **Perform Individual Acknowledgement:** Inside the 
`PersistentSubscription`, the following occurs:
+    *   It verifies that the subscription's type is compatible with individual 
acknowledgement (i.e., not cumulative).
+    *   It constructs `Position` objects from the provided ledger and entry 
IDs.
+    *   It calls its internal `acknowledgeMessage()` method with 
`AckType.Individual` for the specified positions. This is functionally 
identical to a consumer individually acknowledging the messages.
+
+5.  **Persistence and Effect:** The `ManagedCursor` for the subscription 
records these individual acknowledgements, which are persisted.
+    *   For a **regular message** in the backlog, it is marked as consumed for 
that subscription and will not be delivered.
+    *   For a **delayed message**, it is marked as consumed before the 
`DelayedDeliveryTracker` attempts to schedule it. The message is thus 
effectively **cancelled**.
+
+This design is simple and robust as it builds upon the broker's proven message 
acknowledgement foundation while cleanly extending an existing administrative 
API.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+The core of the implementation involves adding a new method to the 
`Subscription` interface and implementing it in `PersistentSubscription` to 
leverage the existing individual acknowledgment mechanism.
+
+1.  **Subscription Interface Extension:**
+    The `Subscription` interface is extended with a new method to handle 
skipping by message IDs.
+
+```java
+// in 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+public interface Subscription extends MessageExpirer {
+    // ... existing methods
+    CompletableFuture<Void> skipMessages(int numMessagesToSkip);
+
+    CompletableFuture<Void> skipMessages(Map<String, String> messageIds);
+    // ... existing methods
+}
+```
+
+2.  **PersistentSubscription Implementation:**
+    The `PersistentSubscription` class provides the concrete implementation. 
It converts the message ID map into `Position` objects and uses the standard 
individual acknowledge flow.
+
+```java
+// in 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+@Override
+public CompletableFuture<Void> skipMessages(Map<String, String> messageIds) {
+    if (log.isDebugEnabled()) {
+        log.debug("[{}][{}] Skipping messages by messageIds, current backlog 
{}", topicName, subName,
+                cursor.getNumberOfEntriesInBacklog(false));
+    }
+
+    if (Subscription.isCumulativeAckMode(getType())) {
+        return CompletableFuture.failedFuture(new 
NotAllowedException("Unsupported subscription type."));
+    }
+
+    List<Position> positions = new ArrayList<>();
+    for (Map.Entry<String, String> entry : messageIds.entrySet()) {
+        try {
+            long ledgerId = Long.parseLong(entry.getKey());
+            long entryId = Long.parseLong(entry.getValue());
+            Position position = PositionFactory.create(ledgerId, entryId);
+            positions.add(position);
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(new 
NotAllowedException("Invalid message ID."));
+        }
+    }
+
+    Map<String, Long> properties = Collections.emptyMap();
+    acknowledgeMessage(positions, AckType.Individual, properties);
+
+    return CompletableFuture.completedFuture(null);
+}
+```
+
+3.  **Admin API Logic:**
+    The `PersistentTopicsBase` class is updated to handle the overloaded 
`skipMessages` request. When `numMessages` is 0 and the `messageIds` map is not 
empty, it routes the request to the new `subscription.skipMessages(Map)` method.
+
+```java
+// in 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+protected void internalSkipMessages(AsyncResponse asyncResponse, String 
subName, int numMessages,
+                                    boolean authoritative, Map<String, String> 
messageIds) {
+    // ...
+    // In the implementation logic for the topic:
+    // ...
+                 if (!messageIds.isEmpty() && numMessages == 0) {
+                     return sub.skipMessages(messageIds).thenAccept(unused -> {
+                                 log.info("[{}] Skipped messages on {} {}", 
clientAppId(), topicName, subName);
+                                 
asyncResponse.resume(Response.noContent().build());
+                             }
+                     );
+                 }
+                  return sub.skipMessages(numMessages).thenAccept(unused -> {
+                      log.info("[{}] Skipped {} messages on {} {}", 
clientAppId(), numMessages,
+                              topicName, subName);
+                      // ...
+                  });
+    // ...
+}
+```
+
+## Public-facing Changes
+The existing `skipMessages` API is modified to accept a POST body containing 
message IDs.
+
+### Public API
+The REST endpoint for skipping messages is updated. To skip by message ID, a 
client must send a `POST` request with `numMessages` set to `0` in the path and 
provide a map of message IDs in the JSON body.
+
+*   **Path (v2):** `POST 
/admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/skip/{numMessages}`
+*   **Path (v1):** `POST 
/admin/v1/persistent/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/skip/{numMessages}`

Review Comment:
   Actually we can add a new path e.g. skipByMessageIds to avoid the confusing 
0 for numMessages is required here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to