This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d309b2c  Refactored to reflect changes in BK for OrderedExecutor 
(#1489)
d309b2c is described below

commit d309b2cb2f766151dc64861ba15e6dc71fbe8d8f
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Tue Apr 3 14:48:46 2018 -0700

    Refactored to reflect changes in BK for OrderedExecutor (#1489)
    
    * Refactored to reflect changes in BK for OrderedExecutor
    
    * Reverted back part of last change
    
    * Fixed test
---
 .../bookkeeper/mledger/impl/EntryCacheImpl.java    |  4 +-
 .../bookkeeper/mledger/impl/EntryCacheManager.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  6 ++-
 .../mledger/impl/ManagedLedgerFactoryImpl.java     | 21 +++++-----
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 45 ++++++++++----------
 .../mledger/impl/MetaStoreImplZookeeper.java       | 36 +++++++++-------
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java |  4 +-
 .../bookkeeper/mledger/impl/OpReadEntry.java       |  6 +--
 .../mledger/impl/EntryCacheManagerTest.java        |  6 +--
 .../mledger/impl/ManagedLedgerMBeanTest.java       |  8 ++--
 .../org/apache/pulsar/broker/PulsarService.java    |  9 ++--
 .../pulsar/broker/namespace/NamespaceService.java  |  4 +-
 .../pulsar/broker/service/BrokerService.java       | 20 ++++-----
 .../apache/pulsar/broker/service/ServerCnx.java    |  2 +-
 .../service/nonpersistent/NonPersistentTopic.java  |  9 ++--
 .../service/persistent/MessageDeduplication.java   |  2 +-
 .../PersistentDispatcherMultipleConsumers.java     |  2 +-
 .../broker/auth/SameThreadOrderedSafeExecutor.java | 14 +++----
 .../broker/cache/ResourceQuotaCacheTest.java       | 10 +----
 .../broker/namespace/OwnershipCacheTest.java       | 11 ++---
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  2 +-
 .../discovery/service/BrokerDiscoveryProvider.java |  1 -
 .../service/web/ZookeeperCacheLoader.java          | 10 +----
 .../proxy/server/util/ZookeeperCacheLoader.java    | 10 +----
 .../pulsar/zookeeper/GlobalZooKeeperCache.java     |  6 +--
 .../pulsar/zookeeper/LocalZooKeeperCache.java      |  9 ++--
 .../apache/pulsar/zookeeper/ZooKeeperCache.java    | 49 ++++++++++------------
 .../zookeeper/ZookeeperBkClientFactoryImpl.java    | 12 +++---
 .../pulsar/zookeeper/ZookeeperCacheTest.java       | 12 +++---
 29 files changed, 152 insertions(+), 180 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
index f267a6c..60ba634 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
@@ -188,7 +188,7 @@ public class EntryCacheImpl implements EntryCache {
                     manager.mlFactoryMBean.recordCacheMiss(1, 
returnEntry.getLength());
                     ml.mbean.addReadEntriesSample(1, returnEntry.getLength());
 
-                    ml.getExecutor().submitOrdered(ml.getName(), safeRun(() -> 
{
+                    ml.getExecutor().executeOrdered(ml.getName(), safeRun(() 
-> {
                         callback.readEntryComplete(returnEntry, obj);
                     }));
                 } else {
@@ -254,7 +254,7 @@ public class EntryCacheImpl implements EntryCache {
 
                 checkNotNull(ml.getName());
                 checkNotNull(ml.getExecutor());
-                ml.getExecutor().submitOrdered(ml.getName(), safeRun(() -> {
+                ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> {
                     // We got the entries, we need to transform them to a 
List<> type
                     long totalSize = 0;
                     final List<EntryImpl> entriesToReturn = 
Lists.newArrayListWithExpectedSize(entriesToRead);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
index 7faa18c..262cbeb 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
@@ -109,7 +109,7 @@ public class EntryCacheManager {
 
         // Trigger a single eviction in background. While the eviction is 
running we stop inserting entries in the cache
         if (currentSize > evictionTriggerThreshold && 
evictionInProgress.compareAndSet(false, true)) {
-            mlFactory.executor.execute(safeRun(() -> {
+            mlFactory.scheduledExecutor.execute(safeRun(() -> {
                 // Trigger a new cache eviction cycle to bring the used memory 
below the cacheEvictionWatermark
                 // percentage limit
                 long sizeToEvict = currentSize - (long) (maxSize * 
cacheEvictionWatermak);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 29fe0a7..a2be71d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -37,6 +37,7 @@ import com.google.common.collect.Sets;
 import com.google.common.collect.TreeRangeSet;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.protobuf.InvalidProtocolBufferException;
+
 import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.List;
@@ -51,6 +52,7 @@ import 
java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
+
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.client.BKException;
@@ -847,7 +849,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         final PositionImpl newPosition = (PositionImpl) newPos;
 
         // order trim and reset operations on a ledger
-        ledger.getExecutor().submitOrdered(ledger.getName(), safeRun(() -> {
+        ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> {
             if (ledger.isValidPosition(newPosition) || 
newPosition.equals(PositionImpl.earliest)
                     || newPosition.equals(PositionImpl.latest)) {
                 internalResetCursor(newPosition, callback);
@@ -1923,7 +1925,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), 
config.getMetadataWriteQuorumSize(),
                 config.getMetadataAckQuorumSize(), config.getDigestType(), 
config.getPassword(), (rc, lh, ctx) -> {
-                    ledger.getExecutor().submit(safeRun(() -> {
+                    ledger.getExecutor().execute(safeRun(() -> {
                         ledger.mbean.endCursorLedgerCreateOp();
                         if (rc != BKException.Code.OK) {
                             log.warn("[{}] Error creating ledger for cursor 
{}: {}", ledger.getName(), name,
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 4c0d3d7..d148d1d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -23,7 +23,7 @@ import static 
org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLed
 
 import com.google.common.base.Predicates;
 import com.google.common.collect.Maps;
-import io.netty.util.concurrent.DefaultThreadFactory;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -31,12 +31,12 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -63,7 +63,6 @@ import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
 import org.apache.bookkeeper.mledger.util.Futures;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
@@ -77,9 +76,9 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
     private final boolean isBookkeeperManaged;
     private final ZooKeeper zookeeper;
     private final ManagedLedgerFactoryConfig config;
-    protected final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(16,
-            new DefaultThreadFactory("bookkeeper-ml"));
-    private final OrderedScheduler orderedExecutor = 
OrderedScheduler.newSchedulerBuilder().numThreads(16)
+    protected final OrderedScheduler scheduledExecutor = 
OrderedScheduler.newSchedulerBuilder().numThreads(16)
+            .name("bookkeeper-ml-scheduler").build();
+    private final OrderedExecutor orderedExecutor = 
OrderedExecutor.newBuilder().numThreads(16)
             .name("bookkeeper-ml-workers").build();
 
     protected final ManagedLedgerFactoryMBeanImpl mbean;
@@ -122,7 +121,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         this.config = config;
         this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
         this.entryCacheManager = new EntryCacheManager(this);
-        this.statsTask = executor.scheduleAtFixedRate(() -> refreshStats(), 0, 
StatsPeriodSeconds, TimeUnit.SECONDS);
+        this.statsTask = scheduledExecutor.scheduleAtFixedRate(() -> 
refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS);
     }
 
     public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper 
zooKeeper) throws Exception {
@@ -138,7 +137,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         this.config = config;
         this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
         this.entryCacheManager = new EntryCacheManager(this);
-        this.statsTask = executor.scheduleAtFixedRate(() -> refreshStats(), 0, 
StatsPeriodSeconds, TimeUnit.SECONDS);
+        this.statsTask = scheduledExecutor.scheduleAtFixedRate(() -> 
refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS);
     }
 
     private synchronized void refreshStats() {
@@ -232,7 +231,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         ledgers.computeIfAbsent(name, (mlName) -> {
             // Create the managed ledger
             CompletableFuture<ManagedLedgerImpl> future = new 
CompletableFuture<>();
-            final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this, 
bookKeeper, store, config, executor,
+            final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this, 
bookKeeper, store, config, scheduledExecutor,
                     orderedExecutor, name);
             newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
                 @Override
@@ -305,7 +304,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
             }
         }
 
-        executor.shutdown();
+        scheduledExecutor.shutdown();
         orderedExecutor.shutdown();
 
         entryCacheManager.clear();
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 34283ed..4aceb6c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -22,18 +22,26 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static java.lang.Math.min;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Range;
+import com.google.common.util.concurrent.RateLimiter;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -45,6 +53,7 @@ import 
org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -77,23 +86,13 @@ import org.apache.bookkeeper.mledger.util.CallbackMutex;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.bookkeeper.mledger.util.Pair;
 import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.BoundType;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Range;
-import com.google.common.util.concurrent.RateLimiter;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
 public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     private final static long MegaByte = 1024 * 1024;
 
@@ -190,8 +189,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, 
State.class, "state");
     private volatile State state = null;
 
-    private final ScheduledExecutorService scheduledExecutor;
-    private final OrderedScheduler executor;
+    private final OrderedScheduler scheduledExecutor;
+    private final OrderedExecutor executor;
     final ManagedLedgerFactoryImpl factory;
     protected final ManagedLedgerMBeanImpl mbean;
 
@@ -204,7 +203,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     // //////////////////////////////////////////////////////////////////////
 
     public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper 
bookKeeper, MetaStore store,
-            ManagedLedgerConfig config, ScheduledExecutorService 
scheduledExecutor, OrderedScheduler orderedExecutor,
+            ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, 
OrderedExecutor orderedExecutor,
             final String name) {
         this.factory = factory;
         this.bookKeeper = bookKeeper;
@@ -250,7 +249,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 if (ledgers.size() > 0) {
                     final long id = ledgers.lastKey();
                     OpenCallback opencb = (rc, lh, ctx1) -> {
-                        executor.submitOrdered(name, safeRun(() -> {
+                        executor.executeOrdered(name, safeRun(() -> {
                             mbean.endDataLedgerOpenOp();
                             if (log.isDebugEnabled()) {
                                 log.debug("[{}] Opened ledger {}: ", name, id, 
BKException.getMessage(rc));
@@ -338,7 +337,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         mbean.startDataLedgerCreateOp();
         bookKeeper.asyncCreateLedger(config.getEnsembleSize(), 
config.getWriteQuorumSize(), config.getAckQuorumSize(),
                 config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> 
{
-                    executor.submitOrdered(name, safeRun(() -> {
+                    executor.executeOrdered(name, safeRun(() -> {
                         mbean.endDataLedgerCreateOp();
                         if (rc != BKException.Code.OK) {
                             
callback.initializeFailed(createManagedLedgerException(rc));
@@ -1319,7 +1318,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             mbean.startDataLedgerOpenOp();
             bookKeeper.asyncOpenLedger(ledgerId, config.getDigestType(), 
config.getPassword(),
                     (int rc, LedgerHandle lh, Object ctx) -> {
-                        executor.submit(safeRun(() -> {
+                        executor.executeOrdered(name, safeRun(() -> {
                             mbean.endDataLedgerOpenOp();
                             if (rc != BKException.Code.OK) {
                                 // Remove the ledger future from cache to give 
chance to reopen it later
@@ -1484,12 +1483,12 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                 break;
             }
 
-            executor.submit(safeRun(() -> 
waitingCursor.notifyEntriesAvailable()));
+            executor.execute(safeRun(() -> 
waitingCursor.notifyEntriesAvailable()));
         }
     }
 
     private void trimConsumedLedgersInBackground() {
-        executor.submitOrdered(name, safeRun(() -> {
+        executor.executeOrdered(name, safeRun(() -> {
             internalTrimConsumedLedgers();
         }));
     }
@@ -2113,11 +2112,11 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
         return ledgers;
     }
 
-    ScheduledExecutorService getScheduledExecutor() {
+    OrderedScheduler getScheduledExecutor() {
         return scheduledExecutor;
     }
 
-    OrderedScheduler getExecutor() {
+    OrderedExecutor getExecutor() {
         return executor;
     }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
index 7a76975..31705c1 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
@@ -20,11 +20,16 @@ package org.apache.bookkeeper.mledger.impl;
 
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 
+import com.google.common.base.Charsets;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.TextFormat;
+import com.google.protobuf.TextFormat.ParseException;
+
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
@@ -40,11 +45,6 @@ import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Charsets;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.TextFormat;
-import com.google.protobuf.TextFormat.ParseException;
-
 @SuppressWarnings("checkstyle:javadoctype")
 public class MetaStoreImplZookeeper implements MetaStore {
 
@@ -55,7 +55,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
     private static final String prefix = prefixName + "/";
 
     private final ZooKeeper zk;
-    private final OrderedScheduler executor;
+    private final OrderedExecutor executor;
 
     private static class ZKStat implements Stat {
         private final int version;
@@ -90,7 +90,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
         }
     }
 
-    public MetaStoreImplZookeeper(ZooKeeper zk, OrderedScheduler executor)
+    public MetaStoreImplZookeeper(ZooKeeper zk, OrderedExecutor executor)
             throws Exception {
         this.zk = zk;
         this.executor = executor;
@@ -130,7 +130,8 @@ public class MetaStoreImplZookeeper implements MetaStore {
     @Override
     public void getManagedLedgerInfo(final String ledgerName, final 
MetaStoreCallback<ManagedLedgerInfo> callback) {
         // Try to get the content or create an empty node
-        zk.getData(prefix + ledgerName, false, (rc, path, ctx, readData, stat) 
-> executor.submit(safeRun(() -> {
+        zk.getData(prefix + ledgerName, false,
+                (rc, path, ctx, readData, stat) -> 
executor.executeOrdered(ledgerName, safeRun(() -> {
             if (rc == Code.OK.intValue()) {
                 try {
                     ManagedLedgerInfo info = parseManagedLedgerInfo(readData);
@@ -171,7 +172,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
         byte[] serializedMlInfo = mlInfo.toByteArray(); // Binary format
 
         zk.setData(prefix + ledgerName, serializedMlInfo, zkStat.getVersion(),
-                (rc, path, zkCtx, stat1) -> executor.submit(safeRun(() -> {
+                (rc, path, zkCtx, stat1) -> 
executor.executeOrdered(ledgerName, safeRun(() -> {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] UpdateLedgersIdsCallback.processResult 
rc={} newVersion={}", ledgerName,
                                 Code.get(rc), stat != null ? stat.getVersion() 
: "null");
@@ -195,7 +196,8 @@ public class MetaStoreImplZookeeper implements MetaStore {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Get cursors list", ledgerName);
         }
-        zk.getChildren(prefix + ledgerName, false, (rc, path, ctx, children, 
stat) -> executor.submit(safeRun(() -> {
+        zk.getChildren(prefix + ledgerName, false,
+                (rc, path, ctx, children, stat) -> 
executor.executeOrdered(ledgerName, safeRun(() -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] getConsumers complete rc={} children={}", 
ledgerName, Code.get(rc), children);
             }
@@ -219,7 +221,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
             log.debug("Reading from {}", path);
         }
 
-        zk.getData(path, false, (rc, path1, ctx, data, stat) -> 
executor.submit(safeRun(() -> {
+        zk.getData(path, false, (rc, path1, ctx, data, stat) -> 
executor.executeOrdered(ledgerName, safeRun(() -> {
             if (rc != Code.OK.intValue()) {
                 callback.operationFailed(new 
MetaStoreException(KeeperException.create(Code.get(rc))));
             } else {
@@ -251,7 +253,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
                 log.debug("[{}] Creating consumer {} on meta-data store with 
{}", ledgerName, cursorName, info);
             }
             zk.create(path, content, Acl, CreateMode.PERSISTENT,
-                    (rc, path1, ctx, name) -> executor.submit(safeRun(() -> {
+                    (rc, path1, ctx, name) -> 
executor.executeOrdered(ledgerName, safeRun(() -> {
                         if (rc != Code.OK.intValue()) {
                             log.warn("[{}] Error creating cosumer {} node on 
meta-data store with {}: ", ledgerName,
                                     cursorName, info, Code.get(rc));
@@ -269,7 +271,8 @@ public class MetaStoreImplZookeeper implements MetaStore {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Updating consumer {} on meta-data store with 
{}", ledgerName, cursorName, info);
             }
-            zk.setData(path, content, zkStat.getVersion(), (rc, path1, ctx, 
stat1) -> executor.submit(safeRun(() -> {
+            zk.setData(path, content, zkStat.getVersion(),
+                    (rc, path1, ctx, stat1) -> 
executor.executeOrdered(ledgerName, safeRun(() -> {
                 if (rc == Code.BADVERSION.intValue()) {
                     callback.operationFailed(new 
BadVersionException(KeeperException.create(Code.get(rc))));
                 } else if (rc != Code.OK.intValue()) {
@@ -285,7 +288,8 @@ public class MetaStoreImplZookeeper implements MetaStore {
     public void asyncRemoveCursor(final String ledgerName, final String 
consumerName,
             final MetaStoreCallback<Void> callback) {
         log.info("[{}] Remove consumer={}", ledgerName, consumerName);
-        zk.delete(prefix + ledgerName + "/" + consumerName, -1, (rc, path, 
ctx) -> executor.submit(safeRun(() -> {
+        zk.delete(prefix + ledgerName + "/" + consumerName, -1,
+                (rc, path, ctx) -> executor.executeOrdered(ledgerName, 
safeRun(() -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] zk delete done. rc={}", ledgerName, 
consumerName, Code.get(rc));
             }
@@ -300,7 +304,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
     @Override
     public void removeManagedLedger(String ledgerName, MetaStoreCallback<Void> 
callback) {
         log.info("[{}] Remove ManagedLedger", ledgerName);
-        zk.delete(prefix + ledgerName, -1, (rc, path, ctx) -> 
executor.submit(safeRun(() -> {
+        zk.delete(prefix + ledgerName, -1, (rc, path, ctx) -> 
executor.executeOrdered(ledgerName, safeRun(() -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] zk delete done. rc={}", ledgerName, 
Code.get(rc));
             }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 85c4da6..f4daec5 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -123,14 +123,14 @@ class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallback {
             // be marked as failed.
             ml.mbean.recordAddEntryError();
 
-            ml.getExecutor().submitOrdered(ml.getName(), SafeRun.safeRun(() -> 
{
+            ml.getExecutor().executeOrdered(ml.getName(), SafeRun.safeRun(() 
-> {
                 // Force the creation of a new ledger. Doing it in a 
background thread to avoid acquiring ML lock
                 // from a BK callback.
                 ml.ledgerClosed(lh);
             }));
         } else {
             // Trigger addComplete callback in a thread hashed on the managed 
ledger name
-            ml.getExecutor().submitOrdered(ml.getName(), this);
+            ml.getExecutor().executeOrdered(ml.getName(), this);
         }
     }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 31d1c79..6b3a03a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -83,7 +83,7 @@ class OpReadEntry implements ReadEntriesCallback {
 
         if (!entries.isEmpty()) {
             // There were already some entries that were read before, we can 
return them
-            cursor.ledger.getExecutor().submit(safeRun(() -> {
+            cursor.ledger.getExecutor().execute(safeRun(() -> {
                 callback.readEntriesComplete(entries, ctx);
                 recycle();
             }));
@@ -131,7 +131,7 @@ class OpReadEntry implements ReadEntriesCallback {
             }
 
             // Schedule next read in a different thread
-            cursor.ledger.getExecutor().submit(safeRun(() -> {
+            cursor.ledger.getExecutor().execute(safeRun(() -> {
                 readPosition = 
cursor.ledger.startReadOperationOnLedger(nextReadPosition);
                 cursor.ledger.asyncReadEntries(OpReadEntry.this);
             }));
@@ -139,7 +139,7 @@ class OpReadEntry implements ReadEntriesCallback {
             // The reading was already completed, release resources and 
trigger callback
             cursor.readOperationCompleted();
 
-            cursor.ledger.getExecutor().submit(safeRun(() -> {
+            cursor.ledger.getExecutor().execute(safeRun(() -> {
                 callback.readEntriesComplete(entries, ctx);
                 recycle();
             }));
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
index 107abcf..47c515d 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
@@ -24,9 +24,9 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
 import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
@@ -41,7 +41,7 @@ public class EntryCacheManagerTest extends 
MockedBookKeeperTestCase {
 
     @BeforeClass
     void setup() throws Exception {
-        ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+        OrderedScheduler executor = 
OrderedScheduler.newSchedulerBuilder().numThreads(1).build();
 
         ml1 = mock(ManagedLedgerImpl.class);
         when(ml1.getScheduledExecutor()).thenReturn(executor);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
index c3e75c2..ebaa1fd 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
@@ -65,7 +65,7 @@ public class ManagedLedgerMBeanTest extends 
MockedBookKeeperTestCase {
         mbean.addLedgerSwitchLatencySample(1, TimeUnit.SECONDS);
 
         // Simulate stats getting update from different thread
-        factory.executor.submit(() -> {
+        factory.scheduledExecutor.submit(() -> {
             mbean.refreshStats(1, TimeUnit.SECONDS);
         }).get();
 
@@ -90,7 +90,7 @@ public class ManagedLedgerMBeanTest extends 
MockedBookKeeperTestCase {
         ledger.addEntry(new byte[600]);
         cursor.markDelete(p1);
 
-        factory.executor.submit(() -> {
+        factory.scheduledExecutor.submit(() -> {
             mbean.refreshStats(1, TimeUnit.SECONDS);
         }).get();
 
@@ -109,7 +109,7 @@ public class ManagedLedgerMBeanTest extends 
MockedBookKeeperTestCase {
         mbean.recordAddEntryError();
         mbean.recordReadEntriesError();
 
-        factory.executor.submit(() -> {
+        factory.scheduledExecutor.submit(() -> {
             mbean.refreshStats(1, TimeUnit.SECONDS);
         }).get();
 
@@ -119,7 +119,7 @@ public class ManagedLedgerMBeanTest extends 
MockedBookKeeperTestCase {
         List<Entry> entries = cursor.readEntries(100);
         assertEquals(entries.size(), 1);
 
-        factory.executor.submit(() -> {
+        factory.scheduledExecutor.submit(() -> {
             mbean.refreshStats(1, TimeUnit.SECONDS);
         }).get();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 58010e8..47753e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -38,6 +38,7 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
@@ -106,8 +107,8 @@ public class PulsarService implements AutoCloseable {
             new DefaultThreadFactory("pulsar"));
     private final ScheduledExecutorService cacheExecutor = 
Executors.newScheduledThreadPool(10,
             new DefaultThreadFactory("zk-cache-callback"));
-    private final OrderedScheduler orderedExecutor = 
OrderedScheduler.newSchedulerBuilder().numThreads(8)
-            .name("pulsar-ordered").build();
+    private final OrderedExecutor orderedExecutor = 
OrderedExecutor.newBuilder().numThreads(8).name("pulsar-ordered")
+            .build();
     private final ScheduledExecutorService loadManagerExecutor;
     private ScheduledFuture<?> loadReportTask = null;
     private ScheduledFuture<?> loadSheddingTask = null;
@@ -445,7 +446,7 @@ public class PulsarService implements AutoCloseable {
 
         LOG.info("starting configuration cache service");
 
-        this.localZkCache = new LocalZooKeeperCache(getZkClient(), 
getOrderedExecutor(), this.cacheExecutor);
+        this.localZkCache = new LocalZooKeeperCache(getZkClient(), 
getOrderedExecutor());
         this.globalZkCache = new 
GlobalZooKeeperCache(getZooKeeperClientFactory(),
                 (int) config.getZooKeeperSessionTimeoutMillis(), 
config.getGlobalZookeeperServers(),
                 getOrderedExecutor(), this.cacheExecutor);
@@ -611,7 +612,7 @@ public class PulsarService implements AutoCloseable {
         return loadManagerExecutor;
     }
 
-    public OrderedScheduler getOrderedExecutor() {
+    public OrderedExecutor getOrderedExecutor() {
         return orderedExecutor;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index fe30713..2668f95 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -636,8 +636,8 @@ public class NamespaceService {
             if (t != null) {
                 // retry several times on BadVersion
                 if ((t instanceof ServerMetadataException) && 
(counter.decrementAndGet() >= 0)) {
-                    pulsar.getOrderedExecutor().submit(
-                        () -> splitAndOwnBundleOnceAndRetry(bundle, unload, 
counter, unloadFuture));
+                    pulsar.getOrderedExecutor()
+                            .execute(() -> 
splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture));
                 } else {
                     // Retry enough, or meet other exception
                     String msg2 = format(" %s not success update nsBundles, 
counter %d, reason %s",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e1a0d3f..6c2e011 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -476,7 +476,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         replicationFuture.exceptionally((ex) -> {
             log.warn("Replication check failed. Removing topic from topics 
list {}, {}", topic, ex);
             nonPersistentTopic.stopReplProducers().whenComplete((v, exception) 
-> {
-                pulsar.getExecutor().submit(() -> topics.remove(topic, 
topicFuture));
+                pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
                 topicFuture.completeExceptionally(ex);
             });
 
@@ -579,7 +579,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
             // namespace is being unloaded
             String msg = String.format("Namespace is being unloaded, cannot 
add topic %s", topic);
             log.warn(msg);
-            pulsar.getExecutor().submit(() -> topics.remove(topic, 
topicFuture));
+            pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
             topicFuture.completeExceptionally(new 
ServiceUnitNotReadyException(msg));
             return;
         }
@@ -618,7 +618,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                                 });
                             } catch (NamingException e) {
                                 log.warn("Failed to create topic {}-{}", 
topic, e.getMessage());
-                                pulsar.getExecutor().submit(() -> 
topics.remove(topic, topicFuture));
+                                pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
                                 topicFuture.completeExceptionally(e);
                             }
                         }
@@ -626,7 +626,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                         @Override
                         public void openLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
                             log.warn("Failed to create topic {}", topic, 
exception);
-                            pulsar.getExecutor().submit(() -> 
topics.remove(topic, topicFuture));
+                            pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
                             topicFuture.completeExceptionally(new 
PersistenceException(exception));
                         }
                     }, null);
@@ -635,7 +635,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
             log.warn("[{}] Failed to get topic configuration: {}", topic, 
exception.getMessage(), exception);
             // remove topic from topics-map in different thread to avoid 
possible deadlock if
             // createPersistentTopic-thread only tries to handle this 
future-result
-            pulsar.getExecutor().submit(() -> topics.remove(topic, 
topicFuture));
+            pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
             topicFuture.completeExceptionally(exception);
             return null;
         });
@@ -644,7 +644,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
     public CompletableFuture<ManagedLedgerConfig> 
getManagedLedgerConfig(TopicName topicName) {
         CompletableFuture<ManagedLedgerConfig> future = new 
CompletableFuture<>();
         // Execute in background thread, since getting the policies might 
block if the z-node wasn't already cached
-        pulsar.getOrderedExecutor().submitOrdered(topicName, safeRun(() -> {
+        pulsar.getOrderedExecutor().executeOrdered(topicName, safeRun(() -> {
             NamespaceName namespace = topicName.getNamespaceObject();
             ServiceConfiguration serviceConfig = pulsar.getConfiguration();
 
@@ -1104,7 +1104,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
     }
 
     private void updateTopicMessageDispatchRate() {
-        this.pulsar().getExecutor().submit(() -> {
+        this.pulsar().getExecutor().execute(() -> {
             // update message-rate for each topic
             topics.forEach((name, topicFuture) -> {
                 if (topicFuture.isDone()) {
@@ -1150,7 +1150,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
     }
 
     private void updateManagedLedgerConfig() {
-        this.pulsar().getExecutor().submit(() -> {
+        this.pulsar().getExecutor().execute(() -> {
             // update managed-ledger config of each topic
             topics.forEach((name, topicFuture) -> {
                 if (topicFuture.isDone()) {
@@ -1403,7 +1403,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
             // block dispatcher with higher unack-msg when it reaches 
broker-unack msg limit
             log.info("[{}] Starting blocking dispatchers with unacked msgs {} 
due to reached max broker limit {}",
                     maxUnackedMessages, maxUnackedMsgsPerDispatcher);
-            executor().submit(() -> blockDispatchersWithLargeUnAckMessages());
+            executor().execute(() -> blockDispatchersWithLargeUnAckMessages());
         } else if (blockedDispatcherOnHighUnackedMsgs.get() && unAckedMessages 
< maxUnackedMessages / 2) {
             // unblock broker-dispatching if received enough acked messages 
back
             if (blockedDispatcherOnHighUnackedMsgs.compareAndSet(true, false)) 
{
@@ -1457,7 +1457,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         try {
             dispatcherList.forEach(dispatcher -> {
                 dispatcher.unBlockDispatcherOnUnackedMsgs();
-                executor().submit(() -> dispatcher.readMoreEntries());
+                executor().execute(() -> dispatcher.readMoreEntries());
                 log.info("[{}] Dispatcher is unblocked", dispatcher.getName());
                 blockedDispatchers.remove(dispatcher);
             });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index bada69c..c052e4c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -955,7 +955,7 @@ public class ServerCnx extends PulsarHandler {
             if (nonPersistentPendingMessages > 
MaxNonPersistentPendingMessages) {
                 final long producerId = send.getProducerId();
                 final long sequenceId = send.getSequenceId();
-                
service.getTopicOrderedExecutor().submitOrdered(producer.getTopic().getName(), 
SafeRun.safeRun(() -> {
+                
service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(), 
SafeRun.safeRun(() -> {
                     ctx.writeAndFlush(Commands.newSendReceipt(producerId, 
sequenceId, -1, -1), ctx.voidPromise());
                 }));
                 producer.recordMessageDrop(send.getNumMessages());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index fc6a920..9eb7ce7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -27,8 +27,10 @@ import com.google.common.base.MoreObjects;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.FastThreadLocal;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -39,6 +41,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
@@ -190,7 +193,7 @@ public class NonPersistentTopic implements Topic {
 
         // retain data for sub/replication because io-thread will release 
actual payload
         data.retain(2);
-        this.executor.submitOrdered(topic, SafeRun.safeRun(() -> {
+        this.executor.executeOrdered(topic, SafeRun.safeRun(() -> {
             subscriptions.forEach((name, subscription) -> {
                 ByteBuf duplicateBuffer = data.retainedDuplicate();
                 Entry entry = create(0L, 0L, duplicateBuffer);
@@ -210,7 +213,7 @@ public class NonPersistentTopic implements Topic {
             }
         }));
 
-        this.executor.submitOrdered(topic, SafeRun.safeRun(() -> {
+        this.executor.executeOrdered(topic, SafeRun.safeRun(() -> {
             replicators.forEach((name, replicator) -> {
                 ByteBuf duplicateBuffer = data.retainedDuplicate();
                 Entry entry = create(0L, 0L, duplicateBuffer);
@@ -487,7 +490,7 @@ public class NonPersistentTopic implements Topic {
 
         FutureUtil.waitForAll(futures).thenRun(() -> {
             log.info("[{}] Topic closed", topic);
-            brokerService.pulsar().getExecutor().submit(() -> 
brokerService.removeTopicFromCache(topic));
+            brokerService.pulsar().getExecutor().execute(() -> 
brokerService.removeTopicFromCache(topic));
             closeFuture.complete(null);
         }).exceptionally(exception -> {
             log.error("[{}] Error closing topic", topic, exception);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index e7e50b8..443d29b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -152,7 +152,7 @@ public class MessageDeduplication {
 
                 if (managedCursor.hasMoreEntries()) {
                     // Read next batch of entries
-                    pulsar.getExecutor().submit(() -> replayCursor(future));
+                    pulsar.getExecutor().execute(() -> replayCursor(future));
                 } else {
                     // Done replaying
                     future.complete(null);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 2eb83ae..17c5db7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -589,7 +589,7 @@ public class PersistentDispatcherMultipleConsumers  extends 
AbstractDispatcherMu
             // unblock dispatcher if it acks back enough messages
             if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, 
TRUE, FALSE)) {
                 log.info("[{}] Dispatcher is unblocked", name);
-                topic.getBrokerService().executor().submit(() -> 
readMoreEntries());
+                topic.getBrokerService().executor().execute(() -> 
readMoreEntries());
             }
         }
         // increment broker-level count
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
index 98cafc5..2909857 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
@@ -18,30 +18,30 @@
  */
 package org.apache.pulsar.broker.auth;
 
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 
-import io.netty.util.concurrent.DefaultThreadFactory;
-
-public class SameThreadOrderedSafeExecutor extends OrderedScheduler {
+public class SameThreadOrderedSafeExecutor extends OrderedExecutor {
 
     public SameThreadOrderedSafeExecutor() {
         super("same-thread-executor", 1, new DefaultThreadFactory("test"), 
NullStatsLogger.INSTANCE, false, 100000, 10);
     }
 
     @Override
-    public void submit(SafeRunnable r) {
+    public void execute(Runnable r) {
         r.run();
     }
 
     @Override
-    public void submitOrdered(int orderingKey, SafeRunnable r) {
+    public void executeOrdered(int orderingKey, SafeRunnable r) {
         r.run();
     }
 
     @Override
-    public void submitOrdered(long orderingKey, SafeRunnable r) {
+    public void executeOrdered(long orderingKey, SafeRunnable r) {
         r.run();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
index 9fd2fb7..2fe63e8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
@@ -25,8 +25,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import com.google.common.hash.Hashing;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.pulsar.broker.PulsarService;
@@ -43,8 +42,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.hash.Hashing;
-
 public class ResourceQuotaCacheTest {
 
     private PulsarService pulsar;
@@ -52,14 +49,12 @@ public class ResourceQuotaCacheTest {
     private LocalZooKeeperCacheService localCache;
     private NamespaceBundleFactory bundleFactory;
     private OrderedScheduler executor;
-    private ScheduledExecutorService scheduledExecutor;
 
     @BeforeMethod
     public void setup() throws Exception {
         pulsar = mock(PulsarService.class);
         executor = 
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
-        scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
-        zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), 
executor, scheduledExecutor);
+        zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), 
executor);
         localCache = new LocalZooKeeperCacheService(zkCache, null);
 
         // set mock pulsar localzkcache
@@ -77,7 +72,6 @@ public class ResourceQuotaCacheTest {
     @AfterMethod
     public void teardown() {
         executor.shutdown();
-        scheduledExecutor.shutdown();
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index e8d4bb9..dfca1b2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -33,10 +33,10 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.hash.Hashing;
+
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -58,8 +58,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.hash.Hashing;
-
 public class OwnershipCacheTest {
 
     private PulsarService pulsar;
@@ -71,7 +69,6 @@ public class OwnershipCacheTest {
     private NamespaceService nsService;
     private BrokerService brokerService;
     private OrderedScheduler executor;
-    private ScheduledExecutorService scheduledExecutor;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -80,8 +77,7 @@ public class OwnershipCacheTest {
         pulsar = mock(PulsarService.class);
         config = mock(ServiceConfiguration.class);
         executor = 
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
-        scheduledExecutor = Executors.newScheduledThreadPool(2);
-        zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), 
executor, scheduledExecutor);
+        zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), 
executor);
         localCache = spy(new LocalZooKeeperCacheService(zkCache, null));
         ZooKeeperDataCache<LocalPolicies> poilciesCache = 
mock(ZooKeeperDataCache.class);
         when(pulsar.getLocalZkCacheService()).thenReturn(localCache);
@@ -106,7 +102,6 @@ public class OwnershipCacheTest {
     @AfterMethod
     public void teardown() throws Exception {
         executor.shutdown();
-        scheduledExecutor.shutdown();
     }
 
     @Test
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c468298..2cbd642 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -655,7 +655,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             return;
         }
 
-        listenerExecutor.submit(() -> {
+        listenerExecutor.execute(() -> {
             if (isActive) {
                 consumerEventListener.becameActive(this, partitionIndex);
             } else {
diff --git 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
index a6e3cbc..9a4b9aa 100644
--- 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
+++ 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.common.naming.TopicName;
diff --git 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
index 30f75be..1371d05 100644
--- 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
+++ 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
@@ -22,8 +22,6 @@ import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -37,8 +35,6 @@ import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 /**
  * Connects with ZooKeeper and sets watch to listen changes for active broker 
list.
  *
@@ -55,8 +51,6 @@ public class ZookeeperCacheLoader implements Closeable {
 
     private final OrderedScheduler orderedExecutor = 
OrderedScheduler.newSchedulerBuilder().numThreads(8)
             .name("pulsar-discovery-ordered-cache").build();
-    private final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(8,
-            new DefaultThreadFactory("pulsar-discovery-cache"));
 
     public static final String LOADBALANCE_BROKERS_ROOT = 
"/loadbalance/brokers";
 
@@ -74,8 +68,7 @@ public class ZookeeperCacheLoader implements Closeable {
             log.error("Shutting down ZK sessions: {}", exitCode);
         });
 
-        this.localZkCache = new 
LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), 
this.orderedExecutor,
-                executor);
+        this.localZkCache = new 
LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), 
this.orderedExecutor);
         localZkConnectionSvc.start(exitCode -> {
             try {
                 localZkCache.getZooKeeper().close();
@@ -115,7 +108,6 @@ public class ZookeeperCacheLoader implements Closeable {
     @Override
     public void close() {
         orderedExecutor.shutdown();
-        executor.shutdown();
     }
 
     private void updateBrokerList(Set<String> brokerNodes) throws Exception {
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
index 0ba1ea6..27d5a9b 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
@@ -22,8 +22,6 @@ import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -37,8 +35,6 @@ import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 /**
  * Connects with ZooKeeper and sets watch to listen changes for active broker 
list.
  *
@@ -56,8 +52,6 @@ public class ZookeeperCacheLoader implements Closeable {
 
     private final OrderedScheduler orderedExecutor = 
OrderedScheduler.newSchedulerBuilder().numThreads(8)
             .name("pulsar-discovery-ordered-cache").build();
-    private final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(8,
-            new DefaultThreadFactory("pulsar-discovery-cache"));
 
     public static final String LOADBALANCE_BROKERS_ROOT = 
"/loadbalance/brokers";
 
@@ -75,8 +69,7 @@ public class ZookeeperCacheLoader implements Closeable {
             log.error("Shutting down ZK sessions: {}", exitCode);
         });
 
-        this.localZkCache = new 
LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), 
this.orderedExecutor,
-                executor);
+        this.localZkCache = new 
LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), 
this.orderedExecutor);
         localZkConnectionSvc.start(exitCode -> {
             try {
                 localZkCache.getZooKeeper().close();
@@ -121,7 +114,6 @@ public class ZookeeperCacheLoader implements Closeable {
     @Override
     public void close() {
         orderedExecutor.shutdown();
-        executor.shutdown();
     }
 
     private void updateBrokerList(Set<String> brokerNodes) throws Exception {
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
index bd75e82..b59cde5 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
@@ -28,7 +28,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.ZooKeeper;
@@ -51,8 +51,8 @@ public class GlobalZooKeeperCache extends ZooKeeperCache 
implements Closeable {
     private final ScheduledExecutorService scheduledExecutor;
 
     public GlobalZooKeeperCache(ZooKeeperClientFactory zkClientFactory, int 
zkSessionTimeoutMillis,
-            String globalZkConnect, OrderedScheduler orderedExecutor, 
ScheduledExecutorService scheduledExecutor) {
-        super(null, orderedExecutor, scheduledExecutor);
+            String globalZkConnect, OrderedExecutor orderedExecutor, 
ScheduledExecutorService scheduledExecutor) {
+        super(null, orderedExecutor);
         this.zlClientFactory = zkClientFactory;
         this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
         this.globalZkConnect = globalZkConnect;
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
index 23547dd..945b58e 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
@@ -18,9 +18,7 @@
  */
 package org.apache.pulsar.zookeeper;
 
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
@@ -37,9 +35,8 @@ public class LocalZooKeeperCache extends ZooKeeperCache {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(LocalZooKeeperCache.class);
 
-    public LocalZooKeeperCache(final ZooKeeper zk, final OrderedScheduler 
executor,
-            ScheduledExecutorService scheduledExecutor) {
-        super(zk, executor, scheduledExecutor);
+    public LocalZooKeeperCache(final ZooKeeper zk, final OrderedExecutor 
executor) {
+        super(zk, executor);
     }
 
     @Override
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index b2715b3..e2e46aa 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -20,6 +20,12 @@ package org.apache.pulsar.zookeeper;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Sets;
+
 import java.nio.file.Paths;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Map.Entry;
@@ -29,14 +35,12 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
@@ -49,14 +53,6 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Sets;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 /**
  * Per ZK client ZooKeeper cache supporting ZNode data and children list 
caches. A cache entry is identified, accessed
  * and invalidated by the ZNode path. For the data cache, ZNode data parsing 
is done at request time with the given
@@ -84,19 +80,18 @@ public abstract class ZooKeeperCache implements Watcher {
     protected final AsyncLoadingCache<String, Entry<Object, Stat>> dataCache;
     protected final Cache<String, Set<String>> childrenCache;
     protected final Cache<String, Boolean> existsCache;
-    private final OrderedScheduler executor;
-    private final ScheduledExecutorService scheduledExecutor;
-    private boolean shouldShutdownExecutor = false;
+    private final OrderedExecutor executor;
+    private final OrderedExecutor backgroundExecutor = 
OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build();
+    private boolean shouldShutdownExecutor;
     public static final int cacheTimeOutInSec = 30;
 
     protected AtomicReference<ZooKeeper> zkSession = new 
AtomicReference<ZooKeeper>(null);
 
-    public ZooKeeperCache(ZooKeeper zkSession, OrderedScheduler executor, 
ScheduledExecutorService scheduledExecutor) {
+    public ZooKeeperCache(ZooKeeper zkSession, OrderedExecutor executor) {
         checkNotNull(executor);
-        checkNotNull(scheduledExecutor);
         this.executor = executor;
-        this.scheduledExecutor = scheduledExecutor;
         this.zkSession.set(zkSession);
+        this.shouldShutdownExecutor = false;
 
         this.dataCache = Caffeine.newBuilder().expireAfterAccess(1, 
TimeUnit.HOURS)
                 .buildAsync((key, executor1) -> null);
@@ -106,8 +101,7 @@ public abstract class ZooKeeperCache implements Watcher {
     }
 
     public ZooKeeperCache(ZooKeeper zkSession) {
-        this(zkSession, 
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("zk-cache-executor").build(),
-                Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("zk-cache-callback-executor")));
+        this(zkSession, 
OrderedExecutor.newBuilder().name("zk-cache-callback-executor").build());
         this.shouldShutdownExecutor = true;
     }
 
@@ -132,7 +126,7 @@ public abstract class ZooKeeperCache implements Watcher {
                     LOG.debug("Submitting reload cache task to the executor 
for path: {}, updater: {}", path, updater);
                 }
                 try {
-                    executor.submitOrdered(path, new SafeRunnable() {
+                    executor.executeOrdered(path, new SafeRunnable() {
                         @Override
                         public void safeRun() {
                             updater.reloadCache(path);
@@ -181,7 +175,7 @@ public abstract class ZooKeeperCache implements Watcher {
     }
 
     public void asyncInvalidate(String path) {
-        scheduledExecutor.submit(() -> invalidate(path));
+        backgroundExecutor.execute(() -> invalidate(path));
     }
 
     public void invalidate(final String path) {
@@ -322,20 +316,20 @@ public abstract class ZooKeeperCache implements Watcher {
             // Broker doesn't restart on global-zk session lost: so handling 
unexpected exception
             try {
                 this.zkSession.get().getData(path, watcher, (rc, path1, ctx, 
content, stat) -> {
-                    Executor exec = scheduledExecutor != null ? 
scheduledExecutor : executor;
                     if (rc == Code.OK.intValue()) {
                         try {
                             T obj = deserializer.deserialize(path, content);
                             // avoid using the zk-client thread to process the 
result
-                            exec.execute(() -> zkFuture.complete(new 
SimpleImmutableEntry<Object, Stat>(obj, stat)));
+                            executor.execute(
+                                    () -> zkFuture.complete(new 
SimpleImmutableEntry<Object, Stat>(obj, stat)));
                         } catch (Exception e) {
-                            exec.execute(() -> 
zkFuture.completeExceptionally(e));
+                            executor.execute(() -> 
zkFuture.completeExceptionally(e));
                         }
                     } else if (rc == Code.NONODE.intValue()) {
                         // Return null values for missing z-nodes, as this is 
not "exceptional" condition
-                        exec.execute(() -> zkFuture.complete(null));
+                        executor.execute(() -> zkFuture.complete(null));
                     } else {
-                        exec.execute(() -> 
zkFuture.completeExceptionally(KeeperException.create(rc)));
+                        executor.execute(() -> 
zkFuture.completeExceptionally(KeeperException.create(rc)));
                     }
                 }, null);
             } catch (Exception e) {
@@ -430,7 +424,8 @@ public abstract class ZooKeeperCache implements Watcher {
     public void stop() {
         if (shouldShutdownExecutor) {
             this.executor.shutdown();
-            this.scheduledExecutor.shutdown();
         }
+
+        this.backgroundExecutor.shutdown();
     }
 }
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
index c0f5299..d18aea9 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
@@ -23,21 +23,21 @@ import static 
org.apache.bookkeeper.util.SafeRunnable.safeRun;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
 
-import lombok.extern.slf4j.Slf4j;
-
 @Slf4j
 public class ZookeeperBkClientFactoryImpl implements ZooKeeperClientFactory {
 
-    private final OrderedScheduler executor;
+    private final OrderedExecutor executor;
 
-    public ZookeeperBkClientFactoryImpl(OrderedScheduler executor) {
+    public ZookeeperBkClientFactoryImpl(OrderedExecutor executor) {
         this.executor = executor;
     }
 
@@ -45,7 +45,7 @@ public class ZookeeperBkClientFactoryImpl implements 
ZooKeeperClientFactory {
     public CompletableFuture<ZooKeeper> create(String serverList, SessionType 
sessionType, int zkSessionTimeoutMillis) {
         CompletableFuture<ZooKeeper> future = new CompletableFuture<>();
 
-        executor.submit(safeRun(() -> {
+        executor.execute(safeRun(() -> {
             try {
                 ZooKeeper zk = 
ZooKeeperClient.newBuilder().connectString(serverList)
                         .sessionTimeoutMs(zkSessionTimeoutMillis)
diff --git 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index e9bad89..af00abc 100644
--- 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++ 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -87,7 +87,7 @@ public class ZookeeperCacheTest {
     @Test(timeOut = 10000)
     void testSimpleCache() throws Exception {
 
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor, scheduledExecutor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor);
         ZooKeeperDataCache<String> zkCache = new 
ZooKeeperDataCache<String>(zkCacheService) {
             @Override
             public String deserialize(String key, byte[] content) throws 
Exception {
@@ -127,7 +127,7 @@ public class ZookeeperCacheTest {
     void testChildrenCache() throws Exception {
         zkClient.create("/test", new byte[0], null, null);
 
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor, scheduledExecutor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor);
         ZooKeeperChildrenCache cache = new 
ZooKeeperChildrenCache(zkCacheService, "/test");
 
         // Create callback counter
@@ -180,7 +180,7 @@ public class ZookeeperCacheTest {
         // Check existence after creation of the node
         zkClient.create("/test", new byte[0], null, null);
         Thread.sleep(20);
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor, scheduledExecutor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor);
         boolean exists = zkCacheService.exists("/test");
         Assert.assertTrue(exists, "/test should exists in the cache");
 
@@ -197,7 +197,7 @@ public class ZookeeperCacheTest {
         zkClient.create("/test/c1", new byte[0], null, null);
         zkClient.create("/test/c2", new byte[0], null, null);
         Thread.sleep(20);
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor, scheduledExecutor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor);
         boolean exists = zkCacheService.exists("/test");
         Assert.assertTrue(exists, "/test should exists in the cache");
 
@@ -339,7 +339,7 @@ public class ZookeeperCacheTest {
         // add readOpDelayMs so, main thread will not serve zkCacahe-returned 
future and let zkExecutor-thread handle
         // callback-result process
         MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100);
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor, scheduledExecutor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor);
         ZooKeeperDataCache<String> zkCache = new 
ZooKeeperDataCache<String>(zkCacheService) {
             @Override
             public String deserialize(String key, byte[] content) throws 
Exception {
@@ -387,7 +387,7 @@ public class ZookeeperCacheTest {
         // add readOpDelayMs so, main thread will not serve zkCacahe-returned 
future and let zkExecutor-thread handle
         // callback-result process
         MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100);
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor, scheduledExecutor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor);
 
         final AtomicInteger count = new AtomicInteger(0);
         ZooKeeperDataCache<String> zkCache = new 
ZooKeeperDataCache<String>(zkCacheService) {

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to