This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 56125f9dcbc Prevent StackOverFlowException in KEY_SHARED subscription 
(#14121)
56125f9dcbc is described below

commit 56125f9dcbcc773f2911e72e575b8b785351d71e
Author: Enrico Olivelli <[email protected]>
AuthorDate: Mon Feb 7 03:56:43 2022 +0800

    Prevent StackOverFlowException in KEY_SHARED subscription (#14121)
    
    (cherry picked from commit f97a76e80a6966c601f73e24fe4cfb2af0114df4)
---
 ...PersistentStickyKeyDispatcherMultipleConsumers.java |  2 +-
 ...istentStickyKeyDispatcherMultipleConsumersTest.java | 18 ++++++++++++++++++
 2 files changed, 19 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 85b2f5163ce..844b72607df 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -328,7 +328,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
             }
             // readMoreEntries should run regardless whether or not stuck is 
caused by
             // stuckConsumers for avoid stopping dispatch.
-            readMoreEntries();
+            topic.getBrokerService().executor().execute(() -> 
readMoreEntries());
         }  else if (currentThreadKeyNumber == 0) {
             topic.getBrokerService().executor().schedule(() -> {
                 synchronized 
(PersistentStickyKeyDispatcherMultipleConsumers.this) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 3cb5bfbdc44..6d32a83ffc5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -21,6 +21,17 @@ package org.apache.pulsar.broker.service.persistent;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelPromise;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import io.netty.channel.EventLoopGroup;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
@@ -118,6 +129,13 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         brokerMock = mock(BrokerService.class);
         doReturn(pulsarMock).when(brokerMock).pulsar();
 
+        EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
+        doReturn(eventLoopGroup).when(brokerMock).executor();
+        doAnswer(invocation -> {
+            ((Runnable)invocation.getArguments()[0]).run();
+            return null;
+        }).when(eventLoopGroup).execute(any(Runnable.class));
+
         topicMock = mock(PersistentTopic.class);
         doReturn(brokerMock).when(topicMock).getBrokerService();
         doReturn(topicName).when(topicMock).getName();

Reply via email to