This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c6602cbb36 [improve][pip] PIP-359: Support custom message listener 
executor for specific subscription (#22902)
5c6602cbb36 is described below

commit 5c6602cbb3660a696bf960f2847aac1a2ae037d2
Author: Aurora Twinkle <foreverlove...@gmail.com>
AuthorDate: Tue Jul 16 10:47:55 2024 +0800

    [improve][pip] PIP-359: Support custom message listener executor for 
specific subscription (#22902)
    
    Co-authored-by: duanlinlin <duanlinl...@xiaohongshu.com>
---
 pip/pip-359.md | 216 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 216 insertions(+)

diff --git a/pip/pip-359.md b/pip/pip-359.md
new file mode 100644
index 00000000000..52a76193d6c
--- /dev/null
+++ b/pip/pip-359.md
@@ -0,0 +1,216 @@
+# PIP-359: Support custom message listener executor for specific subscription
+Implementation PR: [#22861](https://github.com/apache/pulsar/pull/22861)
+
+# Background knowledge
+In the current Pulsar client versions, from the user's perspective, when using 
a Pulsar Consumer,
+we have two main options to consume messages:
+1. Pull mode, by calling `consumer.recieve()`(or `consumer.recieveAsync()`)
+```java
+public class ConsumerExample {
+    public static void main(String[] args) throws PulsarClientException {
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl("pulsar://localhost:6650")
+                .build();
+        Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
+                .topic("persistent://public/default/my-topic")
+                .subscriptionName("my-subscription")
+                .subscribe();
+        do {
+            Message<Long> message = consumer.receive();
+            consumer.acknowledge(message);
+        } while (true);
+
+    }
+}
+
+```
+2. Push mode, by registering a `MessageListener` interface, when building the 
Consumer. 
+When this method is used, we can't also use `consumer.receive()`(or 
`consumer.recieveAsync()`). 
+In the push mode, the MessageListener instance is called by the consumer, 
hence it is 
+doing that with a thread taken from its own internal `ExecutorService` (i.e. 
thread pool). 
+The problem comes when we build and use multiple Consumers from the same 
PulsarClient. It 
+so happens that those consumers will share the same thread pool to call the 
Message Listeners. 
+One can be slower from the other.
+
+```java
+public class ConsumerExample {
+    public static void main(String[] args) throws PulsarClientException {
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl("pulsar://localhost:6650")
+                .build();
+        Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
+                .topic("persistent://public/default/my-topic")
+                .subscriptionName("my-subscription2")
+                .messageListener((consumer, message) -> {
+                    // process message
+                    consumer.acknowledgeAsync(message);
+                })
+                .subscribe();
+    }
+}
+```
+
+# Motivation
+
+As [Background knowledge](#background-knowledge) mentioned, when using 
asynchronous consumer, 
+by registering a MessageListener interface, there is a problem of different 
consumer groups 
+affecting each other, leading to unnecessary consumption delays.
+Therefore, for this scenario, this PIP prepare to support specific a message
+listener executor of consumer latitudes to solve such problem.
+# Goals
+1. Improve consumer message listener isolation, solve the consumption delay 
problem caused by 
+mutual influence of different consumers from the same PulsarClient instance.
+
+## In Scope
+If this PIP is accepted, it will help Pulsar solve the problem of different 
consumers 
+from same `PulsarClient` affecting each other in the asynchronous consumption 
mode(`MessageListener`).
+
+## Out of Scope
+This PIP will not build the plugin library mentioned in 
[PR](https://github.com/apache/pulsar/pull/22902#issuecomment-2169962642),
+we will open a new PIP in the future to do this
+
+
+# Detailed Design
+
+## Design & Implementation Details
+
+1. Add an interface `MessageListenerExecutor`, responsible for executing 
message listener callback tasks.
+Users can customize the implementation to determine in which thread the 
message listener task is executed.
+For example, in the situation described in [Motivation](#motivation) part, 
users can implement the 
+interface with an independent underlying thread pool to ensure that the 
message listener task of each 
+consumer is executed in a separate thread. The caller would be responsible for 
the life cycle of the 
+Executor, and it would be used only for this specific consumer.
+    ```java
+    public interface MessageListenerExecutor {
+
+    /**
+     * select a thread by message(if necessary, for example, 
+     * Key_Shared SubscriptionType, maybe need select thread 
+     * by message order key to ensure order) to execute the runnable!
+     *
+     * @param message  the message
+     * @param runnable the runnable to execute
+     */
+    void execute(Message<?> message, Runnable runnable);
+    }
+    ```
+2. Add an optional config `messageListenerExecutor` in `ConsumerBuilder`, then 
+users can pass their implementations.
+    ```java
+    ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor 
messageListenerExecutor);
+    ```
+
+### Why need an interface like `MessageListenerExecutor`
+Some people may wonder why not just use `java.util.concurrent.ExecutorService`,
+but define an interface like `MessageListenerExecutor`. 
+
+The reason is that:
+
+For sequential consumption scenarios, we need to ensure that messages with the 
same 
+key or the same partition are processed by the same thread to ensure order. If 
we 
+use `java.util.concurrent.ExecutorService`, refer to the following figure, we 
will not be able to make such guarantees,
+because for ExecutorService, which thread to execute the task is not 
controlled by the user.
+![](https://github.com/AuroraTwinkle/pulsar/assets/25919180/232854d6-01f2-4821-b2df-34d01dda1992)
+![](https://github.com/AuroraTwinkle/pulsar/assets/25919180/204f5622-1e5a-4e73-b86b-15220bfb06d6)
+### Interface implementation suggestions
+When implementing the `MessageListenerExecutor` interface, you should consider 
the following points.
+1. if you need to ensure the order of message processing, 
+you can select the thread by the message order key or 
`msg.getTopicName()`(partition topic name), 
+to ensure that the messages of the same order key (or partition) are processed 
in same thread.
+
+### Usage Example
+```java
+    private void startConsumerWithMessageListener(String topic, String 
subscriptionName) throws PulsarClientException {
+    // for example: key_shared
+    MessageListenerExecutor keySharedExecutor = 
getKeySharedMessageListenerExecutor(subscriptionName);
+    Consumer<Long> keySharedconsumer =
+            pulsarClient.newConsumer(Schema.INT64)
+                    .topic(topic)
+                    .subscriptionName(subscriptionName)
+                    // set and then message lister will be executed in the 
executor
+                    .messageListener((c1, msg) -> {
+                        log.info("Received message [{}] in the listener", 
msg.getValue());
+                        c1.acknowledgeAsync(msg);
+                    })
+                    .messageListenerExecutor(keySharedExecutor)
+                    .subscribe();
+
+
+    // for example: partition_ordered
+    MessageListenerExecutor partitionOrderedExecutor = 
getPartitionOrderdMessageListenerExecutor(subscriptionName);
+    Consumer<Long> partitionOrderedConsumer =
+            pulsarClient.newConsumer(Schema.INT64)
+                    .topic(topic)
+                    .subscriptionName(subscriptionName)
+                    // set and then message lister will be executed in the 
executor
+                    .messageListener((c1, msg) -> {
+                        log.info("Received message [{}] in the listener", 
msg.getValue());
+                        c1.acknowledgeAsync(msg);
+                    })
+                    .messageListenerExecutor(partitionOrderedExecutor)
+                    .subscribe();
+
+    // for example: out-of-order
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+    Consumer<Long> outOfOrderConsumer =
+            pulsarClient.newConsumer(Schema.INT64)
+                    .topic(topic)
+                    .subscriptionName(subscriptionName)
+                    // not set and then message lister will be executed in the 
default executor
+                    .messageListener((c1, msg) -> {
+                        log.info("Received message [{}] in the listener", 
msg.getValue());
+                        c1.acknowledgeAsync(msg);
+                    })
+                    .messageListenerExecutor((message, runnable) -> 
executorService.execute(runnable))
+                    .subscribe();
+}
+
+private static MessageListenerExecutor 
getKeySharedMessageListenerExecutor(String subscriptionName) {
+    ExecutorProvider executorProvider = new ExecutorProvider(10, 
subscriptionName + "listener-executor-");
+
+    return (message, runnable) -> {
+        byte[] key = "".getBytes(StandardCharsets.UTF_8);
+        if (message.hasKey()) {
+            key = message.getKeyBytes();
+        } else if (message.hasOrderingKey()) {
+            key = message.getOrderingKey();
+        }
+        // select a thread by message key to execute the runnable!
+        // that say, the message listener task with same order key
+        // will be executed by the same thread
+        ExecutorService executorService = executorProvider.getExecutor(key);
+        // executorService is a SingleThreadExecutor
+        executorService.execute(runnable);
+    };
+}
+
+private static MessageListenerExecutor 
getPartitionOrderdMessageListenerExecutor(String subscriptionName) {
+    ExecutorProvider executorProvider = new ExecutorProvider(10, 
subscriptionName + "listener-executor-");
+
+    return (message, runnable) -> {
+        // select a thread by partition topic name to execute the runnable!
+        // that say, the message listener task from the same partition topic
+        // will be executed by the same thread
+        ExecutorService executorService = 
executorProvider.getExecutor(message.getTopicName().getBytes());
+        // executorService is a SingleThreadExecutor
+        executorService.execute(runnable);
+    };
+}
+
+```
+## Public-facing Changes
+
+### Public API
+
+1. Add an optional config `messageListenerExecutor` in `ConsumerBuilder`
+```java
+ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor 
messageListenerExecutor);
+```
+
+# Backward & Forward Compatibility
+You can do upgrading or reverting normally, no specified steps are needed to 
do.
+
+# Links
+
+* Mailing List discussion thread: 
https://lists.apache.org/thread/8nhqfdhkglsg5bgx6z7c1nho7z7l596l
+* Mailing List voting thread: 
https://lists.apache.org/thread/oo3jdvq3b6bv6p4n7x7sdvypw4gp6hpk

Reply via email to