Github user afine commented on a diff in the pull request:
https://github.com/apache/zookeeper/pull/300#discussion_r155102382
--- Diff:
src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java ---
@@ -240,84 +240,14 @@ public void run() {
}
// Process committed head
- if ((request = committedRequests.poll()) == null) {
- throw new IOException("Error: committed head is
null");
- }
-
- /*
- * Check if request is pending, if so, update it with
the committed info
- */
- LinkedList<Request> sessionQueue = pendingRequests
- .get(request.sessionId);
- if (sessionQueue != null) {
- // If session queue != null, then it is also not
empty.
- Request topPending = sessionQueue.poll();
- if (request.cxid != topPending.cxid) {
- /*
- * TL;DR - we should not encounter this
scenario often under normal load.
- * We pass the commit to the next processor
and put the pending back with a warning.
- *
- * Generally, we can get commit requests that
are not at the queue head after
- * a session moved (see ZOOKEEPER-2684). Let's
denote the previous server of the session
- * with A, and the server that the session
moved to with B (keep in mind that it is
- * possible that the session already moved
from B to a new server C, and maybe C=A).
- * 1. If request.cxid < topPending.cxid : this
means that the session requested this update
- * from A, then moved to B (i.e., which is
us), and now B receives the commit
- * for the update after the session already
performed several operations in B
- * (and therefore its cxid is higher than that
old request).
- * 2. If request.cxid > topPending.cxid : this
means that the session requested an updated
- * from B with cxid that is bigger than the
one we know therefore in this case we
- * are A, and we lost the connection to the
session. Given that we are waiting for a commit
- * for that update, it means that we already
sent the request to the leader and it will
- * be committed at some point (in this case
the order of cxid won't follow zxid, since zxid
- * is an increasing order). It is not safe for
us to delete the session's queue at this
- * point, since it is possible that the
session has newer requests in it after it moved
- * back to us. We just leave the queue as it
is, and once the commit arrives (for the old
- * request), the finalRequestProcessor will
see a closed cnxn handle, and just won't send a
- * response.
- * Also note that we don't have a local
session, therefore we treat the request
- * like any other commit for a remote request,
i.e., we perform the update without sending
- * a response.
- */
- LOG.warn("Got request " + request +
- " but we are expecting request " +
topPending);
- sessionQueue.addFirst(topPending);
- } else {
- /*
- * Generally, we want to send to the next
processor our version of the request,
- * since it contains the session information
that is needed for post update processing.
- * In more details, when a request is in the
local queue, there is (or could be) a client
- * attached to this server waiting for a
response, and there is other bookkeeping of
- * requests that are outstanding and have
originated from this server
- * (e.g., for setting the max outstanding
requests) - we need to update this info when an
- * outstanding request completes. Note that in
the other case (above), the operation
- * originated from a different server and
there is no local bookkeeping or a local client
- * session that needs to be notified.
- */
- topPending.setHdr(request.getHdr());
- topPending.setTxn(request.getTxn());
- topPending.zxid = request.zxid;
- request = topPending;
- }
- }
-
- sendToNextProcessor(request);
-
- waitForEmptyPool();
-
- /*
- * Process following reads if any, remove session
queue if
- * empty.
- */
- if (sessionQueue != null) {
- while (!stopped && !sessionQueue.isEmpty()
- && !needCommit(sessionQueue.peek())) {
- sendToNextProcessor(sessionQueue.poll());
- }
- // Remove empty queues
- if (sessionQueue.isEmpty()) {
- pendingRequests.remove(request.sessionId);
+ // We only need to perform synchronization if we are
on the last request in the queue
+ if (committedRequests.size() == 1) {
+ synchronized (committedRequests) {
--- End diff --
I believe I was able to mitigate any potential performance impact by only
entering a synchronized block when there is only a single committed request in
the queue.
---