This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 56125f9dcbc Prevent StackOverFlowException in KEY_SHARED subscription
(#14121)
56125f9dcbc is described below
commit 56125f9dcbcc773f2911e72e575b8b785351d71e
Author: Enrico Olivelli <[email protected]>
AuthorDate: Mon Feb 7 03:56:43 2022 +0800
Prevent StackOverFlowException in KEY_SHARED subscription (#14121)
(cherry picked from commit f97a76e80a6966c601f73e24fe4cfb2af0114df4)
---
...PersistentStickyKeyDispatcherMultipleConsumers.java | 2 +-
...istentStickyKeyDispatcherMultipleConsumersTest.java | 18 ++++++++++++++++++
2 files changed, 19 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 85b2f5163ce..844b72607df 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -328,7 +328,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
}
// readMoreEntries should run regardless whether or not stuck is
caused by
// stuckConsumers for avoid stopping dispatch.
- readMoreEntries();
+ topic.getBrokerService().executor().execute(() ->
readMoreEntries());
} else if (currentThreadKeyNumber == 0) {
topic.getBrokerService().executor().schedule(() -> {
synchronized
(PersistentStickyKeyDispatcherMultipleConsumers.this) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 3cb5bfbdc44..6d32a83ffc5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -21,6 +21,17 @@ package org.apache.pulsar.broker.service.persistent;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import io.netty.channel.EventLoopGroup;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
@@ -118,6 +129,13 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
brokerMock = mock(BrokerService.class);
doReturn(pulsarMock).when(brokerMock).pulsar();
+ EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
+ doReturn(eventLoopGroup).when(brokerMock).executor();
+ doAnswer(invocation -> {
+ ((Runnable)invocation.getArguments()[0]).run();
+ return null;
+ }).when(eventLoopGroup).execute(any(Runnable.class));
+
topicMock = mock(PersistentTopic.class);
doReturn(brokerMock).when(topicMock).getBrokerService();
doReturn(topicName).when(topicMock).getName();