This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ed8d1a5 Avoid contention in ManagedCursorImpl generated by locking on pendingMarkDeleteOps (#1519) ed8d1a5 is described below commit ed8d1a5d39b5bde796954e79feda579dc7f64c2f Author: Matteo Merli <mme...@apache.org> AuthorDate: Mon Apr 9 13:55:12 2018 -0700 Avoid contention in ManagedCursorImpl generated by locking on pendingMarkDeleteOps (#1519) --- managed-ledger/pom.xml | 11 +- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 151 ++++++++++----------- pom.xml | 7 + 3 files changed, 88 insertions(+), 81 deletions(-) diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml index 3ee7703..05de7fe 100644 --- a/managed-ledger/pom.xml +++ b/managed-ledger/pom.xml @@ -43,12 +43,12 @@ <artifactId>pulsar-common</artifactId> <version>${project.version}</version> </dependency> - + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> - + <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> @@ -79,6 +79,11 @@ </dependency> <dependency> + <groupId>org.jctools</groupId> + <artifactId>jctools-core</artifactId> + </dependency> + + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> <scope>test</scope> @@ -107,7 +112,7 @@ </plugin> </plugins> </build> - + <profiles> <profile> <id>protobuf</id> diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 462428e..443a873 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -38,7 +38,6 @@ import com.google.common.collect.TreeRangeSet; import com.google.common.util.concurrent.RateLimiter; import com.google.protobuf.InvalidProtocolBufferException; -import java.util.ArrayDeque; import java.util.Collections; import java.util.List; import java.util.Map; @@ -82,6 +81,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.commons.lang3.tuple.Pair; +import org.jctools.queues.MpmcArrayQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,7 +148,8 @@ public class ManagedCursorImpl implements ManagedCursor { } } - private final ArrayDeque<MarkDeleteEntry> pendingMarkDeleteOps = new ArrayDeque<>(); + private final MpmcArrayQueue<MarkDeleteEntry> pendingMarkDeleteOps = new MpmcArrayQueue<>(16); + private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingMarkDeletedSubmittedCount"); @SuppressWarnings("unused") @@ -758,14 +759,12 @@ public class ManagedCursorImpl implements ManagedCursor { log.info("[{}] Initiate reset position to {} on cursor {}", ledger.getName(), position, name); - synchronized (pendingMarkDeleteOps) { - if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, FALSE, TRUE)) { - log.error("[{}] reset requested - position [{}], previous reset in progress - cursor {}", - ledger.getName(), position, name); - resetCursorCallback.resetFailed( - new ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in progress"), - position); - } + if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, FALSE, TRUE)) { + log.error("[{}] reset requested - position [{}], previous reset in progress - cursor {}", ledger.getName(), + position, name); + resetCursorCallback.resetFailed( + new ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in progress"), + position); } final AsyncCallbacks.ResetCursorCallback callback = resetCursorCallback; @@ -805,24 +804,20 @@ public class ManagedCursorImpl implements ManagedCursor { } finally { lock.writeLock().unlock(); } - synchronized (pendingMarkDeleteOps) { - pendingMarkDeleteOps.clear(); - if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) { - log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}", - ledger.getName(), newPosition, name); - } + + pendingMarkDeleteOps.drain(entry -> entry.callback.markDeleteComplete(entry.ctx)); + if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) { + log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}", + ledger.getName(), newPosition, name); } callback.resetComplete(newPosition); - } @Override public void operationFailed(ManagedLedgerException exception) { - synchronized (pendingMarkDeleteOps) { - if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) { - log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}", - ledger.getName(), newPosition, name); - } + if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) { + log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}", + ledger.getName(), newPosition, name); } callback.resetFailed(new ManagedLedgerException.InvalidCursorPositionException( "unable to persist position for cursor reset " + newPosition.toString()), newPosition); @@ -1275,7 +1270,7 @@ public class ManagedCursorImpl implements ManagedCursor { // markDelete-position and clear out deletedMsgSet markDeletePosition = PositionImpl.get(newMarkDeletePosition); individualDeletedMessages.remove(Range.atMost(markDeletePosition)); - + if (readPosition.compareTo(newMarkDeletePosition) <= 0) { // If the position that is mark-deleted is past the read position, it // means that the client has skipped some entries. We need to move @@ -1350,36 +1345,42 @@ public class ManagedCursorImpl implements ManagedCursor { MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx); // We cannot write to the ledger during the switch, need to wait until the new metadata ledger is available - synchronized (pendingMarkDeleteOps) { - // The state might have changed while we were waiting on the queue mutex - switch (STATE_UPDATER.get(this)) { - case Closed: - callback.markDeleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx); - return; + // The state might have changed while we were waiting on the queue mutex + switch (state) { + case Closed: + callback.markDeleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx); + return; - case NoLedger: - // We need to create a new ledger to write into - startCreatingNewMetadataLedger(); - // fall through - case SwitchingLedger: - pendingMarkDeleteOps.add(mdEntry); - break; + case NoLedger: + // We need to create a new ledger to write into + startCreatingNewMetadataLedger(); + // fall through + case SwitchingLedger: + pendingMarkDeleteOps.add(mdEntry); + if (state != State.SwitchingLedger) { + // If state changed since we checked. Trigger a flush since we could have missed the current entry + flushPendingMarkDeletes(); + } + break; - case Open: - if (PENDING_READ_OPS_UPDATER.get(this) > 0) { - // Wait until no read operation are pending - pendingMarkDeleteOps.add(mdEntry); - } else { - // Execute the mark delete immediately - internalMarkDelete(mdEntry); + case Open: + if (pendingReadOps > 0) { + // Wait until no read operation are pending + pendingMarkDeleteOps.add(mdEntry); + if (pendingReadOps == 0) { + // If the value changed while enqueuing, trigger a flush to make sure we don't delay current request + flushPendingMarkDeletes(); } - break; - - default: - log.error("[{}][{}] Invalid cursor state: {}", ledger.getName(), name, state); - callback.markDeleteFailed(new ManagedLedgerException("Cursor was in invalid state: " + state), ctx); - break; + } else { + // Execute the mark delete immediately + internalMarkDelete(mdEntry); } + break; + + default: + log.error("[{}][{}] Invalid cursor state: {}", ledger.getName(), name, state); + callback.markDeleteFailed(new ManagedLedgerException("Cursor was in invalid state: " + state), ctx); + break; } } @@ -1881,41 +1882,38 @@ public class ManagedCursorImpl implements ManagedCursor { @Override public void operationComplete() { // We now have a new ledger where we can write - synchronized (pendingMarkDeleteOps) { - flushPendingMarkDeletes(); + flushPendingMarkDeletes(); - // Resume normal mark-delete operations - STATE_UPDATER.set(ManagedCursorImpl.this, State.Open); - } + // Resume normal mark-delete operations + state = State.Open; } @Override public void operationFailed(ManagedLedgerException exception) { log.error("[{}][{}] Metadata ledger creation failed", ledger.getName(), name, exception); - synchronized (pendingMarkDeleteOps) { - while (!pendingMarkDeleteOps.isEmpty()) { - MarkDeleteEntry entry = pendingMarkDeleteOps.poll(); - entry.callback.markDeleteFailed(exception, entry.ctx); - } + // At this point we don't have a ledger ready + state = State.NoLedger; - // At this point we don't have a ledger ready - STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger); - } + pendingMarkDeleteOps.drain(entry -> entry.callback.markDeleteFailed(exception, entry.ctx)); } }); } private void flushPendingMarkDeletes() { - if (!pendingMarkDeleteOps.isEmpty()) { - internalFlushPendingMarkDeletes(); + if (pendingMarkDeleteOps.isEmpty()) { + return; } - } - void internalFlushPendingMarkDeletes() { - MarkDeleteEntry lastEntry = pendingMarkDeleteOps.getLast(); - lastEntry.callbackGroup = Lists.newArrayList(pendingMarkDeleteOps); - pendingMarkDeleteOps.clear(); + List<MarkDeleteEntry> entries = Lists.newArrayListWithExpectedSize(pendingMarkDeleteOps.size()); + pendingMarkDeleteOps.drain(entries::add); + + if (entries.isEmpty()) { + return; + } + + MarkDeleteEntry lastEntry = entries.get(entries.size() - 1); + lastEntry.callbackGroup = entries; internalMarkDelete(lastEntry); } @@ -2188,15 +2186,12 @@ public class ManagedCursorImpl implements ManagedCursor { void readOperationCompleted() { if (PENDING_READ_OPS_UPDATER.decrementAndGet(this) == 0) { - synchronized (pendingMarkDeleteOps) { - if (STATE_UPDATER.get(this) == State.Open) { - // Flush the pending writes only if the state is open. - flushPendingMarkDeletes(); - } else if (PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.get(this) != 0) { - log.info( - "[{}] read operation completed and cursor was closed. need to call any queued cursor close", - name); - } + if (state == State.Open) { + // Flush the pending writes only if the state is open. + flushPendingMarkDeletes(); + } else if (PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.get(this) != 0) { + log.info("[{}] read operation completed and cursor was closed. need to call any queued cursor close", + name); } } } diff --git a/pom.xml b/pom.xml index ef28966..30b3d5e 100644 --- a/pom.xml +++ b/pom.xml @@ -138,6 +138,7 @@ flexible messaging model and an intuitive client API.</description> <protoc-gen-grpc-java.version>1.0.0</protoc-gen-grpc-java.version> <gson.version>2.8.2</gson.version> <sketches.version>0.8.3</sketches.version> + <jctools.version>2.1.1</jctools.version> <!-- test dependencies --> <disruptor.version>3.4.0</disruptor.version> @@ -431,6 +432,12 @@ flexible messaging model and an intuitive client API.</description> </dependency> <dependency> + <groupId>org.jctools</groupId> + <artifactId>jctools-core</artifactId> + <version>${jctools.version}</version> + </dependency> + + <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.5</version> -- To stop receiving notification emails like this one, please contact mme...@apache.org.