This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aff6c04  Fixed race condition intruduced managed ledger addEntry 
introduced in #1521 (#1548)
aff6c04 is described below

commit aff6c04b156cffaf5b96e05afda5e096fbe34729
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Thu Apr 12 02:39:31 2018 -0700

    Fixed race condition intruduced managed ledger addEntry introduced in #1521 
(#1548)
---
 .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java     | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 15c380c..8649d1d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -95,7 +95,6 @@ import org.apache.pulsar.common.api.Commands;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
-import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -205,7 +204,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
      * Queue of pending entries to be added to the managed ledger. Typically 
entries are queued when a new ledger is
      * created asynchronously and hence there is no ready ledger to write into.
      */
-    final GrowableArrayBlockingQueue<OpAddEntry> pendingAddEntries = new 
GrowableArrayBlockingQueue<>();
+    final ConcurrentLinkedQueue<OpAddEntry> pendingAddEntries = new 
ConcurrentLinkedQueue<>();
 
     // //////////////////////////////////////////////////////////////////////
 
@@ -491,10 +490,11 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
 
         OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, 
ctx);
-        pendingAddEntries.add(addOperation);
 
         // Jump to specific thread to avoid contention from writers writing 
from different threads
         executor.executeOrdered(name, safeRun(() -> {
+            pendingAddEntries.add(addOperation);
+
             internalAsyncAddEntry(addOperation);
         }));
     }
@@ -1200,7 +1200,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
 
         // Process all the pending addEntry requests
-        for (OpAddEntry op : pendingAddEntries.toList()) {
+        for (OpAddEntry op : pendingAddEntries) {
             op.setLedger(currentLedger);
             ++currentLedgerEntries;
             currentLedgerSize += op.data.readableBytes();

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to