This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new b22769b Fixed flaky test in managed ledger close path (#1539) b22769b is described below commit b22769bb57defe435cc4faedcbebe1d3260cdee4 Author: Matteo Merli <mme...@apache.org> AuthorDate: Tue Apr 10 12:33:22 2018 -0700 Fixed flaky test in managed ledger close path (#1539) * Fixed flaky test in managed ledger close path: ManagedLedgerBkTest.managedLedgerClosed * Fixed cursorPersistenceAsyncMarkDeleteSameThread test --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 5 ++++- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 26 +++++++++++----------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index b043dc8..2773090 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1368,7 +1368,10 @@ public class ManagedCursorImpl implements ManagedCursor { case Open: if (pendingReadOps > 0) { // Wait until no read operation are pending - pendingMarkDeleteOps.add(mdEntry); + if (!pendingMarkDeleteOps.offer(mdEntry)) { + callback.markDeleteFailed(new ManagedLedgerException("Cursor queue of mark-delete operations full"), ctx); + return; + } if (pendingReadOps == 0) { // If the value changed while enqueuing, trigger a flush to make sure we don't delay current request flushPendingMarkDeletes(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 77b1dff..bf2cd8c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -53,9 +53,8 @@ import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.common.util.OrderedExecutor; -import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; @@ -487,17 +486,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (log.isDebugEnabled()) { log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state); } - final State state = STATE_UPDATER.get(this); - if (state == State.Fenced) { - callback.addFailed(new ManagedLedgerFencedException(), ctx); - return; - } else if (state == State.Terminated) { - callback.addFailed(new ManagedLedgerTerminatedException("Managed ledger was already terminated"), ctx); - return; - } else if (state == State.Closed) { - callback.addFailed(new ManagedLedgerException("Managed ledger was already closed"), ctx); - return; - } OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx); pendingAddEntries.add(addOperation); @@ -509,6 +497,18 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { + final State state = STATE_UPDATER.get(this); + if (state == State.Fenced) { + addOperation.failed(new ManagedLedgerFencedException()); + return; + } else if (state == State.Terminated) { + addOperation.failed(new ManagedLedgerTerminatedException("Managed ledger was already terminated")); + return; + } else if (state == State.Closed) { + addOperation.failed(new ManagedLedgerAlreadyClosedException("Managed ledger was already closed")); + return; + } + if (state == State.ClosingLedger || state == State.CreatingLedger) { // We don't have a ready ledger to write into // We are waiting for a new ledger to be created -- To stop receiving notification emails like this one, please contact mme...@apache.org.