merlimat closed pull request #1489: Refactored to reflect changes in BK for 
OrderedExecutor
URL: https://github.com/apache/incubator-pulsar/pull/1489
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
index f267a6c06..60ba634e1 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
@@ -188,7 +188,7 @@ public void asyncReadEntry(LedgerHandle lh, PositionImpl 
position, final ReadEnt
                     manager.mlFactoryMBean.recordCacheMiss(1, 
returnEntry.getLength());
                     ml.mbean.addReadEntriesSample(1, returnEntry.getLength());
 
-                    ml.getExecutor().submitOrdered(ml.getName(), safeRun(() -> 
{
+                    ml.getExecutor().executeOrdered(ml.getName(), safeRun(() 
-> {
                         callback.readEntryComplete(returnEntry, obj);
                     }));
                 } else {
@@ -254,7 +254,7 @@ public void asyncReadEntry(LedgerHandle lh, long 
firstEntry, long lastEntry, boo
 
                 checkNotNull(ml.getName());
                 checkNotNull(ml.getExecutor());
-                ml.getExecutor().submitOrdered(ml.getName(), safeRun(() -> {
+                ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> {
                     // We got the entries, we need to transform them to a 
List<> type
                     long totalSize = 0;
                     final List<EntryImpl> entriesToReturn = 
Lists.newArrayListWithExpectedSize(entriesToRead);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
index 7faa18c8e..262cbeb4c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
@@ -109,7 +109,7 @@ boolean hasSpaceInCache() {
 
         // Trigger a single eviction in background. While the eviction is 
running we stop inserting entries in the cache
         if (currentSize > evictionTriggerThreshold && 
evictionInProgress.compareAndSet(false, true)) {
-            mlFactory.executor.execute(safeRun(() -> {
+            mlFactory.scheduledExecutor.execute(safeRun(() -> {
                 // Trigger a new cache eviction cycle to bring the used memory 
below the cacheEvictionWatermark
                 // percentage limit
                 long sizeToEvict = currentSize - (long) (maxSize * 
cacheEvictionWatermak);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 29fe0a724..a2be71d15 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -37,6 +37,7 @@
 import com.google.common.collect.TreeRangeSet;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.protobuf.InvalidProtocolBufferException;
+
 import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.List;
@@ -51,6 +52,7 @@
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
+
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.client.BKException;
@@ -847,7 +849,7 @@ public void asyncResetCursor(Position newPos, 
AsyncCallbacks.ResetCursorCallback
         final PositionImpl newPosition = (PositionImpl) newPos;
 
         // order trim and reset operations on a ledger
-        ledger.getExecutor().submitOrdered(ledger.getName(), safeRun(() -> {
+        ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> {
             if (ledger.isValidPosition(newPosition) || 
newPosition.equals(PositionImpl.earliest)
                     || newPosition.equals(PositionImpl.latest)) {
                 internalResetCursor(newPosition, callback);
@@ -1923,7 +1925,7 @@ void createNewMetadataLedger(final VoidCallback callback) 
{
 
         bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), 
config.getMetadataWriteQuorumSize(),
                 config.getMetadataAckQuorumSize(), config.getDigestType(), 
config.getPassword(), (rc, lh, ctx) -> {
-                    ledger.getExecutor().submit(safeRun(() -> {
+                    ledger.getExecutor().execute(safeRun(() -> {
                         ledger.mbean.endCursorLedgerCreateOp();
                         if (rc != BKException.Code.OK) {
                             log.warn("[{}] Error creating ledger for cursor 
{}: {}", ledger.getName(), name,
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 4c0d3d703..d148d1dda 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -23,7 +23,7 @@
 
 import com.google.common.base.Predicates;
 import com.google.common.collect.Maps;
-import io.netty.util.concurrent.DefaultThreadFactory;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -31,12 +31,12 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -63,7 +63,6 @@
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
 import org.apache.bookkeeper.mledger.util.Futures;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
@@ -77,9 +76,9 @@
     private final boolean isBookkeeperManaged;
     private final ZooKeeper zookeeper;
     private final ManagedLedgerFactoryConfig config;
-    protected final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(16,
-            new DefaultThreadFactory("bookkeeper-ml"));
-    private final OrderedScheduler orderedExecutor = 
OrderedScheduler.newSchedulerBuilder().numThreads(16)
+    protected final OrderedScheduler scheduledExecutor = 
OrderedScheduler.newSchedulerBuilder().numThreads(16)
+            .name("bookkeeper-ml-scheduler").build();
+    private final OrderedExecutor orderedExecutor = 
OrderedExecutor.newBuilder().numThreads(16)
             .name("bookkeeper-ml-workers").build();
 
     protected final ManagedLedgerFactoryMBeanImpl mbean;
@@ -122,7 +121,7 @@ public ManagedLedgerFactoryImpl(ClientConfiguration 
bkClientConfiguration, Manag
         this.config = config;
         this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
         this.entryCacheManager = new EntryCacheManager(this);
-        this.statsTask = executor.scheduleAtFixedRate(() -> refreshStats(), 0, 
StatsPeriodSeconds, TimeUnit.SECONDS);
+        this.statsTask = scheduledExecutor.scheduleAtFixedRate(() -> 
refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS);
     }
 
     public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper 
zooKeeper) throws Exception {
@@ -138,7 +137,7 @@ public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, 
ZooKeeper zooKeeper, Mana
         this.config = config;
         this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
         this.entryCacheManager = new EntryCacheManager(this);
-        this.statsTask = executor.scheduleAtFixedRate(() -> refreshStats(), 0, 
StatsPeriodSeconds, TimeUnit.SECONDS);
+        this.statsTask = scheduledExecutor.scheduleAtFixedRate(() -> 
refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS);
     }
 
     private synchronized void refreshStats() {
@@ -232,7 +231,7 @@ public void asyncOpen(final String name, final 
ManagedLedgerConfig config, final
         ledgers.computeIfAbsent(name, (mlName) -> {
             // Create the managed ledger
             CompletableFuture<ManagedLedgerImpl> future = new 
CompletableFuture<>();
-            final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this, 
bookKeeper, store, config, executor,
+            final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this, 
bookKeeper, store, config, scheduledExecutor,
                     orderedExecutor, name);
             newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
                 @Override
@@ -305,7 +304,7 @@ public void closeFailed(ManagedLedgerException exception, 
Object ctx) {
             }
         }
 
-        executor.shutdown();
+        scheduledExecutor.shutdown();
         orderedExecutor.shutdown();
 
         entryCacheManager.clear();
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 34283ed0d..4aceb6ca3 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -22,18 +22,26 @@
 import static java.lang.Math.min;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Range;
+import com.google.common.util.concurrent.RateLimiter;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -45,6 +53,7 @@
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -77,23 +86,13 @@
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.bookkeeper.mledger.util.Pair;
 import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.BoundType;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Range;
-import com.google.common.util.concurrent.RateLimiter;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
 public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     private final static long MegaByte = 1024 * 1024;
 
@@ -190,8 +189,8 @@
             AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, 
State.class, "state");
     private volatile State state = null;
 
-    private final ScheduledExecutorService scheduledExecutor;
-    private final OrderedScheduler executor;
+    private final OrderedScheduler scheduledExecutor;
+    private final OrderedExecutor executor;
     final ManagedLedgerFactoryImpl factory;
     protected final ManagedLedgerMBeanImpl mbean;
 
@@ -204,7 +203,7 @@
     // //////////////////////////////////////////////////////////////////////
 
     public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper 
bookKeeper, MetaStore store,
-            ManagedLedgerConfig config, ScheduledExecutorService 
scheduledExecutor, OrderedScheduler orderedExecutor,
+            ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, 
OrderedExecutor orderedExecutor,
             final String name) {
         this.factory = factory;
         this.bookKeeper = bookKeeper;
@@ -250,7 +249,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, 
Stat stat) {
                 if (ledgers.size() > 0) {
                     final long id = ledgers.lastKey();
                     OpenCallback opencb = (rc, lh, ctx1) -> {
-                        executor.submitOrdered(name, safeRun(() -> {
+                        executor.executeOrdered(name, safeRun(() -> {
                             mbean.endDataLedgerOpenOp();
                             if (log.isDebugEnabled()) {
                                 log.debug("[{}] Opened ledger {}: ", name, id, 
BKException.getMessage(rc));
@@ -338,7 +337,7 @@ public void operationFailed(MetaStoreException e) {
         mbean.startDataLedgerCreateOp();
         bookKeeper.asyncCreateLedger(config.getEnsembleSize(), 
config.getWriteQuorumSize(), config.getAckQuorumSize(),
                 config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> 
{
-                    executor.submitOrdered(name, safeRun(() -> {
+                    executor.executeOrdered(name, safeRun(() -> {
                         mbean.endDataLedgerCreateOp();
                         if (rc != BKException.Code.OK) {
                             
callback.initializeFailed(createManagedLedgerException(rc));
@@ -1319,7 +1318,7 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
             mbean.startDataLedgerOpenOp();
             bookKeeper.asyncOpenLedger(ledgerId, config.getDigestType(), 
config.getPassword(),
                     (int rc, LedgerHandle lh, Object ctx) -> {
-                        executor.submit(safeRun(() -> {
+                        executor.executeOrdered(name, safeRun(() -> {
                             mbean.endDataLedgerOpenOp();
                             if (rc != BKException.Code.OK) {
                                 // Remove the ledger future from cache to give 
chance to reopen it later
@@ -1484,12 +1483,12 @@ void notifyCursors() {
                 break;
             }
 
-            executor.submit(safeRun(() -> 
waitingCursor.notifyEntriesAvailable()));
+            executor.execute(safeRun(() -> 
waitingCursor.notifyEntriesAvailable()));
         }
     }
 
     private void trimConsumedLedgersInBackground() {
-        executor.submitOrdered(name, safeRun(() -> {
+        executor.executeOrdered(name, safeRun(() -> {
             internalTrimConsumedLedgers();
         }));
     }
@@ -2113,11 +2112,11 @@ private boolean currentLedgerIsFull() {
         return ledgers;
     }
 
-    ScheduledExecutorService getScheduledExecutor() {
+    OrderedScheduler getScheduledExecutor() {
         return scheduledExecutor;
     }
 
-    OrderedScheduler getExecutor() {
+    OrderedExecutor getExecutor() {
         return executor;
     }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
index 7a7697531..31705c1b6 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
@@ -20,11 +20,16 @@
 
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 
+import com.google.common.base.Charsets;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.TextFormat;
+import com.google.protobuf.TextFormat.ParseException;
+
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
@@ -40,11 +45,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Charsets;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.TextFormat;
-import com.google.protobuf.TextFormat.ParseException;
-
 @SuppressWarnings("checkstyle:javadoctype")
 public class MetaStoreImplZookeeper implements MetaStore {
 
@@ -55,7 +55,7 @@
     private static final String prefix = prefixName + "/";
 
     private final ZooKeeper zk;
-    private final OrderedScheduler executor;
+    private final OrderedExecutor executor;
 
     private static class ZKStat implements Stat {
         private final int version;
@@ -90,7 +90,7 @@ public long getModificationTimestamp() {
         }
     }
 
-    public MetaStoreImplZookeeper(ZooKeeper zk, OrderedScheduler executor)
+    public MetaStoreImplZookeeper(ZooKeeper zk, OrderedExecutor executor)
             throws Exception {
         this.zk = zk;
         this.executor = executor;
@@ -130,7 +130,8 @@ private ManagedLedgerInfo 
updateMLInfoTimestamp(ManagedLedgerInfo info) {
     @Override
     public void getManagedLedgerInfo(final String ledgerName, final 
MetaStoreCallback<ManagedLedgerInfo> callback) {
         // Try to get the content or create an empty node
-        zk.getData(prefix + ledgerName, false, (rc, path, ctx, readData, stat) 
-> executor.submit(safeRun(() -> {
+        zk.getData(prefix + ledgerName, false,
+                (rc, path, ctx, readData, stat) -> 
executor.executeOrdered(ledgerName, safeRun(() -> {
             if (rc == Code.OK.intValue()) {
                 try {
                     ManagedLedgerInfo info = parseManagedLedgerInfo(readData);
@@ -171,7 +172,7 @@ public void asyncUpdateLedgerIds(String ledgerName, 
ManagedLedgerInfo mlInfo, St
         byte[] serializedMlInfo = mlInfo.toByteArray(); // Binary format
 
         zk.setData(prefix + ledgerName, serializedMlInfo, zkStat.getVersion(),
-                (rc, path, zkCtx, stat1) -> executor.submit(safeRun(() -> {
+                (rc, path, zkCtx, stat1) -> 
executor.executeOrdered(ledgerName, safeRun(() -> {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] UpdateLedgersIdsCallback.processResult 
rc={} newVersion={}", ledgerName,
                                 Code.get(rc), stat != null ? stat.getVersion() 
: "null");
@@ -195,7 +196,8 @@ public void getCursors(final String ledgerName, final 
MetaStoreCallback<List<Str
         if (log.isDebugEnabled()) {
             log.debug("[{}] Get cursors list", ledgerName);
         }
-        zk.getChildren(prefix + ledgerName, false, (rc, path, ctx, children, 
stat) -> executor.submit(safeRun(() -> {
+        zk.getChildren(prefix + ledgerName, false,
+                (rc, path, ctx, children, stat) -> 
executor.executeOrdered(ledgerName, safeRun(() -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] getConsumers complete rc={} children={}", 
ledgerName, Code.get(rc), children);
             }
@@ -219,7 +221,7 @@ public void asyncGetCursorInfo(String ledgerName, String 
consumerName,
             log.debug("Reading from {}", path);
         }
 
-        zk.getData(path, false, (rc, path1, ctx, data, stat) -> 
executor.submit(safeRun(() -> {
+        zk.getData(path, false, (rc, path1, ctx, data, stat) -> 
executor.executeOrdered(ledgerName, safeRun(() -> {
             if (rc != Code.OK.intValue()) {
                 callback.operationFailed(new 
MetaStoreException(KeeperException.create(Code.get(rc))));
             } else {
@@ -251,7 +253,7 @@ public void asyncUpdateCursorInfo(final String ledgerName, 
final String cursorNa
                 log.debug("[{}] Creating consumer {} on meta-data store with 
{}", ledgerName, cursorName, info);
             }
             zk.create(path, content, Acl, CreateMode.PERSISTENT,
-                    (rc, path1, ctx, name) -> executor.submit(safeRun(() -> {
+                    (rc, path1, ctx, name) -> 
executor.executeOrdered(ledgerName, safeRun(() -> {
                         if (rc != Code.OK.intValue()) {
                             log.warn("[{}] Error creating cosumer {} node on 
meta-data store with {}: ", ledgerName,
                                     cursorName, info, Code.get(rc));
@@ -269,7 +271,8 @@ public void asyncUpdateCursorInfo(final String ledgerName, 
final String cursorNa
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Updating consumer {} on meta-data store with 
{}", ledgerName, cursorName, info);
             }
-            zk.setData(path, content, zkStat.getVersion(), (rc, path1, ctx, 
stat1) -> executor.submit(safeRun(() -> {
+            zk.setData(path, content, zkStat.getVersion(),
+                    (rc, path1, ctx, stat1) -> 
executor.executeOrdered(ledgerName, safeRun(() -> {
                 if (rc == Code.BADVERSION.intValue()) {
                     callback.operationFailed(new 
BadVersionException(KeeperException.create(Code.get(rc))));
                 } else if (rc != Code.OK.intValue()) {
@@ -285,7 +288,8 @@ public void asyncUpdateCursorInfo(final String ledgerName, 
final String cursorNa
     public void asyncRemoveCursor(final String ledgerName, final String 
consumerName,
             final MetaStoreCallback<Void> callback) {
         log.info("[{}] Remove consumer={}", ledgerName, consumerName);
-        zk.delete(prefix + ledgerName + "/" + consumerName, -1, (rc, path, 
ctx) -> executor.submit(safeRun(() -> {
+        zk.delete(prefix + ledgerName + "/" + consumerName, -1,
+                (rc, path, ctx) -> executor.executeOrdered(ledgerName, 
safeRun(() -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] zk delete done. rc={}", ledgerName, 
consumerName, Code.get(rc));
             }
@@ -300,7 +304,7 @@ public void asyncRemoveCursor(final String ledgerName, 
final String consumerName
     @Override
     public void removeManagedLedger(String ledgerName, MetaStoreCallback<Void> 
callback) {
         log.info("[{}] Remove ManagedLedger", ledgerName);
-        zk.delete(prefix + ledgerName, -1, (rc, path, ctx) -> 
executor.submit(safeRun(() -> {
+        zk.delete(prefix + ledgerName, -1, (rc, path, ctx) -> 
executor.executeOrdered(ledgerName, safeRun(() -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] zk delete done. rc={}", ledgerName, 
Code.get(rc));
             }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 85c4da624..f4daec52e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -123,14 +123,14 @@ public void addComplete(int rc, final LedgerHandle lh, 
long entryId, Object ctx)
             // be marked as failed.
             ml.mbean.recordAddEntryError();
 
-            ml.getExecutor().submitOrdered(ml.getName(), SafeRun.safeRun(() -> 
{
+            ml.getExecutor().executeOrdered(ml.getName(), SafeRun.safeRun(() 
-> {
                 // Force the creation of a new ledger. Doing it in a 
background thread to avoid acquiring ML lock
                 // from a BK callback.
                 ml.ledgerClosed(lh);
             }));
         } else {
             // Trigger addComplete callback in a thread hashed on the managed 
ledger name
-            ml.getExecutor().submitOrdered(ml.getName(), this);
+            ml.getExecutor().executeOrdered(ml.getName(), this);
         }
     }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 31d1c79b3..6b3a03adb 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -83,7 +83,7 @@ public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
 
         if (!entries.isEmpty()) {
             // There were already some entries that were read before, we can 
return them
-            cursor.ledger.getExecutor().submit(safeRun(() -> {
+            cursor.ledger.getExecutor().execute(safeRun(() -> {
                 callback.readEntriesComplete(entries, ctx);
                 recycle();
             }));
@@ -131,7 +131,7 @@ void checkReadCompletion() {
             }
 
             // Schedule next read in a different thread
-            cursor.ledger.getExecutor().submit(safeRun(() -> {
+            cursor.ledger.getExecutor().execute(safeRun(() -> {
                 readPosition = 
cursor.ledger.startReadOperationOnLedger(nextReadPosition);
                 cursor.ledger.asyncReadEntries(OpReadEntry.this);
             }));
@@ -139,7 +139,7 @@ void checkReadCompletion() {
             // The reading was already completed, release resources and 
trigger callback
             cursor.readOperationCompleted();
 
-            cursor.ledger.getExecutor().submit(safeRun(() -> {
+            cursor.ledger.getExecutor().execute(safeRun(() -> {
                 callback.readEntriesComplete(entries, ctx);
                 recycle();
             }));
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
index 107abcf75..47c515d13 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
@@ -24,9 +24,9 @@
 import static org.testng.Assert.assertTrue;
 
 import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
@@ -41,7 +41,7 @@
 
     @BeforeClass
     void setup() throws Exception {
-        ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+        OrderedScheduler executor = 
OrderedScheduler.newSchedulerBuilder().numThreads(1).build();
 
         ml1 = mock(ManagedLedgerImpl.class);
         when(ml1.getScheduledExecutor()).thenReturn(executor);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
index c3e75c2e7..ebaa1fdab 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
@@ -65,7 +65,7 @@ public void simple() throws Exception {
         mbean.addLedgerSwitchLatencySample(1, TimeUnit.SECONDS);
 
         // Simulate stats getting update from different thread
-        factory.executor.submit(() -> {
+        factory.scheduledExecutor.submit(() -> {
             mbean.refreshStats(1, TimeUnit.SECONDS);
         }).get();
 
@@ -90,7 +90,7 @@ public void simple() throws Exception {
         ledger.addEntry(new byte[600]);
         cursor.markDelete(p1);
 
-        factory.executor.submit(() -> {
+        factory.scheduledExecutor.submit(() -> {
             mbean.refreshStats(1, TimeUnit.SECONDS);
         }).get();
 
@@ -109,7 +109,7 @@ public void simple() throws Exception {
         mbean.recordAddEntryError();
         mbean.recordReadEntriesError();
 
-        factory.executor.submit(() -> {
+        factory.scheduledExecutor.submit(() -> {
             mbean.refreshStats(1, TimeUnit.SECONDS);
         }).get();
 
@@ -119,7 +119,7 @@ public void simple() throws Exception {
         List<Entry> entries = cursor.readEntries(100);
         assertEquals(entries.size(), 1);
 
-        factory.executor.submit(() -> {
+        factory.scheduledExecutor.submit(() -> {
             mbean.refreshStats(1, TimeUnit.SECONDS);
         }).get();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 58010e8d7..47753e6fc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -38,6 +38,7 @@
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
@@ -106,8 +107,8 @@
             new DefaultThreadFactory("pulsar"));
     private final ScheduledExecutorService cacheExecutor = 
Executors.newScheduledThreadPool(10,
             new DefaultThreadFactory("zk-cache-callback"));
-    private final OrderedScheduler orderedExecutor = 
OrderedScheduler.newSchedulerBuilder().numThreads(8)
-            .name("pulsar-ordered").build();
+    private final OrderedExecutor orderedExecutor = 
OrderedExecutor.newBuilder().numThreads(8).name("pulsar-ordered")
+            .build();
     private final ScheduledExecutorService loadManagerExecutor;
     private ScheduledFuture<?> loadReportTask = null;
     private ScheduledFuture<?> loadSheddingTask = null;
@@ -445,7 +446,7 @@ private void startZkCacheService() throws 
PulsarServerException {
 
         LOG.info("starting configuration cache service");
 
-        this.localZkCache = new LocalZooKeeperCache(getZkClient(), 
getOrderedExecutor(), this.cacheExecutor);
+        this.localZkCache = new LocalZooKeeperCache(getZkClient(), 
getOrderedExecutor());
         this.globalZkCache = new 
GlobalZooKeeperCache(getZooKeeperClientFactory(),
                 (int) config.getZooKeeperSessionTimeoutMillis(), 
config.getGlobalZookeeperServers(),
                 getOrderedExecutor(), this.cacheExecutor);
@@ -611,7 +612,7 @@ public ScheduledExecutorService getLoadManagerExecutor() {
         return loadManagerExecutor;
     }
 
-    public OrderedScheduler getOrderedExecutor() {
+    public OrderedExecutor getOrderedExecutor() {
         return orderedExecutor;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index fe307136f..2668f9540 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -636,8 +636,8 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
             if (t != null) {
                 // retry several times on BadVersion
                 if ((t instanceof ServerMetadataException) && 
(counter.decrementAndGet() >= 0)) {
-                    pulsar.getOrderedExecutor().submit(
-                        () -> splitAndOwnBundleOnceAndRetry(bundle, unload, 
counter, unloadFuture));
+                    pulsar.getOrderedExecutor()
+                            .execute(() -> 
splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture));
                 } else {
                     // Retry enough, or meet other exception
                     String msg2 = format(" %s not success update nsBundles, 
counter %d, reason %s",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e1a0d3f0f..6c2e011ea 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -476,7 +476,7 @@ public void unloadNamespaceBundlesGracefully() {
         replicationFuture.exceptionally((ex) -> {
             log.warn("Replication check failed. Removing topic from topics 
list {}, {}", topic, ex);
             nonPersistentTopic.stopReplProducers().whenComplete((v, exception) 
-> {
-                pulsar.getExecutor().submit(() -> topics.remove(topic, 
topicFuture));
+                pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
                 topicFuture.completeExceptionally(ex);
             });
 
@@ -579,7 +579,7 @@ private void createPersistentTopic(final String topic, 
CompletableFuture<Topic>
             // namespace is being unloaded
             String msg = String.format("Namespace is being unloaded, cannot 
add topic %s", topic);
             log.warn(msg);
-            pulsar.getExecutor().submit(() -> topics.remove(topic, 
topicFuture));
+            pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
             topicFuture.completeExceptionally(new 
ServiceUnitNotReadyException(msg));
             return;
         }
@@ -618,7 +618,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object 
ctx) {
                                 });
                             } catch (NamingException e) {
                                 log.warn("Failed to create topic {}-{}", 
topic, e.getMessage());
-                                pulsar.getExecutor().submit(() -> 
topics.remove(topic, topicFuture));
+                                pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
                                 topicFuture.completeExceptionally(e);
                             }
                         }
@@ -626,7 +626,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object 
ctx) {
                         @Override
                         public void openLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
                             log.warn("Failed to create topic {}", topic, 
exception);
-                            pulsar.getExecutor().submit(() -> 
topics.remove(topic, topicFuture));
+                            pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
                             topicFuture.completeExceptionally(new 
PersistenceException(exception));
                         }
                     }, null);
@@ -635,7 +635,7 @@ public void openLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
             log.warn("[{}] Failed to get topic configuration: {}", topic, 
exception.getMessage(), exception);
             // remove topic from topics-map in different thread to avoid 
possible deadlock if
             // createPersistentTopic-thread only tries to handle this 
future-result
-            pulsar.getExecutor().submit(() -> topics.remove(topic, 
topicFuture));
+            pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
             topicFuture.completeExceptionally(exception);
             return null;
         });
@@ -644,7 +644,7 @@ public void openLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
     public CompletableFuture<ManagedLedgerConfig> 
getManagedLedgerConfig(TopicName topicName) {
         CompletableFuture<ManagedLedgerConfig> future = new 
CompletableFuture<>();
         // Execute in background thread, since getting the policies might 
block if the z-node wasn't already cached
-        pulsar.getOrderedExecutor().submitOrdered(topicName, safeRun(() -> {
+        pulsar.getOrderedExecutor().executeOrdered(topicName, safeRun(() -> {
             NamespaceName namespace = topicName.getNamespaceObject();
             ServiceConfiguration serviceConfig = pulsar.getConfiguration();
 
@@ -1104,7 +1104,7 @@ private void updateConfigurationAndRegisterListeners() {
     }
 
     private void updateTopicMessageDispatchRate() {
-        this.pulsar().getExecutor().submit(() -> {
+        this.pulsar().getExecutor().execute(() -> {
             // update message-rate for each topic
             topics.forEach((name, topicFuture) -> {
                 if (topicFuture.isDone()) {
@@ -1150,7 +1150,7 @@ private void updateSubscriptionMessageDispatchRate() {
     }
 
     private void updateManagedLedgerConfig() {
-        this.pulsar().getExecutor().submit(() -> {
+        this.pulsar().getExecutor().execute(() -> {
             // update managed-ledger config of each topic
             topics.forEach((name, topicFuture) -> {
                 if (topicFuture.isDone()) {
@@ -1403,7 +1403,7 @@ public void checkUnAckMessageDispatching() {
             // block dispatcher with higher unack-msg when it reaches 
broker-unack msg limit
             log.info("[{}] Starting blocking dispatchers with unacked msgs {} 
due to reached max broker limit {}",
                     maxUnackedMessages, maxUnackedMsgsPerDispatcher);
-            executor().submit(() -> blockDispatchersWithLargeUnAckMessages());
+            executor().execute(() -> blockDispatchersWithLargeUnAckMessages());
         } else if (blockedDispatcherOnHighUnackedMsgs.get() && unAckedMessages 
< maxUnackedMessages / 2) {
             // unblock broker-dispatching if received enough acked messages 
back
             if (blockedDispatcherOnHighUnackedMsgs.compareAndSet(true, false)) 
{
@@ -1457,7 +1457,7 @@ public void 
unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleC
         try {
             dispatcherList.forEach(dispatcher -> {
                 dispatcher.unBlockDispatcherOnUnackedMsgs();
-                executor().submit(() -> dispatcher.readMoreEntries());
+                executor().execute(() -> dispatcher.readMoreEntries());
                 log.info("[{}] Dispatcher is unblocked", dispatcher.getName());
                 blockedDispatchers.remove(dispatcher);
             });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index bada69ca2..c052e4c58 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -955,7 +955,7 @@ protected void handleSend(CommandSend send, ByteBuf 
headersAndPayload) {
             if (nonPersistentPendingMessages > 
MaxNonPersistentPendingMessages) {
                 final long producerId = send.getProducerId();
                 final long sequenceId = send.getSequenceId();
-                
service.getTopicOrderedExecutor().submitOrdered(producer.getTopic().getName(), 
SafeRun.safeRun(() -> {
+                
service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(), 
SafeRun.safeRun(() -> {
                     ctx.writeAndFlush(Commands.newSendReceipt(producerId, 
sequenceId, -1, -1), ctx.voidPromise());
                 }));
                 producer.recordMessageDrop(send.getNumMessages());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index fc6a920ef..9eb7ce7d0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -27,8 +27,10 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.FastThreadLocal;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -39,6 +41,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
@@ -190,7 +193,7 @@ public void publishMessage(ByteBuf data, PublishContext 
callback) {
 
         // retain data for sub/replication because io-thread will release 
actual payload
         data.retain(2);
-        this.executor.submitOrdered(topic, SafeRun.safeRun(() -> {
+        this.executor.executeOrdered(topic, SafeRun.safeRun(() -> {
             subscriptions.forEach((name, subscription) -> {
                 ByteBuf duplicateBuffer = data.retainedDuplicate();
                 Entry entry = create(0L, 0L, duplicateBuffer);
@@ -210,7 +213,7 @@ public void publishMessage(ByteBuf data, PublishContext 
callback) {
             }
         }));
 
-        this.executor.submitOrdered(topic, SafeRun.safeRun(() -> {
+        this.executor.executeOrdered(topic, SafeRun.safeRun(() -> {
             replicators.forEach((name, replicator) -> {
                 ByteBuf duplicateBuffer = data.retainedDuplicate();
                 Entry entry = create(0L, 0L, duplicateBuffer);
@@ -487,7 +490,7 @@ void removeSubscription(String subscriptionName) {
 
         FutureUtil.waitForAll(futures).thenRun(() -> {
             log.info("[{}] Topic closed", topic);
-            brokerService.pulsar().getExecutor().submit(() -> 
brokerService.removeTopicFromCache(topic));
+            brokerService.pulsar().getExecutor().execute(() -> 
brokerService.removeTopicFromCache(topic));
             closeFuture.complete(null);
         }).exceptionally(exception -> {
             log.error("[{}] Error closing topic", topic, exception);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index e7e50b8cb..443d29bde 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -152,7 +152,7 @@ public void readEntriesComplete(List<Entry> entries, Object 
ctx) {
 
                 if (managedCursor.hasMoreEntries()) {
                     // Read next batch of entries
-                    pulsar.getExecutor().submit(() -> replayCursor(future));
+                    pulsar.getExecutor().execute(() -> replayCursor(future));
                 } else {
                     // Done replaying
                     future.complete(null);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 2eb83ae29..17c5db7a6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -589,7 +589,7 @@ public void addUnAckedMessages(int numberOfMessages) {
             // unblock dispatcher if it acks back enough messages
             if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 
TRUE, FALSE)) {
                 log.info("[{}] Dispatcher is unblocked", name);
-                topic.getBrokerService().executor().submit(() -> 
readMoreEntries());
+                topic.getBrokerService().executor().execute(() -> 
readMoreEntries());
             }
         }
         // increment broker-level count
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
index 98cafc547..290985725 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
@@ -18,30 +18,30 @@
  */
 package org.apache.pulsar.broker.auth;
 
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 
-import io.netty.util.concurrent.DefaultThreadFactory;
-
-public class SameThreadOrderedSafeExecutor extends OrderedScheduler {
+public class SameThreadOrderedSafeExecutor extends OrderedExecutor {
 
     public SameThreadOrderedSafeExecutor() {
         super("same-thread-executor", 1, new DefaultThreadFactory("test"), 
NullStatsLogger.INSTANCE, false, 100000, 10);
     }
 
     @Override
-    public void submit(SafeRunnable r) {
+    public void execute(Runnable r) {
         r.run();
     }
 
     @Override
-    public void submitOrdered(int orderingKey, SafeRunnable r) {
+    public void executeOrdered(int orderingKey, SafeRunnable r) {
         r.run();
     }
 
     @Override
-    public void submitOrdered(long orderingKey, SafeRunnable r) {
+    public void executeOrdered(long orderingKey, SafeRunnable r) {
         r.run();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
index 9fd2fb7ac..2fe63e8ef 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
@@ -25,8 +25,7 @@
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import com.google.common.hash.Hashing;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.pulsar.broker.PulsarService;
@@ -43,8 +42,6 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.hash.Hashing;
-
 public class ResourceQuotaCacheTest {
 
     private PulsarService pulsar;
@@ -52,14 +49,12 @@
     private LocalZooKeeperCacheService localCache;
     private NamespaceBundleFactory bundleFactory;
     private OrderedScheduler executor;
-    private ScheduledExecutorService scheduledExecutor;
 
     @BeforeMethod
     public void setup() throws Exception {
         pulsar = mock(PulsarService.class);
         executor = 
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
-        scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
-        zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), 
executor, scheduledExecutor);
+        zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), 
executor);
         localCache = new LocalZooKeeperCacheService(zkCache, null);
 
         // set mock pulsar localzkcache
@@ -77,7 +72,6 @@ public void setup() throws Exception {
     @AfterMethod
     public void teardown() {
         executor.shutdown();
-        scheduledExecutor.shutdown();
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index e8d4bb909..dfca1b2c0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -33,10 +33,10 @@
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.hash.Hashing;
+
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -58,8 +58,6 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.hash.Hashing;
-
 public class OwnershipCacheTest {
 
     private PulsarService pulsar;
@@ -71,7 +69,6 @@
     private NamespaceService nsService;
     private BrokerService brokerService;
     private OrderedScheduler executor;
-    private ScheduledExecutorService scheduledExecutor;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -80,8 +77,7 @@ public void setup() throws Exception {
         pulsar = mock(PulsarService.class);
         config = mock(ServiceConfiguration.class);
         executor = 
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
-        scheduledExecutor = Executors.newScheduledThreadPool(2);
-        zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), 
executor, scheduledExecutor);
+        zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), 
executor);
         localCache = spy(new LocalZooKeeperCacheService(zkCache, null));
         ZooKeeperDataCache<LocalPolicies> poilciesCache = 
mock(ZooKeeperDataCache.class);
         when(pulsar.getLocalZkCacheService()).thenReturn(localCache);
@@ -106,7 +102,6 @@ public void setup() throws Exception {
     @AfterMethod
     public void teardown() throws Exception {
         executor.shutdown();
-        scheduledExecutor.shutdown();
     }
 
     @Test
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c468298ec..2cbd64224 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -655,7 +655,7 @@ void activeConsumerChanged(boolean isActive) {
             return;
         }
 
-        listenerExecutor.submit(() -> {
+        listenerExecutor.execute(() -> {
             if (isActive) {
                 consumerEventListener.becameActive(this, partitionIndex);
             } else {
diff --git 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
index a6e3cbce2..9a4b9aaf2 100644
--- 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
+++ 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
@@ -31,7 +31,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.common.naming.TopicName;
diff --git 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
index 30f75be6b..1371d0580 100644
--- 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
+++ 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
@@ -22,8 +22,6 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -37,8 +35,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 /**
  * Connects with ZooKeeper and sets watch to listen changes for active broker 
list.
  *
@@ -55,8 +51,6 @@
 
     private final OrderedScheduler orderedExecutor = 
OrderedScheduler.newSchedulerBuilder().numThreads(8)
             .name("pulsar-discovery-ordered-cache").build();
-    private final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(8,
-            new DefaultThreadFactory("pulsar-discovery-cache"));
 
     public static final String LOADBALANCE_BROKERS_ROOT = 
"/loadbalance/brokers";
 
@@ -74,8 +68,7 @@ public ZookeeperCacheLoader(ZooKeeperClientFactory 
zkClientFactory, String zooke
             log.error("Shutting down ZK sessions: {}", exitCode);
         });
 
-        this.localZkCache = new 
LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), 
this.orderedExecutor,
-                executor);
+        this.localZkCache = new 
LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), 
this.orderedExecutor);
         localZkConnectionSvc.start(exitCode -> {
             try {
                 localZkCache.getZooKeeper().close();
@@ -115,7 +108,6 @@ public ZooKeeperCache getLocalZkCache() {
     @Override
     public void close() {
         orderedExecutor.shutdown();
-        executor.shutdown();
     }
 
     private void updateBrokerList(Set<String> brokerNodes) throws Exception {
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
index 0ba1ea63c..27d5a9b74 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
@@ -22,8 +22,6 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -37,8 +35,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 /**
  * Connects with ZooKeeper and sets watch to listen changes for active broker 
list.
  *
@@ -56,8 +52,6 @@
 
     private final OrderedScheduler orderedExecutor = 
OrderedScheduler.newSchedulerBuilder().numThreads(8)
             .name("pulsar-discovery-ordered-cache").build();
-    private final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(8,
-            new DefaultThreadFactory("pulsar-discovery-cache"));
 
     public static final String LOADBALANCE_BROKERS_ROOT = 
"/loadbalance/brokers";
 
@@ -75,8 +69,7 @@ public ZookeeperCacheLoader(ZooKeeperClientFactory 
zkClientFactory, String zooke
             log.error("Shutting down ZK sessions: {}", exitCode);
         });
 
-        this.localZkCache = new 
LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), 
this.orderedExecutor,
-                executor);
+        this.localZkCache = new 
LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), 
this.orderedExecutor);
         localZkConnectionSvc.start(exitCode -> {
             try {
                 localZkCache.getZooKeeper().close();
@@ -121,7 +114,6 @@ public ZooKeeperCache getLocalZkCache() {
     @Override
     public void close() {
         orderedExecutor.shutdown();
-        executor.shutdown();
     }
 
     private void updateBrokerList(Set<String> brokerNodes) throws Exception {
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
index bd75e820e..b59cde569 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
@@ -28,7 +28,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.ZooKeeper;
@@ -51,8 +51,8 @@
     private final ScheduledExecutorService scheduledExecutor;
 
     public GlobalZooKeeperCache(ZooKeeperClientFactory zkClientFactory, int 
zkSessionTimeoutMillis,
-            String globalZkConnect, OrderedScheduler orderedExecutor, 
ScheduledExecutorService scheduledExecutor) {
-        super(null, orderedExecutor, scheduledExecutor);
+            String globalZkConnect, OrderedExecutor orderedExecutor, 
ScheduledExecutorService scheduledExecutor) {
+        super(null, orderedExecutor);
         this.zlClientFactory = zkClientFactory;
         this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
         this.globalZkConnect = globalZkConnect;
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
index 23547dd24..945b58e53 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
@@ -18,9 +18,7 @@
  */
 package org.apache.pulsar.zookeeper;
 
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
@@ -37,9 +35,8 @@
 
     private static final Logger LOG = 
LoggerFactory.getLogger(LocalZooKeeperCache.class);
 
-    public LocalZooKeeperCache(final ZooKeeper zk, final OrderedScheduler 
executor,
-            ScheduledExecutorService scheduledExecutor) {
-        super(zk, executor, scheduledExecutor);
+    public LocalZooKeeperCache(final ZooKeeper zk, final OrderedExecutor 
executor) {
+        super(zk, executor);
     }
 
     @Override
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index b2715b3f2..e2e46aa8f 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -20,6 +20,12 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Sets;
+
 import java.nio.file.Paths;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Map.Entry;
@@ -29,14 +35,12 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
@@ -49,14 +53,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Sets;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 /**
  * Per ZK client ZooKeeper cache supporting ZNode data and children list 
caches. A cache entry is identified, accessed
  * and invalidated by the ZNode path. For the data cache, ZNode data parsing 
is done at request time with the given
@@ -84,19 +80,18 @@
     protected final AsyncLoadingCache<String, Entry<Object, Stat>> dataCache;
     protected final Cache<String, Set<String>> childrenCache;
     protected final Cache<String, Boolean> existsCache;
-    private final OrderedScheduler executor;
-    private final ScheduledExecutorService scheduledExecutor;
-    private boolean shouldShutdownExecutor = false;
+    private final OrderedExecutor executor;
+    private final OrderedExecutor backgroundExecutor = 
OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build();
+    private boolean shouldShutdownExecutor;
     public static final int cacheTimeOutInSec = 30;
 
     protected AtomicReference<ZooKeeper> zkSession = new 
AtomicReference<ZooKeeper>(null);
 
-    public ZooKeeperCache(ZooKeeper zkSession, OrderedScheduler executor, 
ScheduledExecutorService scheduledExecutor) {
+    public ZooKeeperCache(ZooKeeper zkSession, OrderedExecutor executor) {
         checkNotNull(executor);
-        checkNotNull(scheduledExecutor);
         this.executor = executor;
-        this.scheduledExecutor = scheduledExecutor;
         this.zkSession.set(zkSession);
+        this.shouldShutdownExecutor = false;
 
         this.dataCache = Caffeine.newBuilder().expireAfterAccess(1, 
TimeUnit.HOURS)
                 .buildAsync((key, executor1) -> null);
@@ -106,8 +101,7 @@ public ZooKeeperCache(ZooKeeper zkSession, OrderedScheduler 
executor, ScheduledE
     }
 
     public ZooKeeperCache(ZooKeeper zkSession) {
-        this(zkSession, 
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("zk-cache-executor").build(),
-                Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("zk-cache-callback-executor")));
+        this(zkSession, 
OrderedExecutor.newBuilder().name("zk-cache-callback-executor").build());
         this.shouldShutdownExecutor = true;
     }
 
@@ -132,7 +126,7 @@ public ZooKeeper getZooKeeper() {
                     LOG.debug("Submitting reload cache task to the executor 
for path: {}, updater: {}", path, updater);
                 }
                 try {
-                    executor.submitOrdered(path, new SafeRunnable() {
+                    executor.executeOrdered(path, new SafeRunnable() {
                         @Override
                         public void safeRun() {
                             updater.reloadCache(path);
@@ -181,7 +175,7 @@ private void invalidateExists(String path) {
     }
 
     public void asyncInvalidate(String path) {
-        scheduledExecutor.submit(() -> invalidate(path));
+        backgroundExecutor.execute(() -> invalidate(path));
     }
 
     public void invalidate(final String path) {
@@ -322,20 +316,20 @@ public Boolean call() throws Exception {
             // Broker doesn't restart on global-zk session lost: so handling 
unexpected exception
             try {
                 this.zkSession.get().getData(path, watcher, (rc, path1, ctx, 
content, stat) -> {
-                    Executor exec = scheduledExecutor != null ? 
scheduledExecutor : executor;
                     if (rc == Code.OK.intValue()) {
                         try {
                             T obj = deserializer.deserialize(path, content);
                             // avoid using the zk-client thread to process the 
result
-                            exec.execute(() -> zkFuture.complete(new 
SimpleImmutableEntry<Object, Stat>(obj, stat)));
+                            executor.execute(
+                                    () -> zkFuture.complete(new 
SimpleImmutableEntry<Object, Stat>(obj, stat)));
                         } catch (Exception e) {
-                            exec.execute(() -> 
zkFuture.completeExceptionally(e));
+                            executor.execute(() -> 
zkFuture.completeExceptionally(e));
                         }
                     } else if (rc == Code.NONODE.intValue()) {
                         // Return null values for missing z-nodes, as this is 
not "exceptional" condition
-                        exec.execute(() -> zkFuture.complete(null));
+                        executor.execute(() -> zkFuture.complete(null));
                     } else {
-                        exec.execute(() -> 
zkFuture.completeExceptionally(KeeperException.create(rc)));
+                        executor.execute(() -> 
zkFuture.completeExceptionally(KeeperException.create(rc)));
                     }
                 }, null);
             } catch (Exception e) {
@@ -430,7 +424,8 @@ public void invalidateRoot(String root) {
     public void stop() {
         if (shouldShutdownExecutor) {
             this.executor.shutdown();
-            this.scheduledExecutor.shutdown();
         }
+
+        this.backgroundExecutor.shutdown();
     }
 }
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
index c0f529999..d18aea93e 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
@@ -23,21 +23,21 @@
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
 
-import lombok.extern.slf4j.Slf4j;
-
 @Slf4j
 public class ZookeeperBkClientFactoryImpl implements ZooKeeperClientFactory {
 
-    private final OrderedScheduler executor;
+    private final OrderedExecutor executor;
 
-    public ZookeeperBkClientFactoryImpl(OrderedScheduler executor) {
+    public ZookeeperBkClientFactoryImpl(OrderedExecutor executor) {
         this.executor = executor;
     }
 
@@ -45,7 +45,7 @@ public ZookeeperBkClientFactoryImpl(OrderedScheduler 
executor) {
     public CompletableFuture<ZooKeeper> create(String serverList, SessionType 
sessionType, int zkSessionTimeoutMillis) {
         CompletableFuture<ZooKeeper> future = new CompletableFuture<>();
 
-        executor.submit(safeRun(() -> {
+        executor.execute(safeRun(() -> {
             try {
                 ZooKeeper zk = 
ZooKeeperClient.newBuilder().connectString(serverList)
                         .sessionTimeoutMs(zkSessionTimeoutMillis)
diff --git 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index e9bad89f5..af00abc42 100644
--- 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++ 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -87,7 +87,7 @@ void classTeardown() throws Exception {
     @Test(timeOut = 10000)
     void testSimpleCache() throws Exception {
 
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor, scheduledExecutor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor);
         ZooKeeperDataCache<String> zkCache = new 
ZooKeeperDataCache<String>(zkCacheService) {
             @Override
             public String deserialize(String key, byte[] content) throws 
Exception {
@@ -127,7 +127,7 @@ public String deserialize(String key, byte[] content) 
throws Exception {
     void testChildrenCache() throws Exception {
         zkClient.create("/test", new byte[0], null, null);
 
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor, scheduledExecutor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor);
         ZooKeeperChildrenCache cache = new 
ZooKeeperChildrenCache(zkCacheService, "/test");
 
         // Create callback counter
@@ -180,7 +180,7 @@ void testExistsCache() throws Exception {
         // Check existence after creation of the node
         zkClient.create("/test", new byte[0], null, null);
         Thread.sleep(20);
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor, scheduledExecutor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor);
         boolean exists = zkCacheService.exists("/test");
         Assert.assertTrue(exists, "/test should exists in the cache");
 
@@ -197,7 +197,7 @@ void testInvalidateCache() throws Exception {
         zkClient.create("/test/c1", new byte[0], null, null);
         zkClient.create("/test/c2", new byte[0], null, null);
         Thread.sleep(20);
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor, scheduledExecutor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor);
         boolean exists = zkCacheService.exists("/test");
         Assert.assertTrue(exists, "/test should exists in the cache");
 
@@ -339,7 +339,7 @@ void testZkCallbackThreadStuck() throws Exception {
         // add readOpDelayMs so, main thread will not serve zkCacahe-returned 
future and let zkExecutor-thread handle
         // callback-result process
         MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100);
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor, scheduledExecutor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor);
         ZooKeeperDataCache<String> zkCache = new 
ZooKeeperDataCache<String>(zkCacheService) {
             @Override
             public String deserialize(String key, byte[] content) throws 
Exception {
@@ -387,7 +387,7 @@ public void testInvalidateCacheOnFailure() throws Exception 
{
         // add readOpDelayMs so, main thread will not serve zkCacahe-returned 
future and let zkExecutor-thread handle
         // callback-result process
         MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100);
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor, scheduledExecutor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor);
 
         final AtomicInteger count = new AtomicInteger(0);
         ZooKeeperDataCache<String> zkCache = new 
ZooKeeperDataCache<String>(zkCacheService) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to