kezhuw commented on code in PR #2152:
URL: https://github.com/apache/zookeeper/pull/2152#discussion_r1731040660


##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java:
##########
@@ -753,29 +752,60 @@ protected void syncWithLeader(long newLeaderZxid) throws 
Exception {
                     isPreZAB1_0 = false;
 
                     // ZOOKEEPER-3911: make sure sync the uncommitted logs 
before commit them (ACK NEWLEADER).
-                    sock.setSoTimeout(self.tickTime * self.syncLimit);
-                    self.setSyncMode(QuorumPeer.SyncMode.NONE);
-                    zk.startupWithoutServing();
-                    if (zk instanceof FollowerZooKeeperServer) {
+                    if (zk instanceof FollowerZooKeeperServer && 
!packetsCommitted.isEmpty()) {
                         long startTime = Time.currentElapsedTime();
                         FollowerZooKeeperServer fzk = 
(FollowerZooKeeperServer) zk;
-                        for (PacketInFlight p : packetsNotCommitted) {
-                            final Request request = fzk.appendRequest(p.hdr, 
p.rec, p.digest);
-                            requestsToAck.add(request);
+
+                        /*
+                         * @see https://github.com/apache/zookeeper/pull/1848
+                         * Persist and process the committed txns in 
"packetsNotLogged"
+                         * according to "packetsCommitted", which have been 
committed by
+                         * the leader. For these committed proposals, there is 
no need to
+                         * reply ack.
+                         *
+                         * @see 
https://issues.apache.org/jira/browse/ZOOKEEPER-4394
+                         * Keep the outstanding proposals in 
"packetsNotLogged" to avoid
+                         * NullPointerException when the follower receives 
COMMIT packet(s)
+                         * right after replying NEWLEADER ack.
+                         */
+                        while (!packetsCommitted.isEmpty()) {
+                            long zxid = packetsCommitted.removeFirst();
+                            pif = packetsNotLogged.peekFirst();
+                            if (pif == null) {
+                                LOG.warn("Committing 0x{}, but got no 
proposal", Long.toHexString(zxid));
+                                continue;
+                            } else if (pif.hdr.getZxid() != zxid) {
+                                LOG.warn("Committing 0x{}, but next proposal 
is 0x{}",
+                                        Long.toHexString(zxid), 
Long.toHexString(pif.hdr.getZxid()));
+                                continue;
+                            }
+                            packetsNotLogged.removeFirst();
+                            fzk.appendRequest(pif.hdr, pif.rec, pif.digest);
+                            fzk.processTxn(pif.hdr, pif.rec);
                         }
 
-                        // persist the txns to disk
+                        // @see 
https://issues.apache.org/jira/browse/ZOOKEEPER-4646
+                        // Make sure to persist the txns to disk before 
replying NEWLEADER ack.
                         fzk.getZKDatabase().commit();
-                        LOG.info("{} txns have been persisted and it took 
{}ms",
-                        packetsNotCommitted.size(), Time.currentElapsedTime() 
- startTime);
-                        packetsNotCommitted.clear();
+                        LOG.info("It took {}ms to persist and commit txns in 
packetsCommitted. "
+                                        + "{} outstanding txns left in 
packetsNotLogged",
+                                Time.currentElapsedTime() - startTime, 
packetsNotLogged.size());
                     }
 
-                    // set the current epoch after all the tnxs are persisted
+                    // @see 
https://issues.apache.org/jira/browse/ZOOKEEPER-4643
+                    // @see 
https://issues.apache.org/jira/browse/ZOOKEEPER-4785
+                    // Update current epoch after the committed txns are 
persisted
                     self.setCurrentEpoch(newEpoch);
                     LOG.info("Set the current epoch to {}", newEpoch);
 
-                    // send NEWLEADER ack after all the tnxs are persisted
+                    // Now we almost complete the synchronization phase. Start 
RequestProcessors
+                    // to asynchronously process the pending txns in  
"packetsNotLogged"  and
+                    // "packetsCommitted" later.
+                    sock.setSoTimeout(self.tickTime * self.syncLimit);
+                    self.setSyncMode(QuorumPeer.SyncMode.NONE);
+                    zk.startupWithoutServing();

Review Comment:
   I think we don't need this `startupWithoutServing` anymore. It was 
introduced in #1445 to process asynchronous log process. And now we process log 
synchronously.
   
   This should solve problem @changruill raised in 
https://github.com/apache/zookeeper/pull/2154#issuecomment-2167267712



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@zookeeper.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to