This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ed8d1a5  Avoid contention in ManagedCursorImpl generated by locking on 
pendingMarkDeleteOps (#1519)
ed8d1a5 is described below

commit ed8d1a5d39b5bde796954e79feda579dc7f64c2f
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Mon Apr 9 13:55:12 2018 -0700

    Avoid contention in ManagedCursorImpl generated by locking on 
pendingMarkDeleteOps (#1519)
---
 managed-ledger/pom.xml                             |  11 +-
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 151 ++++++++++-----------
 pom.xml                                            |   7 +
 3 files changed, 88 insertions(+), 81 deletions(-)

diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index 3ee7703..05de7fe 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -43,12 +43,12 @@
       <artifactId>pulsar-common</artifactId>
       <version>${project.version}</version>
     </dependency>
-    
+
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
-    
+
     <dependency>
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
@@ -79,6 +79,11 @@
     </dependency>
 
     <dependency>
+      <groupId>org.jctools</groupId>
+      <artifactId>jctools-core</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
@@ -107,7 +112,7 @@
       </plugin>
     </plugins>
   </build>
- 
+
   <profiles>
     <profile>
       <id>protobuf</id>
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 462428e..443a873 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -38,7 +38,6 @@ import com.google.common.collect.TreeRangeSet;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.protobuf.InvalidProtocolBufferException;
 
-import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -82,6 +81,7 @@ import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.commons.lang3.tuple.Pair;
+import org.jctools.queues.MpmcArrayQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -148,7 +148,8 @@ public class ManagedCursorImpl implements ManagedCursor {
         }
     }
 
-    private final ArrayDeque<MarkDeleteEntry> pendingMarkDeleteOps = new 
ArrayDeque<>();
+    private final MpmcArrayQueue<MarkDeleteEntry> pendingMarkDeleteOps = new 
MpmcArrayQueue<>(16);
+
     private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> 
PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER =
         AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, 
"pendingMarkDeletedSubmittedCount");
     @SuppressWarnings("unused")
@@ -758,14 +759,12 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         log.info("[{}] Initiate reset position to {} on cursor {}", 
ledger.getName(), position, name);
 
-        synchronized (pendingMarkDeleteOps) {
-            if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, FALSE, 
TRUE)) {
-                log.error("[{}] reset requested - position [{}], previous 
reset in progress - cursor {}",
-                        ledger.getName(), position, name);
-                resetCursorCallback.resetFailed(
-                        new 
ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in 
progress"),
-                        position);
-            }
+        if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, FALSE, 
TRUE)) {
+            log.error("[{}] reset requested - position [{}], previous reset in 
progress - cursor {}", ledger.getName(),
+                    position, name);
+            resetCursorCallback.resetFailed(
+                    new 
ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in 
progress"),
+                    position);
         }
 
         final AsyncCallbacks.ResetCursorCallback callback = 
resetCursorCallback;
@@ -805,24 +804,20 @@ public class ManagedCursorImpl implements ManagedCursor {
                 } finally {
                     lock.writeLock().unlock();
                 }
-                synchronized (pendingMarkDeleteOps) {
-                    pendingMarkDeleteOps.clear();
-                    if 
(!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, 
FALSE)) {
-                        log.error("[{}] expected reset position [{}], but 
another reset in progress on cursor {}",
-                                ledger.getName(), newPosition, name);
-                    }
+
+                pendingMarkDeleteOps.drain(entry -> 
entry.callback.markDeleteComplete(entry.ctx));
+                if 
(!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, 
FALSE)) {
+                    log.error("[{}] expected reset position [{}], but another 
reset in progress on cursor {}",
+                            ledger.getName(), newPosition, name);
                 }
                 callback.resetComplete(newPosition);
