This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push: new 0bca7135367 Fixed deadlock in key-shared dispatcher (#16660) 0bca7135367 is described below commit 0bca713536701d422e689ba3abb3fed6480f921e Author: Matteo Merli <mme...@apache.org> AuthorDate: Mon Jul 18 17:43:17 2022 -0700 Fixed deadlock in key-shared dispatcher (#16660) --- ...rsistentStickyKeyDispatcherMultipleConsumers.java | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 21c98b9ab33..881d3db1a81 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -391,13 +391,19 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi } @Override - public synchronized void markDeletePositionMoveForward() { - if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty() - && removeConsumersFromRecentJoinedConsumers()) { - // After we process acks, we need to check whether the mark-delete position was advanced and we can finally - // read more messages. It's safe to call readMoreEntries() multiple times. - readMoreEntries(); - } + public void markDeletePositionMoveForward() { + // Execute the notification in different thread to avoid a mutex chain here + // from the delete operation that was completed + topic.getBrokerService().getTopicOrderedExecutor().execute(() -> { + synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) { + if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty() + && removeConsumersFromRecentJoinedConsumers()) { + // After we process acks, we need to check whether the mark-delete position was advanced and we + // can finally read more messages. It's safe to call readMoreEntries() multiple times. + readMoreEntries(); + } + } + }); } private boolean removeConsumersFromRecentJoinedConsumers() {