This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 8c148684fbc Use `safeRun` to log thread exception. (#17484)
8c148684fbc is described below

commit 8c148684fbc1e1a5d0fc1cf73873f79c0bd6323b
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Sep 8 21:32:21 2022 +0800

    Use `safeRun` to log thread exception. (#17484)
---
 .../persistent/PersistentDispatcherMultipleConsumers.java     |  6 +++---
 .../PersistentStickyKeyDispatcherMultipleConsumers.java       | 11 ++++++-----
 .../PersistentStreamingDispatcherMultipleConsumers.java       |  2 +-
 3 files changed, 10 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 08257e909a6..0e68186490f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -221,9 +221,9 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
 
     @Override
     public void consumerFlow(Consumer consumer, int 
additionalNumberOfMessages) {
-        topic.getBrokerService().executor().execute(() -> {
+        topic.getBrokerService().executor().execute(safeRun(() -> {
             internalConsumerFlow(consumer, additionalNumberOfMessages);
-        });
+        }));
     }
 
     private synchronized void internalConsumerFlow(Consumer consumer, int 
additionalNumberOfMessages) {
@@ -249,7 +249,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
      *
      */
     public void readMoreEntriesAsync() {
-        topic.getBrokerService().executor().execute(this::readMoreEntries);
+        
topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
     }
 
     public synchronized void readMoreEntries() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index e42995e9247..5573596a96e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -330,14 +331,14 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             // readMoreEntries should run regardless whether or not stuck is 
caused by
             // stuckConsumers for avoid stopping dispatch.
             sendInProgress = false;
-            topic.getBrokerService().executor().execute(() -> 
readMoreEntries());
+            
topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
         }  else if (currentThreadKeyNumber == 0) {
             sendInProgress = false;
-            topic.getBrokerService().executor().schedule(() -> {
+            topic.getBrokerService().executor().schedule(safeRun(() -> {
                 synchronized 
(PersistentStickyKeyDispatcherMultipleConsumers.this) {
                     readMoreEntries();
                 }
-            }, 100, TimeUnit.MILLISECONDS);
+            }), 100, TimeUnit.MILLISECONDS);
         }
         return false;
     }
@@ -411,7 +412,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
     public void markDeletePositionMoveForward() {
         // Execute the notification in different thread to avoid a mutex chain 
here
         // from the delete operation that was completed
-        topic.getBrokerService().getTopicOrderedExecutor().execute(() -> {
+        topic.getBrokerService().getTopicOrderedExecutor().execute(safeRun(() 
-> {
             synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) 
{
                 if (recentlyJoinedConsumers != null && 
!recentlyJoinedConsumers.isEmpty()
                         && removeConsumersFromRecentJoinedConsumers()) {
@@ -420,7 +421,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                     readMoreEntries();
                 }
             }
-        });
+        }));
     }
 
     private boolean removeConsumersFromRecentJoinedConsumers() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
index 28e3ecd11b4..9eb0a169964 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
@@ -192,7 +192,7 @@ public class PersistentStreamingDispatcherMultipleConsumers 
extends PersistentDi
                     havePendingReplayRead = false;
                     // We should not call readMoreEntries() recursively in the 
same thread
                     // as there is a risk of StackOverflowError
-                    topic.getBrokerService().executor().execute(() -> 
readMoreEntries());
+                    
topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
                 }
             } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == 
TRUE) {
                 log.warn("[{}] Dispatcher read is blocked due to unackMessages 
{} reached to max {}", name,

Reply via email to