merlimat closed pull request #1489: Refactored to reflect changes in BK for OrderedExecutor URL: https://github.com/apache/incubator-pulsar/pull/1489
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java index f267a6c06..60ba634e1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java @@ -188,7 +188,7 @@ public void asyncReadEntry(LedgerHandle lh, PositionImpl position, final ReadEnt manager.mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength()); ml.mbean.addReadEntriesSample(1, returnEntry.getLength()); - ml.getExecutor().submitOrdered(ml.getName(), safeRun(() -> { + ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> { callback.readEntryComplete(returnEntry, obj); })); } else { @@ -254,7 +254,7 @@ public void asyncReadEntry(LedgerHandle lh, long firstEntry, long lastEntry, boo checkNotNull(ml.getName()); checkNotNull(ml.getExecutor()); - ml.getExecutor().submitOrdered(ml.getName(), safeRun(() -> { + ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> { // We got the entries, we need to transform them to a List<> type long totalSize = 0; final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java index 7faa18c8e..262cbeb4c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java @@ -109,7 +109,7 @@ boolean hasSpaceInCache() { // Trigger a single eviction in background. While the eviction is running we stop inserting entries in the cache if (currentSize > evictionTriggerThreshold && evictionInProgress.compareAndSet(false, true)) { - mlFactory.executor.execute(safeRun(() -> { + mlFactory.scheduledExecutor.execute(safeRun(() -> { // Trigger a new cache eviction cycle to bring the used memory below the cacheEvictionWatermark // percentage limit long sizeToEvict = currentSize - (long) (maxSize * cacheEvictionWatermak); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 29fe0a724..a2be71d15 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -37,6 +37,7 @@ import com.google.common.collect.TreeRangeSet; import com.google.common.util.concurrent.RateLimiter; import com.google.protobuf.InvalidProtocolBufferException; + import java.util.ArrayDeque; import java.util.Collections; import java.util.List; @@ -51,6 +52,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; + import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback; import org.apache.bookkeeper.client.BKException; @@ -847,7 +849,7 @@ public void asyncResetCursor(Position newPos, AsyncCallbacks.ResetCursorCallback final PositionImpl newPosition = (PositionImpl) newPos; // order trim and reset operations on a ledger - ledger.getExecutor().submitOrdered(ledger.getName(), safeRun(() -> { + ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> { if (ledger.isValidPosition(newPosition) || newPosition.equals(PositionImpl.earliest) || newPosition.equals(PositionImpl.latest)) { internalResetCursor(newPosition, callback); @@ -1923,7 +1925,7 @@ void createNewMetadataLedger(final VoidCallback callback) { bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), config.getMetadataWriteQuorumSize(), config.getMetadataAckQuorumSize(), config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> { - ledger.getExecutor().submit(safeRun(() -> { + ledger.getExecutor().execute(safeRun(() -> { ledger.mbean.endCursorLedgerCreateOp(); if (rc != BKException.Code.OK) { log.warn("[{}] Error creating ledger for cursor {}: {}", ledger.getName(), name, diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 4c0d3d703..d148d1dda 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -23,7 +23,7 @@ import com.google.common.base.Predicates; import com.google.common.collect.Maps; -import io.netty.util.concurrent.DefaultThreadFactory; + import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -31,12 +31,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; + import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -63,7 +63,6 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange; import org.apache.bookkeeper.mledger.util.Futures; -import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.pulsar.common.util.DateFormatter; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; @@ -77,9 +76,9 @@ private final boolean isBookkeeperManaged; private final ZooKeeper zookeeper; private final ManagedLedgerFactoryConfig config; - protected final ScheduledExecutorService executor = Executors.newScheduledThreadPool(16, - new DefaultThreadFactory("bookkeeper-ml")); - private final OrderedScheduler orderedExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(16) + protected final OrderedScheduler scheduledExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(16) + .name("bookkeeper-ml-scheduler").build(); + private final OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder().numThreads(16) .name("bookkeeper-ml-workers").build(); protected final ManagedLedgerFactoryMBeanImpl mbean; @@ -122,7 +121,7 @@ public ManagedLedgerFactoryImpl(ClientConfiguration bkClientConfiguration, Manag this.config = config; this.mbean = new ManagedLedgerFactoryMBeanImpl(this); this.entryCacheManager = new EntryCacheManager(this); - this.statsTask = executor.scheduleAtFixedRate(() -> refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS); + this.statsTask = scheduledExecutor.scheduleAtFixedRate(() -> refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS); } public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper) throws Exception { @@ -138,7 +137,7 @@ public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper, Mana this.config = config; this.mbean = new ManagedLedgerFactoryMBeanImpl(this); this.entryCacheManager = new EntryCacheManager(this); - this.statsTask = executor.scheduleAtFixedRate(() -> refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS); + this.statsTask = scheduledExecutor.scheduleAtFixedRate(() -> refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS); } private synchronized void refreshStats() { @@ -232,7 +231,7 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final ledgers.computeIfAbsent(name, (mlName) -> { // Create the managed ledger CompletableFuture<ManagedLedgerImpl> future = new CompletableFuture<>(); - final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this, bookKeeper, store, config, executor, + final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this, bookKeeper, store, config, scheduledExecutor, orderedExecutor, name); newledger.initialize(new ManagedLedgerInitializeLedgerCallback() { @Override @@ -305,7 +304,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { } } - executor.shutdown(); + scheduledExecutor.shutdown(); orderedExecutor.shutdown(); entryCacheManager.clear(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 34283ed0d..4aceb6ca3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -22,18 +22,26 @@ import static java.lang.Math.min; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import com.google.common.collect.BoundType; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; +import com.google.common.collect.Range; +import com.google.common.util.concurrent.RateLimiter; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.Queue; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -45,6 +53,7 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; @@ -77,23 +86,13 @@ import org.apache.bookkeeper.mledger.util.Futures; import org.apache.bookkeeper.mledger.util.Pair; import org.apache.pulsar.common.api.Commands; -import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.BoundType; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Queues; -import com.google.common.collect.Range; -import com.google.common.util.concurrent.RateLimiter; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; - public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final static long MegaByte = 1024 * 1024; @@ -190,8 +189,8 @@ AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state"); private volatile State state = null; - private final ScheduledExecutorService scheduledExecutor; - private final OrderedScheduler executor; + private final OrderedScheduler scheduledExecutor; + private final OrderedExecutor executor; final ManagedLedgerFactoryImpl factory; protected final ManagedLedgerMBeanImpl mbean; @@ -204,7 +203,7 @@ // ////////////////////////////////////////////////////////////////////// public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, - ManagedLedgerConfig config, ScheduledExecutorService scheduledExecutor, OrderedScheduler orderedExecutor, + ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, OrderedExecutor orderedExecutor, final String name) { this.factory = factory; this.bookKeeper = bookKeeper; @@ -250,7 +249,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { if (ledgers.size() > 0) { final long id = ledgers.lastKey(); OpenCallback opencb = (rc, lh, ctx1) -> { - executor.submitOrdered(name, safeRun(() -> { + executor.executeOrdered(name, safeRun(() -> { mbean.endDataLedgerOpenOp(); if (log.isDebugEnabled()) { log.debug("[{}] Opened ledger {}: ", name, id, BKException.getMessage(rc)); @@ -338,7 +337,7 @@ public void operationFailed(MetaStoreException e) { mbean.startDataLedgerCreateOp(); bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> { - executor.submitOrdered(name, safeRun(() -> { + executor.executeOrdered(name, safeRun(() -> { mbean.endDataLedgerCreateOp(); if (rc != BKException.Code.OK) { callback.initializeFailed(createManagedLedgerException(rc)); @@ -1319,7 +1318,7 @@ void asyncReadEntries(OpReadEntry opReadEntry) { mbean.startDataLedgerOpenOp(); bookKeeper.asyncOpenLedger(ledgerId, config.getDigestType(), config.getPassword(), (int rc, LedgerHandle lh, Object ctx) -> { - executor.submit(safeRun(() -> { + executor.executeOrdered(name, safeRun(() -> { mbean.endDataLedgerOpenOp(); if (rc != BKException.Code.OK) { // Remove the ledger future from cache to give chance to reopen it later @@ -1484,12 +1483,12 @@ void notifyCursors() { break; } - executor.submit(safeRun(() -> waitingCursor.notifyEntriesAvailable())); + executor.execute(safeRun(() -> waitingCursor.notifyEntriesAvailable())); } } private void trimConsumedLedgersInBackground() { - executor.submitOrdered(name, safeRun(() -> { + executor.executeOrdered(name, safeRun(() -> { internalTrimConsumedLedgers(); })); } @@ -2113,11 +2112,11 @@ private boolean currentLedgerIsFull() { return ledgers; } - ScheduledExecutorService getScheduledExecutor() { + OrderedScheduler getScheduledExecutor() { return scheduledExecutor; } - OrderedScheduler getExecutor() { + OrderedExecutor getExecutor() { return executor; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java index 7a7697531..31705c1b6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java @@ -20,11 +20,16 @@ import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import com.google.common.base.Charsets; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.TextFormat; +import com.google.protobuf.TextFormat.ParseException; + import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; -import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; @@ -40,11 +45,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Charsets; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.TextFormat; -import com.google.protobuf.TextFormat.ParseException; - @SuppressWarnings("checkstyle:javadoctype") public class MetaStoreImplZookeeper implements MetaStore { @@ -55,7 +55,7 @@ private static final String prefix = prefixName + "/"; private final ZooKeeper zk; - private final OrderedScheduler executor; + private final OrderedExecutor executor; private static class ZKStat implements Stat { private final int version; @@ -90,7 +90,7 @@ public long getModificationTimestamp() { } } - public MetaStoreImplZookeeper(ZooKeeper zk, OrderedScheduler executor) + public MetaStoreImplZookeeper(ZooKeeper zk, OrderedExecutor executor) throws Exception { this.zk = zk; this.executor = executor; @@ -130,7 +130,8 @@ private ManagedLedgerInfo updateMLInfoTimestamp(ManagedLedgerInfo info) { @Override public void getManagedLedgerInfo(final String ledgerName, final MetaStoreCallback<ManagedLedgerInfo> callback) { // Try to get the content or create an empty node - zk.getData(prefix + ledgerName, false, (rc, path, ctx, readData, stat) -> executor.submit(safeRun(() -> { + zk.getData(prefix + ledgerName, false, + (rc, path, ctx, readData, stat) -> executor.executeOrdered(ledgerName, safeRun(() -> { if (rc == Code.OK.intValue()) { try { ManagedLedgerInfo info = parseManagedLedgerInfo(readData); @@ -171,7 +172,7 @@ public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, St byte[] serializedMlInfo = mlInfo.toByteArray(); // Binary format zk.setData(prefix + ledgerName, serializedMlInfo, zkStat.getVersion(), - (rc, path, zkCtx, stat1) -> executor.submit(safeRun(() -> { + (rc, path, zkCtx, stat1) -> executor.executeOrdered(ledgerName, safeRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] UpdateLedgersIdsCallback.processResult rc={} newVersion={}", ledgerName, Code.get(rc), stat != null ? stat.getVersion() : "null"); @@ -195,7 +196,8 @@ public void getCursors(final String ledgerName, final MetaStoreCallback<List<Str if (log.isDebugEnabled()) { log.debug("[{}] Get cursors list", ledgerName); } - zk.getChildren(prefix + ledgerName, false, (rc, path, ctx, children, stat) -> executor.submit(safeRun(() -> { + zk.getChildren(prefix + ledgerName, false, + (rc, path, ctx, children, stat) -> executor.executeOrdered(ledgerName, safeRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] getConsumers complete rc={} children={}", ledgerName, Code.get(rc), children); } @@ -219,7 +221,7 @@ public void asyncGetCursorInfo(String ledgerName, String consumerName, log.debug("Reading from {}", path); } - zk.getData(path, false, (rc, path1, ctx, data, stat) -> executor.submit(safeRun(() -> { + zk.getData(path, false, (rc, path1, ctx, data, stat) -> executor.executeOrdered(ledgerName, safeRun(() -> { if (rc != Code.OK.intValue()) { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); } else { @@ -251,7 +253,7 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa log.debug("[{}] Creating consumer {} on meta-data store with {}", ledgerName, cursorName, info); } zk.create(path, content, Acl, CreateMode.PERSISTENT, - (rc, path1, ctx, name) -> executor.submit(safeRun(() -> { + (rc, path1, ctx, name) -> executor.executeOrdered(ledgerName, safeRun(() -> { if (rc != Code.OK.intValue()) { log.warn("[{}] Error creating cosumer {} node on meta-data store with {}: ", ledgerName, cursorName, info, Code.get(rc)); @@ -269,7 +271,8 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa if (log.isDebugEnabled()) { log.debug("[{}] Updating consumer {} on meta-data store with {}", ledgerName, cursorName, info); } - zk.setData(path, content, zkStat.getVersion(), (rc, path1, ctx, stat1) -> executor.submit(safeRun(() -> { + zk.setData(path, content, zkStat.getVersion(), + (rc, path1, ctx, stat1) -> executor.executeOrdered(ledgerName, safeRun(() -> { if (rc == Code.BADVERSION.intValue()) { callback.operationFailed(new BadVersionException(KeeperException.create(Code.get(rc)))); } else if (rc != Code.OK.intValue()) { @@ -285,7 +288,8 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa public void asyncRemoveCursor(final String ledgerName, final String consumerName, final MetaStoreCallback<Void> callback) { log.info("[{}] Remove consumer={}", ledgerName, consumerName); - zk.delete(prefix + ledgerName + "/" + consumerName, -1, (rc, path, ctx) -> executor.submit(safeRun(() -> { + zk.delete(prefix + ledgerName + "/" + consumerName, -1, + (rc, path, ctx) -> executor.executeOrdered(ledgerName, safeRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] [{}] zk delete done. rc={}", ledgerName, consumerName, Code.get(rc)); } @@ -300,7 +304,7 @@ public void asyncRemoveCursor(final String ledgerName, final String consumerName @Override public void removeManagedLedger(String ledgerName, MetaStoreCallback<Void> callback) { log.info("[{}] Remove ManagedLedger", ledgerName); - zk.delete(prefix + ledgerName, -1, (rc, path, ctx) -> executor.submit(safeRun(() -> { + zk.delete(prefix + ledgerName, -1, (rc, path, ctx) -> executor.executeOrdered(ledgerName, safeRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] zk delete done. rc={}", ledgerName, Code.get(rc)); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 85c4da624..f4daec52e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -123,14 +123,14 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) // be marked as failed. ml.mbean.recordAddEntryError(); - ml.getExecutor().submitOrdered(ml.getName(), SafeRun.safeRun(() -> { + ml.getExecutor().executeOrdered(ml.getName(), SafeRun.safeRun(() -> { // Force the creation of a new ledger. Doing it in a background thread to avoid acquiring ML lock // from a BK callback. ml.ledgerClosed(lh); })); } else { // Trigger addComplete callback in a thread hashed on the managed ledger name - ml.getExecutor().submitOrdered(ml.getName(), this); + ml.getExecutor().executeOrdered(ml.getName(), this); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index 31d1c79b3..6b3a03adb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -83,7 +83,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { if (!entries.isEmpty()) { // There were already some entries that were read before, we can return them - cursor.ledger.getExecutor().submit(safeRun(() -> { + cursor.ledger.getExecutor().execute(safeRun(() -> { callback.readEntriesComplete(entries, ctx); recycle(); })); @@ -131,7 +131,7 @@ void checkReadCompletion() { } // Schedule next read in a different thread - cursor.ledger.getExecutor().submit(safeRun(() -> { + cursor.ledger.getExecutor().execute(safeRun(() -> { readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition); cursor.ledger.asyncReadEntries(OpReadEntry.this); })); @@ -139,7 +139,7 @@ void checkReadCompletion() { // The reading was already completed, release resources and trigger callback cursor.readOperationCompleted(); - cursor.ledger.getExecutor().submit(safeRun(() -> { + cursor.ledger.getExecutor().execute(safeRun(() -> { callback.readEntriesComplete(entries, ctx); recycle(); })); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java index 107abcf75..47c515d13 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java @@ -24,9 +24,9 @@ import static org.testng.Assert.assertTrue; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; @@ -41,7 +41,7 @@ @BeforeClass void setup() throws Exception { - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).build(); ml1 = mock(ManagedLedgerImpl.class); when(ml1.getScheduledExecutor()).thenReturn(executor); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java index c3e75c2e7..ebaa1fdab 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java @@ -65,7 +65,7 @@ public void simple() throws Exception { mbean.addLedgerSwitchLatencySample(1, TimeUnit.SECONDS); // Simulate stats getting update from different thread - factory.executor.submit(() -> { + factory.scheduledExecutor.submit(() -> { mbean.refreshStats(1, TimeUnit.SECONDS); }).get(); @@ -90,7 +90,7 @@ public void simple() throws Exception { ledger.addEntry(new byte[600]); cursor.markDelete(p1); - factory.executor.submit(() -> { + factory.scheduledExecutor.submit(() -> { mbean.refreshStats(1, TimeUnit.SECONDS); }).get(); @@ -109,7 +109,7 @@ public void simple() throws Exception { mbean.recordAddEntryError(); mbean.recordReadEntriesError(); - factory.executor.submit(() -> { + factory.scheduledExecutor.submit(() -> { mbean.refreshStats(1, TimeUnit.SECONDS); }).get(); @@ -119,7 +119,7 @@ public void simple() throws Exception { List<Entry> entries = cursor.readEntries(100); assertEquals(entries.size(), 1); - factory.executor.submit(() -> { + factory.scheduledExecutor.submit(() -> { mbean.refreshStats(1, TimeUnit.SECONDS); }).get(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 58010e8d7..47753e6fc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -38,6 +38,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; @@ -106,8 +107,8 @@ new DefaultThreadFactory("pulsar")); private final ScheduledExecutorService cacheExecutor = Executors.newScheduledThreadPool(10, new DefaultThreadFactory("zk-cache-callback")); - private final OrderedScheduler orderedExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(8) - .name("pulsar-ordered").build(); + private final OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder().numThreads(8).name("pulsar-ordered") + .build(); private final ScheduledExecutorService loadManagerExecutor; private ScheduledFuture<?> loadReportTask = null; private ScheduledFuture<?> loadSheddingTask = null; @@ -445,7 +446,7 @@ private void startZkCacheService() throws PulsarServerException { LOG.info("starting configuration cache service"); - this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor(), this.cacheExecutor); + this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor()); this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(), (int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(), getOrderedExecutor(), this.cacheExecutor); @@ -611,7 +612,7 @@ public ScheduledExecutorService getLoadManagerExecutor() { return loadManagerExecutor; } - public OrderedScheduler getOrderedExecutor() { + public OrderedExecutor getOrderedExecutor() { return orderedExecutor; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index fe307136f..2668f9540 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -636,8 +636,8 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, if (t != null) { // retry several times on BadVersion if ((t instanceof ServerMetadataException) && (counter.decrementAndGet() >= 0)) { - pulsar.getOrderedExecutor().submit( - () -> splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture)); + pulsar.getOrderedExecutor() + .execute(() -> splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture)); } else { // Retry enough, or meet other exception String msg2 = format(" %s not success update nsBundles, counter %d, reason %s", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index e1a0d3f0f..6c2e011ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -476,7 +476,7 @@ public void unloadNamespaceBundlesGracefully() { replicationFuture.exceptionally((ex) -> { log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex); nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> { - pulsar.getExecutor().submit(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(ex); }); @@ -579,7 +579,7 @@ private void createPersistentTopic(final String topic, CompletableFuture<Topic> // namespace is being unloaded String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); log.warn(msg); - pulsar.getExecutor().submit(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); return; } @@ -618,7 +618,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { }); } catch (NamingException e) { log.warn("Failed to create topic {}-{}", topic, e.getMessage()); - pulsar.getExecutor().submit(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(e); } } @@ -626,7 +626,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { @Override public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { log.warn("Failed to create topic {}", topic, exception); - pulsar.getExecutor().submit(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(new PersistenceException(exception)); } }, null); @@ -635,7 +635,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception); // remove topic from topics-map in different thread to avoid possible deadlock if // createPersistentTopic-thread only tries to handle this future-result - pulsar.getExecutor().submit(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(exception); return null; }); @@ -644,7 +644,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName topicName) { CompletableFuture<ManagedLedgerConfig> future = new CompletableFuture<>(); // Execute in background thread, since getting the policies might block if the z-node wasn't already cached - pulsar.getOrderedExecutor().submitOrdered(topicName, safeRun(() -> { + pulsar.getOrderedExecutor().executeOrdered(topicName, safeRun(() -> { NamespaceName namespace = topicName.getNamespaceObject(); ServiceConfiguration serviceConfig = pulsar.getConfiguration(); @@ -1104,7 +1104,7 @@ private void updateConfigurationAndRegisterListeners() { } private void updateTopicMessageDispatchRate() { - this.pulsar().getExecutor().submit(() -> { + this.pulsar().getExecutor().execute(() -> { // update message-rate for each topic topics.forEach((name, topicFuture) -> { if (topicFuture.isDone()) { @@ -1150,7 +1150,7 @@ private void updateSubscriptionMessageDispatchRate() { } private void updateManagedLedgerConfig() { - this.pulsar().getExecutor().submit(() -> { + this.pulsar().getExecutor().execute(() -> { // update managed-ledger config of each topic topics.forEach((name, topicFuture) -> { if (topicFuture.isDone()) { @@ -1403,7 +1403,7 @@ public void checkUnAckMessageDispatching() { // block dispatcher with higher unack-msg when it reaches broker-unack msg limit log.info("[{}] Starting blocking dispatchers with unacked msgs {} due to reached max broker limit {}", maxUnackedMessages, maxUnackedMsgsPerDispatcher); - executor().submit(() -> blockDispatchersWithLargeUnAckMessages()); + executor().execute(() -> blockDispatchersWithLargeUnAckMessages()); } else if (blockedDispatcherOnHighUnackedMsgs.get() && unAckedMessages < maxUnackedMessages / 2) { // unblock broker-dispatching if received enough acked messages back if (blockedDispatcherOnHighUnackedMsgs.compareAndSet(true, false)) { @@ -1457,7 +1457,7 @@ public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleC try { dispatcherList.forEach(dispatcher -> { dispatcher.unBlockDispatcherOnUnackedMsgs(); - executor().submit(() -> dispatcher.readMoreEntries()); + executor().execute(() -> dispatcher.readMoreEntries()); log.info("[{}] Dispatcher is unblocked", dispatcher.getName()); blockedDispatchers.remove(dispatcher); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index bada69ca2..c052e4c58 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -955,7 +955,7 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) { if (nonPersistentPendingMessages > MaxNonPersistentPendingMessages) { final long producerId = send.getProducerId(); final long sequenceId = send.getSequenceId(); - service.getTopicOrderedExecutor().submitOrdered(producer.getTopic().getName(), SafeRun.safeRun(() -> { + service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(), SafeRun.safeRun(() -> { ctx.writeAndFlush(Commands.newSendReceipt(producerId, sequenceId, -1, -1), ctx.voidPromise()); })); producer.recordMessageDrop(send.getNumMessages()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index fc6a920ef..9eb7ce7d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -27,8 +27,10 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; + import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.FastThreadLocal; + import java.util.Collections; import java.util.List; import java.util.Map; @@ -39,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; @@ -190,7 +193,7 @@ public void publishMessage(ByteBuf data, PublishContext callback) { // retain data for sub/replication because io-thread will release actual payload data.retain(2); - this.executor.submitOrdered(topic, SafeRun.safeRun(() -> { + this.executor.executeOrdered(topic, SafeRun.safeRun(() -> { subscriptions.forEach((name, subscription) -> { ByteBuf duplicateBuffer = data.retainedDuplicate(); Entry entry = create(0L, 0L, duplicateBuffer); @@ -210,7 +213,7 @@ public void publishMessage(ByteBuf data, PublishContext callback) { } })); - this.executor.submitOrdered(topic, SafeRun.safeRun(() -> { + this.executor.executeOrdered(topic, SafeRun.safeRun(() -> { replicators.forEach((name, replicator) -> { ByteBuf duplicateBuffer = data.retainedDuplicate(); Entry entry = create(0L, 0L, duplicateBuffer); @@ -487,7 +490,7 @@ void removeSubscription(String subscriptionName) { FutureUtil.waitForAll(futures).thenRun(() -> { log.info("[{}] Topic closed", topic); - brokerService.pulsar().getExecutor().submit(() -> brokerService.removeTopicFromCache(topic)); + brokerService.pulsar().getExecutor().execute(() -> brokerService.removeTopicFromCache(topic)); closeFuture.complete(null); }).exceptionally(exception -> { log.error("[{}] Error closing topic", topic, exception); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index e7e50b8cb..443d29bde 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -152,7 +152,7 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) { if (managedCursor.hasMoreEntries()) { // Read next batch of entries - pulsar.getExecutor().submit(() -> replayCursor(future)); + pulsar.getExecutor().execute(() -> replayCursor(future)); } else { // Done replaying future.complete(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 2eb83ae29..17c5db7a6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -589,7 +589,7 @@ public void addUnAckedMessages(int numberOfMessages) { // unblock dispatcher if it acks back enough messages if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) { log.info("[{}] Dispatcher is unblocked", name); - topic.getBrokerService().executor().submit(() -> readMoreEntries()); + topic.getBrokerService().executor().execute(() -> readMoreEntries()); } } // increment broker-level count diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java index 98cafc547..290985725 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java @@ -18,30 +18,30 @@ */ package org.apache.pulsar.broker.auth; -import org.apache.bookkeeper.common.util.OrderedScheduler; +import io.netty.util.concurrent.DefaultThreadFactory; + +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.SafeRunnable; import org.apache.bookkeeper.stats.NullStatsLogger; -import io.netty.util.concurrent.DefaultThreadFactory; - -public class SameThreadOrderedSafeExecutor extends OrderedScheduler { +public class SameThreadOrderedSafeExecutor extends OrderedExecutor { public SameThreadOrderedSafeExecutor() { super("same-thread-executor", 1, new DefaultThreadFactory("test"), NullStatsLogger.INSTANCE, false, 100000, 10); } @Override - public void submit(SafeRunnable r) { + public void execute(Runnable r) { r.run(); } @Override - public void submitOrdered(int orderingKey, SafeRunnable r) { + public void executeOrdered(int orderingKey, SafeRunnable r) { r.run(); } @Override - public void submitOrdered(long orderingKey, SafeRunnable r) { + public void executeOrdered(long orderingKey, SafeRunnable r) { r.run(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java index 9fd2fb7ac..2fe63e8ef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java @@ -25,8 +25,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import com.google.common.hash.Hashing; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.pulsar.broker.PulsarService; @@ -43,8 +42,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.google.common.hash.Hashing; - public class ResourceQuotaCacheTest { private PulsarService pulsar; @@ -52,14 +49,12 @@ private LocalZooKeeperCacheService localCache; private NamespaceBundleFactory bundleFactory; private OrderedScheduler executor; - private ScheduledExecutorService scheduledExecutor; @BeforeMethod public void setup() throws Exception { pulsar = mock(PulsarService.class); executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build(); - scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); - zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, scheduledExecutor); + zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor); localCache = new LocalZooKeeperCacheService(zkCache, null); // set mock pulsar localzkcache @@ -77,7 +72,6 @@ public void setup() throws Exception { @AfterMethod public void teardown() { executor.shutdown(); - scheduledExecutor.shutdown(); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java index e8d4bb909..dfca1b2c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java @@ -33,10 +33,10 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.hash.Hashing; + import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.common.util.OrderedScheduler; @@ -58,8 +58,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.google.common.hash.Hashing; - public class OwnershipCacheTest { private PulsarService pulsar; @@ -71,7 +69,6 @@ private NamespaceService nsService; private BrokerService brokerService; private OrderedScheduler executor; - private ScheduledExecutorService scheduledExecutor; @BeforeMethod public void setup() throws Exception { @@ -80,8 +77,7 @@ public void setup() throws Exception { pulsar = mock(PulsarService.class); config = mock(ServiceConfiguration.class); executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build(); - scheduledExecutor = Executors.newScheduledThreadPool(2); - zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, scheduledExecutor); + zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor); localCache = spy(new LocalZooKeeperCacheService(zkCache, null)); ZooKeeperDataCache<LocalPolicies> poilciesCache = mock(ZooKeeperDataCache.class); when(pulsar.getLocalZkCacheService()).thenReturn(localCache); @@ -106,7 +102,6 @@ public void setup() throws Exception { @AfterMethod public void teardown() throws Exception { executor.shutdown(); - scheduledExecutor.shutdown(); } @Test diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index c468298ec..2cbd64224 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -655,7 +655,7 @@ void activeConsumerChanged(boolean isActive) { return; } - listenerExecutor.submit(() -> { + listenerExecutor.execute(() -> { if (isActive) { consumerEventListener.becameActive(this, partitionIndex); } else { diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java index a6e3cbce2..9a4b9aaf2 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java @@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.common.util.OrderedScheduler; -import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.common.naming.TopicName; diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java index 30f75be6b..1371d0580 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java @@ -22,8 +22,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -37,8 +35,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.util.concurrent.DefaultThreadFactory; - /** * Connects with ZooKeeper and sets watch to listen changes for active broker list. * @@ -55,8 +51,6 @@ private final OrderedScheduler orderedExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(8) .name("pulsar-discovery-ordered-cache").build(); - private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(8, - new DefaultThreadFactory("pulsar-discovery-cache")); public static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers"; @@ -74,8 +68,7 @@ public ZookeeperCacheLoader(ZooKeeperClientFactory zkClientFactory, String zooke log.error("Shutting down ZK sessions: {}", exitCode); }); - this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor, - executor); + this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor); localZkConnectionSvc.start(exitCode -> { try { localZkCache.getZooKeeper().close(); @@ -115,7 +108,6 @@ public ZooKeeperCache getLocalZkCache() { @Override public void close() { orderedExecutor.shutdown(); - executor.shutdown(); } private void updateBrokerList(Set<String> brokerNodes) throws Exception { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java index 0ba1ea63c..27d5a9b74 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java @@ -22,8 +22,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -37,8 +35,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.util.concurrent.DefaultThreadFactory; - /** * Connects with ZooKeeper and sets watch to listen changes for active broker list. * @@ -56,8 +52,6 @@ private final OrderedScheduler orderedExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(8) .name("pulsar-discovery-ordered-cache").build(); - private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(8, - new DefaultThreadFactory("pulsar-discovery-cache")); public static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers"; @@ -75,8 +69,7 @@ public ZookeeperCacheLoader(ZooKeeperClientFactory zkClientFactory, String zooke log.error("Shutting down ZK sessions: {}", exitCode); }); - this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor, - executor); + this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor); localZkConnectionSvc.start(exitCode -> { try { localZkCache.getZooKeeper().close(); @@ -121,7 +114,6 @@ public ZooKeeperCache getLocalZkCache() { @Override public void close() { orderedExecutor.shutdown(); - executor.shutdown(); } private void updateBrokerList(Set<String> brokerNodes) throws Exception { diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java index bd75e820e..b59cde569 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java @@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.ZooKeeper; @@ -51,8 +51,8 @@ private final ScheduledExecutorService scheduledExecutor; public GlobalZooKeeperCache(ZooKeeperClientFactory zkClientFactory, int zkSessionTimeoutMillis, - String globalZkConnect, OrderedScheduler orderedExecutor, ScheduledExecutorService scheduledExecutor) { - super(null, orderedExecutor, scheduledExecutor); + String globalZkConnect, OrderedExecutor orderedExecutor, ScheduledExecutorService scheduledExecutor) { + super(null, orderedExecutor); this.zlClientFactory = zkClientFactory; this.zkSessionTimeoutMillis = zkSessionTimeoutMillis; this.globalZkConnect = globalZkConnect; diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java index 23547dd24..945b58e53 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java @@ -18,9 +18,7 @@ */ package org.apache.pulsar.zookeeper; -import java.util.concurrent.ScheduledExecutorService; - -import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; @@ -37,9 +35,8 @@ private static final Logger LOG = LoggerFactory.getLogger(LocalZooKeeperCache.class); - public LocalZooKeeperCache(final ZooKeeper zk, final OrderedScheduler executor, - ScheduledExecutorService scheduledExecutor) { - super(zk, executor, scheduledExecutor); + public LocalZooKeeperCache(final ZooKeeper zk, final OrderedExecutor executor) { + super(zk, executor); } @Override diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java index b2715b3f2..e2e46aa8f 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java @@ -20,6 +20,12 @@ import static com.google.common.base.Preconditions.checkNotNull; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Sets; + import java.nio.file.Paths; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Map.Entry; @@ -29,14 +35,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.util.SafeRunnable; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -49,14 +53,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.github.benmanes.caffeine.cache.AsyncLoadingCache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.collect.Sets; - -import io.netty.util.concurrent.DefaultThreadFactory; - /** * Per ZK client ZooKeeper cache supporting ZNode data and children list caches. A cache entry is identified, accessed * and invalidated by the ZNode path. For the data cache, ZNode data parsing is done at request time with the given @@ -84,19 +80,18 @@ protected final AsyncLoadingCache<String, Entry<Object, Stat>> dataCache; protected final Cache<String, Set<String>> childrenCache; protected final Cache<String, Boolean> existsCache; - private final OrderedScheduler executor; - private final ScheduledExecutorService scheduledExecutor; - private boolean shouldShutdownExecutor = false; + private final OrderedExecutor executor; + private final OrderedExecutor backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build(); + private boolean shouldShutdownExecutor; public static final int cacheTimeOutInSec = 30; protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null); - public ZooKeeperCache(ZooKeeper zkSession, OrderedScheduler executor, ScheduledExecutorService scheduledExecutor) { + public ZooKeeperCache(ZooKeeper zkSession, OrderedExecutor executor) { checkNotNull(executor); - checkNotNull(scheduledExecutor); this.executor = executor; - this.scheduledExecutor = scheduledExecutor; this.zkSession.set(zkSession); + this.shouldShutdownExecutor = false; this.dataCache = Caffeine.newBuilder().expireAfterAccess(1, TimeUnit.HOURS) .buildAsync((key, executor1) -> null); @@ -106,8 +101,7 @@ public ZooKeeperCache(ZooKeeper zkSession, OrderedScheduler executor, ScheduledE } public ZooKeeperCache(ZooKeeper zkSession) { - this(zkSession, OrderedScheduler.newSchedulerBuilder().numThreads(1).name("zk-cache-executor").build(), - Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("zk-cache-callback-executor"))); + this(zkSession, OrderedExecutor.newBuilder().name("zk-cache-callback-executor").build()); this.shouldShutdownExecutor = true; } @@ -132,7 +126,7 @@ public ZooKeeper getZooKeeper() { LOG.debug("Submitting reload cache task to the executor for path: {}, updater: {}", path, updater); } try { - executor.submitOrdered(path, new SafeRunnable() { + executor.executeOrdered(path, new SafeRunnable() { @Override public void safeRun() { updater.reloadCache(path); @@ -181,7 +175,7 @@ private void invalidateExists(String path) { } public void asyncInvalidate(String path) { - scheduledExecutor.submit(() -> invalidate(path)); + backgroundExecutor.execute(() -> invalidate(path)); } public void invalidate(final String path) { @@ -322,20 +316,20 @@ public Boolean call() throws Exception { // Broker doesn't restart on global-zk session lost: so handling unexpected exception try { this.zkSession.get().getData(path, watcher, (rc, path1, ctx, content, stat) -> { - Executor exec = scheduledExecutor != null ? scheduledExecutor : executor; if (rc == Code.OK.intValue()) { try { T obj = deserializer.deserialize(path, content); // avoid using the zk-client thread to process the result - exec.execute(() -> zkFuture.complete(new SimpleImmutableEntry<Object, Stat>(obj, stat))); + executor.execute( + () -> zkFuture.complete(new SimpleImmutableEntry<Object, Stat>(obj, stat))); } catch (Exception e) { - exec.execute(() -> zkFuture.completeExceptionally(e)); + executor.execute(() -> zkFuture.completeExceptionally(e)); } } else if (rc == Code.NONODE.intValue()) { // Return null values for missing z-nodes, as this is not "exceptional" condition - exec.execute(() -> zkFuture.complete(null)); + executor.execute(() -> zkFuture.complete(null)); } else { - exec.execute(() -> zkFuture.completeExceptionally(KeeperException.create(rc))); + executor.execute(() -> zkFuture.completeExceptionally(KeeperException.create(rc))); } }, null); } catch (Exception e) { @@ -430,7 +424,8 @@ public void invalidateRoot(String root) { public void stop() { if (shouldShutdownExecutor) { this.executor.shutdown(); - this.scheduledExecutor.shutdown(); } + + this.backgroundExecutor.shutdown(); } } diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java index c0f529999..d18aea93e 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java @@ -23,21 +23,21 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; -import org.apache.bookkeeper.common.util.OrderedScheduler; +import lombok.extern.slf4j.Slf4j; + +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; -import lombok.extern.slf4j.Slf4j; - @Slf4j public class ZookeeperBkClientFactoryImpl implements ZooKeeperClientFactory { - private final OrderedScheduler executor; + private final OrderedExecutor executor; - public ZookeeperBkClientFactoryImpl(OrderedScheduler executor) { + public ZookeeperBkClientFactoryImpl(OrderedExecutor executor) { this.executor = executor; } @@ -45,7 +45,7 @@ public ZookeeperBkClientFactoryImpl(OrderedScheduler executor) { public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType, int zkSessionTimeoutMillis) { CompletableFuture<ZooKeeper> future = new CompletableFuture<>(); - executor.submit(safeRun(() -> { + executor.execute(safeRun(() -> { try { ZooKeeper zk = ZooKeeperClient.newBuilder().connectString(serverList) .sessionTimeoutMs(zkSessionTimeoutMillis) diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java index e9bad89f5..af00abc42 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java @@ -87,7 +87,7 @@ void classTeardown() throws Exception { @Test(timeOut = 10000) void testSimpleCache() throws Exception { - ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor); ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) { @Override public String deserialize(String key, byte[] content) throws Exception { @@ -127,7 +127,7 @@ public String deserialize(String key, byte[] content) throws Exception { void testChildrenCache() throws Exception { zkClient.create("/test", new byte[0], null, null); - ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor); ZooKeeperChildrenCache cache = new ZooKeeperChildrenCache(zkCacheService, "/test"); // Create callback counter @@ -180,7 +180,7 @@ void testExistsCache() throws Exception { // Check existence after creation of the node zkClient.create("/test", new byte[0], null, null); Thread.sleep(20); - ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor); boolean exists = zkCacheService.exists("/test"); Assert.assertTrue(exists, "/test should exists in the cache"); @@ -197,7 +197,7 @@ void testInvalidateCache() throws Exception { zkClient.create("/test/c1", new byte[0], null, null); zkClient.create("/test/c2", new byte[0], null, null); Thread.sleep(20); - ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor); boolean exists = zkCacheService.exists("/test"); Assert.assertTrue(exists, "/test should exists in the cache"); @@ -339,7 +339,7 @@ void testZkCallbackThreadStuck() throws Exception { // add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle // callback-result process MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100); - ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor); ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) { @Override public String deserialize(String key, byte[] content) throws Exception { @@ -387,7 +387,7 @@ public void testInvalidateCacheOnFailure() throws Exception { // add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle // callback-result process MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100); - ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor); final AtomicInteger count = new AtomicInteger(0); ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services