[ https://issues.apache.org/jira/browse/ZOOKEEPER-2807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274498#comment-16274498 ]
ASF GitHub Bot commented on ZOOKEEPER-2807: ------------------------------------------- Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/300#discussion_r154367742 --- Diff: src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java --- @@ -240,84 +240,14 @@ public void run() { } // Process committed head - if ((request = committedRequests.poll()) == null) { - throw new IOException("Error: committed head is null"); - } - - /* - * Check if request is pending, if so, update it with the committed info - */ - LinkedList<Request> sessionQueue = pendingRequests - .get(request.sessionId); - if (sessionQueue != null) { - // If session queue != null, then it is also not empty. - Request topPending = sessionQueue.poll(); - if (request.cxid != topPending.cxid) { - /* - * TL;DR - we should not encounter this scenario often under normal load. - * We pass the commit to the next processor and put the pending back with a warning. - * - * Generally, we can get commit requests that are not at the queue head after - * a session moved (see ZOOKEEPER-2684). Let's denote the previous server of the session - * with A, and the server that the session moved to with B (keep in mind that it is - * possible that the session already moved from B to a new server C, and maybe C=A). - * 1. If request.cxid < topPending.cxid : this means that the session requested this update - * from A, then moved to B (i.e., which is us), and now B receives the commit - * for the update after the session already performed several operations in B - * (and therefore its cxid is higher than that old request). - * 2. If request.cxid > topPending.cxid : this means that the session requested an updated - * from B with cxid that is bigger than the one we know therefore in this case we - * are A, and we lost the connection to the session. Given that we are waiting for a commit - * for that update, it means that we already sent the request to the leader and it will - * be committed at some point (in this case the order of cxid won't follow zxid, since zxid - * is an increasing order). It is not safe for us to delete the session's queue at this - * point, since it is possible that the session has newer requests in it after it moved - * back to us. We just leave the queue as it is, and once the commit arrives (for the old - * request), the finalRequestProcessor will see a closed cnxn handle, and just won't send a - * response. - * Also note that we don't have a local session, therefore we treat the request - * like any other commit for a remote request, i.e., we perform the update without sending - * a response. - */ - LOG.warn("Got request " + request + - " but we are expecting request " + topPending); - sessionQueue.addFirst(topPending); - } else { - /* - * Generally, we want to send to the next processor our version of the request, - * since it contains the session information that is needed for post update processing. - * In more details, when a request is in the local queue, there is (or could be) a client - * attached to this server waiting for a response, and there is other bookkeeping of - * requests that are outstanding and have originated from this server - * (e.g., for setting the max outstanding requests) - we need to update this info when an - * outstanding request completes. Note that in the other case (above), the operation - * originated from a different server and there is no local bookkeeping or a local client - * session that needs to be notified. - */ - topPending.setHdr(request.getHdr()); - topPending.setTxn(request.getTxn()); - topPending.zxid = request.zxid; - request = topPending; - } - } - - sendToNextProcessor(request); - - waitForEmptyPool(); - - /* - * Process following reads if any, remove session queue if - * empty. - */ - if (sessionQueue != null) { - while (!stopped && !sessionQueue.isEmpty() - && !needCommit(sessionQueue.peek())) { - sendToNextProcessor(sessionQueue.poll()); - } - // Remove empty queues - if (sessionQueue.isEmpty()) { - pendingRequests.remove(request.sessionId); + // We only need to perform synchronization if we are on the last request in the queue + if (committedRequests.size() == 1) { + synchronized (committedRequests) { --- End diff -- Unfortunately I don't get why do you need to synchronise here. Would you please elaborate a little bit? > Flaky test: > org.apache.zookeeper.test.WatchEventWhenAutoResetTest.testNodeDataChanged > ------------------------------------------------------------------------------------- > > Key: ZOOKEEPER-2807 > URL: https://issues.apache.org/jira/browse/ZOOKEEPER-2807 > Project: ZooKeeper > Issue Type: Bug > Reporter: Abraham Fine > Assignee: Abraham Fine > -- This message was sent by Atlassian JIRA (v6.4.14#64029)