This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bafecb2a9c0 [improve] Removed usages of SafeRun (#20060)
bafecb2a9c0 is described below
commit bafecb2a9c0e73942de6a38df72dd5888d5afd66
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Apr 10 18:30:22 2023 -0700
[improve] Removed usages of SafeRun (#20060)
### Motivation
With BK 4.16, we don't need to pass `SafeRunnable` instances to the
`OrderedExecutor` anymore. The executor has embedded the logic of checking and
logging exceptions.
Removing the SafeRun will avoid extra allocations in critical path and
clutter of the code
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
*(Please pick either of the following options)*
This change is a trivial rework / code cleanup without any test coverage.
*(or)*
This change is already covered by existing tests, such as *(please describe
tests)*.
*(or)*
This change added tests and can be verified as follows:
*(example:)*
- *Added integration tests for end-to-end deployment with large payloads
(10MB)*
- *Extended integration test for recovery after broker failure*
### Does this pull request potentially affect one of the following parts:
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
*If the box was checked, please highlight the changes*
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc` <!-- Your PR contains doc changes. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update
later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
### Matching PR in forked repository
PR in forked repository: (https://github.com/merlimat/pulsar/pull/6)
<!--
After opening this PR, the build in apache/pulsar will fail and
instructions will
be provided for opening a PR in the PR author's forked repository.
apache/pulsar pull requests should be first tested in your own fork since
the
apache/pulsar CI based on GitHub Actions has constrained resources and
quota.
GitHub Actions provides separate quota for pull requests that are executed
in
a forked repository.
The tests will be run in the forked repository until all PR review comments
have
been handled, the tests pass and the PR is approved by a reviewer.
-->
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 21 +++---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 44 ++++++-----
.../bookkeeper/mledger/impl/MetaStoreImpl.java | 27 ++++---
.../apache/bookkeeper/mledger/impl/OpAddEntry.java | 10 +--
.../bookkeeper/mledger/impl/OpReadEntry.java | 13 ++--
.../mledger/impl/ShadowManagedLedgerImpl.java | 13 ++--
.../impl/cache/RangeEntryCacheManagerImpl.java | 5 +-
.../apache/bookkeeper/mledger/util/SafeRun.java | 57 --------------
.../mledger/impl/ManagedCursorConcurrencyTest.java | 5 +-
.../stats/prometheus/PrometheusMetricsServlet.java | 5 +-
.../pulsar/broker/service/BrokerService.java | 87 +++++++++++++---------
.../apache/pulsar/broker/service/ServerCnx.java | 5 +-
.../PersistentDispatcherMultipleConsumers.java | 11 ++-
.../PersistentDispatcherSingleActiveConsumer.java | 25 ++-----
...istentStickyKeyDispatcherMultipleConsumers.java | 5 +-
...istentStreamingDispatcherMultipleConsumers.java | 12 ++-
...entStreamingDispatcherSingleActiveConsumer.java | 10 +--
.../streamingdispatch/StreamingEntryReader.java | 19 ++---
.../coordination/impl/LeaderElectionImpl.java | 3 +-
19 files changed, 150 insertions(+), 227 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 05cad09b018..ef607fa7ed7 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -25,7 +25,6 @@ import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGE
import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import static
org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Collections2;
@@ -1358,7 +1357,7 @@ public class ManagedCursorImpl implements ManagedCursor {
final PositionImpl newPosition = (PositionImpl) newPos;
// order trim and reset operations on a ledger
- ledger.getExecutor().execute(safeRun(() -> {
+ ledger.getExecutor().execute(() -> {
PositionImpl actualPosition = newPosition;
if (!ledger.isValidPosition(actualPosition)
@@ -1375,7 +1374,7 @@ public class ManagedCursorImpl implements ManagedCursor {
}
internalResetCursor(actualPosition, callback);
- }));
+ });
}
@Override
@@ -2055,7 +2054,7 @@ public class ManagedCursorImpl implements ManagedCursor {
+ "is later.", mdEntry.newPosition,
persistentMarkDeletePosition);
}
// run with executor to prevent deadlock
- ledger.getExecutor().execute(safeRun(() ->
mdEntry.triggerComplete()));
+ ledger.getExecutor().execute(() -> mdEntry.triggerComplete());
return;
}
@@ -2074,7 +2073,7 @@ public class ManagedCursorImpl implements ManagedCursor {
+ "in progress {} is later.", mdEntry.newPosition,
inProgressLatest);
}
// run with executor to prevent deadlock
- ledger.getExecutor().execute(safeRun(() ->
mdEntry.triggerComplete()));
+ ledger.getExecutor().execute(() -> mdEntry.triggerComplete());
return;
}
@@ -2611,8 +2610,8 @@ public class ManagedCursorImpl implements ManagedCursor {
private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl
position, Map<String, Long> properties,
MetaStoreCallback<Void> callback, boolean
persistIndividualDeletedMessageRanges) {
if (state == State.Closed) {
- ledger.getExecutor().execute(safeRun(() ->
callback.operationFailed(new MetaStoreException(
- new CursorAlreadyClosedException(name + " cursor already
closed")))));
+ ledger.getExecutor().execute(() -> callback.operationFailed(new
MetaStoreException(
+ new CursorAlreadyClosedException(name + " cursor already
closed"))));
return;
}
@@ -2845,7 +2844,7 @@ public class ManagedCursorImpl implements ManagedCursor {
return;
}
- ledger.getExecutor().execute(safeRun(() -> {
+ ledger.getExecutor().execute(() -> {
ledger.mbean.endCursorLedgerCreateOp();
if (rc != BKException.Code.OK) {
log.warn("[{}] Error creating ledger for cursor {}: {}",
ledger.getName(), name,
@@ -2858,7 +2857,7 @@ public class ManagedCursorImpl implements ManagedCursor {
log.debug("[{}] Created ledger {} for cursor {}",
ledger.getName(), lh.getId(), name);
}
future.complete(lh);
- }));
+ });
}, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));
return future;
@@ -3192,7 +3191,7 @@ public class ManagedCursorImpl implements ManagedCursor {
log.warn("[{}] Failed to delete ledger {}: {}",
ledger.getName(), lh.getId(),
BKException.getMessage(rc));
if (!isNoSuchLedgerExistsException(rc)) {
- ledger.getScheduledExecutor().schedule(safeRun(() ->
asyncDeleteLedger(lh, retry - 1)),
+ ledger.getScheduledExecutor().schedule(() ->
asyncDeleteLedger(lh, retry - 1),
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC,
TimeUnit.SECONDS);
}
return;
@@ -3227,7 +3226,7 @@ public class ManagedCursorImpl implements ManagedCursor {
log.warn("[{}][{}] Failed to delete ledger {}: {}",
ledger.getName(), name, cursorLedger.getId(),
BKException.getMessage(rc));
if (!isNoSuchLedgerExistsException(rc)) {
- ledger.getScheduledExecutor().schedule(safeRun(() ->
asyncDeleteCursorLedger(retry - 1)),
+ ledger.getScheduledExecutor().schedule(() ->
asyncDeleteCursorLedger(retry - 1),
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC,
TimeUnit.SECONDS);
}
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 8376ee1bb84..c74db884a95 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -22,7 +22,6 @@ import static
com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.Math.min;
import static
org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BoundType;
import com.google.common.collect.Lists;
@@ -409,7 +408,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
if (!ledgers.isEmpty()) {
final long id = ledgers.lastKey();
OpenCallback opencb = (rc, lh, ctx1) -> {
- executor.execute(safeRun(() -> {
+ executor.execute(() -> {
mbean.endDataLedgerOpenOp();
if (log.isDebugEnabled()) {
log.debug("[{}] Opened ledger {}: {}", name,
id, BKException.getMessage(rc));
@@ -439,7 +438,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
callback.initializeFailed(createManagedLedgerException(rc));
return;
}
- }));
+ });
};
if (log.isDebugEnabled()) {
@@ -522,7 +521,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
return;
}
- executor.execute(safeRun(() -> {
+ executor.execute(() -> {
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
callback.initializeFailed(createManagedLedgerException(rc));
@@ -551,7 +550,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
// Save it back to ensure all nodes exist
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(),
ledgersStat, storeLedgersCb);
- }));
+ });
}, ledgerMetadata);
}
@@ -774,10 +773,10 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
buffer.retain();
// Jump to specific thread to avoid contention from writers writing
from different threads
- executor.execute(safeRun(() -> {
+ executor.execute(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this,
buffer, callback, ctx);
internalAsyncAddEntry(addOperation);
- }));
+ });
}
@Override
@@ -790,10 +789,10 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
buffer.retain();
// Jump to specific thread to avoid contention from writers writing
from different threads
- executor.execute(safeRun(() -> {
+ executor.execute(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this,
buffer, numberOfMessages, callback, ctx);
internalAsyncAddEntry(addOperation);
- }));
+ });
}
protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation)
{
@@ -2374,7 +2373,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
break;
}
- executor.execute(safeRun(waitingCursor::notifyEntriesAvailable));
+ executor.execute(waitingCursor::notifyEntriesAvailable);
}
}
@@ -2385,7 +2384,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
break;
}
- executor.execute(safeRun(cb::entriesAvailable));
+ executor.execute(cb::entriesAvailable);
}
}
@@ -2432,16 +2431,16 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
@Override
public void trimConsumedLedgersInBackground(CompletableFuture<?> promise) {
- executor.execute(safeRun(() -> internalTrimConsumedLedgers(promise)));
+ executor.execute(() -> internalTrimConsumedLedgers(promise));
}
public void trimConsumedLedgersInBackground(boolean isTruncate,
CompletableFuture<?> promise) {
- executor.execute(safeRun(() -> internalTrimLedgers(isTruncate,
promise)));
+ executor.execute(() -> internalTrimLedgers(isTruncate, promise));
}
private void scheduleDeferredTrimming(boolean isTruncate,
CompletableFuture<?> promise) {
- scheduledExecutor.schedule(safeRun(() ->
trimConsumedLedgersInBackground(isTruncate, promise)), 100,
- TimeUnit.MILLISECONDS);
+ scheduledExecutor.schedule(() ->
trimConsumedLedgersInBackground(isTruncate, promise),
+ 100, TimeUnit.MILLISECONDS);
}
private void maybeOffloadInBackground(CompletableFuture<PositionImpl>
promise) {
@@ -2456,7 +2455,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
final long offloadThresholdInSeconds =
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
- executor.execute(safeRun(() ->
maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
+ executor.execute(() -> maybeOffload(offloadThresholdInBytes,
offloadThresholdInSeconds, promise));
}
}
@@ -2477,7 +2476,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
if (!offloadMutex.tryLock()) {
- scheduledExecutor.schedule(safeRun(() ->
maybeOffloadInBackground(finalPromise)),
+ scheduledExecutor.schedule(() ->
maybeOffloadInBackground(finalPromise),
100, TimeUnit.MILLISECONDS);
return;
}
@@ -2956,7 +2955,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
log.warn("[{}] Ledger was already deleted {}", name, ledgerId);
} else if (rc != BKException.Code.OK) {
log.error("[{}] Error deleting ledger {} : {}", name,
ledgerId, BKException.getMessage(rc));
- scheduledExecutor.schedule(safeRun(() ->
asyncDeleteLedger(ledgerId, retry - 1)),
+ scheduledExecutor.schedule(() -> asyncDeleteLedger(ledgerId,
retry - 1),
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC,
TimeUnit.SECONDS);
} else {
if (log.isDebugEnabled()) {
@@ -3260,7 +3259,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
if (!metadataMutex.tryLock()) {
// retry in 100 milliseconds
scheduledExecutor.schedule(
- safeRun(() -> tryTransformLedgerInfo(ledgerId,
transformation, finalPromise)), 100,
+ () -> tryTransformLedgerInfo(ledgerId, transformation,
finalPromise), 100,
TimeUnit.MILLISECONDS);
} else { // lock acquired
CompletableFuture<Void> unlockingPromise = new
CompletableFuture<>();
@@ -4011,9 +4010,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
timeoutSec = timeoutSec <= 0
? Math.max(config.getAddEntryTimeoutSeconds(),
config.getReadEntryTimeoutSeconds())
: timeoutSec;
- this.timeoutTask =
this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
- checkTimeouts();
- }), timeoutSec, timeoutSec, TimeUnit.SECONDS);
+ this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(
+ this::checkTimeouts, timeoutSec, timeoutSec,
TimeUnit.SECONDS);
}
}
@@ -4336,7 +4334,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
checkLedgerRollTask.cancel(true);
}
this.checkLedgerRollTask = this.scheduledExecutor.schedule(
- safeRun(this::rollCurrentLedgerIfFull),
this.maximumRolloverTimeMs, TimeUnit.MILLISECONDS);
+ this::rollCurrentLedgerIfFull, this.maximumRolloverTimeMs,
TimeUnit.MILLISECONDS);
}
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index fb93e929f50..fcce1f7766c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -42,7 +42,6 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
-import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.compression.CompressionCodec;
@@ -156,7 +155,7 @@ public class MetaStoreImpl implements MetaStore,
Consumer<Notification> {
.exceptionally(ex -> {
try {
executor.executeOrdered(ledgerName,
- SafeRunnable.safeRun(() ->
callback.operationFailed(getException(ex))));
+ () ->
callback.operationFailed(getException(ex)));
} catch (RejectedExecutionException e) {
//executor maybe shutdown, use common pool to run
callback.
CompletableFuture.runAsync(() ->
callback.operationFailed(getException(ex)));
@@ -204,8 +203,8 @@ public class MetaStoreImpl implements MetaStore,
Consumer<Notification> {
.thenAcceptAsync(newVersion ->
callback.operationComplete(null, newVersion),
executor.chooseThread(ledgerName))
.exceptionally(ex -> {
- executor.executeOrdered(ledgerName,
SafeRunnable.safeRun(() -> callback
- .operationFailed(getException(ex))));
+ executor.executeOrdered(ledgerName,
+ () -> callback.operationFailed(getException(ex)));
return null;
});
}
@@ -221,8 +220,8 @@ public class MetaStoreImpl implements MetaStore,
Consumer<Notification> {
.thenAcceptAsync(cursors ->
callback.operationComplete(cursors, null), executor
.chooseThread(ledgerName))
.exceptionally(ex -> {
- executor.executeOrdered(ledgerName,
SafeRunnable.safeRun(() -> callback
- .operationFailed(getException(ex))));
+ executor.executeOrdered(ledgerName,
+ () -> callback.operationFailed(getException(ex)));
return null;
});
}
@@ -249,8 +248,8 @@ public class MetaStoreImpl implements MetaStore,
Consumer<Notification> {
}
}, executor.chooseThread(ledgerName))
.exceptionally(ex -> {
- executor.executeOrdered(ledgerName,
SafeRunnable.safeRun(() -> callback
- .operationFailed(getException(ex))));
+ executor.executeOrdered(ledgerName,
+ () -> callback.operationFailed(getException(ex)));
return null;
});
}
@@ -284,8 +283,8 @@ public class MetaStoreImpl implements MetaStore,
Consumer<Notification> {
.thenAcceptAsync(optStat -> callback.operationComplete(null,
optStat), executor
.chooseThread(ledgerName))
.exceptionally(ex -> {
- executor.executeOrdered(ledgerName,
SafeRunnable.safeRun(() -> callback
- .operationFailed(getException(ex))));
+ executor.executeOrdered(ledgerName,
+ () -> callback.operationFailed(getException(ex)));
return null;
});
}
@@ -303,7 +302,7 @@ public class MetaStoreImpl implements MetaStore,
Consumer<Notification> {
callback.operationComplete(null, null);
}, executor.chooseThread(ledgerName))
.exceptionally(ex -> {
- executor.executeOrdered(ledgerName,
SafeRunnable.safeRun(() -> {
+ executor.executeOrdered(ledgerName, () -> {
Throwable actEx =
FutureUtil.unwrapCompletionException(ex);
if (actEx instanceof
MetadataStoreException.NotFoundException){
log.info("[{}] [{}] cursor delete done because it
did not exist.", ledgerName, cursorName);
@@ -311,7 +310,7 @@ public class MetaStoreImpl implements MetaStore,
Consumer<Notification> {
return;
}
callback.operationFailed(getException(ex));
- }));
+ });
return null;
});
}
@@ -329,8 +328,8 @@ public class MetaStoreImpl implements MetaStore,
Consumer<Notification> {
callback.operationComplete(null, null);
}, executor.chooseThread(ledgerName))
.exceptionally(ex -> {
- executor.executeOrdered(ledgerName,
SafeRunnable.safeRun(() -> callback
- .operationFailed(getException(ex))));
+ executor.executeOrdered(ledgerName,
+ () -> callback.operationFailed(getException(ex)));
return null;
});
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index c56123c24ca..ae2beafb643 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -35,8 +35,6 @@ import
org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
-import org.apache.bookkeeper.mledger.util.SafeRun;
-import org.apache.bookkeeper.util.SafeRunnable;
/**
@@ -44,7 +42,7 @@ import org.apache.bookkeeper.util.SafeRunnable;
*
*/
@Slf4j
-public class OpAddEntry extends SafeRunnable implements AddCallback,
CloseCallback {
+public class OpAddEntry implements AddCallback, CloseCallback, Runnable {
protected ManagedLedgerImpl ml;
LedgerHandle ledger;
private long entryId;
@@ -212,7 +210,7 @@ public class OpAddEntry extends SafeRunnable implements
AddCallback, CloseCallba
// Called in executor hashed on managed ledger name, once the add
operation is complete
@Override
- public void safeRun() {
+ public void run() {
if (payloadProcessorHandle != null) {
payloadProcessorHandle.release();
}
@@ -328,11 +326,11 @@ public class OpAddEntry extends SafeRunnable implements
AddCallback, CloseCallba
ManagedLedgerImpl finalMl = this.ml;
finalMl.mbean.recordAddEntryError();
- finalMl.getExecutor().execute(SafeRun.safeRun(() -> {
+ finalMl.getExecutor().execute(() -> {
// Force the creation of a new ledger. Doing it in a background
thread to avoid acquiring ML lock
// from a BK callback.
finalMl.ledgerClosed(lh);
- }));
+ });
}
void close() {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 81b14359514..19211553a5f 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -18,7 +18,6 @@
*/
package org.apache.bookkeeper.mledger.impl;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.ArrayList;
@@ -108,10 +107,10 @@ class OpReadEntry implements ReadEntriesCallback {
if (!entries.isEmpty()) {
// There were already some entries that were read before, we can
return them
- cursor.ledger.getExecutor().execute(safeRun(() -> {
+ cursor.ledger.getExecutor().execute(() -> {
callback.readEntriesComplete(entries, ctx);
recycle();
- }));
+ });
} else if (cursor.config.isAutoSkipNonRecoverableData() && exception
instanceof NonRecoverableLedgerException) {
log.warn("[{}][{}] read failed from ledger at position:{} : {}",
cursor.ledger.getName(), cursor.getName(),
readPosition, exception.getMessage());
@@ -161,20 +160,20 @@ class OpReadEntry implements ReadEntriesCallback {
&& maxPosition.compareTo(readPosition) > 0) {
// We still have more entries to read from the next ledger,
schedule a new async operation
- cursor.ledger.getExecutor().execute(safeRun(() -> {
+ cursor.ledger.getExecutor().execute(() -> {
readPosition =
cursor.ledger.startReadOperationOnLedger(nextReadPosition);
cursor.ledger.asyncReadEntries(OpReadEntry.this);
- }));
+ });
} else {
// The reading was already completed, release resources and
trigger callback
try {
cursor.readOperationCompleted();
} finally {
- cursor.ledger.getExecutor().execute(safeRun(() -> {
+ cursor.ledger.getExecutor().execute(() -> {
callback.readEntriesComplete(entries, ctx);
recycle();
- }));
+ });
}
}
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
index b1f23941347..b33dd87543f 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
@@ -19,7 +19,6 @@
package org.apache.bookkeeper.mledger.impl;
import static
org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -66,13 +65,13 @@ public class ShadowManagedLedgerImpl extends
ManagedLedgerImpl {
@Override
synchronized void initialize(ManagedLedgerInitializeLedgerCallback
callback, Object ctx) {
log.info("Opening shadow managed ledger {} with source={}", name,
sourceMLName);
- executor.execute(safeRun(() -> doInitialize(callback, ctx)));
+ executor.execute(() -> doInitialize(callback, ctx));
}
private void doInitialize(ManagedLedgerInitializeLedgerCallback callback,
Object ctx) {
// Fetch the list of existing ledgers in the source managed ledger
store.watchManagedLedgerInfo(sourceMLName, (managedLedgerInfo, stat) ->
- executor.execute(safeRun(() ->
processSourceManagedLedgerInfo(managedLedgerInfo, stat)))
+ executor.execute(() ->
processSourceManagedLedgerInfo(managedLedgerInfo, stat))
);
store.getManagedLedgerInfo(sourceMLName, false, null, new
MetaStore.MetaStoreCallback<>() {
@Override
@@ -106,7 +105,7 @@ public class ShadowManagedLedgerImpl extends
ManagedLedgerImpl {
final long lastLedgerId = ledgers.lastKey();
mbean.startDataLedgerOpenOp();
- AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) ->
executor.execute(safeRun(() -> {
+ AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) ->
executor.execute(() -> {
mbean.endDataLedgerOpenOp();
if (log.isDebugEnabled()) {
log.debug("[{}] Opened source ledger {}", name,
lastLedgerId);
@@ -145,7 +144,7 @@ public class ShadowManagedLedgerImpl extends
ManagedLedgerImpl {
BKException.getMessage(rc));
callback.initializeFailed(createManagedLedgerException(rc));
}
- }));
+ });
//open ledger in readonly mode.
bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType,
config.getPassword(), opencb, null);
@@ -317,7 +316,7 @@ public class ShadowManagedLedgerImpl extends
ManagedLedgerImpl {
mbean.startDataLedgerOpenOp();
//open ledger in readonly mode.
bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType,
config.getPassword(),
- (rc, lh, ctx1) -> executor.execute(safeRun(() -> {
+ (rc, lh, ctx1) -> executor.execute(() -> {
mbean.endDataLedgerOpenOp();
if (log.isDebugEnabled()) {
log.debug("[{}] Opened new source ledger {}",
name, lastLedgerId);
@@ -342,7 +341,7 @@ public class ShadowManagedLedgerImpl extends
ManagedLedgerImpl {
log.error("[{}] Failed to open source ledger {}:
{}", name, lastLedgerId,
BKException.getMessage(rc));
}
- })), null);
+ }), null);
}
//handle old ledgers deleted.
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
index 080c70b5873..d5a3019855c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
@@ -18,7 +18,6 @@
*/
package org.apache.bookkeeper.mledger.impl.cache;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import java.util.concurrent.ConcurrentHashMap;
@@ -116,7 +115,7 @@ public class RangeEntryCacheManagerImpl implements
EntryCacheManager {
// Trigger a single eviction in background. While the eviction is
running we stop inserting entries in the cache
if (currentSize > evictionTriggerThreshold &&
evictionInProgress.compareAndSet(false, true)) {
- mlFactory.getScheduledExecutor().execute(safeRun(() -> {
+ mlFactory.getScheduledExecutor().execute(() -> {
// Trigger a new cache eviction cycle to bring the used memory
below the cacheEvictionWatermark
// percentage limit
long sizeToEvict = currentSize - (long) (maxSize *
cacheEvictionWatermark);
@@ -136,7 +135,7 @@ public class RangeEntryCacheManagerImpl implements
EntryCacheManager {
mlFactoryMBean.recordCacheEviction();
evictionInProgress.set(false);
}
- }));
+ });
}
return currentSize < maxSize;
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/SafeRun.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/SafeRun.java
deleted file mode 100644
index 570cb7ae735..00000000000
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/SafeRun.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.bookkeeper.mledger.util;
-
-import java.util.function.Consumer;
-import org.apache.bookkeeper.util.SafeRunnable;
-
-/**
- * Static builders for {@link SafeRunnable}s.
- */
-public class SafeRun {
- public static SafeRunnable safeRun(Runnable runnable) {
- return new SafeRunnable() {
- @Override
- public void safeRun() {
- runnable.run();
- }
- };
- }
-
- /**
- *
- * @param runnable
- * @param exceptionHandler
- * handler that will be called when there are any exception
- * @return
- */
- public static SafeRunnable safeRun(Runnable runnable, Consumer<Throwable>
exceptionHandler) {
- return new SafeRunnable() {
- @Override
- public void safeRun() {
- try {
- runnable.run();
- } catch (Throwable t) {
- exceptionHandler.accept(t);
- throw t;
- }
- }
- };
- }
-}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
index 3fa0234e13a..7558f07db76 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.bookkeeper.mledger.impl;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
@@ -383,7 +382,7 @@ public class ManagedCursorConcurrencyTest extends
MockedBookKeeperTestCase {
final AtomicInteger iteration = new AtomicInteger(0);
for (int i = 0; i < deleteEntries; i++) {
- executor.submit(safeRun(() -> {
+ executor.submit(() -> {
try {
cursor.asyncDelete(addedEntries.get(iteration.getAndIncrement()), new
DeleteCallback() {
@Override
@@ -403,7 +402,7 @@ public class ManagedCursorConcurrencyTest extends
MockedBookKeeperTestCase {
} finally {
counter.countDown();
}
- }));
+ });
}
counter.await();
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index 136fe77d77a..64d1fcdab6f 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.stats.prometheus;
-import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.EOFException;
import java.io.IOException;
@@ -61,7 +60,7 @@ public class PrometheusMetricsServlet extends HttpServlet {
protected void doGet(HttpServletRequest request, HttpServletResponse
response) {
AsyncContext context = request.startAsync();
context.setTimeout(metricsServletTimeoutMs);
- executor.execute(safeRun(() -> {
+ executor.execute(() -> {
long start = System.currentTimeMillis();
HttpServletResponse res = (HttpServletResponse)
context.getResponse();
try {
@@ -92,7 +91,7 @@ public class PrometheusMetricsServlet extends HttpServlet {
+ "this is likely due to metricsServletTimeoutMs:
{} ms elapsed: {}", time, e + "");
}
}
- }));
+ });
}
protected void generateMetrics(String cluster, ServletOutputStream
outputStream) throws IOException {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a6345f4a56a..bb08734298f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
import static
org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.commons.collections4.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
@@ -56,7 +55,6 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -76,6 +74,7 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.LedgerOffloader;
@@ -324,8 +323,10 @@ public class BrokerService implements Closeable {
this.acceptorGroup = EventLoopUtil.newEventLoopGroup(
pulsar.getConfiguration().getNumAcceptorThreads(), false,
acceptorThreadFactory);
this.workerGroup = eventLoopGroup;
- this.statsUpdater = Executors.newSingleThreadScheduledExecutor(
- new
ExecutorProvider.ExtendedThreadFactory("pulsar-stats-updater"));
+ this.statsUpdater = OrderedScheduler.newSchedulerBuilder()
+ .name("pulsar-stats-updater")
+ .numThreads(1)
+ .build();
this.authorizationService = new AuthorizationService(
pulsar.getConfiguration(), pulsar().getPulsarResources());
this.entryFilterProvider = new
EntryFilterProvider(pulsar.getConfiguration());
@@ -333,22 +334,32 @@ public class BrokerService implements Closeable {
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
- this.inactivityMonitor = Executors.newSingleThreadScheduledExecutor(
- new
ExecutorProvider.ExtendedThreadFactory("pulsar-inactivity-monitor"));
- this.messageExpiryMonitor = Executors.newSingleThreadScheduledExecutor(
- new
ExecutorProvider.ExtendedThreadFactory("pulsar-msg-expiry-monitor"));
- this.compactionMonitor =
- Executors.newSingleThreadScheduledExecutor(
- new
ExecutorProvider.ExtendedThreadFactory("pulsar-compaction-monitor"));
- this.consumedLedgersMonitor =
Executors.newSingleThreadScheduledExecutor(
- new
ExecutorProvider.ExtendedThreadFactory("consumed-Ledgers-monitor"));
+
+ this.inactivityMonitor = OrderedScheduler.newSchedulerBuilder()
+ .name("pulsar-inactivity-monitor")
+ .numThreads(1)
+ .build();
+ this.messageExpiryMonitor = OrderedScheduler.newSchedulerBuilder()
+ .name("pulsar-msg-expiry-monitor")
+ .numThreads(1)
+ .build();
+ this.compactionMonitor = OrderedScheduler.newSchedulerBuilder()
+ .name("pulsar-compaction-monitor")
+ .numThreads(1)
+ .build();
+ this.consumedLedgersMonitor = OrderedScheduler.newSchedulerBuilder()
+ .name("pulsar-consumed-ledgers-monitor")
+ .numThreads(1)
+ .build();
this.topicPublishRateLimiterMonitor =
new
PublishRateLimiterMonitor("pulsar-topic-publish-rate-limiter-monitor");
this.brokerPublishRateLimiterMonitor =
new
PublishRateLimiterMonitor("pulsar-broker-publish-rate-limiter-monitor");
this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
- this.backlogQuotaChecker = Executors.newSingleThreadScheduledExecutor(
- new
ExecutorProvider.ExtendedThreadFactory("pulsar-backlog-quota-checker"));
+ this.backlogQuotaChecker = OrderedScheduler.newSchedulerBuilder()
+ .name("pulsar-backlog-quota-checker")
+ .numThreads(1)
+ .build();
this.authenticationService = new
AuthenticationService(pulsar.getConfiguration());
this.blockedDispatchers =
ConcurrentOpenHashSet.<PersistentDispatcherMultipleConsumers>newBuilder().build();
@@ -367,7 +378,7 @@ public class BrokerService implements Closeable {
log.info("Enabling per-broker unack-message limit {} and
dispatcher-limit {} on blocked-broker",
maxUnackedMessages, maxUnackedMsgsPerDispatcher);
// block misbehaving dispatcher by checking periodically
-
pulsar.getExecutor().scheduleAtFixedRate(safeRun(this::checkUnAckMessageDispatching),
+
pulsar.getExecutor().scheduleAtFixedRate(this::checkUnAckMessageDispatching,
600, 30, TimeUnit.SECONDS);
} else {
this.maxUnackedMessages = 0;
@@ -557,7 +568,7 @@ public class BrokerService implements Closeable {
}
protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int
statsUpdateFrequencyInSecs) {
- statsUpdater.scheduleAtFixedRate(safeRun(this::updateRates),
+ statsUpdater.scheduleAtFixedRate(this::updateRates,
statsUpdateInitialDelayInSecs, statsUpdateFrequencyInSecs,
TimeUnit.SECONDS);
// Ensure the broker starts up with initial stats
@@ -567,11 +578,12 @@ public class BrokerService implements Closeable {
protected void startDeduplicationSnapshotMonitor() {
int interval =
pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
if (interval > 0 &&
pulsar().getConfiguration().isBrokerDeduplicationEnabled()) {
- this.deduplicationSnapshotMonitor =
- Executors.newSingleThreadScheduledExecutor(new
ExecutorProvider.ExtendedThreadFactory(
- "deduplication-snapshot-monitor"));
- deduplicationSnapshotMonitor.scheduleAtFixedRate(safeRun(() ->
forEachTopic(
- Topic::checkDeduplicationSnapshot))
+ this.deduplicationSnapshotMonitor =
OrderedScheduler.newSchedulerBuilder()
+ .name("deduplication-snapshot-monitor")
+ .numThreads(1)
+ .build();
+ deduplicationSnapshotMonitor.scheduleAtFixedRate(() ->
forEachTopic(
+ Topic::checkDeduplicationSnapshot)
, interval, interval, TimeUnit.SECONDS);
}
}
@@ -579,14 +591,14 @@ public class BrokerService implements Closeable {
protected void startInactivityMonitor() {
if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled())
{
int interval =
pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
- inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC()),
interval, interval,
+ inactivityMonitor.scheduleAtFixedRate(() -> checkGC(), interval,
interval,
TimeUnit.SECONDS);
}
// Deduplication info checker
long duplicationCheckerIntervalInSeconds = TimeUnit.MINUTES
.toSeconds(pulsar().getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes())
/ 3;
-
inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkMessageDeduplicationInfo),
+
inactivityMonitor.scheduleAtFixedRate(this::checkMessageDeduplicationInfo,
duplicationCheckerIntervalInSeconds,
duplicationCheckerIntervalInSeconds, TimeUnit.SECONDS);
@@ -595,7 +607,7 @@ public class BrokerService implements Closeable {
long subscriptionExpiryCheckIntervalInSeconds =
TimeUnit.MINUTES.toSeconds(pulsar().getConfiguration()
.getSubscriptionExpiryCheckIntervalInMinutes());
-
inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkInactiveSubscriptions),
+
inactivityMonitor.scheduleAtFixedRate(this::checkInactiveSubscriptions,
subscriptionExpiryCheckIntervalInSeconds,
subscriptionExpiryCheckIntervalInSeconds,
TimeUnit.SECONDS);
}
@@ -603,21 +615,21 @@ public class BrokerService implements Closeable {
// check cluster migration
int interval =
pulsar().getConfiguration().getClusterMigrationCheckDurationSeconds();
if (interval > 0) {
- inactivityMonitor.scheduleAtFixedRate(safeRun(() ->
checkClusterMigration()), interval, interval,
+ inactivityMonitor.scheduleAtFixedRate(() ->
checkClusterMigration(), interval, interval,
TimeUnit.SECONDS);
}
}
protected void startMessageExpiryMonitor() {
int interval =
pulsar().getConfiguration().getMessageExpiryCheckIntervalInMinutes();
-
messageExpiryMonitor.scheduleAtFixedRate(safeRun(this::checkMessageExpiry),
interval, interval,
+ messageExpiryMonitor.scheduleAtFixedRate(this::checkMessageExpiry,
interval, interval,
TimeUnit.MINUTES);
}
protected void startCheckReplicationPolicies() {
int interval =
pulsar.getConfig().getReplicationPolicyCheckDurationSeconds();
if (interval > 0) {
-
messageExpiryMonitor.scheduleAtFixedRate(safeRun(this::checkReplicationPolicies),
interval, interval,
+
messageExpiryMonitor.scheduleAtFixedRate(this::checkReplicationPolicies,
interval, interval,
TimeUnit.SECONDS);
}
}
@@ -625,16 +637,16 @@ public class BrokerService implements Closeable {
protected void startCompactionMonitor() {
int interval =
pulsar().getConfiguration().getBrokerServiceCompactionMonitorIntervalInSeconds();
if (interval > 0) {
- compactionMonitor.scheduleAtFixedRate(safeRun(() ->
checkCompaction()),
- interval, interval,
TimeUnit.SECONDS);
+ compactionMonitor.scheduleAtFixedRate(this::checkCompaction,
+ interval, interval, TimeUnit.SECONDS);
}
}
protected void startConsumedLedgersMonitor() {
int interval =
pulsar().getConfiguration().getRetentionCheckIntervalInSeconds();
if (interval > 0) {
-
consumedLedgersMonitor.scheduleAtFixedRate(safeRun(this::checkConsumedLedgers),
- interval,
interval, TimeUnit.SECONDS);
+
consumedLedgersMonitor.scheduleAtFixedRate(this::checkConsumedLedgers,
+ interval, interval, TimeUnit.SECONDS);
}
}
@@ -642,7 +654,7 @@ public class BrokerService implements Closeable {
if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
final int interval =
pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
log.info("Scheduling a thread to check backlog quota after [{}]
seconds in background", interval);
-
backlogQuotaChecker.scheduleAtFixedRate(safeRun(this::monitorBacklogQuota),
interval, interval,
+ backlogQuotaChecker.scheduleAtFixedRate(this::monitorBacklogQuota,
interval, interval,
TimeUnit.SECONDS);
} else {
log.info("Backlog quota check monitoring is disabled");
@@ -702,12 +714,15 @@ public class BrokerService implements Closeable {
stop();
}
//start monitor.
- scheduler = Executors.newSingleThreadScheduledExecutor(new
ExecutorProvider.ExtendedThreadFactory(name));
+ scheduler = OrderedScheduler.newSchedulerBuilder()
+ .name(name)
+ .numThreads(1)
+ .build();
// schedule task that sums up publish-rate across all cnx on a
topic ,
// and check the rate limit exceeded or not.
- scheduler.scheduleAtFixedRate(safeRun(checkTask), tickTimeMs,
tickTimeMs, TimeUnit.MILLISECONDS);
+ scheduler.scheduleAtFixedRate(checkTask, tickTimeMs, tickTimeMs,
TimeUnit.MILLISECONDS);
// schedule task that refreshes rate-limiting bucket
- scheduler.scheduleAtFixedRate(safeRun(refreshTask), 1, 1,
TimeUnit.SECONDS);
+ scheduler.scheduleAtFixedRate(refreshTask, 1, 1, TimeUnit.SECONDS);
this.tickTimeMs = tickTimeMs;
this.refreshTask = refreshTask;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 677ed25ef21..bfd7c001e4c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -67,7 +67,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.mutable.MutableInt;
@@ -1674,9 +1673,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
final long producerId = send.getProducerId();
final long sequenceId = send.getSequenceId();
final long highestSequenceId = send.getHighestSequenceId();
-
service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(),
SafeRun.safeRun(() -> {
+
service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(),
() -> {
commandSender.sendSendReceiptResponse(producerId,
sequenceId, highestSequenceId, -1, -1);
- }));
+ });
producer.recordMessageDrop(send.getNumMessages());
return;
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 9540f6efe00..4ac755860fc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.service.persistent;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
@@ -236,9 +235,9 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
@Override
public void consumerFlow(Consumer consumer, int
additionalNumberOfMessages) {
- topic.getBrokerService().executor().execute(safeRun(() -> {
+ topic.getBrokerService().executor().execute(() -> {
internalConsumerFlow(consumer, additionalNumberOfMessages);
- }));
+ });
}
private synchronized void internalConsumerFlow(Consumer consumer, int
additionalNumberOfMessages) {
@@ -264,7 +263,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
*
*/
public void readMoreEntriesAsync() {
-
topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
+ topic.getBrokerService().executor().execute(this::readMoreEntries);
}
public synchronized void readMoreEntries() {
@@ -584,14 +583,14 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
// setting sendInProgress here, because sendMessagesToConsumers
will be executed
// in a separate thread, and we want to prevent more reads
acquireSendInProgress();
- dispatchMessagesThread.execute(safeRun(() -> {
+ dispatchMessagesThread.execute(() -> {
if (sendMessagesToConsumers(readType, entries, false)) {
updatePendingBytesToDispatch(-size);
readMoreEntries();
} else {
updatePendingBytesToDispatch(-size);
}
- }));
+ });
} else {
if (sendMessagesToConsumers(readType, entries, true)) {
updatePendingBytesToDispatch(-size);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 03dce58af3a..7cbf7bd2c78 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -38,7 +38,6 @@ import
org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentWaitCallba
import
org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
@@ -149,9 +148,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
@Override
public void readEntriesComplete(final List<Entry> entries, Object obj) {
- topicExecutor.execute(SafeRun.safeRun(() -> {
- internalReadEntriesComplete(entries, obj);
- }));
+ topicExecutor.execute(() -> internalReadEntriesComplete(entries, obj));
}
public synchronized void internalReadEntriesComplete(final List<Entry>
entries, Object obj) {
@@ -229,21 +226,19 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes());
// Schedule a new read batch operation only after the
previous batch has been written to the socket.
- topicExecutor.execute(SafeRun.safeRun(() -> {
+ topicExecutor.execute(() -> {
synchronized
(PersistentDispatcherSingleActiveConsumer.this) {
Consumer newConsumer = getActiveConsumer();
readMoreEntries(newConsumer);
}
- }));
+ });
}
});
}
@Override
public void consumerFlow(Consumer consumer, int
additionalNumberOfMessages) {
- topicExecutor.execute(SafeRun.safeRun(() -> {
- internalConsumerFlow(consumer);
- }));
+ topicExecutor.execute(() -> internalConsumerFlow(consumer));
}
private synchronized void internalConsumerFlow(Consumer consumer) {
@@ -272,9 +267,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
@Override
public void redeliverUnacknowledgedMessages(Consumer consumer, long
consumerEpoch) {
- topicExecutor.execute(SafeRun.safeRun(() -> {
- internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch);
- }));
+ topicExecutor.execute(() ->
internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch));
}
private synchronized void internalRedeliverUnacknowledgedMessages(Consumer
consumer, long consumerEpoch) {
@@ -466,9 +459,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object
ctx) {
- topicExecutor.execute(SafeRun.safeRun(() -> {
- internalReadEntriesFailed(exception, ctx);
- }));
+ topicExecutor.execute(() -> internalReadEntriesFailed(exception, ctx));
}
private synchronized void internalReadEntriesFailed(ManagedLedgerException
exception, Object ctx) {
@@ -516,7 +507,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
topic.getBrokerService().executor().schedule(() -> {
// Jump again into dispatcher dedicated thread
- topicExecutor.execute(SafeRun.safeRun(() -> {
+ topicExecutor.execute(() -> {
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer currentConsumer =
ACTIVE_CONSUMER_UPDATER.get(this);
// we should retry the read if we have an active consumer
and there is no pending read
@@ -533,7 +524,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
currentConsumer, havePendingRead);
}
}
- }));
+ });
}, waitTimeMillis, TimeUnit.MILLISECONDS);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index a0292729e73..1a8c6e180a2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.service.persistent;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
@@ -415,7 +414,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
public void markDeletePositionMoveForward() {
// Execute the notification in different thread to avoid a mutex chain
here
// from the delete operation that was completed
- topic.getBrokerService().getTopicOrderedExecutor().execute(safeRun(()
-> {
+ topic.getBrokerService().getTopicOrderedExecutor().execute(() -> {
synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this)
{
if (recentlyJoinedConsumers != null &&
!recentlyJoinedConsumers.isEmpty()
&& removeConsumersFromRecentJoinedConsumers()) {
@@ -424,7 +423,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
readMoreEntries();
}
}
- }));
+ });
}
private boolean removeConsumersFromRecentJoinedConsumers() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
index 5df1fc2c6db..44e8a423344 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.service.persistent;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.collect.Lists;
import java.util.Set;
import java.util.concurrent.Executor;
@@ -30,7 +29,6 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.Subscription;
import
org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
@@ -105,13 +103,13 @@ public class
PersistentStreamingDispatcherMultipleConsumers extends PersistentDi
// setting sendInProgress here, because sendMessagesToConsumers
will be executed
// in a separate thread, and we want to prevent more reads
acquireSendInProgress();
- dispatchMessagesThread.execute(safeRun(() -> {
+ dispatchMessagesThread.execute(() -> {
if (sendMessagesToConsumers(readType,
Lists.newArrayList(entry), false)) {
readMoreEntries();
} else {
updatePendingBytesToDispatch(-size);
}
- }));
+ });
} else {
if (sendMessagesToConsumers(readType, Lists.newArrayList(entry),
true)) {
readMoreEntriesAsync();
@@ -129,7 +127,7 @@ public class PersistentStreamingDispatcherMultipleConsumers
extends PersistentDi
public void canReadMoreEntries(boolean withBackoff) {
havePendingRead = false;
topic.getBrokerService().executor().schedule(() -> {
- topicExecutor.execute(SafeRun.safeRun(() -> {
+ topicExecutor.execute(() -> {
synchronized
(PersistentStreamingDispatcherMultipleConsumers.this) {
if (!havePendingRead) {
log.info("[{}] Scheduling read operation", name);
@@ -138,7 +136,7 @@ public class PersistentStreamingDispatcherMultipleConsumers
extends PersistentDi
log.info("[{}] Skipping read since we have
pendingRead", name);
}
}
- }));
+ });
}, withBackoff
? readFailureBackoff.next() : 0, TimeUnit.MILLISECONDS);
}
@@ -216,7 +214,7 @@ public class PersistentStreamingDispatcherMultipleConsumers
extends PersistentDi
havePendingReplayRead = false;
// We should not call readMoreEntries() recursively in the
same thread
// as there is a risk of StackOverflowError
-
topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
+
topic.getBrokerService().executor().execute(this::readMoreEntries);
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) ==
TRUE) {
log.debug("[{}] Dispatcher read is blocked due to
unackMessages {} reached to max {}", name,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
index 9000850ed69..efe9de778a3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.service.persistent;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.google.common.collect.Lists;
import java.util.concurrent.Executor;
@@ -29,7 +28,6 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
@@ -68,7 +66,7 @@ public class
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
public void canReadMoreEntries(boolean withBackoff) {
havePendingRead = false;
topic.getBrokerService().executor().schedule(() -> {
- topicExecutor.execute(SafeRun.safeRun(() -> {
+ topicExecutor.execute(() -> {
synchronized
(PersistentStreamingDispatcherSingleActiveConsumer.this) {
Consumer currentConsumer =
ACTIVE_CONSUMER_UPDATER.get(this);
if (currentConsumer != null && !havePendingRead) {
@@ -81,7 +79,7 @@ public class
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
currentConsumer, havePendingRead);
}
}
- }));
+ });
}, withBackoff
? readFailureBackoff.next() : 0, TimeUnit.MILLISECONDS);
}
@@ -115,9 +113,7 @@ public class
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
*/
@Override
public void readEntryComplete(Entry entry, PendingReadEntryRequest ctx) {
- dispatcherExecutor.execute(safeRun(() -> {
- internalReadEntryComplete(entry, ctx);
- }));
+ dispatcherExecutor.execute(() -> internalReadEntryComplete(entry,
ctx));
}
public synchronized void internalReadEntryComplete(Entry entry,
PendingReadEntryRequest ctx) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
index 0f75538ee88..6ffc5ba0f62 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
@@ -35,7 +35,6 @@ import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import
org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
import org.apache.pulsar.client.impl.Backoff;
@@ -157,9 +156,7 @@ public class StreamingEntryReader implements
AsyncCallbacks.ReadEntryCallback, W
@Override
public void readEntryComplete(Entry entry, Object ctx) {
// Don't block caller thread, complete read entry with dispatcher
dedicated thread.
- dispatcherExecutor.execute(SafeRun.safeRun(() -> {
- internalReadEntryComplete(entry, ctx);
- }));
+ dispatcherExecutor.execute(() -> internalReadEntryComplete(entry,
ctx));
}
private void internalReadEntryComplete(Entry entry, Object ctx) {
@@ -198,9 +195,7 @@ public class StreamingEntryReader implements
AsyncCallbacks.ReadEntryCallback, W
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
// Don't block caller thread, complete read entry fail with dispatcher
dedicated thread.
- dispatcherExecutor.execute(SafeRun.safeRun(() -> {
- internalReadEntryFailed(exception, ctx);
- }));
+ dispatcherExecutor.execute(() -> internalReadEntryFailed(exception,
ctx));
}
private void internalReadEntryFailed(ManagedLedgerException exception,
Object ctx) {
@@ -257,13 +252,13 @@ public class StreamingEntryReader implements
AsyncCallbacks.ReadEntryCallback, W
public boolean cancelReadRequests() {
if (STATE_UPDATER.compareAndSet(this, State.Issued, State.Canceling)) {
// Don't block caller thread, complete cancel read with dispatcher
dedicated thread.
- topicExecutor.execute(SafeRun.safeRun(() -> {
+ topicExecutor.execute(() -> {
synchronized (StreamingEntryReader.this) {
if (STATE_UPDATER.compareAndSet(this, State.Canceling,
State.Canceled)) {
internalCancelReadRequests();
}
}
- }));
+ });
return true;
}
return false;
@@ -282,16 +277,16 @@ public class StreamingEntryReader implements
AsyncCallbacks.ReadEntryCallback, W
private void retryReadRequest(PendingReadEntryRequest
pendingReadEntryRequest, long delay) {
topic.getBrokerService().executor().schedule(() -> {
// Jump again into dispatcher dedicated thread
- dispatcherExecutor.execute(SafeRun.safeRun(() -> {
+ dispatcherExecutor.execute(() -> {
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
cursor.getManagedLedger();
managedLedger.asyncReadEntry(pendingReadEntryRequest.position,
this, pendingReadEntryRequest);
- }));
+ });
}, delay, TimeUnit.MILLISECONDS);
}
@Override
public void entriesAvailable() {
-
dispatcherExecutor.execute(SafeRun.safeRun(this::internalEntriesAvailable));
+ dispatcherExecutor.execute(this::internalEntriesAvailable);
}
private synchronized void internalEntriesAvailable() {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
index 11ae62226e7..9e6a9b94c42 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
@@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
@@ -89,7 +88,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
this.sequencer = FutureUtil.Sequencer.create();
store.registerListener(this::handlePathNotification);
store.registerSessionListener(this::handleSessionNotification);
- updateCachedValueFuture =
executor.scheduleWithFixedDelay(SafeRunnable.safeRun(this::getLeaderValue),
+ updateCachedValueFuture =
executor.scheduleWithFixedDelay(this::getLeaderValue,
metadataCacheConfig.getRefreshAfterWriteMillis() / 2,
metadataCacheConfig.getRefreshAfterWriteMillis(),
TimeUnit.MILLISECONDS);
}