This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ad90ac6  Removed contention between producers on ManagedLedger 
addEntry (#1521)
ad90ac6 is described below

commit ad90ac6463681f7358f3f331a8440dbcfcd34258
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Mon Apr 9 12:43:01 2018 -0700

    Removed contention between producers on ManagedLedger addEntry (#1521)
---
 .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 +++++++++++----
 .../org/apache/bookkeeper/mledger/impl/OpReadEntry.java   |  2 +-
 2 files changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c96b35e..f6c7e3f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -479,7 +479,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     }
 
     @Override
-    public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback 
callback, Object ctx) {
+    public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, 
Object ctx) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] asyncAddEntry size={} state={}", name, 
buffer.readableBytes(), state);
         }
@@ -498,6 +498,13 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, 
ctx);
         pendingAddEntries.add(addOperation);
 
+        // Jump to specific thread to avoid contention from writers writing 
from different threads
+        executor.executeOrdered(name, safeRun(() -> {
+            internalAsyncAddEntry(addOperation);
+        }));
+    }
+
+    private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
         if (state == State.ClosingLedger || state == State.CreatingLedger) {
             // We don't have a ready ledger to write into
             // We are waiting for a new ledger to be created
@@ -509,7 +516,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             if (now < lastLedgerCreationFailureTimestamp + 
WaitTimeAfterLedgerCreationFailureMs) {
                 // Deny the write request, since we haven't waited enough time 
since last attempt to create a new ledger
                 pendingAddEntries.remove(addOperation);
-                callback.addFailed(new ManagedLedgerException("Waiting for new 
ledger creation to complete"), ctx);
+                addOperation.failed(new ManagedLedgerException("Waiting for 
new ledger creation to complete"));
                 return;
             }
 
@@ -521,7 +528,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
                 mbean.startDataLedgerCreateOp();
                 bookKeeper.asyncCreateLedger(config.getEnsembleSize(), 
config.getWriteQuorumSize(),
-                        config.getAckQuorumSize(), config.getDigestType(), 
config.getPassword(), this, ctx,
+                        config.getAckQuorumSize(), config.getDigestType(), 
config.getPassword(), this, null,
                         Collections.emptyMap());
             }
         } else {
@@ -531,7 +538,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             addOperation.setLedger(currentLedger);
 
             ++currentLedgerEntries;
-            currentLedgerSize += buffer.readableBytes();
+            currentLedgerSize += addOperation.data.readableBytes();
 
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Write into current ledger lh={} entries={}", 
name, currentLedger.getId(),
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 6b3a03a..0dfa338 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -139,7 +139,7 @@ class OpReadEntry implements ReadEntriesCallback {
             // The reading was already completed, release resources and 
trigger callback
             cursor.readOperationCompleted();
 
-            cursor.ledger.getExecutor().execute(safeRun(() -> {
+            
cursor.ledger.getExecutor().executeOrdered(cursor.ledger.getName(), safeRun(() 
-> {
                 callback.readEntriesComplete(entries, ctx);
                 recycle();
             }));

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to