lhotari commented on code in PR #24370:
URL: https://github.com/apache/pulsar/pull/24370#discussion_r2425455389
##########
pip/pip-423.md:
##########
@@ -0,0 +1,254 @@
+# PIP-423: Add a new admin API to acknowledge a single message
+
+# Background knowledge
+
+* **Message Identification (MessageId):** In Pulsar, every message is
uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an
`entryId`, and optionally a partition index. The combination of `ledgerId` and
`entryId` (often represented as a `Position`) uniquely identifies a message's
physical storage location within a topic. A producer receives a `MessageId` for
each message it successfully sends.
+* **Subscriptions and Cursors:** Each subscription on a topic maintains its
own "cursor" which tracks consumption progress. For subscription types like
`Shared` or `Key_Shared`, the cursor can track individually acknowledged
messages, even if they are out of order relative to the main consumption
progress marker (`mark-delete position`). The cursor is responsible for
ensuring that acknowledged messages are not redelivered.
+* **Individual Acknowledgement:** Pulsar subscriptions support different
acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on
**individual acknowledgement**. This allows a consumer to acknowledge a single
message. When a message is acknowledged individually ahead of the `mark-delete
position`, the broker's `ManagedCursor` persistently tracks this to ensure that
acknowledged messages are not redelivered after a broker or consumer restart.
This proposal leverages this existing, robust mechanism.
+* **Delayed Messages:** Pulsar supports scheduling messages for future
delivery. A primary use case for this proposal is to allow a scheduled message
to be effectively "cancelled" by acknowledging it before its delivery time.
Since the message is marked as consumed by the cursor, the
`DelayedDeliveryTracker` will not dispatch it.
+* **Existing `skip` API:** Pulsar has an admin API to `skip` a specified
*number* of messages for a subscription. This is useful for bulk-skipping but
lacks the precision to target a single, specific message within a large
backlog, especially if its exact sequence is unknown. This proposal provides a
more precise way to skip messages by their specific `MessageId`.
+
+# Motivation
+
+Operators and SREs occasionally need to intervene in a topic's backlog to
handle problematic messages or adapt to changing business requirements. For
instance:
+
+* **Cancelling Scheduled Actions:** A delayed message representing a future
task (e.g., a scheduled report or a notification) may become obsolete. The most
efficient way to handle this is to prevent its delivery entirely by
acknowledging it pre-emptively.
+* **Removing Backlogs:** A specific message in a backlog might have a
malformed payload that causes consumer applications to crash repeatedly.
Removing this single "poison pill" message without affecting valid messages
around it is a critical operational capability.
+* **Manual Business Logic Correction:** An event may have been sent that is
later determined to be invalid due to external factors. An administrator needs
a precise tool to remove this specific event from a subscription's delivery
queue.
+
+The existing `skip(numMessages)` API is a blunt instrument, ill-suited for
these precise, targeted operations. This proposal introduces an administrative
API to skip messages by their specific `MessageId`, providing a robust and
reliable way to remove any individual message—delayed or not—from a
subscription's backlog.
+
+# Goals
+
+## In Scope
+
+* Introduce a new Admin API endpoint and a corresponding `pulsar-admin` CLI
command to support skipping specific messages for a subscription.
+* The target message(s) will be identified by their `ledgerId` and `entryId`.
+* The implementation will leverage Pulsar's existing, robust
`AckType.Individual` mechanism for persistence and reliability.
+* This feature will only be supported for subscription types that allow
individual acknowledgements (e.g., `Shared`, `Key_Shared`).
+* Ensure that once a message is successfully skipped via this API, it will
not be delivered to any consumer on the targeted subscription.
+* Support for both partitioned and non-partitioned topics.
+
+## Out of Scope
+
+* Modifying the existing `skip/{numMessages}` endpoint. A new, dedicated
endpoint will be created for clarity.
+* Automatic skipping of messages across geo-replicated clusters. The command
is a per-cluster administrative operation that must be run on each cluster
where the skip is needed.
+
+# High Level Design
+
+The proposed solution introduces a new admin API that triggers Pulsar's
individual acknowledgement capability on demand for specific messages.
+
+1. **Initiate Skip-by-ID:** An administrator initiates the action via the new
`pulsar-admin topics skip-messages` command or its corresponding REST API call.
The request must specify the topic, the target subscription, and a list of
message IDs (`ledgerId:entryId`) to be skipped.
+
+2. **Broker Receives Request:** The Pulsar broker receives the admin request
for the new endpoint `.../subscription/{subName}/skipByMessageIds`. It
validates the administrator's permissions for the topic, re-using the existing
`TopicOperation.SKIP` authorization rule.
+
+3. **Delegate to Subscription:** The broker, after validating topic ownership
and permissions, invokes a new method `skipMessages(Map<String, String>
messageIds)` on the target `PersistentSubscription` object. For partitioned
topics, the request is scattered to all partitions, and each partition broker
performs this action.
+
+4. **Perform Individual Acknowledgement:** Inside the
`PersistentSubscription`, the following occurs:
+ * It verifies that the subscription's type supports individual
acknowledgements.
+ * It constructs `Position` objects from the provided ledger and entry
IDs.
+ * It calls its internal `acknowledgeMessage()` method with
`AckType.Individual` for the specified positions. This is functionally
identical to what would happen if a consumer individually acknowledged the
message.
+
+5. **Persistence and Effect:** The `ManagedCursor` for the subscription
records these individual acknowledgements, which are persisted to metadata
storage.
+ * For a **regular message** in the backlog, it is marked as consumed for
that subscription and will not be delivered.
+ * For a **delayed message**, it is marked as consumed before the
`DelayedDeliveryTracker` attempts to schedule it. The message is thus
effectively **cancelled**.
+
+This design is simple and robust as it builds upon the broker's proven message
acknowledgement foundation while providing a clean, dedicated administrative
API for this precise operational task.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+The core of the implementation involves adding a new method to the
`Subscription` interface and implementing it in `PersistentSubscription` to
leverage the existing individual acknowledgment mechanism.
+
+1. **Subscription Interface Extension:**
+ The `Subscription` interface is extended with a new method to handle
skipping by message IDs.
+
+```java
+// in
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+public interface Subscription extends MessageExpirer {
+ // ... existing methods
+ CompletableFuture<Void> skipMessages(int numMessagesToSkip);
+
+ CompletableFuture<Void> skipMessages(Map<String, String> messageIds);
+ // ... existing methods
+}
+```
+
+2. **PersistentSubscription Implementation:**
+
+The `PersistentSubscription` class provides the concrete implementation. It
converts the message ID map into `Position` objects and uses the standard
individual acknowledge flow.
+
+```java
+// in
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+@Override
+public CompletableFuture<Void> skipMessages(Map<String, String> messageIds) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] Skipping messages by messageIds, current backlog
{}", topicName, subName,
+ cursor.getNumberOfEntriesInBacklog(false));
+ }
+
+ if (Subscription.isCumulativeAckMode(getType())) {
+ return CompletableFuture.failedFuture(new
NotAllowedException("Unsupported subscription type."));
+ }
+
+ List<Position> positions = new ArrayList<>();
+ for (Map.Entry<String, String> entry : messageIds.entrySet()) {
+ try {
+ long ledgerId = Long.parseLong(entry.getKey());
+ long entryId = Long.parseLong(entry.getValue());
+ Position position = PositionFactory.create(ledgerId, entryId);
+ positions.add(position);
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(new
NotAllowedException("Invalid message ID."));
+ }
+ }
+
+ Map<String, Long> properties = Collections.emptyMap();
+ acknowledgeMessage(positions, AckType.Individual, properties);
+
+ return CompletableFuture.completedFuture(null);
+}
+```
+
+3. **Admin API Logic:**
+
+The `PersistentTopicsBase` class is updated with a new
`internalSkipByMessageIds` method which handles the request and calls the
`subscription.skipMessages(Map)` method. It also includes logic to handle
partitioned topics by fanning out the request to each partition owner.
+```java
+// in
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+protected void internalSkipByMessageIds(AsyncResponse asyncResponse, String
subName, boolean authoritative,
+ Map<String, String> messageIds) {
+ // ... validate operation and ownership ...
+
+ // Logic for a single partition / non-partitioned topic
+ // ...
+ Subscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topic.getName(), subName)));
+ }
+ return sub.skipMessages(messageIds);
+ // ...
+}
+```
+
+## Public-facing Changes
+The existing `skipMessages` API is modified to accept a POST body containing
message IDs.
+
+### Public API
+
+A new REST endpoint is added for skipping specific messages.
+
+* **Path (v2):** `POST
/admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/skipByMessageIds`
+* **Path (v1):** `POST
/admin/v1/persistent/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/skipByMessageIds`
+* **Path Parameters:**
+ * All path parameters identify the target subscription.
+* **HTTP Body Parameters (JSON):** A map where keys are `ledgerId` as a
string and values are `entryId` as a string.
+ * **Example Body:** `{"12345": "100", "12346": "200"}`
Review Comment:
The serialization format should be changed. The problem with this format is
that it cannot be extended to cover batch messages.
besides ledgerId and entryId, there's also batchIndex, which is part of the
message id.
One of the design principles has been to hide the implementation details of
the message id in Pulsar and not leak it. That's why there are
org.apache.pulsar.client.api.MessageId#toByteArray and
org.apache.pulsar.client.api.MessageId#fromByteArray .
It would be useful to support multiple formats for the message id. For
consistency with the MessageId.toByteArray() API, there should be support for
accepting a list of base64 encoded byte array representations of the MessageId.
One way to solve this is to have the body map to an object that is mapped
with Jackson using polymorphism configured with `@JsonTypeInfo` and
`@JsonSubTypes` annotations. That would allow flexibility for adding various
formats for the message id list.
the default format could be the byteArrays encoded as base64:
```json
{"type": "byteArray", "messageIds": ["CLlgEAQwAA==", "CLlgEAYwAA==",
"CLlgEJ4BMAA=", "CLlgEKQCMAA=", "CLlgEPQCMAA=", "CLlgECcwAA==", "CLlgEIQDMAA=",
"CLlgEKEDMAA=", "CLlgEAQwAA==", "CLlgEI4EMAA="]}
```
since this could be made the default, the `"type": "byteArray"` could be
omitted:
```json
{"messageIds": ["CLlgEAQwAA==", "CLlgEAYwAA==", "CLlgEJ4BMAA=",
"CLlgEKQCMAA=", "CLlgEPQCMAA=", "CLlgECcwAA==", "CLlgEIQDMAA=", "CLlgEKEDMAA=",
"CLlgEAQwAA==", "CLlgEI4EMAA="]}
```
another could be the ledgerId:entryId pairs (without support for batchIndex)
if this is absolutely necessary:
```json
{"type": "map_of_ledgerId_entryId", "messageIds": {12345: 100, 12346: 200}}
```
Instead of that, it could be useful to have a format that maps directly:
```json
{"type": "messageId", "messageIds": [{"ledgerId": 12345, "entryId": 100},
{"ledgerId": 12346, "entryId": 200}]}
```
this format could also support the batch index.
```json
{"type": "messageId", "messageIds": [{"ledgerId": 12345, "entryId": 100,
"batchIndex": 5}, {"ledgerId": 12346, "entryId": 200, "batchIndex": 10}]}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]