-
             }
 
             @Override
             public void operationFailed(ManagedLedgerException exception) {
-                synchronized (pendingMarkDeleteOps) {
-                    if 
(!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, 
FALSE)) {
-                        log.error("[{}] expected reset position [{}], but 
another reset in progress on cursor {}",
-                                ledger.getName(), newPosition, name);
-                    }
+                if 
(!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, 
FALSE)) {
+                    log.error("[{}] expected reset position [{}], but another 
reset in progress on cursor {}",
+                            ledger.getName(), newPosition, name);
                 }
                 callback.resetFailed(new 
ManagedLedgerException.InvalidCursorPositionException(
                         "unable to persist position for cursor reset " + 
newPosition.toString()), newPosition);
@@ -1275,7 +1270,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         // markDelete-position and clear out deletedMsgSet
         markDeletePosition = PositionImpl.get(newMarkDeletePosition);
         individualDeletedMessages.remove(Range.atMost(markDeletePosition));
-        
+
         if (readPosition.compareTo(newMarkDeletePosition) <= 0) {
             // If the position that is mark-deleted is past the read position, 
it
             // means that the client has skipped some entries. We need to move
@@ -1350,36 +1345,42 @@ public class ManagedCursorImpl implements ManagedCursor 
{
         MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, 
callback, ctx);
 
         // We cannot write to the ledger during the switch, need to wait until 
the new metadata ledger is available
-        synchronized (pendingMarkDeleteOps) {
-            // The state might have changed while we were waiting on the queue 
mutex
-            switch (STATE_UPDATER.get(this)) {
-            case Closed:
-                callback.markDeleteFailed(new ManagedLedgerException("Cursor 
was already closed"), ctx);
-                return;
+        // The state might have changed while we were waiting on the queue 
mutex
+        switch (state) {
+        case Closed:
+            callback.markDeleteFailed(new ManagedLedgerException("Cursor was 
already closed"), ctx);
+            return;
 
-            case NoLedger:
-                // We need to create a new ledger to write into
-                startCreatingNewMetadataLedger();
-                // fall through
-            case SwitchingLedger:
-                pendingMarkDeleteOps.add(mdEntry);
-                break;
+        case NoLedger:
+            // We need to create a new ledger to write into
+            startCreatingNewMetadataLedger();
+            // fall through
+        case SwitchingLedger:
+            pendingMarkDeleteOps.add(mdEntry);
+            if (state != State.SwitchingLedger) {
+                // If state changed since we checked. Trigger a flush since we 
could have missed the current entry
+                flushPendingMarkDeletes();
+            }
+            break;
 
-            case Open:
-                if (PENDING_READ_OPS_UPDATER.get(this) > 0) {
-                    // Wait until no read operation are pending
-                    pendingMarkDeleteOps.add(mdEntry);
-                } else {
-                    // Execute the mark delete immediately
-                    internalMarkDelete(mdEntry);
+        case Open:
+            if (pendingReadOps > 0) {
+                // Wait until no read operation are pending
+                pendingMarkDeleteOps.add(mdEntry);
+                if (pendingReadOps == 0) {
+                    // If the value changed while enqueuing, trigger a flush 
to make sure we don't delay current request
+                    flushPendingMarkDeletes();
                 }
-                break;
-
-            default:
-                log.error("[{}][{}] Invalid cursor state: {}", 
ledger.getName(), name, state);
-                callback.markDeleteFailed(new ManagedLedgerException("Cursor 
was in invalid state: " + state), ctx);
-                break;
+            } else {
+                // Execute the mark delete immediately
+                internalMarkDelete(mdEntry);
             }
+            break;
+
+        default:
+            log.error("[{}][{}] Invalid cursor state: {}", ledger.getName(), 
name, state);
+            callback.markDeleteFailed(new ManagedLedgerException("Cursor was 
in invalid state: " + state), ctx);
+            break;
         }
     }
 
@@ -1881,41 +1882,38 @@ public class ManagedCursorImpl implements ManagedCursor 
{
             @Override
             public void operationComplete() {
                 // We now have a new ledger where we can write
-                synchronized (pendingMarkDeleteOps) {
-                    flushPendingMarkDeletes();
+                flushPendingMarkDeletes();
 
-                    // Resume normal mark-delete operations
-                    STATE_UPDATER.set(ManagedCursorImpl.this, State.Open);
-                }
+                // Resume normal mark-delete operations
+                state = State.Open;
             }
 
             @Override
             public void operationFailed(ManagedLedgerException exception) {
                 log.error("[{}][{}] Metadata ledger creation failed", 
ledger.getName(), name, exception);
 
-                synchronized (pendingMarkDeleteOps) {
-                    while (!pendingMarkDeleteOps.isEmpty()) {
-                        MarkDeleteEntry entry = pendingMarkDeleteOps.poll();
-                        entry.callback.markDeleteFailed(exception, entry.ctx);
-                    }
+                // At this point we don't have a ledger ready
+                state = State.NoLedger;
 
-                    // At this point we don't have a ledger ready
-                    STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);
-                }
+                pendingMarkDeleteOps.drain(entry -> 
entry.callback.markDeleteFailed(exception, entry.ctx));
             }
         });
     }
 
     private void flushPendingMarkDeletes() {
-        if (!pendingMarkDeleteOps.isEmpty()) {
-            internalFlushPendingMarkDeletes();
+        if (pendingMarkDeleteOps.isEmpty()) {
+            return;
         }
-    }
 
-    void internalFlushPendingMarkDeletes() {
-        MarkDeleteEntry lastEntry = pendingMarkDeleteOps.getLast();
-        lastEntry.callbackGroup = Lists.newArrayList(pendingMarkDeleteOps);
-        pendingMarkDeleteOps.clear();
+        List<MarkDeleteEntry> entries = 
Lists.newArrayListWithExpectedSize(pendingMarkDeleteOps.size());
+        pendingMarkDeleteOps.drain(entries::add);
+
+        if (entries.isEmpty()) {
+            return;
+        }
+
+        MarkDeleteEntry lastEntry = entries.get(entries.size() - 1);
+        lastEntry.callbackGroup = entries;
 
         internalMarkDelete(lastEntry);
     }
@@ -2188,15 +2186,12 @@ public class ManagedCursorImpl implements ManagedCursor 
{
 
     void readOperationCompleted() {
         if (PENDING_READ_OPS_UPDATER.decrementAndGet(this) == 0) {
-            synchronized (pendingMarkDeleteOps) {
-                if (STATE_UPDATER.get(this) == State.Open) {
-                    // Flush the pending writes only if the state is open.
-                    flushPendingMarkDeletes();
-                } else if 
(PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.get(this) != 0) {
-                    log.info(
-                            "[{}] read operation completed and cursor was 
closed. need to call any queued cursor close",
-                            name);
-                }
+            if (state == State.Open) {
+                // Flush the pending writes only if the state is open.
+                flushPendingMarkDeletes();
+            } else if (PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.get(this) 
!= 0) {
+                log.info("[{}] read operation completed and cursor was closed. 
need to call any queued cursor close",
+                        name);
             }
         }
     }
diff --git a/pom.xml b/pom.xml
index ef28966..30b3d5e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -138,6 +138,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <protoc-gen-grpc-java.version>1.0.0</protoc-gen-grpc-java.version>
     <gson.version>2.8.2</gson.version>
     <sketches.version>0.8.3</sketches.version>
+    <jctools.version>2.1.1</jctools.version>
 
     <!-- test dependencies -->
     <disruptor.version>3.4.0</disruptor.version>
@@ -431,6 +432,12 @@ flexible messaging model and an intuitive client 
API.</description>
       </dependency>
 
       <dependency>
+        <groupId>org.jctools</groupId>
+        <artifactId>jctools-core</artifactId>
+        <version>${jctools.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>commons-io</groupId>
         <artifactId>commons-io</artifactId>
         <version>2.5</version>

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to