codelipenghui commented on code in PR #24370: URL: https://github.com/apache/pulsar/pull/24370#discussion_r2240231061
########## pip/pip-423.md: ########## @@ -0,0 +1,255 @@ +# PIP-423: Add a new admin API to acknowledge a single message + +# Background knowledge + +* **Message Identification (MessageId):** In Pulsar, every message is uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an `entryId`, and optionally a partition index. The combination of `ledgerId` and `entryId` (often represented as a `Position`) uniquely identifies a message's physical storage location within a topic. A producer receives a `MessageId` for each message it successfully sends. +* **Subscriptions and Cursors:** Each subscription on a topic maintains its own "cursor" which tracks consumption progress. For subscription types like `Shared` or `Key_Shared`, the cursor can track individually acknowledged messages, even if they are out of order relative to the main consumption progress marker (`mark-delete position`). +* **Individual Acknowledgement:** Pulsar subscriptions support different acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on **individual acknowledgement**. This allows a consumer to acknowledge a single message. When a message is acknowledged individually ahead of the `mark-delete position`, the broker's `ManagedCursor` persistently tracks this to ensure that acknowledged messages are not redelivered after a broker or consumer restart. +* **Delayed Messages:** Pulsar supports scheduling messages for future delivery. A primary use case for this proposal is to allow a scheduled message to be effectively "cancelled" by acknowledging it before its delivery time. +* **Existing `skipMessages` API:** Pulsar has an admin API to `skip` a specified *number* of messages for a subscription. This is useful for bulk-skipping but lacks the precision to target a single, specific message within a large backlog, especially if its exact sequence is unknown. This proposal extends this existing API to add the ability to skip messages by their specific `MessageId`. + +# Motivation + +Operators and SREs occasionally need to intervene in a topic's backlog to handle problematic messages or adapt to changing business requirements. For instance: + +* **Cancelling Scheduled Actions:** A delayed message representing a future task (e.g., a scheduled report or a notification) may become obsolete. The most efficient way to handle this is to prevent its delivery entirely. +* **Removing Backlogs:** A specific message in a backlog might have a malformed payload that causes consumer applications to crash repeatedly. Removing this single message without affecting valid messages around it is a critical operational capability. +* **Manual Business Logic Correction:** An event may have been sent that is later determined to be invalid due to external factors. An administrator needs a precise tool to remove this specific event from a subscription's queue. + +The existing `skipMessages(numMessages)` API is a blunt instrument, ill-suited for these precise, targeted operations. This proposal enhances the `skipMessages` API to accept specific message IDs, providing a robust and reliable way to remove any individual message—delayed or not—from a subscription's backlog. + +# Goals + +## In Scope + +* Enhance the existing Admin API endpoint and `pulsar-admin` CLI to support skipping specific messages for a subscription. +* Introduce a new CLI command `pulsar-admin topics skip-messages` for this purpose. +* The target message(s) will be identified by their `ledgerId` and `entryId`. +* The implementation will leverage Pulsar's existing, robust `AckType.Individual` mechanism for persistence and reliability. +* This feature will only be supported for subscription types that allow individual acknowledgements (e.g., `Shared`, `Key_Shared`). +* Ensure that once a message is successfully skipped via this API, it will not be delivered to any consumer on the targeted subscription. + +## Out of Scope +* Adding a new, separate Admin API endpoint. This feature enhances the existing `skip` endpoint. +* Automatic skipping of messages across geo-replicated clusters. The command is a per-cluster administrative operation. + +# High Level Design + +The proposed solution extends the existing administrative `skipMessages` API to trigger Pulsar's individual acknowledgement capability on demand. + +1. **Initiate Skip-by-ID:** An administrator initiates the action via the new `pulsar-admin topics skip-messages` command or its corresponding REST API call. The request must specify the topic, the target subscription, and a map of `ledgerId` to `entryId` for the messages to be skipped. + +2. **Broker Receives Request:** The Pulsar broker that owns the topic partition receives the admin request. It validates the parameters and the administrator's permissions for the topic, re-using the existing `TopicOperation.SKIP` permission. The API call is an overload of the existing skip endpoint, where the number of messages to skip is specified as `0` in the URL path, and the message IDs are passed in the POST body. + +3. **Delegate to Subscription:** The broker invokes a new `skipMessages(Map<String, String> messageIds)` method on the target `PersistentSubscription` object. + +4. **Perform Individual Acknowledgement:** Inside the `PersistentSubscription`, the following occurs: + * It verifies that the subscription's type is compatible with individual acknowledgement (i.e., not cumulative). + * It constructs `Position` objects from the provided ledger and entry IDs. + * It calls its internal `acknowledgeMessage()` method with `AckType.Individual` for the specified positions. This is functionally identical to a consumer individually acknowledging the messages. + +5. **Persistence and Effect:** The `ManagedCursor` for the subscription records these individual acknowledgements, which are persisted. + * For a **regular message** in the backlog, it is marked as consumed for that subscription and will not be delivered. + * For a **delayed message**, it is marked as consumed before the `DelayedDeliveryTracker` attempts to schedule it. The message is thus effectively **cancelled**. + +This design is simple and robust as it builds upon the broker's proven message acknowledgement foundation while cleanly extending an existing administrative API. + +# Detailed Design + +## Design & Implementation Details + +The core of the implementation involves adding a new method to the `Subscription` interface and implementing it in `PersistentSubscription` to leverage the existing individual acknowledgment mechanism. + +1. **Subscription Interface Extension:** + The `Subscription` interface is extended with a new method to handle skipping by message IDs. + +```java +// in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +public interface Subscription extends MessageExpirer { + // ... existing methods + CompletableFuture<Void> skipMessages(int numMessagesToSkip); + + CompletableFuture<Void> skipMessages(Map<String, String> messageIds); + // ... existing methods +} +``` + +2. **PersistentSubscription Implementation:** + The `PersistentSubscription` class provides the concrete implementation. It converts the message ID map into `Position` objects and uses the standard individual acknowledge flow. + +```java +// in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +@Override +public CompletableFuture<Void> skipMessages(Map<String, String> messageIds) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Skipping messages by messageIds, current backlog {}", topicName, subName, + cursor.getNumberOfEntriesInBacklog(false)); + } + + if (Subscription.isCumulativeAckMode(getType())) { + return CompletableFuture.failedFuture(new NotAllowedException("Unsupported subscription type.")); + } + + List<Position> positions = new ArrayList<>(); + for (Map.Entry<String, String> entry : messageIds.entrySet()) { + try { + long ledgerId = Long.parseLong(entry.getKey()); + long entryId = Long.parseLong(entry.getValue()); + Position position = PositionFactory.create(ledgerId, entryId); + positions.add(position); + } catch (Exception e) { + return CompletableFuture.failedFuture(new NotAllowedException("Invalid message ID.")); + } + } + + Map<String, Long> properties = Collections.emptyMap(); + acknowledgeMessage(positions, AckType.Individual, properties); + + return CompletableFuture.completedFuture(null); +} +``` + +3. **Admin API Logic:** + The `PersistentTopicsBase` class is updated to handle the overloaded `skipMessages` request. When `numMessages` is 0 and the `messageIds` map is not empty, it routes the request to the new `subscription.skipMessages(Map)` method. + +```java +// in pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +protected void internalSkipMessages(AsyncResponse asyncResponse, String subName, int numMessages, + boolean authoritative, Map<String, String> messageIds) { + // ... + // In the implementation logic for the topic: + // ... + if (!messageIds.isEmpty() && numMessages == 0) { + return sub.skipMessages(messageIds).thenAccept(unused -> { + log.info("[{}] Skipped messages on {} {}", clientAppId(), topicName, subName); + asyncResponse.resume(Response.noContent().build()); + } + ); + } + return sub.skipMessages(numMessages).thenAccept(unused -> { + log.info("[{}] Skipped {} messages on {} {}", clientAppId(), numMessages, + topicName, subName); + // ... + }); + // ... +} +``` + +## Public-facing Changes +The existing `skipMessages` API is modified to accept a POST body containing message IDs. + +### Public API +The REST endpoint for skipping messages is updated. To skip by message ID, a client must send a `POST` request with `numMessages` set to `0` in the path and provide a map of message IDs in the JSON body. + +* **Path (v2):** `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/skip/{numMessages}` +* **Path (v1):** `POST /admin/v1/persistent/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/skip/{numMessages}` Review Comment: Actually we can add a new path e.g. skipByMessageIds to avoid the confusing 0 for numMessages is required here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org