kezhuw commented on code in PR #2152:
URL: https://github.com/apache/zookeeper/pull/2152#discussion_r1545947549


##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java:
##########
@@ -753,29 +752,60 @@ protected void syncWithLeader(long newLeaderZxid) throws 
Exception {
                     isPreZAB1_0 = false;
 
                     // ZOOKEEPER-3911: make sure sync the uncommitted logs 
before commit them (ACK NEWLEADER).
-                    sock.setSoTimeout(self.tickTime * self.syncLimit);
-                    self.setSyncMode(QuorumPeer.SyncMode.NONE);
-                    zk.startupWithoutServing();
-                    if (zk instanceof FollowerZooKeeperServer) {
+                    if (zk instanceof FollowerZooKeeperServer && 
!packetsCommitted.isEmpty()) {
                         long startTime = Time.currentElapsedTime();
                         FollowerZooKeeperServer fzk = 
(FollowerZooKeeperServer) zk;
-                        for (PacketInFlight p : packetsNotCommitted) {
-                            final Request request = fzk.appendRequest(p.hdr, 
p.rec, p.digest);
-                            requestsToAck.add(request);
+
+                        /*
+                         * @see https://github.com/apache/zookeeper/pull/1848
+                         * Persist and process the committed txns in 
"packetsNotCommitted"
+                         * according to "packetsCommitted", which have been 
committed by
+                         * the leader. For these committed proposals, there is 
no need to
+                         * reply ack.
+                         *
+                         * @see 
https://issues.apache.org/jira/browse/ZOOKEEPER-4394
+                         * Keep the outstanding proposals in 
"packetsNotCommitted" to avoid
+                         * NullPointerException when the follower receives 
COMMIT packet(s)

Review Comment:
   Given my comments above, then we should not clear `packetsNotCommitted` 
apparently. All txns not in `packetsCommitted` are proposals from new election.



##########
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java:
##########
@@ -820,6 +806,140 @@ private void proposeNewSession(QuorumPacket qp, long 
zxid, long sessionId) throw
         }, testData);
     }
 
+    @Test
+    public void 
testNormalFollowerRun_ProcessCommitInSyncAfterAckNewLeader(@TempDir File 
testData) throws Exception {
+        testFollowerConversation(new FollowerConversation() {
+            @Override
+            public void converseWithFollower(InputArchive ia, OutputArchive 
oa, Follower f) throws Exception {
+                File tmpDir = File.createTempFile("test", "dir", testData);
+                tmpDir.delete();
+                tmpDir.mkdir();
+                File logDir = 
f.fzk.getTxnLogFactory().getDataLogDir().getParentFile();
+                File snapDir = 
f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
+                //Spy on ZK so we can check if a snapshot happened or not.
+                f.zk = spy(f.zk);
+                try {
+                    assertEquals(0, f.self.getAcceptedEpoch());
+                    assertEquals(0, f.self.getCurrentEpoch());
+
+                    // Setup a database with a single /foo node
+                    ZKDatabase zkDb = new ZKDatabase(new 
FileTxnSnapLog(tmpDir, tmpDir));
+                    final long firstZxid = ZxidUtils.makeZxid(1, 1);
+                    zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, 
ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), 
ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
+                    Stat stat = new Stat();
+                    assertEquals("data1", new String(zkDb.getData("/foo", 
stat, null)));
+
+                    QuorumPacket qp = new QuorumPacket();
+                    readPacketSkippingPing(ia, qp);
+                    assertEquals(Leader.FOLLOWERINFO, qp.getType());
+                    assertEquals(qp.getZxid(), 0);
+                    LearnerInfo learnInfo = new LearnerInfo();
+                    
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), 
learnInfo);
+                    assertEquals(learnInfo.getProtocolVersion(), 0x10000);
+                    assertEquals(learnInfo.getServerid(), 0);
+
+                    // We are simulating an established leader, so the epoch 
is 1
+                    qp.setType(Leader.LEADERINFO);
+                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
+                    byte[] protoBytes = new byte[4];
+                    ByteBuffer.wrap(protoBytes).putInt(0x10000);
+                    qp.setData(protoBytes);
+                    oa.writeRecord(qp, null);
+
+                    readPacketSkippingPing(ia, qp);
+                    assertEquals(Leader.ACKEPOCH, qp.getType());
+                    assertEquals(0, qp.getZxid());
+                    assertEquals(ZxidUtils.makeZxid(0, 0), 
ByteBuffer.wrap(qp.getData()).getInt());
+                    assertEquals(1, f.self.getAcceptedEpoch());
+                    assertEquals(0, f.self.getCurrentEpoch());
+
+                    // Send the snapshot we created earlier
+                    qp.setType(Leader.SNAP);
+                    qp.setData(new byte[0]);
+                    qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
+                    oa.writeRecord(qp, null);
+                    zkDb.serializeSnapshot(oa);
+                    oa.writeString("BenWasHere", null);
+                    Thread.sleep(10); //Give it some time to process the snap
+                    //No Snapshot taken yet, the SNAP was applied in memory
+                    verify(f.zk, never()).takeSnapshot();
+
+                    // Leader sends an outstanding proposal
+                    long proposalZxid = ZxidUtils.makeZxid(1, 1001);
+                    proposeSetData(qp, proposalZxid, "data2", 2);
+                    oa.writeRecord(qp, null);
+
+                    qp.setType(Leader.NEWLEADER);
+                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
+                    oa.writeRecord(qp, null);

Review Comment:
   `qp.data` should be cleared.



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java:
##########
@@ -753,29 +752,60 @@ protected void syncWithLeader(long newLeaderZxid) throws 
Exception {
                     isPreZAB1_0 = false;
 
                     // ZOOKEEPER-3911: make sure sync the uncommitted logs 
before commit them (ACK NEWLEADER).
-                    sock.setSoTimeout(self.tickTime * self.syncLimit);
-                    self.setSyncMode(QuorumPeer.SyncMode.NONE);
-                    zk.startupWithoutServing();
-                    if (zk instanceof FollowerZooKeeperServer) {
+                    if (zk instanceof FollowerZooKeeperServer && 
!packetsCommitted.isEmpty()) {
                         long startTime = Time.currentElapsedTime();
                         FollowerZooKeeperServer fzk = 
(FollowerZooKeeperServer) zk;
-                        for (PacketInFlight p : packetsNotCommitted) {
-                            final Request request = fzk.appendRequest(p.hdr, 
p.rec, p.digest);
-                            requestsToAck.add(request);
+
+                        /*
+                         * @see https://github.com/apache/zookeeper/pull/1848
+                         * Persist and process the committed txns in 
"packetsNotCommitted"
+                         * according to "packetsCommitted", which have been 
committed by

Review Comment:
   The comment is somewhat misunderstanding. The key is to log these committed 
ones, they are considered committed before election by the paper. All the 
reason we touch `packetsNotCommitted` here is to make sure it is not 
`logRequest` again in `broadcast` phase. I think it might be better to rename 
`packetsNotCommitted` to `packetsNotLogged` as @jeffrey-xiao did in #1930. 
"log" is a disk operation, "commit" is an agreement. What we want here should 
be "log committed txns agreed in election".
   
   Coming into the implementation, new proposals could still be committed 
before `NEWLEADER` since  `LearnerHandler` does not issue `NEWLEADER` right 
after these committed txns. But it does not harm us here as we are potentially 
to persist more but not less and new leader expect no `ack` for committed ones.



##########
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java:
##########
@@ -765,36 +765,22 @@ public void converseWithFollower(InputArchive ia, 
OutputArchive oa, Follower f)
                     qp.setZxid(0);
                     oa.writeRecord(qp, null);
 
-                    // Read the uptodate ack
-                    readPacketSkippingPing(ia, qp);
-                    assertEquals(Leader.ACK, qp.getType());
-                    assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
-
                     // Get the ack of the new leader
                     readPacketSkippingPing(ia, qp);
                     assertEquals(Leader.ACK, qp.getType());
                     assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
                     assertEquals(1, f.self.getAcceptedEpoch());
                     assertEquals(1, f.self.getCurrentEpoch());
-
-                    //Wait for the transactions to be written out. The thread 
that writes them out
-                    // does not send anything back when it is done.
-                    long start = System.currentTimeMillis();
-                    while (createSessionZxid != f.fzk.getLastProcessedZxid()
-                                   && (System.currentTimeMillis() - start) < 
50) {
-                        Thread.sleep(1);

Review Comment:
   > When specifying Learner.syncWithLeader(..) in version 3.9.2 (fixed by 
https://github.com/apache/zookeeper/pull/2111), the issue traces of 
[ZOOKEEPER-4394](https://issues.apache.org/jira/browse/ZOOKEEPER-4394) and 
[ZOOKEEPER-3023](https://issues.apache.org/jira/browse/ZOOKEEPER-3023) can be 
detected.
   
   I did not see ZOOKEEPER-3023 after #2111. But if you are verifying this 
using TLA, this is doomed to failure. I am +1 to revert this to pre 
ZOOKEEPER-2678.



##########
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java:
##########
@@ -820,6 +806,140 @@ private void proposeNewSession(QuorumPacket qp, long 
zxid, long sessionId) throw
         }, testData);
     }
 
+    @Test
+    public void 
testNormalFollowerRun_ProcessCommitInSyncAfterAckNewLeader(@TempDir File 
testData) throws Exception {
+        testFollowerConversation(new FollowerConversation() {
+            @Override
+            public void converseWithFollower(InputArchive ia, OutputArchive 
oa, Follower f) throws Exception {
+                File tmpDir = File.createTempFile("test", "dir", testData);
+                tmpDir.delete();
+                tmpDir.mkdir();
+                File logDir = 
f.fzk.getTxnLogFactory().getDataLogDir().getParentFile();
+                File snapDir = 
f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
+                //Spy on ZK so we can check if a snapshot happened or not.
+                f.zk = spy(f.zk);
+                try {
+                    assertEquals(0, f.self.getAcceptedEpoch());
+                    assertEquals(0, f.self.getCurrentEpoch());
+
+                    // Setup a database with a single /foo node
+                    ZKDatabase zkDb = new ZKDatabase(new 
FileTxnSnapLog(tmpDir, tmpDir));
+                    final long firstZxid = ZxidUtils.makeZxid(1, 1);
+                    zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, 
ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), 
ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
+                    Stat stat = new Stat();
+                    assertEquals("data1", new String(zkDb.getData("/foo", 
stat, null)));
+
+                    QuorumPacket qp = new QuorumPacket();
+                    readPacketSkippingPing(ia, qp);
+                    assertEquals(Leader.FOLLOWERINFO, qp.getType());
+                    assertEquals(qp.getZxid(), 0);
+                    LearnerInfo learnInfo = new LearnerInfo();
+                    
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), 
learnInfo);
+                    assertEquals(learnInfo.getProtocolVersion(), 0x10000);
+                    assertEquals(learnInfo.getServerid(), 0);
+
+                    // We are simulating an established leader, so the epoch 
is 1
+                    qp.setType(Leader.LEADERINFO);
+                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
+                    byte[] protoBytes = new byte[4];
+                    ByteBuffer.wrap(protoBytes).putInt(0x10000);
+                    qp.setData(protoBytes);
+                    oa.writeRecord(qp, null);
+
+                    readPacketSkippingPing(ia, qp);
+                    assertEquals(Leader.ACKEPOCH, qp.getType());
+                    assertEquals(0, qp.getZxid());
+                    assertEquals(ZxidUtils.makeZxid(0, 0), 
ByteBuffer.wrap(qp.getData()).getInt());
+                    assertEquals(1, f.self.getAcceptedEpoch());
+                    assertEquals(0, f.self.getCurrentEpoch());
+
+                    // Send the snapshot we created earlier
+                    qp.setType(Leader.SNAP);
+                    qp.setData(new byte[0]);
+                    qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
+                    oa.writeRecord(qp, null);
+                    zkDb.serializeSnapshot(oa);
+                    oa.writeString("BenWasHere", null);
+                    Thread.sleep(10); //Give it some time to process the snap
+                    //No Snapshot taken yet, the SNAP was applied in memory
+                    verify(f.zk, never()).takeSnapshot();
+
+                    // Leader sends an outstanding proposal
+                    long proposalZxid = ZxidUtils.makeZxid(1, 1001);
+                    proposeSetData(qp, proposalZxid, "data2", 2);
+                    oa.writeRecord(qp, null);

Review Comment:
   I think it is what ZOOKEEPER-4394 tried to report. New proposals are issued 
before `NEWLEADER`. This is the gap between paper and implementation.



##########
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java:
##########
@@ -820,6 +806,140 @@ private void proposeNewSession(QuorumPacket qp, long 
zxid, long sessionId) throw
         }, testData);
     }
 
+    @Test
+    public void 
testNormalFollowerRun_ProcessCommitInSyncAfterAckNewLeader(@TempDir File 
testData) throws Exception {
+        testFollowerConversation(new FollowerConversation() {
+            @Override
+            public void converseWithFollower(InputArchive ia, OutputArchive 
oa, Follower f) throws Exception {
+                File tmpDir = File.createTempFile("test", "dir", testData);
+                tmpDir.delete();
+                tmpDir.mkdir();
+                File logDir = 
f.fzk.getTxnLogFactory().getDataLogDir().getParentFile();
+                File snapDir = 
f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
+                //Spy on ZK so we can check if a snapshot happened or not.
+                f.zk = spy(f.zk);
+                try {
+                    assertEquals(0, f.self.getAcceptedEpoch());
+                    assertEquals(0, f.self.getCurrentEpoch());
+
+                    // Setup a database with a single /foo node
+                    ZKDatabase zkDb = new ZKDatabase(new 
FileTxnSnapLog(tmpDir, tmpDir));
+                    final long firstZxid = ZxidUtils.makeZxid(1, 1);
+                    zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, 
ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), 
ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
+                    Stat stat = new Stat();
+                    assertEquals("data1", new String(zkDb.getData("/foo", 
stat, null)));
+
+                    QuorumPacket qp = new QuorumPacket();
+                    readPacketSkippingPing(ia, qp);
+                    assertEquals(Leader.FOLLOWERINFO, qp.getType());
+                    assertEquals(qp.getZxid(), 0);
+                    LearnerInfo learnInfo = new LearnerInfo();
+                    
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), 
learnInfo);
+                    assertEquals(learnInfo.getProtocolVersion(), 0x10000);
+                    assertEquals(learnInfo.getServerid(), 0);
+
+                    // We are simulating an established leader, so the epoch 
is 1
+                    qp.setType(Leader.LEADERINFO);
+                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
+                    byte[] protoBytes = new byte[4];
+                    ByteBuffer.wrap(protoBytes).putInt(0x10000);
+                    qp.setData(protoBytes);
+                    oa.writeRecord(qp, null);
+
+                    readPacketSkippingPing(ia, qp);
+                    assertEquals(Leader.ACKEPOCH, qp.getType());
+                    assertEquals(0, qp.getZxid());
+                    assertEquals(ZxidUtils.makeZxid(0, 0), 
ByteBuffer.wrap(qp.getData()).getInt());
+                    assertEquals(1, f.self.getAcceptedEpoch());
+                    assertEquals(0, f.self.getCurrentEpoch());
+
+                    // Send the snapshot we created earlier
+                    qp.setType(Leader.SNAP);
+                    qp.setData(new byte[0]);
+                    qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
+                    oa.writeRecord(qp, null);
+                    zkDb.serializeSnapshot(oa);
+                    oa.writeString("BenWasHere", null);
+                    Thread.sleep(10); //Give it some time to process the snap
+                    //No Snapshot taken yet, the SNAP was applied in memory
+                    verify(f.zk, never()).takeSnapshot();
+
+                    // Leader sends an outstanding proposal
+                    long proposalZxid = ZxidUtils.makeZxid(1, 1001);
+                    proposeSetData(qp, proposalZxid, "data2", 2);
+                    oa.writeRecord(qp, null);
+
+                    qp.setType(Leader.NEWLEADER);
+                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
+                    oa.writeRecord(qp, null);
+
+                    // Get the ack of the new leader
+                    readPacketSkippingPing(ia, qp);
+                    assertEquals(Leader.ACK, qp.getType());
+                    assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                    assertEquals(1, f.self.getAcceptedEpoch());
+                    assertEquals(1, f.self.getCurrentEpoch());
+                    //Make sure that we did take the snapshot now
+                    verify(f.zk).takeSnapshot(true);
+                    assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
+
+                    // The outstanding proposal has not been persisted yet
+                    ZKDatabase zkDb2 = new ZKDatabase(new 
FileTxnSnapLog(logDir, snapDir));
+                    long lastZxid = zkDb2.loadDataBase();
+                    assertEquals("data1", new String(zkDb2.getData("/foo", 
stat, null)));

Review Comment:
   This seems an extra enforce to the paper. But given that we are driving the 
test step by step and we are testing implementation, I am +1 on this. By the 
paper, we should not have this problem and assertions should still hold anyway 
as there are no new proposals from new leader before `NEWLEADER`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to