Github user shralex commented on a diff in the pull request:
https://github.com/apache/zookeeper/pull/411#discussion_r147854385
--- Diff:
src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
---
@@ -376,4 +377,123 @@ public void noStarvationOfReadRequestsTest() throws
Exception {
!processedRequests.contains(r));
}
}
+
+ /**
+ * In the following test, we verify that we can handle the case that
we got a commit
+ * of a request we never seen since the session that we just
established. This can happen
+ * when a session is just established and there is request waiting to
be committed in the
+ * in the session queue but it sees a commit for a request that
belongs to the previous connection.
+ */
+ @Test(timeout = 1000)
+ public void noCrashOnCommittedRequestsOfUnseenRequestTest() throws
Exception {
+ final String path =
"/noCrash/OnCommittedRequests/OfUnseenRequestTest";
+ final int numberofReads = 10;
+ final int sessionid = 0x123456;
+ final int firstCXid = 0x100;
+ int readReqId = firstCXid;
+ processor.stoppedMainLoop = true;
+ HashSet<Request> localRequests = new HashSet<Request>();
+ // queue the blocking write request to queuedRequests
+ Request firstCommittedReq = newRequest(
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create, sessionid, readReqId++);
+ processor.queuedRequests.add(firstCommittedReq);
+ localRequests.add(firstCommittedReq);
+
+ // queue read requests to queuedRequests
+ for (; readReqId <= numberofReads+firstCXid; ++readReqId) {
+ Request readReq = newRequest(new GetDataRequest(path, false),
+ OpCode.getData, sessionid, readReqId);
+ processor.queuedRequests.add(readReq);
+ localRequests.add(readReq);
+ }
+
+ //run once
+
Assert.assertTrue(processor.queuedRequests.containsAll(localRequests));
+ processor.initThreads(numberofReads* 2);
+ processor.run();
+
+ //We verify that the processor is waiting for the commit
+ Assert.assertTrue(processedRequests.isEmpty());
+
+ // We add a commit that belongs to the same session but with
smaller cxid,
+ // i.e., commit of an update from previous connection of this
session.
+ Request preSessionCommittedReq = newRequest(
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create, sessionid, firstCXid - 2);
+ processor.committedRequests.add(preSessionCommittedReq);
+ processor.committedRequests.add(firstCommittedReq);
+ processor.run();
+
+ //We verify that the commit processor processed the old commit
prior to the newer messages
+ Assert.assertTrue(processedRequests.peek() ==
preSessionCommittedReq);
+
+ processor.run();
+
+ //We verify that the commit processor handle all messages.
+ Assert.assertTrue(processedRequests.containsAll(localRequests));
+ }
+
+ /**
+ * In the following test, we verify if we handle the case in which we
get a commit
+ * for a request that has higher Cxid than the one we are waiting.
This can happen
+ * when a session connection is lost but there is a request waiting to
be committed in the
+ * session queue. However, since the session has moved, new requests
can get to
+ * the leader out of order. Hence, the commits can also arrive "out of
order" w.r.t. cxid.
+ * We should commit the requests according to the order we receive
from the leader, i.e., wait for the relevant commit.
--- End diff --
1) I think its worth while explaining the two scenarios in the
CommitProcessor.java comments. I mean how its possible to get higher cxid and
lower cxid from leader compared to what you expect. Including the part where
requests may reach the leader out of cxid order if session is moved.
2) I wonder if in this case (receiving higher cxid from leader) we should
also empty the local pending requests ? What's the value of keeping the
requests in the local queue ?
---