Denovo1998 opened a new issue, #24600:
URL: https://github.com/apache/pulsar/issues/24600

   ### Search before reporting
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Motivation
   
   ## Future
   
   **Currently, I am deeply researching the delayed message module of Pulsar 
and want to discuss some things with everyone.**
   
   ### Flexible control of delayed messages
   
   Before this, I researched both the community and offline sources. I found 
that the delayed message feature is very important for the business, and the 
business requirements demand flexible control over delayed messages. We know 
that for a message system, once a message is sent, it is immutable, but for 
delayed messages, the delay itself is actually variable.
   
   Currently, Pulsar's delay message module does not have this variability, and 
to my knowledge, other messaging systems also do not have similar 
functionality. However, it is actually achievable.
   
   ---
   
   1. Initially, I tried the feature for canceling delayed messages.
   
   
https://github.com/apache/pulsar/blob/f66f1b5439f0a9809e84a1720855468ccfedc24c/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
   
   Currently, we can indirectly implement the cancellation of delayed messages 
by acknowledging the messages.
   See PR:https://github.com/apache/pulsar/pull/24370
   
   2. Then @thetumbled proposed that the business needs to achieve delayed 
messages on the consumer side, but there will be issues such as write 
amplification.
   
   I also tried to implement it, see the PR for 
details:https://github.com/apache/pulsar/pull/24372
   
   These two PRs are actually implemented in the same way. We only need 
ledgerId, entryId, and delayed timestamp to operate on delayed messages, but 
they all have a common problem.
   
   > If a delayed message in the LastMutableBucket has not been flushed to the 
Bookie, and a failure occurs, the data in the LastMutableBucket will be lost. 
However, this will not have any impact, as after restarting, the messages will 
still be read from the MackDelete position onwards, and the Bucket will be 
rebuilt. This is why the data in the Bucket can be deleted as long as it is 
read (without requiring client Ack).
   > 
   > If we send a command to add or cancel a delayed message from the consumer 
side, and it fails to be persisted (sealBucketAndAsyncPersistent) in the 
LastMutableBucket, and the Broker crashes, the command will be lost. We cannot 
wait until the Seal Bucket condition is triggered to return the delayed message 
command cancellation success, because we do not know how long it will take.
   
   ---
   
   **So we need to solve the problem that commands are lost.**
   
   - We need an operation command type for delayed messages
   
   CANCEL(Delayed message cancellation)
   DELAY(Delay message on the consumer side)
   LOOP(Loop/Timed delay)
   LOOP_EXPONENT(Loop exponent delay)
   
   - We need to store these commands, the ledgerid, entryid, and timestamp of 
the messages we are going to operate.
   
   - Most importantly, we don't need to store all the commands in memory. 
Referring to delayed messages, we can allow commands to be loaded into memory 
with delay, such as one to two ticktime windows earlier than the original 
message to be operated on. Thus, when a message acquisition trigger occurs and 
it is found that this message needs to be canceled or cyclically delayed, the 
corresponding processing can be done.
   
   **Consider using a dedicated topic to store these commands, these commands 
will be delayed in sending but will not trigger the current future. This 
mechanism is similar to a configurable dead-letter queue.**
   
   **_This should be the last enhancement considered._**
   
   ---
   
   ### One topic corresponds to one BucketDelayedDeliveryTracker instance.
   
   We know that each BucketDelayedDeliveryTracker corresponds to a 
subscription. If the topic has a large number of subscriptions, each tracker 
will load the same information (including ledger ID, entry ID, and timestamp), 
which will consume a lot of memory.
   
   If we implement the aforementioned future functionality, the number of 
topics storing delayed message operation commands will become very large.
   
   So we need to transform BucketDelayedDeliveryTracker into a topic-level one. 
In the tracker, we track and manage the "delay position" corresponding to each 
subscription.
   
   **_This should be the second improvement to prioritize._**
   
   ---
   
   ## Bug
   
   Currently, `BucketDelayedDeliveryTracker` has thread safety issues that need 
to be resolved.
   
   https://github.com/apache/pulsar/issues/23190
   https://github.com/apache/pulsar/pull/24542
   https://github.com/apache/pulsar/issues/23190
   
   And it is important that finer-grained concurrency control may be needed in 
`BucketDelayedDeliveryTracker`, similar to using `StampedLock` or 
`ReentrantReadWriteLock`.
   
   For example, I found that the current getScheduledMessages should be a read 
operation, but it is actually a composite operation (read + write + consume), 
which leads to inconsistent concurrency control strategies and performance 
issues. This requires redesigning the architecture.
   
   The currently used coarse-grained synchronized may lead to:
   
   - Read operations being blocked: containsMessage(), nextDeliveryTime() 
cannot be executed concurrently
   - Write operations being blocked: addMessage() is blocked by consumption 
operations
   
   ```java
   public synchronized NavigableSet<Position> getScheduledMessages(int 
maxMessages) {
       // It looks like a read operation, but actually doing:
   
       // 1. Modify queue status
       sharedBucketPriorityQueue.pop();  // Remove elements from the queue
   
       // 2. Modify index bitmap
       removeIndexBit(ledgerId, entryId);  // Clear bitmap
   
       // 3. Modify counter
       numberDelayedMessages.decrementAndGet();  // Reduce message count
   
       // 4. Modify bucket status
       snapshotSegmentLastIndexMap.remove(snapshotKey);  // Remove snapshot 
mapping
   
       // 5. Trigger asynchronous loading and state change
       bucket.asyncLoadNextBucketSnapshotEntry()...
   }
   ```
   
   **_This should be the highest priority._**
   
   ### Solution
   
   _No response_
   
   ### Alternatives
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to