merlimat closed pull request #1539: Fixed flaky test in managed ledger close path URL: https://github.com/apache/incubator-pulsar/pull/1539
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index b043dc8f45..277309095e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1368,7 +1368,10 @@ protected void internalAsyncMarkDelete(final PositionImpl newPosition, Map<Strin case Open: if (pendingReadOps > 0) { // Wait until no read operation are pending - pendingMarkDeleteOps.add(mdEntry); + if (!pendingMarkDeleteOps.offer(mdEntry)) { + callback.markDeleteFailed(new ManagedLedgerException("Cursor queue of mark-delete operations full"), ctx); + return; + } if (pendingReadOps == 0) { // If the value changed while enqueuing, trigger a flush to make sure we don't delay current request flushPendingMarkDeletes(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 77b1dff7b2..bf2cd8c587 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -53,9 +53,8 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.common.util.OrderedExecutor; -import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; @@ -487,17 +486,6 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) if (log.isDebugEnabled()) { log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state); } - final State state = STATE_UPDATER.get(this); - if (state == State.Fenced) { - callback.addFailed(new ManagedLedgerFencedException(), ctx); - return; - } else if (state == State.Terminated) { - callback.addFailed(new ManagedLedgerTerminatedException("Managed ledger was already terminated"), ctx); - return; - } else if (state == State.Closed) { - callback.addFailed(new ManagedLedgerException("Managed ledger was already closed"), ctx); - return; - } OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx); pendingAddEntries.add(addOperation); @@ -509,6 +497,18 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) } private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { + final State state = STATE_UPDATER.get(this); + if (state == State.Fenced) { + addOperation.failed(new ManagedLedgerFencedException()); + return; + } else if (state == State.Terminated) { + addOperation.failed(new ManagedLedgerTerminatedException("Managed ledger was already terminated")); + return; + } else if (state == State.Closed) { + addOperation.failed(new ManagedLedgerAlreadyClosedException("Managed ledger was already closed")); + return; + } + if (state == State.ClosingLedger || state == State.CreatingLedger) { // We don't have a ready ledger to write into // We are waiting for a new ledger to be created ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services