This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bafecb2a9c0 [improve] Removed usages of SafeRun (#20060)
bafecb2a9c0 is described below

commit bafecb2a9c0e73942de6a38df72dd5888d5afd66
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Apr 10 18:30:22 2023 -0700

    [improve] Removed usages of SafeRun (#20060)
    
    ### Motivation
    
    With BK 4.16, we don't need to pass `SafeRunnable` instances to the 
`OrderedExecutor` anymore. The executor has embedded the logic of checking and 
logging exceptions.
    
    Removing the SafeRun will avoid extra allocations in critical path and 
clutter of the code
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
    
    *(Please pick either of the following options)*
    
    This change is a trivial rework / code cleanup without any test coverage.
    
    *(or)*
    
    This change is already covered by existing tests, such as *(please describe 
tests)*.
    
    *(or)*
    
    This change added tests and can be verified as follows:
    
    *(example:)*
      - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
      - *Extended integration test for recovery after broker failure*
    
    ### Does this pull request potentially affect one of the following parts:
    
    <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
    
    *If the box was checked, please highlight the changes*
    
    - [ ] Dependencies (add or upgrade a dependency)
    - [ ] The public API
    - [ ] The schema
    - [ ] The default values of configurations
    - [ ] The threading model
    - [ ] The binary protocol
    - [ ] The REST endpoints
    - [ ] The admin CLI options
    - [ ] The metrics
    - [ ] Anything that affects deployment
    
    ### Documentation
    
    <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
    
    - [ ] `doc` <!-- Your PR contains doc changes. -->
    - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
    - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
    - [ ] `doc-complete` <!-- Docs have been already added -->
    
    ### Matching PR in forked repository
    
    PR in forked repository: (https://github.com/merlimat/pulsar/pull/6)
    
    <!--
    After opening this PR, the build in apache/pulsar will fail and 
instructions will
    be provided for opening a PR in the PR author's forked repository.
    
    apache/pulsar pull requests should be first tested in your own fork since 
the
    apache/pulsar CI based on GitHub Actions has constrained resources and 
quota.
    GitHub Actions provides separate quota for pull requests that are executed 
in
    a forked repository.
    
    The tests will be run in the forked repository until all PR review comments 
have
    been handled, the tests pass and the PR is approved by a reviewer.
    -->
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 21 +++---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 44 ++++++-----
 .../bookkeeper/mledger/impl/MetaStoreImpl.java     | 27 ++++---
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 10 +--
 .../bookkeeper/mledger/impl/OpReadEntry.java       | 13 ++--
 .../mledger/impl/ShadowManagedLedgerImpl.java      | 13 ++--
 .../impl/cache/RangeEntryCacheManagerImpl.java     |  5 +-
 .../apache/bookkeeper/mledger/util/SafeRun.java    | 57 --------------
 .../mledger/impl/ManagedCursorConcurrencyTest.java |  5 +-
 .../stats/prometheus/PrometheusMetricsServlet.java |  5 +-
 .../pulsar/broker/service/BrokerService.java       | 87 +++++++++++++---------
 .../apache/pulsar/broker/service/ServerCnx.java    |  5 +-
 .../PersistentDispatcherMultipleConsumers.java     | 11 ++-
 .../PersistentDispatcherSingleActiveConsumer.java  | 25 ++-----
 ...istentStickyKeyDispatcherMultipleConsumers.java |  5 +-
 ...istentStreamingDispatcherMultipleConsumers.java | 12 ++-
 ...entStreamingDispatcherSingleActiveConsumer.java | 10 +--
 .../streamingdispatch/StreamingEntryReader.java    | 19 ++---
 .../coordination/impl/LeaderElectionImpl.java      |  3 +-
 19 files changed, 150 insertions(+), 227 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 05cad09b018..ef607fa7ed7 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -25,7 +25,6 @@ import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGE
 import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
 import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
 import static 
org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Collections2;
@@ -1358,7 +1357,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         final PositionImpl newPosition = (PositionImpl) newPos;
 
         // order trim and reset operations on a ledger
-        ledger.getExecutor().execute(safeRun(() -> {
+        ledger.getExecutor().execute(() -> {
             PositionImpl actualPosition = newPosition;
 
             if (!ledger.isValidPosition(actualPosition)
@@ -1375,7 +1374,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             }
 
             internalResetCursor(actualPosition, callback);
-        }));
+        });
     }
 
     @Override
@@ -2055,7 +2054,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                         + "is later.", mdEntry.newPosition, 
persistentMarkDeletePosition);
             }
             // run with executor to prevent deadlock
-            ledger.getExecutor().execute(safeRun(() -> 
mdEntry.triggerComplete()));
+            ledger.getExecutor().execute(() -> mdEntry.triggerComplete());
             return;
         }
 
@@ -2074,7 +2073,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                         + "in progress {} is later.", mdEntry.newPosition, 
inProgressLatest);
             }
             // run with executor to prevent deadlock
-            ledger.getExecutor().execute(safeRun(() -> 
mdEntry.triggerComplete()));
+            ledger.getExecutor().execute(() -> mdEntry.triggerComplete());
             return;
         }
 
@@ -2611,8 +2610,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl 
position, Map<String, Long> properties,
             MetaStoreCallback<Void> callback, boolean 
