kezhuw commented on code in PR #2152:
URL: https://github.com/apache/zookeeper/pull/2152#discussion_r1545947549
##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java:
##########
@@ -753,29 +752,60 @@ protected void syncWithLeader(long newLeaderZxid) throws
Exception {
isPreZAB1_0 = false;
// ZOOKEEPER-3911: make sure sync the uncommitted logs
before commit them (ACK NEWLEADER).
- sock.setSoTimeout(self.tickTime * self.syncLimit);
- self.setSyncMode(QuorumPeer.SyncMode.NONE);
- zk.startupWithoutServing();
- if (zk instanceof FollowerZooKeeperServer) {
+ if (zk instanceof FollowerZooKeeperServer &&
!packetsCommitted.isEmpty()) {
long startTime = Time.currentElapsedTime();
FollowerZooKeeperServer fzk =
(FollowerZooKeeperServer) zk;
- for (PacketInFlight p : packetsNotCommitted) {
- final Request request = fzk.appendRequest(p.hdr,
p.rec, p.digest);
- requestsToAck.add(request);
+
+ /*
+ * @see https://github.com/apache/zookeeper/pull/1848
+ * Persist and process the committed txns in
"packetsNotCommitted"
+ * according to "packetsCommitted", which have been
committed by
+ * the leader. For these committed proposals, there is
no need to
+ * reply ack.
+ *
+ * @see
https://issues.apache.org/jira/browse/ZOOKEEPER-4394
+ * Keep the outstanding proposals in
"packetsNotCommitted" to avoid
+ * NullPointerException when the follower receives
COMMIT packet(s)
Review Comment:
Given my comments above, then we should not clear `packetsNotCommitted`
apparently. All txns not in `packetsCommitted` are proposals from new election.
##########
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java:
##########
@@ -820,6 +806,140 @@ private void proposeNewSession(QuorumPacket qp, long
zxid, long sessionId) throw
}, testData);
}
+ @Test
+ public void
testNormalFollowerRun_ProcessCommitInSyncAfterAckNewLeader(@TempDir File
testData) throws Exception {
+ testFollowerConversation(new FollowerConversation() {
+ @Override
+ public void converseWithFollower(InputArchive ia, OutputArchive
oa, Follower f) throws Exception {
+ File tmpDir = File.createTempFile("test", "dir", testData);
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File logDir =
f.fzk.getTxnLogFactory().getDataLogDir().getParentFile();
+ File snapDir =
f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
+ //Spy on ZK so we can check if a snapshot happened or not.
+ f.zk = spy(f.zk);
+ try {
+ assertEquals(0, f.self.getAcceptedEpoch());
+ assertEquals(0, f.self.getCurrentEpoch());
+
+ // Setup a database with a single /foo node
+ ZKDatabase zkDb = new ZKDatabase(new
FileTxnSnapLog(tmpDir, tmpDir));
+ final long firstZxid = ZxidUtils.makeZxid(1, 1);
+ zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33,
ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
+ Stat stat = new Stat();
+ assertEquals("data1", new String(zkDb.getData("/foo",
stat, null)));
+
+ QuorumPacket qp = new QuorumPacket();
+ readPacketSkippingPing(ia, qp);
+ assertEquals(Leader.FOLLOWERINFO, qp.getType());
+ assertEquals(qp.getZxid(), 0);
+ LearnerInfo learnInfo = new LearnerInfo();
+
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()),
learnInfo);
+ assertEquals(learnInfo.getProtocolVersion(), 0x10000);
+ assertEquals(learnInfo.getServerid(), 0);
+
+ // We are simulating an established leader, so the epoch
is 1
+ qp.setType(Leader.LEADERINFO);
+ qp.setZxid(ZxidUtils.makeZxid(1, 0));
+ byte[] protoBytes = new byte[4];
+ ByteBuffer.wrap(protoBytes).putInt(0x10000);
+ qp.setData(protoBytes);
+ oa.writeRecord(qp, null);
+
+ readPacketSkippingPing(ia, qp);
+ assertEquals(Leader.ACKEPOCH, qp.getType());
+ assertEquals(0, qp.getZxid());
+ assertEquals(ZxidUtils.makeZxid(0, 0),
ByteBuffer.wrap(qp.getData()).getInt());
+ assertEquals(1, f.self.getAcceptedEpoch());
+ assertEquals(0, f.self.getCurrentEpoch());
+
+ // Send the snapshot we created earlier
+ qp.setType(Leader.SNAP);
+ qp.setData(new byte[0]);
+ qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
+ oa.writeRecord(qp, null);
+ zkDb.serializeSnapshot(oa);
+ oa.writeString("BenWasHere", null);
+ Thread.sleep(10); //Give it some time to process the snap
+ //No Snapshot taken yet, the SNAP was applied in memory
+ verify(f.zk, never()).takeSnapshot();
+
+ // Leader sends an outstanding proposal
+ long proposalZxid = ZxidUtils.makeZxid(1, 1001);
+ proposeSetData(qp, proposalZxid, "data2", 2);
+ oa.writeRecord(qp, null);
+
+ qp.setType(Leader.NEWLEADER);
+ qp.setZxid(ZxidUtils.makeZxid(1, 0));
+ oa.writeRecord(qp, null);
Review Comment:
`qp.data` should be cleared.
##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java:
##########
@@ -753,29 +752,60 @@ protected void syncWithLeader(long newLeaderZxid) throws
Exception {
isPreZAB1_0 = false;
// ZOOKEEPER-3911: make sure sync the uncommitted logs
before commit them (ACK NEWLEADER).
- sock.setSoTimeout(self.tickTime * self.syncLimit);
- self.setSyncMode(QuorumPeer.SyncMode.NONE);
- zk.startupWithoutServing();
- if (zk instanceof FollowerZooKeeperServer) {
+ if (zk instanceof FollowerZooKeeperServer &&
!packetsCommitted.isEmpty()) {
long startTime = Time.currentElapsedTime();
FollowerZooKeeperServer fzk =
(FollowerZooKeeperServer) zk;
- for (PacketInFlight p : packetsNotCommitted) {
- final Request request = fzk.appendRequest(p.hdr,
p.rec, p.digest);
- requestsToAck.add(request);
+
+ /*
+ * @see https://github.com/apache/zookeeper/pull/1848
+ * Persist and process the committed txns in
"packetsNotCommitted"
+ * according to "packetsCommitted", which have been
committed by
Review Comment:
The comment is somewhat misunderstanding. The key is to log these committed
ones, they are considered committed before election by the paper. All the
reason we touch `packetsNotCommitted` here is to make sure it is not
`logRequest` again in `broadcast` phase. I think it might be better to rename
`packetsNotCommitted` to `packetsNotLogged` as @jeffrey-xiao did in #1930.
"log" is a disk operation, "commit" is an agreement. What we want here should
be "log committed txns agreed in election".
Coming into the implementation, new proposals could still be committed
before `NEWLEADER` since `LearnerHandler` does not issue `NEWLEADER` right
after these committed txns. But it does not harm us here as we are potentially
to persist more but not less and new leader expect no `ack` for committed ones.
##########
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java:
##########
@@ -765,36 +765,22 @@ public void converseWithFollower(InputArchive ia,
OutputArchive oa, Follower f)
qp.setZxid(0);
oa.writeRecord(qp, null);
- // Read the uptodate ack
- readPacketSkippingPing(ia, qp);
- assertEquals(Leader.ACK, qp.getType());
- assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
-
// Get the ack of the new leader
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, f.self.getAcceptedEpoch());
assertEquals(1, f.self.getCurrentEpoch());
-
- //Wait for the transactions to be written out. The thread
that writes them out
- // does not send anything back when it is done.
- long start = System.currentTimeMillis();
- while (createSessionZxid != f.fzk.getLastProcessedZxid()
- && (System.currentTimeMillis() - start) <
50) {
- Thread.sleep(1);
Review Comment:
> When specifying Learner.syncWithLeader(..) in version 3.9.2 (fixed by
https://github.com/apache/zookeeper/pull/2111), the issue traces of
[ZOOKEEPER-4394](https://issues.apache.org/jira/browse/ZOOKEEPER-4394) and
[ZOOKEEPER-3023](https://issues.apache.org/jira/browse/ZOOKEEPER-3023) can be
detected.
I did not see ZOOKEEPER-3023 after #2111. But if you are verifying this
using TLA, this is doomed to failure. I am +1 to revert this to pre
ZOOKEEPER-2678.
##########
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java:
##########
@@ -820,6 +806,140 @@ private void proposeNewSession(QuorumPacket qp, long
zxid, long sessionId) throw
}, testData);
}
+ @Test
+ public void
testNormalFollowerRun_ProcessCommitInSyncAfterAckNewLeader(@TempDir File
testData) throws Exception {
+ testFollowerConversation(new FollowerConversation() {
+ @Override
+ public void converseWithFollower(InputArchive ia, OutputArchive
oa, Follower f) throws Exception {
+ File tmpDir = File.createTempFile("test", "dir", testData);
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File logDir =
f.fzk.getTxnLogFactory().getDataLogDir().getParentFile();
+ File snapDir =
f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
+ //Spy on ZK so we can check if a snapshot happened or not.
+ f.zk = spy(f.zk);
+ try {
+ assertEquals(0, f.self.getAcceptedEpoch());
+ assertEquals(0, f.self.getCurrentEpoch());
+
+ // Setup a database with a single /foo node
+ ZKDatabase zkDb = new ZKDatabase(new
FileTxnSnapLog(tmpDir, tmpDir));
+ final long firstZxid = ZxidUtils.makeZxid(1, 1);
+ zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33,
ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
+ Stat stat = new Stat();
+ assertEquals("data1", new String(zkDb.getData("/foo",
stat, null)));
+
+ QuorumPacket qp = new QuorumPacket();
+ readPacketSkippingPing(ia, qp);
+ assertEquals(Leader.FOLLOWERINFO, qp.getType());
+ assertEquals(qp.getZxid(), 0);
+ LearnerInfo learnInfo = new LearnerInfo();
+
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()),
learnInfo);
+ assertEquals(learnInfo.getProtocolVersion(), 0x10000);
+ assertEquals(learnInfo.getServerid(), 0);
+
+ // We are simulating an established leader, so the epoch
is 1
+ qp.setType(Leader.LEADERINFO);
+ qp.setZxid(ZxidUtils.makeZxid(1, 0));
+ byte[] protoBytes = new byte[4];
+ ByteBuffer.wrap(protoBytes).putInt(0x10000);
+ qp.setData(protoBytes);
+ oa.writeRecord(qp, null);
+
+ readPacketSkippingPing(ia, qp);
+ assertEquals(Leader.ACKEPOCH, qp.getType());
+ assertEquals(0, qp.getZxid());
+ assertEquals(ZxidUtils.makeZxid(0, 0),
ByteBuffer.wrap(qp.getData()).getInt());
+ assertEquals(1, f.self.getAcceptedEpoch());
+ assertEquals(0, f.self.getCurrentEpoch());
+
+ // Send the snapshot we created earlier
+ qp.setType(Leader.SNAP);
+ qp.setData(new byte[0]);
+ qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
+ oa.writeRecord(qp, null);
+ zkDb.serializeSnapshot(oa);
+ oa.writeString("BenWasHere", null);
+ Thread.sleep(10); //Give it some time to process the snap
+ //No Snapshot taken yet, the SNAP was applied in memory
+ verify(f.zk, never()).takeSnapshot();
+
+ // Leader sends an outstanding proposal
+ long proposalZxid = ZxidUtils.makeZxid(1, 1001);
+ proposeSetData(qp, proposalZxid, "data2", 2);
+ oa.writeRecord(qp, null);
Review Comment:
I think it is what ZOOKEEPER-4394 tried to report. New proposals are issued
before `NEWLEADER`. This is the gap between paper and implementation.
##########
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java:
##########
@@ -820,6 +806,140 @@ private void proposeNewSession(QuorumPacket qp, long
zxid, long sessionId) throw
}, testData);
}
+ @Test
+ public void
testNormalFollowerRun_ProcessCommitInSyncAfterAckNewLeader(@TempDir File
testData) throws Exception {
+ testFollowerConversation(new FollowerConversation() {
+ @Override
+ public void converseWithFollower(InputArchive ia, OutputArchive
oa, Follower f) throws Exception {
+ File tmpDir = File.createTempFile("test", "dir", testData);
+ tmpDir.delete();
+ tmpDir.mkdir();
+ File logDir =
f.fzk.getTxnLogFactory().getDataLogDir().getParentFile();
+ File snapDir =
f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
+ //Spy on ZK so we can check if a snapshot happened or not.
+ f.zk = spy(f.zk);
+ try {
+ assertEquals(0, f.self.getAcceptedEpoch());
+ assertEquals(0, f.self.getCurrentEpoch());
+
+ // Setup a database with a single /foo node
+ ZKDatabase zkDb = new ZKDatabase(new
FileTxnSnapLog(tmpDir, tmpDir));
+ final long firstZxid = ZxidUtils.makeZxid(1, 1);
+ zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33,
ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
+ Stat stat = new Stat();
+ assertEquals("data1", new String(zkDb.getData("/foo",
stat, null)));
+
+ QuorumPacket qp = new QuorumPacket();
+ readPacketSkippingPing(ia, qp);
+ assertEquals(Leader.FOLLOWERINFO, qp.getType());
+ assertEquals(qp.getZxid(), 0);
+ LearnerInfo learnInfo = new LearnerInfo();
+
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()),
learnInfo);
+ assertEquals(learnInfo.getProtocolVersion(), 0x10000);
+ assertEquals(learnInfo.getServerid(), 0);
+
+ // We are simulating an established leader, so the epoch
is 1
+ qp.setType(Leader.LEADERINFO);
+ qp.setZxid(ZxidUtils.makeZxid(1, 0));
+ byte[] protoBytes = new byte[4];
+ ByteBuffer.wrap(protoBytes).putInt(0x10000);
+ qp.setData(protoBytes);
+ oa.writeRecord(qp, null);
+
+ readPacketSkippingPing(ia, qp);
+ assertEquals(Leader.ACKEPOCH, qp.getType());
+ assertEquals(0, qp.getZxid());
+ assertEquals(ZxidUtils.makeZxid(0, 0),
ByteBuffer.wrap(qp.getData()).getInt());
+ assertEquals(1, f.self.getAcceptedEpoch());
+ assertEquals(0, f.self.getCurrentEpoch());
+
+ // Send the snapshot we created earlier
+ qp.setType(Leader.SNAP);
+ qp.setData(new byte[0]);
+ qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
+ oa.writeRecord(qp, null);
+ zkDb.serializeSnapshot(oa);
+ oa.writeString("BenWasHere", null);
+ Thread.sleep(10); //Give it some time to process the snap
+ //No Snapshot taken yet, the SNAP was applied in memory
+ verify(f.zk, never()).takeSnapshot();
+
+ // Leader sends an outstanding proposal
+ long proposalZxid = ZxidUtils.makeZxid(1, 1001);
+ proposeSetData(qp, proposalZxid, "data2", 2);
+ oa.writeRecord(qp, null);
+
+ qp.setType(Leader.NEWLEADER);
+ qp.setZxid(ZxidUtils.makeZxid(1, 0));
+ oa.writeRecord(qp, null);
+
+ // Get the ack of the new leader
+ readPacketSkippingPing(ia, qp);
+ assertEquals(Leader.ACK, qp.getType());
+ assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+ assertEquals(1, f.self.getAcceptedEpoch());
+ assertEquals(1, f.self.getCurrentEpoch());
+ //Make sure that we did take the snapshot now
+ verify(f.zk).takeSnapshot(true);
+ assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
+
+ // The outstanding proposal has not been persisted yet
+ ZKDatabase zkDb2 = new ZKDatabase(new
FileTxnSnapLog(logDir, snapDir));
+ long lastZxid = zkDb2.loadDataBase();
+ assertEquals("data1", new String(zkDb2.getData("/foo",
stat, null)));
Review Comment:
This seems an extra enforce to the paper. But given that we are driving the
test step by step and we are testing implementation, I am +1 on this. By the
paper, we should not have this problem and assertions should still hold anyway
as there are no new proposals from new leader before `NEWLEADER`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]