merlimat closed pull request #1539: Fixed flaky test in managed ledger close 
path
URL: https://github.com/apache/incubator-pulsar/pull/1539
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index b043dc8f45..277309095e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1368,7 +1368,10 @@ protected void internalAsyncMarkDelete(final 
PositionImpl newPosition, Map<Strin
         case Open:
             if (pendingReadOps > 0) {
                 // Wait until no read operation are pending
-                pendingMarkDeleteOps.add(mdEntry);
+                if (!pendingMarkDeleteOps.offer(mdEntry)) {
+                    callback.markDeleteFailed(new 
ManagedLedgerException("Cursor queue of mark-delete operations full"), ctx);
+                    return;
+                }
                 if (pendingReadOps == 0) {
                     // If the value changed while enqueuing, trigger a flush 
to make sure we don't delay current request
                     flushPendingMarkDeletes();
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 77b1dff7b2..bf2cd8c587 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -53,9 +53,8 @@
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -487,17 +486,6 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback 
callback, Object ctx)
         if (log.isDebugEnabled()) {
             log.debug("[{}] asyncAddEntry size={} state={}", name, 
buffer.readableBytes(), state);
         }
-        final State state = STATE_UPDATER.get(this);
-        if (state == State.Fenced) {
-            callback.addFailed(new ManagedLedgerFencedException(), ctx);
-            return;
-        } else if (state == State.Terminated) {
-            callback.addFailed(new ManagedLedgerTerminatedException("Managed 
ledger was already terminated"), ctx);
-            return;
-        } else if (state == State.Closed) {
-            callback.addFailed(new ManagedLedgerException("Managed ledger was 
already closed"), ctx);
-            return;
-        }
 
         OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, 
ctx);
         pendingAddEntries.add(addOperation);
@@ -509,6 +497,18 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback 
callback, Object ctx)
     }
 
     private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
+        final State state = STATE_UPDATER.get(this);
+        if (state == State.Fenced) {
+            addOperation.failed(new ManagedLedgerFencedException());
+            return;
+        } else if (state == State.Terminated) {
+            addOperation.failed(new ManagedLedgerTerminatedException("Managed 
ledger was already terminated"));
+            return;
+        } else if (state == State.Closed) {
+            addOperation.failed(new 
ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
+            return;
+        }
+
         if (state == State.ClosingLedger || state == State.CreatingLedger) {
             // We don't have a ready ledger to write into
             // We are waiting for a new ledger to be created


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to