persistIndividualDeletedMessageRanges) {
         if (state == State.Closed) {
-            ledger.getExecutor().execute(safeRun(() -> 
callback.operationFailed(new MetaStoreException(
-                    new CursorAlreadyClosedException(name + " cursor already 
closed")))));
+            ledger.getExecutor().execute(() -> callback.operationFailed(new 
MetaStoreException(
+                    new CursorAlreadyClosedException(name + " cursor already 
closed"))));
             return;
         }
 
@@ -2845,7 +2844,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 return;
             }
 
-            ledger.getExecutor().execute(safeRun(() -> {
+            ledger.getExecutor().execute(() -> {
                 ledger.mbean.endCursorLedgerCreateOp();
                 if (rc != BKException.Code.OK) {
                     log.warn("[{}] Error creating ledger for cursor {}: {}", 
ledger.getName(), name,
@@ -2858,7 +2857,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     log.debug("[{}] Created ledger {} for cursor {}", 
ledger.getName(), lh.getId(), name);
                 }
                 future.complete(lh);
-            }));
+            });
         }, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));
 
         return future;
@@ -3192,7 +3191,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 log.warn("[{}] Failed to delete ledger {}: {}", 
ledger.getName(), lh.getId(),
                         BKException.getMessage(rc));
                 if (!isNoSuchLedgerExistsException(rc)) {
-                    ledger.getScheduledExecutor().schedule(safeRun(() -> 
asyncDeleteLedger(lh, retry - 1)),
+                    ledger.getScheduledExecutor().schedule(() -> 
asyncDeleteLedger(lh, retry - 1),
                         DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, 
TimeUnit.SECONDS);
                 }
                 return;
@@ -3227,7 +3226,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 log.warn("[{}][{}] Failed to delete ledger {}: {}", 
ledger.getName(), name, cursorLedger.getId(),
                         BKException.getMessage(rc));
                 if (!isNoSuchLedgerExistsException(rc)) {
-                    ledger.getScheduledExecutor().schedule(safeRun(() -> 
asyncDeleteCursorLedger(retry - 1)),
+                    ledger.getScheduledExecutor().schedule(() -> 
asyncDeleteCursorLedger(retry - 1),
                             DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, 
TimeUnit.SECONDS);
                 }
             }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 8376ee1bb84..c74db884a95 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -22,7 +22,6 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static java.lang.Math.min;
 import static 
org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.Lists;
@@ -409,7 +408,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 if (!ledgers.isEmpty()) {
                     final long id = ledgers.lastKey();
                     OpenCallback opencb = (rc, lh, ctx1) -> {
-                        executor.execute(safeRun(() -> {
+                        executor.execute(() -> {
                             mbean.endDataLedgerOpenOp();
                             if (log.isDebugEnabled()) {
                                 log.debug("[{}] Opened ledger {}: {}", name, 
id, BKException.getMessage(rc));
@@ -439,7 +438,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                                 
callback.initializeFailed(createManagedLedgerException(rc));
                                 return;
                             }
-                        }));
+                        });
                     };
 
                     if (log.isDebugEnabled()) {
@@ -522,7 +521,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 return;
             }
 
-            executor.execute(safeRun(() -> {
+            executor.execute(() -> {
                 mbean.endDataLedgerCreateOp();
                 if (rc != BKException.Code.OK) {
                     
callback.initializeFailed(createManagedLedgerException(rc));
@@ -551,7 +550,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
                 // Save it back to ensure all nodes exist
                 store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), 
ledgersStat, storeLedgersCb);
-            }));
+            });
         }, ledgerMetadata);
     }
 
@@ -774,10 +773,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         buffer.retain();
 
         // Jump to specific thread to avoid contention from writers writing 
from different threads
-        executor.execute(safeRun(() -> {
+        executor.execute(() -> {
             OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, 
buffer, callback, ctx);
             internalAsyncAddEntry(addOperation);
-        }));
+        });
     }
 
     @Override
@@ -790,10 +789,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         buffer.retain();
 
         // Jump to specific thread to avoid contention from writers writing 
from different threads
-        executor.execute(safeRun(() -> {
+        executor.execute(() -> {
             OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, 
buffer, numberOfMessages, callback, ctx);
             internalAsyncAddEntry(addOperation);
-        }));
+        });
     }
 
     protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) 
{
@@ -2374,7 +2373,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 break;
             }
 
-            executor.execute(safeRun(waitingCursor::notifyEntriesAvailable));
+            executor.execute(waitingCursor::notifyEntriesAvailable);
         }
     }
 
@@ -2385,7 +2384,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 break;
             }
 
-            executor.execute(safeRun(cb::entriesAvailable));
+            executor.execute(cb::entriesAvailable);
         }
     }
 
@@ -2432,16 +2431,16 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
 
     @Override
     public void trimConsumedLedgersInBackground(CompletableFuture<?> promise) {
-        executor.execute(safeRun(() -> internalTrimConsumedLedgers(promise)));
+        executor.execute(() -> internalTrimConsumedLedgers(promise));
     }
 
     public void trimConsumedLedgersInBackground(boolean isTruncate, 
CompletableFuture<?> promise) {
-        executor.execute(safeRun(() -> internalTrimLedgers(isTruncate, 
promise)));
+        executor.execute(() -> internalTrimLedgers(isTruncate, promise));
     }
 
     private void scheduleDeferredTrimming(boolean isTruncate, 
CompletableFuture<?> promise) {
-        scheduledExecutor.schedule(safeRun(() -> 
trimConsumedLedgersInBackground(isTruncate, promise)), 100,
-                TimeUnit.MILLISECONDS);
+        scheduledExecutor.schedule(() -> 
trimConsumedLedgersInBackground(isTruncate, promise),
+                100, TimeUnit.MILLISECONDS);
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> 
promise) {
@@ -2456,7 +2455,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         final long offloadThresholdInSeconds =
                 
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
         if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
-            executor.execute(safeRun(() -> 
maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
+            executor.execute(() -> maybeOffload(offloadThresholdInBytes, 
offloadThresholdInSeconds, promise));
         }
     }
 
@@ -2477,7 +2476,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
 
         if (!offloadMutex.tryLock()) {
-            scheduledExecutor.schedule(safeRun(() -> 
maybeOffloadInBackground(finalPromise)),
+            scheduledExecutor.schedule(() -> 
maybeOffloadInBackground(finalPromise),
                     100, TimeUnit.MILLISECONDS);
             return;
         }
@@ -2956,7 +2955,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 log.warn("[{}] Ledger was already deleted {}", name, ledgerId);
             } else if (rc != BKException.Code.OK) {
                 log.error("[{}] Error deleting ledger {} : {}", name, 
ledgerId, BKException.getMessage(rc));
-                scheduledExecutor.schedule(safeRun(() -> 
asyncDeleteLedger(ledgerId, retry - 1)),
+                scheduledExecutor.schedule(() -> asyncDeleteLedger(ledgerId, 
retry - 1),
                         DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, 
TimeUnit.SECONDS);
             } else {
                 if (log.isDebugEnabled()) {
@@ -3260,7 +3259,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             if (!metadataMutex.tryLock()) {
                 // retry in 100 milliseconds
                 scheduledExecutor.schedule(
-                        safeRun(() -> tryTransformLedgerInfo(ledgerId, 
transformation, finalPromise)), 100,
+                        () -> tryTransformLedgerInfo(ledgerId, transformation, 
finalPromise), 100,
                         TimeUnit.MILLISECONDS);
             } else { // lock acquired
                 CompletableFuture<Void> unlockingPromise = new 
CompletableFuture<>();
@@ -4011,9 +4010,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             timeoutSec = timeoutSec <= 0
                     ? Math.max(config.getAddEntryTimeoutSeconds(), 
config.getReadEntryTimeoutSeconds())
                     : timeoutSec;
-            this.timeoutTask = 
this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
-                checkTimeouts();
-            }), timeoutSec, timeoutSec, TimeUnit.SECONDS);
+            this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(
+                    this::checkTimeouts, timeoutSec, timeoutSec, 
TimeUnit.SECONDS);
         }
     }
 
