This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new aff6c04 Fixed race condition intruduced managed ledger addEntry introduced in #1521 (#1548) aff6c04 is described below commit aff6c04b156cffaf5b96e05afda5e096fbe34729 Author: Matteo Merli <mme...@apache.org> AuthorDate: Thu Apr 12 02:39:31 2018 -0700 Fixed race condition intruduced managed ledger addEntry introduced in #1521 (#1548) --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 15c380c..8649d1d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -95,7 +95,6 @@ import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; -import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -205,7 +204,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { * Queue of pending entries to be added to the managed ledger. Typically entries are queued when a new ledger is * created asynchronously and hence there is no ready ledger to write into. */ - final GrowableArrayBlockingQueue<OpAddEntry> pendingAddEntries = new GrowableArrayBlockingQueue<>(); + final ConcurrentLinkedQueue<OpAddEntry> pendingAddEntries = new ConcurrentLinkedQueue<>(); // ////////////////////////////////////////////////////////////////////// @@ -491,10 +490,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx); - pendingAddEntries.add(addOperation); // Jump to specific thread to avoid contention from writers writing from different threads executor.executeOrdered(name, safeRun(() -> { + pendingAddEntries.add(addOperation); + internalAsyncAddEntry(addOperation); })); } @@ -1200,7 +1200,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } // Process all the pending addEntry requests - for (OpAddEntry op : pendingAddEntries.toList()) { + for (OpAddEntry op : pendingAddEntries) { op.setLedger(currentLedger); ++currentLedgerEntries; currentLedgerSize += op.data.readableBytes(); -- To stop receiving notification emails like this one, please contact si...@apache.org.