This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ad90ac6 Removed contention between producers on ManagedLedger addEntry (#1521) ad90ac6 is described below commit ad90ac6463681f7358f3f331a8440dbcfcd34258 Author: Matteo Merli <mme...@apache.org> AuthorDate: Mon Apr 9 12:43:01 2018 -0700 Removed contention between producers on ManagedLedger addEntry (#1521) --- .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 +++++++++++---- .../org/apache/bookkeeper/mledger/impl/OpReadEntry.java | 2 +- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index c96b35e..f6c7e3f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -479,7 +479,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } @Override - public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) { + public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state); } @@ -498,6 +498,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx); pendingAddEntries.add(addOperation); + // Jump to specific thread to avoid contention from writers writing from different threads + executor.executeOrdered(name, safeRun(() -> { + internalAsyncAddEntry(addOperation); + })); + } + + private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { if (state == State.ClosingLedger || state == State.CreatingLedger) { // We don't have a ready ledger to write into // We are waiting for a new ledger to be created @@ -509,7 +516,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (now < lastLedgerCreationFailureTimestamp + WaitTimeAfterLedgerCreationFailureMs) { // Deny the write request, since we haven't waited enough time since last attempt to create a new ledger pendingAddEntries.remove(addOperation); - callback.addFailed(new ManagedLedgerException("Waiting for new ledger creation to complete"), ctx); + addOperation.failed(new ManagedLedgerException("Waiting for new ledger creation to complete")); return; } @@ -521,7 +528,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), - config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, ctx, + config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, null, Collections.emptyMap()); } } else { @@ -531,7 +538,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { addOperation.setLedger(currentLedger); ++currentLedgerEntries; - currentLedgerSize += buffer.readableBytes(); + currentLedgerSize += addOperation.data.readableBytes(); if (log.isDebugEnabled()) { log.debug("[{}] Write into current ledger lh={} entries={}", name, currentLedger.getId(), diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index 6b3a03a..0dfa338 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -139,7 +139,7 @@ class OpReadEntry implements ReadEntriesCallback { // The reading was already completed, release resources and trigger callback cursor.readOperationCompleted(); - cursor.ledger.getExecutor().execute(safeRun(() -> { + cursor.ledger.getExecutor().executeOrdered(cursor.ledger.getName(), safeRun(() -> { callback.readEntriesComplete(entries, ctx); recycle(); })); -- To stop receiving notification emails like this one, please contact si...@apache.org.