@@ -4336,7 +4334,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 checkLedgerRollTask.cancel(true);
             }
             this.checkLedgerRollTask = this.scheduledExecutor.schedule(
-                    safeRun(this::rollCurrentLedgerIfFull), 
this.maximumRolloverTimeMs, TimeUnit.MILLISECONDS);
+                    this::rollCurrentLedgerIfFull, this.maximumRolloverTimeMs, 
TimeUnit.MILLISECONDS);
         }
     }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index fb93e929f50..fcce1f7766c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -42,7 +42,6 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
-import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.compression.CompressionCodec;
@@ -156,7 +155,7 @@ public class MetaStoreImpl implements MetaStore, 
Consumer<Notification> {
                 .exceptionally(ex -> {
                     try {
                         executor.executeOrdered(ledgerName,
-                                SafeRunnable.safeRun(() -> 
callback.operationFailed(getException(ex))));
+                                () -> 
callback.operationFailed(getException(ex)));
                     } catch (RejectedExecutionException e) {
                         //executor maybe shutdown, use common pool to run 
callback.
                         CompletableFuture.runAsync(() -> 
callback.operationFailed(getException(ex)));
@@ -204,8 +203,8 @@ public class MetaStoreImpl implements MetaStore, 
Consumer<Notification> {
                 .thenAcceptAsync(newVersion -> 
callback.operationComplete(null, newVersion),
                         executor.chooseThread(ledgerName))
                 .exceptionally(ex -> {
-                    executor.executeOrdered(ledgerName, 
SafeRunnable.safeRun(() -> callback
-                            .operationFailed(getException(ex))));
+                    executor.executeOrdered(ledgerName,
+                            () -> callback.operationFailed(getException(ex)));
                     return null;
                 });
     }
@@ -221,8 +220,8 @@ public class MetaStoreImpl implements MetaStore, 
Consumer<Notification> {
                 .thenAcceptAsync(cursors -> 
callback.operationComplete(cursors, null), executor
                         .chooseThread(ledgerName))
                 .exceptionally(ex -> {
-                    executor.executeOrdered(ledgerName, 
SafeRunnable.safeRun(() -> callback
-                            .operationFailed(getException(ex))));
+                    executor.executeOrdered(ledgerName,
+                            () -> callback.operationFailed(getException(ex)));
                     return null;
                 });
     }
@@ -249,8 +248,8 @@ public class MetaStoreImpl implements MetaStore, 
Consumer<Notification> {
                     }
                 }, executor.chooseThread(ledgerName))
                 .exceptionally(ex -> {
-                    executor.executeOrdered(ledgerName, 
SafeRunnable.safeRun(() -> callback
-                            .operationFailed(getException(ex))));
+                    executor.executeOrdered(ledgerName,
+                            () -> callback.operationFailed(getException(ex)));
                     return null;
                 });
     }
@@ -284,8 +283,8 @@ public class MetaStoreImpl implements MetaStore, 
Consumer<Notification> {
                 .thenAcceptAsync(optStat -> callback.operationComplete(null, 
optStat), executor
                         .chooseThread(ledgerName))
                 .exceptionally(ex -> {
-                    executor.executeOrdered(ledgerName, 
SafeRunnable.safeRun(() -> callback
-                            .operationFailed(getException(ex))));
+                    executor.executeOrdered(ledgerName,
+                            () -> callback.operationFailed(getException(ex)));
                     return null;
                 });
     }
@@ -303,7 +302,7 @@ public class MetaStoreImpl implements MetaStore, 
Consumer<Notification> {
                     callback.operationComplete(null, null);
                 }, executor.chooseThread(ledgerName))
                 .exceptionally(ex -> {
-                    executor.executeOrdered(ledgerName, 
SafeRunnable.safeRun(() -> {
+                    executor.executeOrdered(ledgerName, () -> {
                         Throwable actEx = 
FutureUtil.unwrapCompletionException(ex);
                         if (actEx instanceof 
MetadataStoreException.NotFoundException){
                             log.info("[{}] [{}] cursor delete done because it 
did not exist.", ledgerName, cursorName);
@@ -311,7 +310,7 @@ public class MetaStoreImpl implements MetaStore, 
Consumer<Notification> {
                             return;
                         }
                         callback.operationFailed(getException(ex));
-                    }));
+                    });
                     return null;
                 });
     }
@@ -329,8 +328,8 @@ public class MetaStoreImpl implements MetaStore, 
Consumer<Notification> {
                     callback.operationComplete(null, null);
                 }, executor.chooseThread(ledgerName))
                 .exceptionally(ex -> {
-                    executor.executeOrdered(ledgerName, 
SafeRunnable.safeRun(() -> callback
-                            .operationFailed(getException(ex))));
+                    executor.executeOrdered(ledgerName,
+                            () -> callback.operationFailed(getException(ex)));
                     return null;
                 });
     }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index c56123c24ca..ae2beafb643 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -35,8 +35,6 @@ import 
org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
-import org.apache.bookkeeper.mledger.util.SafeRun;
-import org.apache.bookkeeper.util.SafeRunnable;
 
 
 /**
@@ -44,7 +42,7 @@ import org.apache.bookkeeper.util.SafeRunnable;
  *
  */
 @Slf4j
