This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c5846bbbecd [improve][client]PIP-359:Support custom message listener 
executor for specific subscription (#22861)
c5846bbbecd is described below

commit c5846bbbecdbd4dd197679f78874eb398da38c23
Author: Aurora Twinkle <foreverlove...@gmail.com>
AuthorDate: Mon Aug 5 19:25:17 2024 +0800

    [improve][client]PIP-359:Support custom message listener executor for 
specific subscription (#22861)
    
    Co-authored-by: duanlinlin <duanlinl...@xiaohongshu.com>
    [PIP-359](https://github.com/apache/pulsar/pull/22902)
    Support custom message listener thread pool for specific subscription, 
avoid individual subscription listener consuming too much time leading to 
higher consumption delay in other subscriptions.
    
    <!--
    ### Contribution Checklist
    
      - PR title format should be *[type][component] summary*. For details, see 
*[Guideline - Pulsar PR Naming 
Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*.
    
      - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
    
      - Each pull request should address only one issue, not mix up code from 
multiple issues.
    
      - Each commit in the pull request has a meaningful commit message
    
      - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.
    -->
    
    <!-- Either this PR fixes an issue, -->
    
    <!-- or this PR is one task of an issue -->
    
    <!-- If the PR belongs to a PIP, please add the PIP link here -->
    
    <!-- Details of when a PIP is required and how the PIP process work, please 
see: https://github.com/apache/pulsar/blob/master/pip/README.md -->
    
    ### Motivation
    In our scenario, there is a centralized message proxy service, this service 
will use the same PulsarClient instance to create a lot of subscription groups 
to consume many topics and cache messages locally.Then the business will pull 
messages from the cache of the proxy service. It seems that there is no 
problem, but during use, we found that when the
    message processing time of several consumer groups (listener mode) is very 
high, it almost affects all consumer groups responsible for the proxy service, 
causing a large number of message delays.
    
    By analyzing the source code, we found that by default, all consumer 
instances created from the same PulsarClient will share a thread pool to 
process message listeners, and sometimes there are multiple consumer message 
listeners bound to the same thread. Obviously, when a consumer processes 
messages and causes long-term blocking, it will cause the messages of other 
consumers bound to the thread to fail to be processed in time, resulting in 
message delays. Therefore, for this scenario, [...]
    
    <!-- Explain here the context, and why you're making that change. What is 
the problem you're trying to solve. -->
    
    ### Modifications
    Support custom message listener thread pool for specific subscription.
    <!-- Describe the modifications you've done. -->
    
    (cherry picked from commit 10f4e0248f0f985b1dc7ad38970c906b7fe629be)
---
 .../client/api/MessageListenerExecutorTest.java    | 193 +++++++++++++++++++++
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  15 ++
 .../pulsar/client/api/MessageListenerExecutor.java |  43 +++++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  28 +--
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |   8 +
 .../impl/conf/ConsumerConfigurationData.java       |   3 +
 6 files changed, 280 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageListenerExecutorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageListenerExecutorTest.java
new file mode 100644
index 00000000000..9e148beb304
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageListenerExecutorTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.testng.Assert.assertTrue;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Cleanup;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.naming.TopicName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class MessageListenerExecutorTest extends ProducerConsumerBase {
+    private static final Logger log = 
LoggerFactory.getLogger(MessageListenerExecutorTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void customizeNewPulsarClientBuilder(ClientBuilder 
clientBuilder) {
+        // Set listenerThreads to 1 to reproduce the pr more easily in #22861
+        clientBuilder.listenerThreads(1);
+    }
+
+    @Test
+    public void testConsumerMessageListenerExecutorIsolation() throws 
Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newCachedThreadPool();
+        List<CompletableFuture<Long>> 
maxConsumeDelayWithDisableIsolationFutures = new ArrayList<>();
+        int loops = 5;
+        long consumeSleepTimeMs = 10000;
+        for (int i = 0; i < loops; i++) {
+            // The first consumer will consume messages with sleep block 1s,
+            // and the others will consume messages without sleep block.
+            // The maxConsumeDelayWithDisableIsolation of all consumers
+            // should be greater than sleepTimeMs cause by disable 
MessageListenerExecutor.
+            CompletableFuture<Long> maxConsumeDelayFuture = 
startConsumeAndComputeMaxConsumeDelay(
+                    
"persistent://my-property/my-ns/testConsumerMessageListenerDisableIsolation-" + 
i,
+                    "my-sub-testConsumerMessageListenerDisableIsolation-" + i,
+                    i == 0 ? Duration.ofMillis(consumeSleepTimeMs) : 
Duration.ofMillis(0),
+                    false,
+                    executor);
+            
maxConsumeDelayWithDisableIsolationFutures.add(maxConsumeDelayFuture);
+        }
+
+        // ensure all consumers consume messages delay more than 
consumeSleepTimeMs
+        boolean allDelayMoreThanConsumeSleepTimeMs = 
maxConsumeDelayWithDisableIsolationFutures.stream()
+                .map(CompletableFuture::join)
+                .allMatch(delay -> delay > consumeSleepTimeMs);
+        assertTrue(allDelayMoreThanConsumeSleepTimeMs);
+
+        List<CompletableFuture<Long>> 
maxConsumeDelayWhitEnableIsolationFutures = new ArrayList<>();
+        for (int i = 0; i < loops; i++) {
+            // The first consumer will consume messages with sleep block 1s,
+            // and the others will consume messages without sleep block.
+            // The maxConsumeDelayWhitEnableIsolation of the first consumer
+            // should be greater than sleepTimeMs, and the others should be
+            // less than sleepTimeMs, cause by enable MessageListenerExecutor.
+            CompletableFuture<Long> maxConsumeDelayFuture = 
startConsumeAndComputeMaxConsumeDelay(
+                    
"persistent://my-property/my-ns/testConsumerMessageListenerEnableIsolation-" + 
i,
+                    "my-sub-testConsumerMessageListenerEnableIsolation-" + i,
+                    i == 0 ? Duration.ofMillis(consumeSleepTimeMs) : 
Duration.ofMillis(0),
+                    true,
+                    executor);
+            
maxConsumeDelayWhitEnableIsolationFutures.add(maxConsumeDelayFuture);
+        }
+
+        assertTrue(maxConsumeDelayWhitEnableIsolationFutures.get(0).join() > 
consumeSleepTimeMs);
+        boolean remainingAlmostNoDelay = 
maxConsumeDelayWhitEnableIsolationFutures.stream()
+                .skip(1)
+                .map(CompletableFuture::join)
+                .allMatch(delay -> delay < 1000);
+        assertTrue(remainingAlmostNoDelay);
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    private CompletableFuture<Long> 
startConsumeAndComputeMaxConsumeDelay(String topic, String subscriptionName,
+                                                                          
Duration consumeSleepTime,
+                                                                          
boolean enableMessageListenerExecutorIsolation,
+                                                                          
ExecutorService executorService)
+            throws Exception {
+        int numMessages = 2;
+        final CountDownLatch latch = new CountDownLatch(numMessages);
+        int numPartitions = 50;
+        TopicName nonIsolationTopicName = TopicName.get(topic);
+        
admin.topics().createPartitionedTopic(nonIsolationTopicName.toString(), 
numPartitions);
+
+        AtomicLong maxConsumeDelay = new AtomicLong(-1);
+        ConsumerBuilder<Long> consumerBuilder =
+                pulsarClient.newConsumer(Schema.INT64)
+                        .topic(nonIsolationTopicName.toString())
+                        .subscriptionName(subscriptionName)
+                        .messageListener((c1, msg) -> {
+                            Assert.assertNotNull(msg, "Message cannot be 
null");
+                            log.debug("Received message [{}] in the listener", 
msg.getValue());
+                            c1.acknowledgeAsync(msg);
+                            maxConsumeDelay.set(Math.max(maxConsumeDelay.get(),
+                                    System.currentTimeMillis() - 
msg.getValue()));
+                            if (consumeSleepTime.toMillis() > 0) {
+                                
Uninterruptibles.sleepUninterruptibly(consumeSleepTime);
+                            }
+                            latch.countDown();
+                        });
+
+        ExecutorService executor = Executors.newSingleThreadExecutor(
+                new ExecutorProvider.ExtendedThreadFactory(subscriptionName + 
"listener-executor-", true));
+        if (enableMessageListenerExecutorIsolation) {
+            consumerBuilder.messageListenerExecutor((message, runnable) -> 
executor.execute(runnable));
+        }
+
+        Consumer<Long> consumer = consumerBuilder.subscribe();
+        ProducerBuilder<Long> producerBuilder = 
pulsarClient.newProducer(Schema.INT64)
+                .topic(nonIsolationTopicName.toString());
+
+        Producer<Long> producer = producerBuilder.create();
+        List<Future<MessageId>> futures = new ArrayList<>();
+
+        // Asynchronously produce messages
+        for (int i = 0; i < numMessages; i++) {
+            Future<MessageId> future = 
producer.sendAsync(System.currentTimeMillis());
+            futures.add(future);
+        }
+
+        log.info("Waiting for async publish to complete");
+        for (Future<MessageId> future : futures) {
+            future.get();
+        }
+
+        CompletableFuture<Long> maxDelayFuture = new CompletableFuture<>();
+
+        CompletableFuture.runAsync(() -> {
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }, executorService).whenCompleteAsync((v, ex) -> {
+            maxDelayFuture.complete(maxConsumeDelay.get());
+            try {
+                producer.close();
+                consumer.close();
+                executor.shutdownNow();
+            } catch (PulsarClientException e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        return maxDelayFuture;
+    }
+}
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 01c9fc4a864..ff1a3cd73bb 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -284,6 +284,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
      */
     ConsumerBuilder<T> messageListener(MessageListener<T> messageListener);
 
+    /**
+     * Set the {@link MessageListenerExecutor} to be used for message 
listeners of <b>current consumer</b>.
+     * <i>(default: use executor from PulsarClient,
+     * {@link 
org.apache.pulsar.client.impl.PulsarClientImpl#externalExecutorProvider})</i>.
+     *
+     * <p>The listener thread pool is exclusively owned by current consumer
+     * that are using a "listener" model to get messages. For a given internal 
consumer,
+     * the listener will always be invoked from the same thread, to ensure 
ordering.
+     *
+     * <p> The caller need to shut down the thread pool after closing the 
consumer to avoid leaks.
+     * @param messageListenerExecutor the executor of the consumer message 
listener
+     * @return the consumer builder instance
+     */
+    ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor 
messageListenerExecutor);
+
     /**
      * Sets a {@link CryptoKeyReader}.
      *
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageListenerExecutor.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageListenerExecutor.java
new file mode 100644
index 00000000000..53bb828c05a
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageListenerExecutor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Interface for providing service to execute message listeners.
+ */
+public interface MessageListenerExecutor {
+
+    /**
+     * select a thread by message to execute the runnable!
+     * <p>
+     * Suggestions:
+     * <p>
+     * 1. The message listener task will be submitted to this executor for 
execution,
+     * so the implementations of this interface should carefully consider 
execution
+     * order if sequential consumption is required.
+     * </p>
+     * <p>
+     * 2. The users should release resources(e.g. threads) of the executor 
after closing
+     * the consumer to avoid leaks.
+     * </p>
+     * @param message  the message
+     * @param runnable the runnable to execute, that is, the message listener 
task
+     */
+    void execute(Message<?> message, Runnable runnable);
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 3747dc23d9d..2b4a3f334cd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.MessageListenerExecutor;
 import org.apache.pulsar.client.api.Messages;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
@@ -82,6 +83,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     protected final MessageListener<T> listener;
     protected final ConsumerEventListener consumerEventListener;
     protected final ExecutorProvider executorProvider;
+    protected final MessageListenerExecutor messageListenerExecutor;
     protected final ExecutorService externalPinnedExecutor;
     protected final ExecutorService internalPinnedExecutor;
     protected UnAckedMessageTracker unAckedMessageTracker;
@@ -139,6 +141,11 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         this.unAckedChunkedMessageIdSequenceMap =
                 ConcurrentOpenHashMap.<MessageIdAdv, 
MessageIdImpl[]>newBuilder().build();
         this.executorProvider = executorProvider;
+        this.messageListenerExecutor = conf.getMessageListenerExecutor() == 
null
+                ? (conf.getSubscriptionType() == SubscriptionType.Key_Shared
+                   ? this::executeKeySharedMessageListener
+                   : this::executeMessageListener)
+                : conf.getMessageListenerExecutor();
         this.externalPinnedExecutor = executorProvider.getExecutor();
         this.internalPinnedExecutor = client.getInternalExecutorService();
         this.pendingReceives = Queues.newConcurrentLinkedQueue();
@@ -1117,14 +1124,7 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
                         // internal pinned executor thread while the message 
processing happens
                         final Message<T> finalMsg = msg;
                         
MESSAGE_LISTENER_QUEUE_SIZE_UPDATER.incrementAndGet(this);
-                        if (SubscriptionType.Key_Shared == 
conf.getSubscriptionType()) {
-                            
executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
-                                    callMessageListener(finalMsg));
-                        } else {
-                            getExternalExecutor(msg).execute(() -> {
-                                callMessageListener(finalMsg);
-                            });
-                        }
+                        messageListenerExecutor.execute(msg, () -> 
callMessageListener(finalMsg));
                     } else {
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] [{}] Message has been cleared from 
the queue", topic, subscription);
@@ -1137,6 +1137,14 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
         });
     }
 
+    private void executeMessageListener(Message<?> message, Runnable runnable) 
{
+        getExternalExecutor(message).execute(runnable);
+    }
+
+    private void executeKeySharedMessageListener(Message<?> message, Runnable 
runnable) {
+        
executorProvider.getExecutor(peekMessageKey(message)).execute(runnable);
+    }
+
     protected void callMessageListener(Message<T> msg) {
         try {
             if (log.isDebugEnabled()) {
@@ -1166,7 +1174,7 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
     }
 
     static final byte[] NONE_KEY = "NONE_KEY".getBytes(StandardCharsets.UTF_8);
-    protected byte[] peekMessageKey(Message<T> msg) {
+    protected byte[] peekMessageKey(Message<?> msg) {
         byte[] key = NONE_KEY;
         if (msg.hasKey()) {
             key = msg.getKeyBytes();
@@ -1233,7 +1241,7 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
 
     protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
 
-    private ExecutorService getExternalExecutor(Message<T> msg) {
+    private ExecutorService getExternalExecutor(Message<?> msg) {
         ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl) ? 
((TopicMessageImpl) msg).receivedByconsumer
                 : null;
         ExecutorService executor = receivedConsumer != null && 
receivedConsumer.externalPinnedExecutor != null
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 2d31dc427d0..351025d426a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.MessageListenerExecutor;
 import org.apache.pulsar.client.api.MessagePayloadProcessor;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
@@ -299,6 +300,13 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
         return this;
     }
 
+    @Override
+    public ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor 
messageListenerExecutor) {
+        checkArgument(messageListenerExecutor != null, 
"messageListenerExecutor needs to be not null");
+        conf.setMessageListenerExecutor(messageListenerExecutor);
+        return this;
+    }
+
     @Override
     public ConsumerBuilder<T> consumerEventListener(@NonNull 
ConsumerEventListener consumerEventListener) {
         conf.setConsumerEventListener(consumerEventListener);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 3ae0e977d13..2bb7ef79c64 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.MessageListenerExecutor;
 import org.apache.pulsar.client.api.MessagePayloadProcessor;
 import org.apache.pulsar.client.api.RedeliveryBackoff;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
@@ -90,6 +91,8 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
 
     private SubscriptionMode subscriptionMode = SubscriptionMode.Durable;
 
+    @JsonIgnore
+    private transient MessageListenerExecutor messageListenerExecutor;
     @JsonIgnore
     private MessageListener<T> messageListener;
 

Reply via email to