Denovo1998 commented on code in PR #24370:
URL: https://github.com/apache/pulsar/pull/24370#discussion_r2249214904
##########
pip/pip-423.md:
##########
@@ -0,0 +1,255 @@
+# PIP-423: Add a new admin API to acknowledge a single message
+
+# Background knowledge
+
+* **Message Identification (MessageId):** In Pulsar, every message is
uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an
`entryId`, and optionally a partition index. The combination of `ledgerId` and
`entryId` (often represented as a `Position`) uniquely identifies a message's
physical storage location within a topic. A producer receives a `MessageId` for
each message it successfully sends.
+* **Subscriptions and Cursors:** Each subscription on a topic maintains its
own "cursor" which tracks consumption progress. For subscription types like
`Shared` or `Key_Shared`, the cursor can track individually acknowledged
messages, even if they are out of order relative to the main consumption
progress marker (`mark-delete position`).
+* **Individual Acknowledgement:** Pulsar subscriptions support different
acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on
**individual acknowledgement**. This allows a consumer to acknowledge a single
message. When a message is acknowledged individually ahead of the `mark-delete
position`, the broker's `ManagedCursor` persistently tracks this to ensure that
acknowledged messages are not redelivered after a broker or consumer restart.
+* **Delayed Messages:** Pulsar supports scheduling messages for future
delivery. A primary use case for this proposal is to allow a scheduled message
to be effectively "cancelled" by acknowledging it before its delivery time.
+* **Existing `skipMessages` API:** Pulsar has an admin API to `skip` a
specified *number* of messages for a subscription. This is useful for
bulk-skipping but lacks the precision to target a single, specific message
within a large backlog, especially if its exact sequence is unknown. This
proposal extends this existing API to add the ability to skip messages by their
specific `MessageId`.
+
+# Motivation
+
+Operators and SREs occasionally need to intervene in a topic's backlog to
handle problematic messages or adapt to changing business requirements. For
instance:
+
+* **Cancelling Scheduled Actions:** A delayed message representing a future
task (e.g., a scheduled report or a notification) may become obsolete. The most
efficient way to handle this is to prevent its delivery entirely.
+* **Removing Backlogs:** A specific message in a backlog might have a
malformed payload that causes consumer applications to crash repeatedly.
Removing this single message without affecting valid messages around it is a
critical operational capability.
+* **Manual Business Logic Correction:** An event may have been sent that is
later determined to be invalid due to external factors. An administrator needs
a precise tool to remove this specific event from a subscription's queue.
+
+The existing `skipMessages(numMessages)` API is a blunt instrument, ill-suited
for these precise, targeted operations. This proposal enhances the
`skipMessages` API to accept specific message IDs, providing a robust and
reliable way to remove any individual message—delayed or not—from a
subscription's backlog.
+
+# Goals
+
+## In Scope
+
+* Enhance the existing Admin API endpoint and `pulsar-admin` CLI to support
skipping specific messages for a subscription.
+* Introduce a new CLI command `pulsar-admin topics skip-messages` for this
purpose.
+* The target message(s) will be identified by their `ledgerId` and `entryId`.
+* The implementation will leverage Pulsar's existing, robust
`AckType.Individual` mechanism for persistence and reliability.
+* This feature will only be supported for subscription types that allow
individual acknowledgements (e.g., `Shared`, `Key_Shared`).
+* Ensure that once a message is successfully skipped via this API, it will
not be delivered to any consumer on the targeted subscription.
+
+## Out of Scope
+* Adding a new, separate Admin API endpoint. This feature enhances the
existing `skip` endpoint.
+* Automatic skipping of messages across geo-replicated clusters. The command
is a per-cluster administrative operation.
+
+# High Level Design
+
+The proposed solution extends the existing administrative `skipMessages` API
to trigger Pulsar's individual acknowledgement capability on demand.
+
+1. **Initiate Skip-by-ID:** An administrator initiates the action via the new
`pulsar-admin topics skip-messages` command or its corresponding REST API call.
The request must specify the topic, the target subscription, and a map of
`ledgerId` to `entryId` for the messages to be skipped.
+
+2. **Broker Receives Request:** The Pulsar broker that owns the topic
partition receives the admin request. It validates the parameters and the
administrator's permissions for the topic, re-using the existing
`TopicOperation.SKIP` permission. The API call is an overload of the existing
skip endpoint, where the number of messages to skip is specified as `0` in the
URL path, and the message IDs are passed in the POST body.
+
+3. **Delegate to Subscription:** The broker invokes a new
`skipMessages(Map<String, String> messageIds)` method on the target
`PersistentSubscription` object.
+
+4. **Perform Individual Acknowledgement:** Inside the
`PersistentSubscription`, the following occurs:
+ * It verifies that the subscription's type is compatible with individual
acknowledgement (i.e., not cumulative).
+ * It constructs `Position` objects from the provided ledger and entry
IDs.
+ * It calls its internal `acknowledgeMessage()` method with
`AckType.Individual` for the specified positions. This is functionally
identical to a consumer individually acknowledging the messages.
+
+5. **Persistence and Effect:** The `ManagedCursor` for the subscription
records these individual acknowledgements, which are persisted.
+ * For a **regular message** in the backlog, it is marked as consumed for
that subscription and will not be delivered.
+ * For a **delayed message**, it is marked as consumed before the
`DelayedDeliveryTracker` attempts to schedule it. The message is thus
effectively **cancelled**.
+
+This design is simple and robust as it builds upon the broker's proven message
acknowledgement foundation while cleanly extending an existing administrative
API.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+The core of the implementation involves adding a new method to the
`Subscription` interface and implementing it in `PersistentSubscription` to
leverage the existing individual acknowledgment mechanism.
+
+1. **Subscription Interface Extension:**
+ The `Subscription` interface is extended with a new method to handle
skipping by message IDs.
+
+```java
+// in
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+public interface Subscription extends MessageExpirer {
+ // ... existing methods
+ CompletableFuture<Void> skipMessages(int numMessagesToSkip);
+
+ CompletableFuture<Void> skipMessages(Map<String, String> messageIds);
+ // ... existing methods
+}
+```
+
+2. **PersistentSubscription Implementation:**
+ The `PersistentSubscription` class provides the concrete implementation.
It converts the message ID map into `Position` objects and uses the standard
individual acknowledge flow.
+
+```java
+// in
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+@Override
+public CompletableFuture<Void> skipMessages(Map<String, String> messageIds) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] Skipping messages by messageIds, current backlog
{}", topicName, subName,
+ cursor.getNumberOfEntriesInBacklog(false));
+ }
+
+ if (Subscription.isCumulativeAckMode(getType())) {
+ return CompletableFuture.failedFuture(new
NotAllowedException("Unsupported subscription type."));
+ }
+
+ List<Position> positions = new ArrayList<>();
+ for (Map.Entry<String, String> entry : messageIds.entrySet()) {
+ try {
+ long ledgerId = Long.parseLong(entry.getKey());
+ long entryId = Long.parseLong(entry.getValue());
+ Position position = PositionFactory.create(ledgerId, entryId);
+ positions.add(position);
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(new
NotAllowedException("Invalid message ID."));
+ }
+ }
+
+ Map<String, Long> properties = Collections.emptyMap();
+ acknowledgeMessage(positions, AckType.Individual, properties);
+
+ return CompletableFuture.completedFuture(null);
+}
+```
+
+3. **Admin API Logic:**
+ The `PersistentTopicsBase` class is updated to handle the overloaded
`skipMessages` request. When `numMessages` is 0 and the `messageIds` map is not
empty, it routes the request to the new `subscription.skipMessages(Map)` method.
+
+```java
+// in
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+protected void internalSkipMessages(AsyncResponse asyncResponse, String
subName, int numMessages,
+ boolean authoritative, Map<String, String>
messageIds) {
+ // ...
+ // In the implementation logic for the topic:
+ // ...
+ if (!messageIds.isEmpty() && numMessages == 0) {
+ return sub.skipMessages(messageIds).thenAccept(unused -> {
+ log.info("[{}] Skipped messages on {} {}",
clientAppId(), topicName, subName);
+
asyncResponse.resume(Response.noContent().build());
+ }
+ );
+ }
+ return sub.skipMessages(numMessages).thenAccept(unused -> {
+ log.info("[{}] Skipped {} messages on {} {}",
clientAppId(), numMessages,
+ topicName, subName);
+ // ...
+ });
+ // ...
+}
+```
+
+## Public-facing Changes
+The existing `skipMessages` API is modified to accept a POST body containing
message IDs.
+
+### Public API
+The REST endpoint for skipping messages is updated. To skip by message ID, a
client must send a `POST` request with `numMessages` set to `0` in the path and
provide a map of message IDs in the JSON body.
+
+* **Path (v2):** `POST
/admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/skip/{numMessages}`
+* **Path (v1):** `POST
/admin/v1/persistent/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/skip/{numMessages}`
Review Comment:
@codelipenghui Already modified, are there any other areas that need to be
adjusted?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]