This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5c6602cbb36 [improve][pip] PIP-359: Support custom message listener executor for specific subscription (#22902) 5c6602cbb36 is described below commit 5c6602cbb3660a696bf960f2847aac1a2ae037d2 Author: Aurora Twinkle <foreverlove...@gmail.com> AuthorDate: Tue Jul 16 10:47:55 2024 +0800 [improve][pip] PIP-359: Support custom message listener executor for specific subscription (#22902) Co-authored-by: duanlinlin <duanlinl...@xiaohongshu.com> --- pip/pip-359.md | 216 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 216 insertions(+) diff --git a/pip/pip-359.md b/pip/pip-359.md new file mode 100644 index 00000000000..52a76193d6c --- /dev/null +++ b/pip/pip-359.md @@ -0,0 +1,216 @@ +# PIP-359: Support custom message listener executor for specific subscription +Implementation PR: [#22861](https://github.com/apache/pulsar/pull/22861) + +# Background knowledge +In the current Pulsar client versions, from the user's perspective, when using a Pulsar Consumer, +we have two main options to consume messages: +1. Pull mode, by calling `consumer.recieve()`(or `consumer.recieveAsync()`) +```java +public class ConsumerExample { + public static void main(String[] args) throws PulsarClientException { + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .build(); + Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64) + .topic("persistent://public/default/my-topic") + .subscriptionName("my-subscription") + .subscribe(); + do { + Message<Long> message = consumer.receive(); + consumer.acknowledge(message); + } while (true); + + } +} + +``` +2. Push mode, by registering a `MessageListener` interface, when building the Consumer. +When this method is used, we can't also use `consumer.receive()`(or `consumer.recieveAsync()`). +In the push mode, the MessageListener instance is called by the consumer, hence it is +doing that with a thread taken from its own internal `ExecutorService` (i.e. thread pool). +The problem comes when we build and use multiple Consumers from the same PulsarClient. It +so happens that those consumers will share the same thread pool to call the Message Listeners. +One can be slower from the other. + +```java +public class ConsumerExample { + public static void main(String[] args) throws PulsarClientException { + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .build(); + Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64) + .topic("persistent://public/default/my-topic") + .subscriptionName("my-subscription2") + .messageListener((consumer, message) -> { + // process message + consumer.acknowledgeAsync(message); + }) + .subscribe(); + } +} +``` + +# Motivation + +As [Background knowledge](#background-knowledge) mentioned, when using asynchronous consumer, +by registering a MessageListener interface, there is a problem of different consumer groups +affecting each other, leading to unnecessary consumption delays. +Therefore, for this scenario, this PIP prepare to support specific a message +listener executor of consumer latitudes to solve such problem. +# Goals +1. Improve consumer message listener isolation, solve the consumption delay problem caused by +mutual influence of different consumers from the same PulsarClient instance. + +## In Scope +If this PIP is accepted, it will help Pulsar solve the problem of different consumers +from same `PulsarClient` affecting each other in the asynchronous consumption mode(`MessageListener`). + +## Out of Scope +This PIP will not build the plugin library mentioned in [PR](https://github.com/apache/pulsar/pull/22902#issuecomment-2169962642), +we will open a new PIP in the future to do this + + +# Detailed Design + +## Design & Implementation Details + +1. Add an interface `MessageListenerExecutor`, responsible for executing message listener callback tasks. +Users can customize the implementation to determine in which thread the message listener task is executed. +For example, in the situation described in [Motivation](#motivation) part, users can implement the +interface with an independent underlying thread pool to ensure that the message listener task of each +consumer is executed in a separate thread. The caller would be responsible for the life cycle of the +Executor, and it would be used only for this specific consumer. + ```java + public interface MessageListenerExecutor { + + /** + * select a thread by message(if necessary, for example, + * Key_Shared SubscriptionType, maybe need select thread + * by message order key to ensure order) to execute the runnable! + * + * @param message the message + * @param runnable the runnable to execute + */ + void execute(Message<?> message, Runnable runnable); + } + ``` +2. Add an optional config `messageListenerExecutor` in `ConsumerBuilder`, then +users can pass their implementations. + ```java + ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor); + ``` + +### Why need an interface like `MessageListenerExecutor` +Some people may wonder why not just use `java.util.concurrent.ExecutorService`, +but define an interface like `MessageListenerExecutor`. + +The reason is that: + +For sequential consumption scenarios, we need to ensure that messages with the same +key or the same partition are processed by the same thread to ensure order. If we +use `java.util.concurrent.ExecutorService`, refer to the following figure, we will not be able to make such guarantees, +because for ExecutorService, which thread to execute the task is not controlled by the user. +![](https://github.com/AuroraTwinkle/pulsar/assets/25919180/232854d6-01f2-4821-b2df-34d01dda1992) +![](https://github.com/AuroraTwinkle/pulsar/assets/25919180/204f5622-1e5a-4e73-b86b-15220bfb06d6) +### Interface implementation suggestions +When implementing the `MessageListenerExecutor` interface, you should consider the following points. +1. if you need to ensure the order of message processing, +you can select the thread by the message order key or `msg.getTopicName()`(partition topic name), +to ensure that the messages of the same order key (or partition) are processed in same thread. + +### Usage Example +```java + private void startConsumerWithMessageListener(String topic, String subscriptionName) throws PulsarClientException { + // for example: key_shared + MessageListenerExecutor keySharedExecutor = getKeySharedMessageListenerExecutor(subscriptionName); + Consumer<Long> keySharedconsumer = + pulsarClient.newConsumer(Schema.INT64) + .topic(topic) + .subscriptionName(subscriptionName) + // set and then message lister will be executed in the executor + .messageListener((c1, msg) -> { + log.info("Received message [{}] in the listener", msg.getValue()); + c1.acknowledgeAsync(msg); + }) + .messageListenerExecutor(keySharedExecutor) + .subscribe(); + + + // for example: partition_ordered + MessageListenerExecutor partitionOrderedExecutor = getPartitionOrderdMessageListenerExecutor(subscriptionName); + Consumer<Long> partitionOrderedConsumer = + pulsarClient.newConsumer(Schema.INT64) + .topic(topic) + .subscriptionName(subscriptionName) + // set and then message lister will be executed in the executor + .messageListener((c1, msg) -> { + log.info("Received message [{}] in the listener", msg.getValue()); + c1.acknowledgeAsync(msg); + }) + .messageListenerExecutor(partitionOrderedExecutor) + .subscribe(); + + // for example: out-of-order + ExecutorService executorService = Executors.newFixedThreadPool(10); + Consumer<Long> outOfOrderConsumer = + pulsarClient.newConsumer(Schema.INT64) + .topic(topic) + .subscriptionName(subscriptionName) + // not set and then message lister will be executed in the default executor + .messageListener((c1, msg) -> { + log.info("Received message [{}] in the listener", msg.getValue()); + c1.acknowledgeAsync(msg); + }) + .messageListenerExecutor((message, runnable) -> executorService.execute(runnable)) + .subscribe(); +} + +private static MessageListenerExecutor getKeySharedMessageListenerExecutor(String subscriptionName) { + ExecutorProvider executorProvider = new ExecutorProvider(10, subscriptionName + "listener-executor-"); + + return (message, runnable) -> { + byte[] key = "".getBytes(StandardCharsets.UTF_8); + if (message.hasKey()) { + key = message.getKeyBytes(); + } else if (message.hasOrderingKey()) { + key = message.getOrderingKey(); + } + // select a thread by message key to execute the runnable! + // that say, the message listener task with same order key + // will be executed by the same thread + ExecutorService executorService = executorProvider.getExecutor(key); + // executorService is a SingleThreadExecutor + executorService.execute(runnable); + }; +} + +private static MessageListenerExecutor getPartitionOrderdMessageListenerExecutor(String subscriptionName) { + ExecutorProvider executorProvider = new ExecutorProvider(10, subscriptionName + "listener-executor-"); + + return (message, runnable) -> { + // select a thread by partition topic name to execute the runnable! + // that say, the message listener task from the same partition topic + // will be executed by the same thread + ExecutorService executorService = executorProvider.getExecutor(message.getTopicName().getBytes()); + // executorService is a SingleThreadExecutor + executorService.execute(runnable); + }; +} + +``` +## Public-facing Changes + +### Public API + +1. Add an optional config `messageListenerExecutor` in `ConsumerBuilder` +```java +ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor); +``` + +# Backward & Forward Compatibility +You can do upgrading or reverting normally, no specified steps are needed to do. + +# Links + +* Mailing List discussion thread: https://lists.apache.org/thread/8nhqfdhkglsg5bgx6z7c1nho7z7l596l +* Mailing List voting thread: https://lists.apache.org/thread/oo3jdvq3b6bv6p4n7x7sdvypw4gp6hpk