Denovo1998 opened a new issue, #24600: URL: https://github.com/apache/pulsar/issues/24600
### Search before reporting - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Motivation ## Future **Currently, I am deeply researching the delayed message module of Pulsar and want to discuss some things with everyone.** ### Flexible control of delayed messages Before this, I researched both the community and offline sources. I found that the delayed message feature is very important for the business, and the business requirements demand flexible control over delayed messages. We know that for a message system, once a message is sent, it is immutable, but for delayed messages, the delay itself is actually variable. Currently, Pulsar's delay message module does not have this variability, and to my knowledge, other messaging systems also do not have similar functionality. However, it is actually achievable. --- 1. Initially, I tried the feature for canceling delayed messages. https://github.com/apache/pulsar/blob/f66f1b5439f0a9809e84a1720855468ccfedc24c/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java Currently, we can indirectly implement the cancellation of delayed messages by acknowledging the messages. See PR:https://github.com/apache/pulsar/pull/24370 2. Then @thetumbled proposed that the business needs to achieve delayed messages on the consumer side, but there will be issues such as write amplification. I also tried to implement it, see the PR for details:https://github.com/apache/pulsar/pull/24372 These two PRs are actually implemented in the same way. We only need ledgerId, entryId, and delayed timestamp to operate on delayed messages, but they all have a common problem. > If a delayed message in the LastMutableBucket has not been flushed to the Bookie, and a failure occurs, the data in the LastMutableBucket will be lost. However, this will not have any impact, as after restarting, the messages will still be read from the MackDelete position onwards, and the Bucket will be rebuilt. This is why the data in the Bucket can be deleted as long as it is read (without requiring client Ack). > > If we send a command to add or cancel a delayed message from the consumer side, and it fails to be persisted (sealBucketAndAsyncPersistent) in the LastMutableBucket, and the Broker crashes, the command will be lost. We cannot wait until the Seal Bucket condition is triggered to return the delayed message command cancellation success, because we do not know how long it will take. --- **So we need to solve the problem that commands are lost.** - We need an operation command type for delayed messages CANCEL(Delayed message cancellation) DELAY(Delay message on the consumer side) LOOP(Loop/Timed delay) LOOP_EXPONENT(Loop exponent delay) - We need to store these commands, the ledgerid, entryid, and timestamp of the messages we are going to operate. - Most importantly, we don't need to store all the commands in memory. Referring to delayed messages, we can allow commands to be loaded into memory with delay, such as one to two ticktime windows earlier than the original message to be operated on. Thus, when a message acquisition trigger occurs and it is found that this message needs to be canceled or cyclically delayed, the corresponding processing can be done. **Consider using a dedicated topic to store these commands, these commands will be delayed in sending but will not trigger the current future. This mechanism is similar to a configurable dead-letter queue.** **_This should be the last enhancement considered._** --- ### One topic corresponds to one BucketDelayedDeliveryTracker instance. We know that each BucketDelayedDeliveryTracker corresponds to a subscription. If the topic has a large number of subscriptions, each tracker will load the same information (including ledger ID, entry ID, and timestamp), which will consume a lot of memory. If we implement the aforementioned future functionality, the number of topics storing delayed message operation commands will become very large. So we need to transform BucketDelayedDeliveryTracker into a topic-level one. In the tracker, we track and manage the "delay position" corresponding to each subscription. **_This should be the second improvement to prioritize._** --- ## Bug Currently, `BucketDelayedDeliveryTracker` has thread safety issues that need to be resolved. https://github.com/apache/pulsar/issues/23190 https://github.com/apache/pulsar/pull/24542 https://github.com/apache/pulsar/issues/23190 And it is important that finer-grained concurrency control may be needed in `BucketDelayedDeliveryTracker`, similar to using `StampedLock` or `ReentrantReadWriteLock`. For example, I found that the current getScheduledMessages should be a read operation, but it is actually a composite operation (read + write + consume), which leads to inconsistent concurrency control strategies and performance issues. This requires redesigning the architecture. The currently used coarse-grained synchronized may lead to: - Read operations being blocked: containsMessage(), nextDeliveryTime() cannot be executed concurrently - Write operations being blocked: addMessage() is blocked by consumption operations ```java public synchronized NavigableSet<Position> getScheduledMessages(int maxMessages) { // It looks like a read operation, but actually doing: // 1. Modify queue status sharedBucketPriorityQueue.pop(); // Remove elements from the queue // 2. Modify index bitmap removeIndexBit(ledgerId, entryId); // Clear bitmap // 3. Modify counter numberDelayedMessages.decrementAndGet(); // Reduce message count // 4. Modify bucket status snapshotSegmentLastIndexMap.remove(snapshotKey); // Remove snapshot mapping // 5. Trigger asynchronous loading and state change bucket.asyncLoadNextBucketSnapshotEntry()... } ``` **_This should be the highest priority._** ### Solution _No response_ ### Alternatives _No response_ ### Anything else? _No response_ ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
