kezhuw commented on code in PR #2152: URL: https://github.com/apache/zookeeper/pull/2152#discussion_r1731040660
########## zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java: ########## @@ -753,29 +752,60 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { isPreZAB1_0 = false; // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). - sock.setSoTimeout(self.tickTime * self.syncLimit); - self.setSyncMode(QuorumPeer.SyncMode.NONE); - zk.startupWithoutServing(); - if (zk instanceof FollowerZooKeeperServer) { + if (zk instanceof FollowerZooKeeperServer && !packetsCommitted.isEmpty()) { long startTime = Time.currentElapsedTime(); FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - for (PacketInFlight p : packetsNotCommitted) { - final Request request = fzk.appendRequest(p.hdr, p.rec, p.digest); - requestsToAck.add(request); + + /* + * @see https://github.com/apache/zookeeper/pull/1848 + * Persist and process the committed txns in "packetsNotLogged" + * according to "packetsCommitted", which have been committed by + * the leader. For these committed proposals, there is no need to + * reply ack. + * + * @see https://issues.apache.org/jira/browse/ZOOKEEPER-4394 + * Keep the outstanding proposals in "packetsNotLogged" to avoid + * NullPointerException when the follower receives COMMIT packet(s) + * right after replying NEWLEADER ack. + */ + while (!packetsCommitted.isEmpty()) { + long zxid = packetsCommitted.removeFirst(); + pif = packetsNotLogged.peekFirst(); + if (pif == null) { + LOG.warn("Committing 0x{}, but got no proposal", Long.toHexString(zxid)); + continue; + } else if (pif.hdr.getZxid() != zxid) { + LOG.warn("Committing 0x{}, but next proposal is 0x{}", + Long.toHexString(zxid), Long.toHexString(pif.hdr.getZxid())); + continue; + } + packetsNotLogged.removeFirst(); + fzk.appendRequest(pif.hdr, pif.rec, pif.digest); + fzk.processTxn(pif.hdr, pif.rec); } - // persist the txns to disk + // @see https://issues.apache.org/jira/browse/ZOOKEEPER-4646 + // Make sure to persist the txns to disk before replying NEWLEADER ack. fzk.getZKDatabase().commit(); - LOG.info("{} txns have been persisted and it took {}ms", - packetsNotCommitted.size(), Time.currentElapsedTime() - startTime); - packetsNotCommitted.clear(); + LOG.info("It took {}ms to persist and commit txns in packetsCommitted. " + + "{} outstanding txns left in packetsNotLogged", + Time.currentElapsedTime() - startTime, packetsNotLogged.size()); } - // set the current epoch after all the tnxs are persisted + // @see https://issues.apache.org/jira/browse/ZOOKEEPER-4643 + // @see https://issues.apache.org/jira/browse/ZOOKEEPER-4785 + // Update current epoch after the committed txns are persisted self.setCurrentEpoch(newEpoch); LOG.info("Set the current epoch to {}", newEpoch); - // send NEWLEADER ack after all the tnxs are persisted + // Now we almost complete the synchronization phase. Start RequestProcessors + // to asynchronously process the pending txns in "packetsNotLogged" and + // "packetsCommitted" later. + sock.setSoTimeout(self.tickTime * self.syncLimit); + self.setSyncMode(QuorumPeer.SyncMode.NONE); + zk.startupWithoutServing(); Review Comment: I think we don't need this `startupWithoutServing` anymore. It was introduced in #1445 to process asynchronous log process. And now we process log synchronously. This should solve problem @changruill raised in https://github.com/apache/zookeeper/pull/2154#issuecomment-2167267712 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@zookeeper.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org