This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new a5ec98d6fff Fixed deadlock in key-shared dispatcher (#16660) a5ec98d6fff is described below commit a5ec98d6fffde8fe6727c10667b711be52ec4ba9 Author: Matteo Merli <mme...@apache.org> AuthorDate: Mon Jul 18 17:43:17 2022 -0700 Fixed deadlock in key-shared dispatcher (#16660) --- ...rsistentStickyKeyDispatcherMultipleConsumers.java | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 3e01531fc3e..0a402d8322a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -403,13 +403,19 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi } @Override - public synchronized void markDeletePositionMoveForward() { - if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty() - && removeConsumersFromRecentJoinedConsumers()) { - // After we process acks, we need to check whether the mark-delete position was advanced and we can finally - // read more messages. It's safe to call readMoreEntries() multiple times. - readMoreEntries(); - } + public void markDeletePositionMoveForward() { + // Execute the notification in different thread to avoid a mutex chain here + // from the delete operation that was completed + topic.getBrokerService().getTopicOrderedExecutor().execute(() -> { + synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) { + if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty() + && removeConsumersFromRecentJoinedConsumers()) { + // After we process acks, we need to check whether the mark-delete position was advanced and we + // can finally read more messages. It's safe to call readMoreEntries() multiple times. + readMoreEntries(); + } + } + }); } private boolean removeConsumersFromRecentJoinedConsumers() {