Github user kfirlevari commented on a diff in the pull request:
https://github.com/apache/zookeeper/pull/411#discussion_r148119340
--- Diff:
src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java ---
@@ -246,33 +246,51 @@ public void run() {
}
/*
- * Check if request is pending, if so, update it with
the
- * committed info
+ * Check if request is pending, if so, update it with
the committed info
*/
LinkedList<Request> sessionQueue = pendingRequests
.get(request.sessionId);
if (sessionQueue != null) {
// If session queue != null, then it is also not
empty.
Request topPending = sessionQueue.poll();
if (request.cxid != topPending.cxid) {
- LOG.error(
- "Got cxid 0x"
- +
Long.toHexString(request.cxid)
- + " expected 0x" +
Long.toHexString(
- topPending.cxid)
- + " for client session id "
- + Long.toHexString(request.sessionId));
- throw new IOException("Error: unexpected cxid
for"
- + "client session");
+ // TL;DR - we should not encounter this
scenario often under normal load.
+ // We pass the commit to the next processor
and put the pending back with a warning.
+
+ // Generally, we can get commit requests that
are not at the queue head after
+ // a session moved (see ZOOKEEPER-2684). Let's
denote the previous server of the session
+ // with A, and the server that the session
moved to with B (keep in mind that it is
+ // possible that the session already moved
from B to a new server C, and maybe C=A).
+ // 1. If request.cxid < topPending.cxid : this
means that the session requested this update
+ // from A, then moved to B (i.e., which is
us), and now B receives the commit
+ // for the update after the session already
performed several operations in B
+ // (and therefore its cxid is higher than that
old request).
+ // 2. If request.cxid > topPending.cxid : this
means that the session requested an updated
+ // from B with cxid that is bigger than the
one we know therefore in this case we
+ // are A, and we lost the connection to the
session. Given that we are waiting for a commit
+ // for that update, it means that we already
sent the request to the leader and it will
+ // be committed at some point (in this case
the order of cxid won't follow zxid, since zxid
+ // is an increasing order). It is not safe for
us to delete the session's queue at this
+ // point, since it is possible that the
session has newer requests in it after it moved
+ // back to us. We just leave the queue as it
is, and once the commit arrives (for the old
+ // request), the finalRequestProcessor will
see a closed cnxn handle, and just won't send a
+ // response.
+ // Also note that we don't have a local
session, therefore we treat the request
+ // like any other commit for a remote request,
i.e., we perform the update without sending
+ // a response.
+
+ LOG.warn("Got request " + request +
+ " but we are expecting request " +
topPending);
+ sessionQueue.addFirst(topPending);
+ } else {
+ // We want to send to the next processor our
version of the request,
+ // since it contains the session information
that is needed
+ // for post update processing (e.g., using
request.cnxn we send a response to the client).
+ topPending.setHdr(request.getHdr());
--- End diff --
Updated accordingly
---