-public class OpAddEntry extends SafeRunnable implements AddCallback, 
CloseCallback {
+public class OpAddEntry implements AddCallback, CloseCallback, Runnable {
     protected ManagedLedgerImpl ml;
     LedgerHandle ledger;
     private long entryId;
@@ -212,7 +210,7 @@ public class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallba
 
     // Called in executor hashed on managed ledger name, once the add 
operation is complete
     @Override
-    public void safeRun() {
+    public void run() {
         if (payloadProcessorHandle != null) {
             payloadProcessorHandle.release();
         }
@@ -328,11 +326,11 @@ public class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallba
         ManagedLedgerImpl finalMl = this.ml;
         finalMl.mbean.recordAddEntryError();
 
-        finalMl.getExecutor().execute(SafeRun.safeRun(() -> {
+        finalMl.getExecutor().execute(() -> {
             // Force the creation of a new ledger. Doing it in a background 
thread to avoid acquiring ML lock
             // from a BK callback.
             finalMl.ledgerClosed(lh);
-        }));
+        });
     }
 
     void close() {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 81b14359514..19211553a5f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -18,7 +18,6 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import java.util.ArrayList;
@@ -108,10 +107,10 @@ class OpReadEntry implements ReadEntriesCallback {
 
         if (!entries.isEmpty()) {
             // There were already some entries that were read before, we can 
return them
-            cursor.ledger.getExecutor().execute(safeRun(() -> {
+            cursor.ledger.getExecutor().execute(() -> {
                 callback.readEntriesComplete(entries, ctx);
                 recycle();
-            }));
+            });
         } else if (cursor.config.isAutoSkipNonRecoverableData() && exception 
instanceof NonRecoverableLedgerException) {
             log.warn("[{}][{}] read failed from ledger at position:{} : {}", 
cursor.ledger.getName(), cursor.getName(),
                     readPosition, exception.getMessage());
@@ -161,20 +160,20 @@ class OpReadEntry implements ReadEntriesCallback {
                 && maxPosition.compareTo(readPosition) > 0) {
 
             // We still have more entries to read from the next ledger, 
schedule a new async operation
-            cursor.ledger.getExecutor().execute(safeRun(() -> {
+            cursor.ledger.getExecutor().execute(() -> {
                 readPosition = 
cursor.ledger.startReadOperationOnLedger(nextReadPosition);
                 cursor.ledger.asyncReadEntries(OpReadEntry.this);
-            }));
+            });
         } else {
             // The reading was already completed, release resources and 
trigger callback
             try {
                 cursor.readOperationCompleted();
 
             } finally {
-                cursor.ledger.getExecutor().execute(safeRun(() -> {
+                cursor.ledger.getExecutor().execute(() -> {
                     callback.readEntriesComplete(entries, ctx);
                     recycle();
-                }));
+                });
             }
         }
     }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
index b1f23941347..b33dd87543f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
@@ -19,7 +19,6 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import static 
org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -66,13 +65,13 @@ public class ShadowManagedLedgerImpl extends 
ManagedLedgerImpl {
     @Override
     synchronized void initialize(ManagedLedgerInitializeLedgerCallback 
callback, Object ctx) {
         log.info("Opening shadow managed ledger {} with source={}", name, 
sourceMLName);
-        executor.execute(safeRun(() -> doInitialize(callback, ctx)));
+        executor.execute(() -> doInitialize(callback, ctx));
     }
 
     private void doInitialize(ManagedLedgerInitializeLedgerCallback callback, 
Object ctx) {
         // Fetch the list of existing ledgers in the source managed ledger
         store.watchManagedLedgerInfo(sourceMLName, (managedLedgerInfo, stat) ->
-                executor.execute(safeRun(() -> 
processSourceManagedLedgerInfo(managedLedgerInfo, stat)))
+                executor.execute(() -> 
processSourceManagedLedgerInfo(managedLedgerInfo, stat))
         );
         store.getManagedLedgerInfo(sourceMLName, false, null, new 
MetaStore.MetaStoreCallback<>() {
             @Override
@@ -106,7 +105,7 @@ public class ShadowManagedLedgerImpl extends 
ManagedLedgerImpl {
 
                 final long lastLedgerId = ledgers.lastKey();
                 mbean.startDataLedgerOpenOp();
-                AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> 
executor.execute(safeRun(() -> {
+                AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> 
executor.execute(() -> {
                     mbean.endDataLedgerOpenOp();
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] Opened source ledger {}", name, 
lastLedgerId);
@@ -145,7 +144,7 @@ public class ShadowManagedLedgerImpl extends 
ManagedLedgerImpl {
                                 BKException.getMessage(rc));
                         
callback.initializeFailed(createManagedLedgerException(rc));
                     }
-                }));
+                });
                 //open ledger in readonly mode.
                 bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType, 
config.getPassword(), opencb, null);
 
@@ -317,7 +316,7 @@ public class ShadowManagedLedgerImpl extends 
ManagedLedgerImpl {
             mbean.startDataLedgerOpenOp();
             //open ledger in readonly mode.
             bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType, 
config.getPassword(),
-                    (rc, lh, ctx1) -> executor.execute(safeRun(() -> {
+                    (rc, lh, ctx1) -> executor.execute(() -> {
                         mbean.endDataLedgerOpenOp();
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] Opened new source ledger {}", 
name, lastLedgerId);
@@ -342,7 +341,7 @@ public class ShadowManagedLedgerImpl extends 
ManagedLedgerImpl {
                             log.error("[{}] Failed to open source ledger {}: 
{}", name, lastLedgerId,
                                     BKException.getMessage(rc));
                         }
-                    })), null);
+                    }), null);
         }
 
         //handle old ledgers deleted.
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
index 080c70b5873..d5a3019855c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
@@ -18,7 +18,6 @@
  */
 package org.apache.bookkeeper.mledger.impl.cache;
 
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
 import java.util.concurrent.ConcurrentHashMap;
@@ -116,7 +115,7 @@ public class RangeEntryCacheManagerImpl implements 
EntryCacheManager {
 
         // Trigger a single eviction in background. While the eviction is 
running we stop inserting entries in the cache
         if (currentSize > evictionTriggerThreshold && 
evictionInProgress.compareAndSet(false, true)) {
-            mlFactory.getScheduledExecutor().execute(safeRun(() -> {
+            mlFactory.getScheduledExecutor().execute(() -> {
                 // Trigger a new cache eviction cycle to bring the used memory 
below the cacheEvictionWatermark
                 // percentage limit
                 long sizeToEvict = currentSize - (long) (maxSize * 
cacheEvictionWatermark);
@@ -136,7 +135,7 @@ public class RangeEntryCacheManagerImpl implements 
EntryCacheManager {
                     mlFactoryMBean.recordCacheEviction();
                     evictionInProgress.set(false);
                 }
-            }));
+            });
         }
 
         return currentSize < maxSize;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/SafeRun.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/SafeRun.java
deleted file mode 100644
index 570cb7ae735..00000000000
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/SafeRun.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.bookkeeper.mledger.util;
-
-import java.util.function.Consumer;
-import org.apache.bookkeeper.util.SafeRunnable;
-
-/**
- * Static builders for {@link SafeRunnable}s.
- */
-public class SafeRun {
-    public static SafeRunnable safeRun(Runnable runnable) {
-        return new SafeRunnable() {
-            @Override
-            public void safeRun() {
-                runnable.run();
-            }
-        };
-    }
-
-    /**
-     *
-     * @param runnable
-     * @param exceptionHandler
-     *            handler that will be called when there are any exception
-     * @return
-     */
-    public static SafeRunnable safeRun(Runnable runnable, Consumer<Throwable> 
exceptionHandler) {
-        return new SafeRunnable() {
-            @Override
-            public void safeRun() {
-                try {
-                    runnable.run();
-                } catch (Throwable t) {
-                    exceptionHandler.accept(t);
-                    throw t;
-                }
-            }
-        };
-    }
-}
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
index 3fa0234e13a..7558f07db76 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
@@ -383,7 +382,7 @@ public class ManagedCursorConcurrencyTest extends 
MockedBookKeeperTestCase {
         final AtomicInteger iteration = new AtomicInteger(0);
 
         for (int i = 0; i < deleteEntries; i++) {
-            executor.submit(safeRun(() -> {
+            executor.submit(() -> {
                 try {
                     
cursor.asyncDelete(addedEntries.get(iteration.getAndIncrement()), new 
DeleteCallback() {
                         @Override
@@ -403,7 +402,7 @@ public class ManagedCursorConcurrencyTest extends 
MockedBookKeeperTestCase {
                 } finally {
                     counter.countDown();
                 }
-            }));
+            });
         }
 
         counter.await();
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index 136fe77d77a..64d1fcdab6f 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.stats.prometheus;
 
-import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.EOFException;
 import java.io.IOException;
@@ -61,7 +60,7 @@ public class PrometheusMetricsServlet extends HttpServlet {
     protected void doGet(HttpServletRequest request, HttpServletResponse 
response) {
         AsyncContext context = request.startAsync();
         context.setTimeout(metricsServletTimeoutMs);
-        executor.execute(safeRun(() -> {
+        executor.execute(() -> {
             long start = System.currentTimeMillis();
             HttpServletResponse res = (HttpServletResponse) 
context.getResponse();
             try {
@@ -92,7 +91,7 @@ public class PrometheusMetricsServlet extends HttpServlet {
                             + "this is likely due to metricsServletTimeoutMs: 
{} ms elapsed: {}", time, e + "");
                 }
             }
-        }));
+        });
     }
 
     protected void generateMetrics(String cluster, ServletOutputStream 
outputStream) throws IOException {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a6345f4a56a..bb08734298f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.service;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import static org.apache.commons.collections4.CollectionUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
@@ -56,7 +55,6 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -76,6 +74,7 @@ import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
@@ -324,8 +323,10 @@ public class BrokerService implements Closeable {
         this.acceptorGroup = EventLoopUtil.newEventLoopGroup(
                 pulsar.getConfiguration().getNumAcceptorThreads(), false, 
acceptorThreadFactory);
         this.workerGroup = eventLoopGroup;
-        this.statsUpdater = Executors.newSingleThreadScheduledExecutor(
-                new 
ExecutorProvider.ExtendedThreadFactory("pulsar-stats-updater"));
+        this.statsUpdater = OrderedScheduler.newSchedulerBuilder()
+                .name("pulsar-stats-updater")
+                .numThreads(1)
+                .build();
         this.authorizationService = new AuthorizationService(
                 pulsar.getConfiguration(), pulsar().getPulsarResources());
         this.entryFilterProvider = new 
EntryFilterProvider(pulsar.getConfiguration());
@@ -333,22 +334,32 @@ public class BrokerService implements Closeable {
         
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
         
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
 
-        this.inactivityMonitor = Executors.newSingleThreadScheduledExecutor(
-                new 
ExecutorProvider.ExtendedThreadFactory("pulsar-inactivity-monitor"));
-        this.messageExpiryMonitor = Executors.newSingleThreadScheduledExecutor(
-                new 
ExecutorProvider.ExtendedThreadFactory("pulsar-msg-expiry-monitor"));
-        this.compactionMonitor =
-                Executors.newSingleThreadScheduledExecutor(
-                        new 
ExecutorProvider.ExtendedThreadFactory("pulsar-compaction-monitor"));
-        this.consumedLedgersMonitor = 
Executors.newSingleThreadScheduledExecutor(
-                new 
ExecutorProvider.ExtendedThreadFactory("consumed-Ledgers-monitor"));
+
+        this.inactivityMonitor = OrderedScheduler.newSchedulerBuilder()
+                .name("pulsar-inactivity-monitor")
+                .numThreads(1)
+                .build();
+        this.messageExpiryMonitor = OrderedScheduler.newSchedulerBuilder()
+                .name("pulsar-msg-expiry-monitor")
+                .numThreads(1)
+                .build();
+        this.compactionMonitor = OrderedScheduler.newSchedulerBuilder()
+                .name("pulsar-compaction-monitor")
+                .numThreads(1)
+                .build();
+        this.consumedLedgersMonitor = OrderedScheduler.newSchedulerBuilder()
+                .name("pulsar-consumed-ledgers-monitor")
+                .numThreads(1)
+                .build();
         this.topicPublishRateLimiterMonitor =
                 new 
PublishRateLimiterMonitor("pulsar-topic-publish-rate-limiter-monitor");
         this.brokerPublishRateLimiterMonitor =
                 new 
PublishRateLimiterMonitor("pulsar-broker-publish-rate-limiter-monitor");
         this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
-        this.backlogQuotaChecker = Executors.newSingleThreadScheduledExecutor(
-                new 
ExecutorProvider.ExtendedThreadFactory("pulsar-backlog-quota-checker"));
+        this.backlogQuotaChecker = OrderedScheduler.newSchedulerBuilder()
+                .name("pulsar-backlog-quota-checker")
+                .numThreads(1)
+                .build();
         this.authenticationService = new 
AuthenticationService(pulsar.getConfiguration());
         this.blockedDispatchers =
                 
ConcurrentOpenHashSet.<PersistentDispatcherMultipleConsumers>newBuilder().build();
@@ -367,7 +378,7 @@ public class BrokerService implements Closeable {
             log.info("Enabling per-broker unack-message limit {} and 
dispatcher-limit {} on blocked-broker",
                     maxUnackedMessages, maxUnackedMsgsPerDispatcher);
             // block misbehaving dispatcher by checking periodically
-            
pulsar.getExecutor().scheduleAtFixedRate(safeRun(this::checkUnAckMessageDispatching),
+            
pulsar.getExecutor().scheduleAtFixedRate(this::checkUnAckMessageDispatching,
                     600, 30, TimeUnit.SECONDS);
         } else {
             this.maxUnackedMessages = 0;
@@ -557,7 +568,7 @@ public class BrokerService implements Closeable {
     }
 
     protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int 
statsUpdateFrequencyInSecs) {
-        statsUpdater.scheduleAtFixedRate(safeRun(this::updateRates),
+        statsUpdater.scheduleAtFixedRate(this::updateRates,
             statsUpdateInitialDelayInSecs, statsUpdateFrequencyInSecs, 
TimeUnit.SECONDS);
 
         // Ensure the broker starts up with initial stats
@@ -567,11 +578,12 @@ public class BrokerService implements Closeable {
     protected void startDeduplicationSnapshotMonitor() {
         int interval = 
pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
         if (interval > 0 && 
pulsar().getConfiguration().isBrokerDeduplicationEnabled()) {
-            this.deduplicationSnapshotMonitor =
-                    Executors.newSingleThreadScheduledExecutor(new 
ExecutorProvider.ExtendedThreadFactory(
-                            "deduplication-snapshot-monitor"));
-            deduplicationSnapshotMonitor.scheduleAtFixedRate(safeRun(() -> 
forEachTopic(
-                    Topic::checkDeduplicationSnapshot))
+            this.deduplicationSnapshotMonitor = 
OrderedScheduler.newSchedulerBuilder()
+                    .name("deduplication-snapshot-monitor")
+                    .numThreads(1)
+                    .build();
+            deduplicationSnapshotMonitor.scheduleAtFixedRate(() -> 
forEachTopic(
+                    Topic::checkDeduplicationSnapshot)
                     , interval, interval, TimeUnit.SECONDS);
         }
     }
@@ -579,14 +591,14 @@ public class BrokerService implements Closeable {
     protected void startInactivityMonitor() {
         if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled()) 
{
             int interval = 
pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
-            inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC()), 
interval, interval,
+            inactivityMonitor.scheduleAtFixedRate(() -> checkGC(), interval, 
interval,
                     TimeUnit.SECONDS);
         }
 
         // Deduplication info checker
         long duplicationCheckerIntervalInSeconds = TimeUnit.MINUTES
                 
.toSeconds(pulsar().getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes())
 / 3;
-        
inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkMessageDeduplicationInfo),
+        
inactivityMonitor.scheduleAtFixedRate(this::checkMessageDeduplicationInfo,
                 duplicationCheckerIntervalInSeconds,
                 duplicationCheckerIntervalInSeconds, TimeUnit.SECONDS);
 
@@ -595,7 +607,7 @@ public class BrokerService implements Closeable {
             long subscriptionExpiryCheckIntervalInSeconds =
                     TimeUnit.MINUTES.toSeconds(pulsar().getConfiguration()
                             .getSubscriptionExpiryCheckIntervalInMinutes());
-            
inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkInactiveSubscriptions),
+            
inactivityMonitor.scheduleAtFixedRate(this::checkInactiveSubscriptions,
                     subscriptionExpiryCheckIntervalInSeconds,
                     subscriptionExpiryCheckIntervalInSeconds, 
TimeUnit.SECONDS);
         }
@@ -603,21 +615,21 @@ public class BrokerService implements Closeable {
         // check cluster migration
         int interval = 
pulsar().getConfiguration().getClusterMigrationCheckDurationSeconds();
         if (interval > 0) {
-            inactivityMonitor.scheduleAtFixedRate(safeRun(() -> 
checkClusterMigration()), interval, interval,
+            inactivityMonitor.scheduleAtFixedRate(() -> 
checkClusterMigration(), interval, interval,
                     TimeUnit.SECONDS);
         }
     }
 
     protected void startMessageExpiryMonitor() {
         int interval = 
pulsar().getConfiguration().getMessageExpiryCheckIntervalInMinutes();
-        
messageExpiryMonitor.scheduleAtFixedRate(safeRun(this::checkMessageExpiry), 
interval, interval,
+        messageExpiryMonitor.scheduleAtFixedRate(this::checkMessageExpiry, 
interval, interval,
                 TimeUnit.MINUTES);
     }
 
     protected void startCheckReplicationPolicies() {
         int interval = 
pulsar.getConfig().getReplicationPolicyCheckDurationSeconds();
         if (interval > 0) {
-            
messageExpiryMonitor.scheduleAtFixedRate(safeRun(this::checkReplicationPolicies),
 interval, interval,
+            
messageExpiryMonitor.scheduleAtFixedRate(this::checkReplicationPolicies, 
interval, interval,
                     TimeUnit.SECONDS);
         }
     }
@@ -625,16 +637,16 @@ public class BrokerService implements Closeable {
     protected void startCompactionMonitor() {
         int interval = 
pulsar().getConfiguration().getBrokerServiceCompactionMonitorIntervalInSeconds();
         if (interval > 0) {
-            compactionMonitor.scheduleAtFixedRate(safeRun(() -> 
checkCompaction()),
-                                                  interval, interval, 
TimeUnit.SECONDS);
+            compactionMonitor.scheduleAtFixedRate(this::checkCompaction,
+                    interval, interval, TimeUnit.SECONDS);
         }
     }
 
     protected void startConsumedLedgersMonitor() {
         int interval = 
pulsar().getConfiguration().getRetentionCheckIntervalInSeconds();
         if (interval > 0) {
-            
consumedLedgersMonitor.scheduleAtFixedRate(safeRun(this::checkConsumedLedgers),
-                                                            interval, 
interval, TimeUnit.SECONDS);
+            
consumedLedgersMonitor.scheduleAtFixedRate(this::checkConsumedLedgers,
+                    interval, interval, TimeUnit.SECONDS);
         }
     }
 
@@ -642,7 +654,7 @@ public class BrokerService implements Closeable {
         if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
             final int interval = 
pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
             log.info("Scheduling a thread to check backlog quota after [{}] 
seconds in background", interval);
-            
backlogQuotaChecker.scheduleAtFixedRate(safeRun(this::monitorBacklogQuota), 
interval, interval,
+            backlogQuotaChecker.scheduleAtFixedRate(this::monitorBacklogQuota, 
interval, interval,
                     TimeUnit.SECONDS);
         } else {
             log.info("Backlog quota check monitoring is disabled");
@@ -702,12 +714,15 @@ public class BrokerService implements Closeable {
                 stop();
             }
             //start monitor.
-            scheduler = Executors.newSingleThreadScheduledExecutor(new 
ExecutorProvider.ExtendedThreadFactory(name));
+            scheduler = OrderedScheduler.newSchedulerBuilder()
+                    .name(name)
+                    .numThreads(1)
+                    .build();
             // schedule task that sums up publish-rate across all cnx on a 
topic ,
             // and check the rate limit exceeded or not.
-            scheduler.scheduleAtFixedRate(safeRun(checkTask), tickTimeMs, 
tickTimeMs, TimeUnit.MILLISECONDS);
+            scheduler.scheduleAtFixedRate(checkTask, tickTimeMs, tickTimeMs, 
TimeUnit.MILLISECONDS);
             // schedule task that refreshes rate-limiting bucket
-            scheduler.scheduleAtFixedRate(safeRun(refreshTask), 1, 1, 
TimeUnit.SECONDS);
+            scheduler.scheduleAtFixedRate(refreshTask, 1, 1, TimeUnit.SECONDS);
             this.tickTimeMs = tickTimeMs;
             this.refreshTask = refreshTask;
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 677ed25ef21..bfd7c001e4c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -67,7 +67,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.lang3.mutable.MutableInt;
@@ -1674,9 +1673,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 final long producerId = send.getProducerId();
                 final long sequenceId = send.getSequenceId();
                 final long highestSequenceId = send.getHighestSequenceId();
-                
service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(), 
SafeRun.safeRun(() -> {
+                
service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(), 
() -> {
                     commandSender.sendSendReceiptResponse(producerId, 
sequenceId, highestSequenceId, -1, -1);
-                }));
+                });
                 producer.recordMessageDrop(send.getNumMessages());
                 return;
             } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 9540f6efe00..4ac755860fc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
@@ -236,9 +235,9 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
 
     @Override
     public void consumerFlow(Consumer consumer, int 
additionalNumberOfMessages) {
-        topic.getBrokerService().executor().execute(safeRun(() -> {
+        topic.getBrokerService().executor().execute(() -> {
             internalConsumerFlow(consumer, additionalNumberOfMessages);
-        }));
+        });
     }
 
     private synchronized void internalConsumerFlow(Consumer consumer, int 
additionalNumberOfMessages) {
@@ -264,7 +263,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
      *
      */
     public void readMoreEntriesAsync() {
-        
topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
+        topic.getBrokerService().executor().execute(this::readMoreEntries);
     }
 
     public synchronized void readMoreEntries() {
@@ -584,14 +583,14 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
             // setting sendInProgress here, because sendMessagesToConsumers 
will be executed
             // in a separate thread, and we want to prevent more reads
             acquireSendInProgress();
-            dispatchMessagesThread.execute(safeRun(() -> {
+            dispatchMessagesThread.execute(() -> {
                 if (sendMessagesToConsumers(readType, entries, false)) {
                     updatePendingBytesToDispatch(-size);
                     readMoreEntries();
                 } else {
                     updatePendingBytesToDispatch(-size);
                 }
-            }));
+            });
         } else {
             if (sendMessagesToConsumers(readType, entries, true)) {
                 updatePendingBytesToDispatch(-size);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 03dce58af3a..7cbf7bd2c78 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -38,7 +38,6 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentWaitCallba
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
@@ -149,9 +148,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
 
     @Override
     public void readEntriesComplete(final List<Entry> entries, Object obj) {
-        topicExecutor.execute(SafeRun.safeRun(() -> {
-            internalReadEntriesComplete(entries, obj);
-        }));
+        topicExecutor.execute(() -> internalReadEntriesComplete(entries, obj));
     }
 
     public synchronized void internalReadEntriesComplete(final List<Entry> 
entries, Object obj) {
@@ -229,21 +226,19 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
                             sendMessageInfo.getTotalMessages(), 
sendMessageInfo.getTotalBytes());
 
                     // Schedule a new read batch operation only after the 
previous batch has been written to the socket.
-                    topicExecutor.execute(SafeRun.safeRun(() -> {
+                    topicExecutor.execute(() -> {
                             synchronized 
(PersistentDispatcherSingleActiveConsumer.this) {
                                 Consumer newConsumer = getActiveConsumer();
                                 readMoreEntries(newConsumer);
                             }
-                        }));
+                        });
                 }
             });
     }
 
     @Override
     public void consumerFlow(Consumer consumer, int 
additionalNumberOfMessages) {
-        topicExecutor.execute(SafeRun.safeRun(() -> {
-            internalConsumerFlow(consumer);
-        }));
+        topicExecutor.execute(() -> internalConsumerFlow(consumer));
     }
 
     private synchronized void internalConsumerFlow(Consumer consumer) {
@@ -272,9 +267,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
 
     @Override
     public void redeliverUnacknowledgedMessages(Consumer consumer, long 
consumerEpoch) {
-        topicExecutor.execute(SafeRun.safeRun(() -> {
-            internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch);
-        }));
+        topicExecutor.execute(() -> 
internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch));
     }
 
     private synchronized void internalRedeliverUnacknowledgedMessages(Consumer 
consumer, long consumerEpoch) {
@@ -466,9 +459,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
 
     @Override
     public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
-        topicExecutor.execute(SafeRun.safeRun(() -> {
-            internalReadEntriesFailed(exception, ctx);
-        }));
+        topicExecutor.execute(() -> internalReadEntriesFailed(exception, ctx));
     }
 
     private synchronized void internalReadEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
@@ -516,7 +507,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         topic.getBrokerService().executor().schedule(() -> {
 
             // Jump again into dispatcher dedicated thread
-            topicExecutor.execute(SafeRun.safeRun(() -> {
+            topicExecutor.execute(() -> {
                 synchronized (PersistentDispatcherSingleActiveConsumer.this) {
                     Consumer currentConsumer = 
ACTIVE_CONSUMER_UPDATER.get(this);
                     // we should retry the read if we have an active consumer 
and there is no pending read
@@ -533,7 +524,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
                                 currentConsumer, havePendingRead);
                     }
                 }
-            }));
+            });
         }, waitTimeMillis, TimeUnit.MILLISECONDS);
 
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index a0292729e73..1a8c6e180a2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.util.ArrayList;
@@ -415,7 +414,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
     public void markDeletePositionMoveForward() {
         // Execute the notification in different thread to avoid a mutex chain 
here
         // from the delete operation that was completed
-        topic.getBrokerService().getTopicOrderedExecutor().execute(safeRun(() 
-> {
+        topic.getBrokerService().getTopicOrderedExecutor().execute(() -> {
             synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) 
{
                 if (recentlyJoinedConsumers != null && 
!recentlyJoinedConsumers.isEmpty()
                         && removeConsumersFromRecentJoinedConsumers()) {
@@ -424,7 +423,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                     readMoreEntries();
                 }
             }
-        }));
+        });
     }
 
     private boolean removeConsumersFromRecentJoinedConsumers() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
index 5df1fc2c6db..44e8a423344 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import com.google.common.collect.Lists;
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -30,7 +29,6 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.Subscription;
 import 
org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
@@ -105,13 +103,13 @@ public class 
PersistentStreamingDispatcherMultipleConsumers extends PersistentDi
             // setting sendInProgress here, because sendMessagesToConsumers 
will be executed
             // in a separate thread, and we want to prevent more reads
             acquireSendInProgress();
-            dispatchMessagesThread.execute(safeRun(() -> {
+            dispatchMessagesThread.execute(() -> {
                 if (sendMessagesToConsumers(readType, 
Lists.newArrayList(entry), false)) {
                     readMoreEntries();
                 } else {
                     updatePendingBytesToDispatch(-size);
                 }
-            }));
+            });
         } else {
             if (sendMessagesToConsumers(readType, Lists.newArrayList(entry), 
true)) {
                 readMoreEntriesAsync();
@@ -129,7 +127,7 @@ public class PersistentStreamingDispatcherMultipleConsumers 
extends PersistentDi
     public void canReadMoreEntries(boolean withBackoff) {
         havePendingRead = false;
         topic.getBrokerService().executor().schedule(() -> {
-            topicExecutor.execute(SafeRun.safeRun(() -> {
+            topicExecutor.execute(() -> {
                 synchronized 
(PersistentStreamingDispatcherMultipleConsumers.this) {
                     if (!havePendingRead) {
                         log.info("[{}] Scheduling read operation", name);
@@ -138,7 +136,7 @@ public class PersistentStreamingDispatcherMultipleConsumers 
extends PersistentDi
                         log.info("[{}] Skipping read since we have 
pendingRead", name);
                     }
                 }
-            }));
+            });
         }, withBackoff
                 ? readFailureBackoff.next() : 0, TimeUnit.MILLISECONDS);
     }
@@ -216,7 +214,7 @@ public class PersistentStreamingDispatcherMultipleConsumers 
extends PersistentDi
                     havePendingReplayRead = false;
                     // We should not call readMoreEntries() recursively in the 
same thread
                     // as there is a risk of StackOverflowError
-                    
topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
+                    
topic.getBrokerService().executor().execute(this::readMoreEntries);
                 }
             } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == 
TRUE) {
                 log.debug("[{}] Dispatcher read is blocked due to 
unackMessages {} reached to max {}", name,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
index 9000850ed69..efe9de778a3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import com.google.common.collect.Lists;
 import java.util.concurrent.Executor;
@@ -29,7 +28,6 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
@@ -68,7 +66,7 @@ public class 
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
     public void canReadMoreEntries(boolean withBackoff) {
         havePendingRead = false;
         topic.getBrokerService().executor().schedule(() -> {
-             topicExecutor.execute(SafeRun.safeRun(() -> {
+             topicExecutor.execute(() -> {
                 synchronized 
(PersistentStreamingDispatcherSingleActiveConsumer.this) {
                     Consumer currentConsumer = 
ACTIVE_CONSUMER_UPDATER.get(this);
                     if (currentConsumer != null && !havePendingRead) {
@@ -81,7 +79,7 @@ public class 
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
                                 currentConsumer, havePendingRead);
                     }
                 }
-            }));
+            });
         }, withBackoff
                 ? readFailureBackoff.next() : 0, TimeUnit.MILLISECONDS);
     }
@@ -115,9 +113,7 @@ public class 
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
      */
     @Override
     public void readEntryComplete(Entry entry, PendingReadEntryRequest ctx) {
-        dispatcherExecutor.execute(safeRun(() -> {
-            internalReadEntryComplete(entry, ctx);
-        }));
+        dispatcherExecutor.execute(() -> internalReadEntryComplete(entry, 
ctx));
     }
 
     public synchronized void internalReadEntryComplete(Entry entry, 
PendingReadEntryRequest ctx) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
index 0f75538ee88..6ffc5ba0f62 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
@@ -35,7 +35,6 @@ import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
 import org.apache.pulsar.client.impl.Backoff;
@@ -157,9 +156,7 @@ public class StreamingEntryReader implements 
AsyncCallbacks.ReadEntryCallback, W
     @Override
     public void readEntryComplete(Entry entry, Object ctx) {
         // Don't block caller thread, complete read entry with dispatcher 
dedicated thread.
-        dispatcherExecutor.execute(SafeRun.safeRun(() -> {
-            internalReadEntryComplete(entry, ctx);
-        }));
+        dispatcherExecutor.execute(() -> internalReadEntryComplete(entry, 
ctx));
     }
 
     private void internalReadEntryComplete(Entry entry, Object ctx) {
@@ -198,9 +195,7 @@ public class StreamingEntryReader implements 
AsyncCallbacks.ReadEntryCallback, W
     @Override
     public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
         // Don't block caller thread, complete read entry fail with dispatcher 
dedicated thread.
-        dispatcherExecutor.execute(SafeRun.safeRun(() -> {
-            internalReadEntryFailed(exception, ctx);
-        }));
+        dispatcherExecutor.execute(() -> internalReadEntryFailed(exception, 
ctx));
     }
 
     private void internalReadEntryFailed(ManagedLedgerException exception, 
Object ctx) {
@@ -257,13 +252,13 @@ public class StreamingEntryReader implements 
AsyncCallbacks.ReadEntryCallback, W
     public boolean cancelReadRequests() {
         if (STATE_UPDATER.compareAndSet(this, State.Issued, State.Canceling)) {
             // Don't block caller thread, complete cancel read with dispatcher 
dedicated thread.
-             topicExecutor.execute(SafeRun.safeRun(() -> {
+             topicExecutor.execute(() -> {
                 synchronized (StreamingEntryReader.this) {
                     if (STATE_UPDATER.compareAndSet(this, State.Canceling, 
State.Canceled)) {
                         internalCancelReadRequests();
                     }
                 }
-            }));
+            });
             return true;
         }
         return false;
@@ -282,16 +277,16 @@ public class StreamingEntryReader implements 
AsyncCallbacks.ReadEntryCallback, W
     private void retryReadRequest(PendingReadEntryRequest 
pendingReadEntryRequest, long delay) {
         topic.getBrokerService().executor().schedule(() -> {
             // Jump again into dispatcher dedicated thread
-            dispatcherExecutor.execute(SafeRun.safeRun(() -> {
+            dispatcherExecutor.execute(() -> {
                 ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
cursor.getManagedLedger();
                 managedLedger.asyncReadEntry(pendingReadEntryRequest.position, 
this, pendingReadEntryRequest);
-            }));
+            });
         }, delay, TimeUnit.MILLISECONDS);
     }
 
     @Override
     public void entriesAvailable() {
-        
dispatcherExecutor.execute(SafeRun.safeRun(this::internalEntriesAvailable));
+        dispatcherExecutor.execute(this::internalEntriesAvailable);
     }
 
     private synchronized void internalEntriesAvailable() {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
index 11ae62226e7..9e6a9b94c42 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
@@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
@@ -89,7 +88,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
         this.sequencer = FutureUtil.Sequencer.create();
         store.registerListener(this::handlePathNotification);
         store.registerSessionListener(this::handleSessionNotification);
-        updateCachedValueFuture = 
executor.scheduleWithFixedDelay(SafeRunnable.safeRun(this::getLeaderValue),
+        updateCachedValueFuture = 
executor.scheduleWithFixedDelay(this::getLeaderValue,
                 metadataCacheConfig.getRefreshAfterWriteMillis() / 2,
                 metadataCacheConfig.getRefreshAfterWriteMillis(), 
TimeUnit.MILLISECONDS);
     }

Reply via email to