This is an automated email from the ASF dual-hosted git repository.
kezhuw pushed a commit to branch branch-3.9.4
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/branch-3.9.4 by this push:
new cf31c3ec1 ZOOKEEPER-4925: Fix data loss due to propagation of
discontinuous committedLog (#2254) (#2266)
cf31c3ec1 is described below
commit cf31c3ec12729f5fc6cc52a0464486859fc86b92
Author: Kezhu Wang <[email protected]>
AuthorDate: Thu Aug 7 22:42:10 2025 +0800
ZOOKEEPER-4925: Fix data loss due to propagation of discontinuous
committedLog (#2254) (#2266)
There are two variants of `ZooKeeperServer::processTxn`. Those two
variants diverge significantly since ZOOKEEPER-3484.
`processTxn(Request request)` pops outstanding change from
`outstandingChanges` and adds txn to `committedLog` for follower to sync
in addition to what `processTxn(TxnHeader hdr, Record txn)` does. The
`Learner` uses `processTxn(TxnHeader hdr, Record txn)` to commit txn to
memory after ZOOKEEPER-4394, which means it leaves `committedLog`
untouched in `SYNCHRONIZATION` phase.
This way, a stale follower will have hole in its `committedLog` after
joining cluster. The stale follower will propagate the in memory hole
to other stale nodes after becoming leader. This causes data loss.
The test case fails on master and 3.9.3, and passes on 3.9.2. So only
3.9.3 is affected.
This commit drops `processTxn(TxnHeader hdr, Record txn)` as
`processTxn(Request request)` is capable in `SYNCHRONIZATION` phase too.
Also, this commit rejects discontinuous proposals in `syncWithLeader`
and `committedLog`, so to avoid possible data loss.
Refs: ZOOKEEPER-4925, ZOOKEEPER-4394, ZOOKEEPER-3484
Reviewers: li4wang
Author: kezhuw
Closes #2254 from kezhuw/ZOOKEEPER-4925-fix-data-loss
(cherry picked from commit e5dd60bf0512ccc1e090d99410a8da48623219da)
Signed-off-by: Kezhu Wang <[email protected]>
---
.../java/org/apache/zookeeper/server/Request.java | 13 +++
.../org/apache/zookeeper/server/TxnLogEntry.java | 4 +
.../org/apache/zookeeper/server/ZKDatabase.java | 28 ++++--
.../apache/zookeeper/server/ZooKeeperServer.java | 21 ++---
.../apache/zookeeper/server/quorum/Follower.java | 4 +-
.../server/quorum/FollowerZooKeeperServer.java | 34 ++-----
.../apache/zookeeper/server/quorum/Learner.java | 58 +++++++-----
.../apache/zookeeper/server/quorum/Observer.java | 11 +--
.../apache/zookeeper/server/TxnLogDigestTest.java | 2 +
.../apache/zookeeper/server/ZxidRolloverTest.java | 2 +
.../zookeeper/server/quorum/QuorumSyncTest.java | 100 +++++++++++++++++++++
11 files changed, 196 insertions(+), 81 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index c8b404e79..8b6109171 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -78,6 +78,19 @@ public Request(long sessionId, int xid, int type, TxnHeader
hdr, Record txn, lon
this.authInfo = null;
}
+ public Request(TxnHeader hdr, Record txn, TxnDigest digest) {
+ this.sessionId = hdr.getClientId();
+ this.cxid = hdr.getCxid();
+ this.type = hdr.getType();
+ this.hdr = hdr;
+ this.txn = txn;
+ this.zxid = hdr.getZxid();
+ this.request = null;
+ this.cnxn = null;
+ this.authInfo = null;
+ this.txnDigest = digest;
+ }
+
public final long sessionId;
public final int cxid;
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java
index 352eb81da..409fd21fa 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java
@@ -47,4 +47,8 @@ public TxnHeader getHeader() {
public TxnDigest getDigest() {
return digest;
}
+
+ public Request toRequest() {
+ return new Request(header, txn, digest);
+ }
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
index 7258daa7c..7a26d8362 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
@@ -58,6 +58,7 @@
import org.apache.zookeeper.server.quorum.Leader.PureRequestProposal;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.txn.TxnDigest;
import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
@@ -82,6 +83,8 @@ public class ZKDatabase {
protected FileTxnSnapLog snapLog;
protected long minCommittedLog, maxCommittedLog;
+ private final boolean allowDiscontinuousProposals =
Boolean.getBoolean("zookeeper.test.allowDiscontinuousProposals");
+
/**
* Default value is to use snapshot if txnlog size exceeds 1/3 the size of
snapshot
*/
@@ -170,8 +173,6 @@ public boolean isInitialized() {
* data structures in zkdatabase.
*/
public void clear() {
- minCommittedLog = 0;
- maxCommittedLog = 0;
/* to be safe we just create a new
* datatree.
*/
@@ -182,6 +183,8 @@ public void clear() {
try {
lock.lock();
committedLog.clear();
+ minCommittedLog = 0;
+ maxCommittedLog = 0;
} finally {
lock.unlock();
}
@@ -320,17 +323,30 @@ public void addCommittedProposal(Request request) {
WriteLock wl = logLock.writeLock();
try {
wl.lock();
- if (committedLog.size() > commitLogCount) {
- committedLog.remove();
- minCommittedLog = committedLog.peek().getZxid();
- }
if (committedLog.isEmpty()) {
minCommittedLog = request.zxid;
maxCommittedLog = request.zxid;
+ } else if (request.zxid <= maxCommittedLog) {
+ // This could happen if lastProcessedZxid is rewinded and
database is re-synced.
+ // Currently, it only happens in test codes, but it should
also be safe for production path.
+ return;
+ } else if (!allowDiscontinuousProposals
+ && request.zxid != maxCommittedLog + 1
+ && ZxidUtils.getEpochFromZxid(request.zxid) <=
ZxidUtils.getEpochFromZxid(maxCommittedLog)) {
+ String msg = String.format(
+ "Committed proposal cached out of order: 0x%s is not the
next proposal of 0x%s",
+ ZxidUtils.zxidToString(request.zxid),
+ ZxidUtils.zxidToString(maxCommittedLog));
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
}
PureRequestProposal p = new PureRequestProposal(request);
committedLog.add(p);
maxCommittedLog = p.getZxid();
+ if (committedLog.size() > commitLogCount) {
+ committedLog.remove();
+ minCommittedLog = committedLog.peek().getZxid();
+ }
} finally {
wl.unlock();
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index fc677b329..114e7afc2 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -1846,13 +1846,6 @@ private void processSasl(RequestRecord request,
ServerCnxn cnxn, RequestHeader r
cnxn.sendResponse(replyHeader, record, "response");
}
- // entry point for quorum/Learner.java
- public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
- processTxnForSessionEvents(null, hdr, txn);
- return processTxnInDB(hdr, txn, null);
- }
-
- // entry point for FinalRequestProcessor.java
public ProcessTxnResult processTxn(Request request) {
TxnHeader hdr = request.getHdr();
processTxnForSessionEvents(request, hdr, request.getTxn());
@@ -1864,8 +1857,10 @@ public ProcessTxnResult processTxn(Request request) {
if (!writeRequest && !quorumRequest) {
return new ProcessTxnResult();
}
+
+ ProcessTxnResult rc;
synchronized (outstandingChanges) {
- ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(),
request.getTxnDigest());
+ rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
// request.hdr is set for write requests, which are the only ones
// that add to outstandingChanges.
@@ -1886,13 +1881,13 @@ public ProcessTxnResult processTxn(Request request) {
}
}
}
+ }
- // do not add non quorum packets to the queue.
- if (quorumRequest) {
- getZKDatabase().addCommittedProposal(request);
- }
- return rc;
+ // do not add non quorum packets to the queue.
+ if (quorumRequest) {
+ getZKDatabase().addCommittedProposal(request);
}
+ return rc;
}
private void processTxnForSessionEvents(Request request, TxnHeader hdr,
Record txn) {
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
index 0eff9d248..ca99974cb 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
@@ -35,7 +35,6 @@
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.txn.SetDataTxn;
-import org.apache.zookeeper.txn.TxnDigest;
import org.apache.zookeeper.txn.TxnHeader;
/**
@@ -164,7 +163,6 @@ protected void processPacket(QuorumPacket qp) throws
Exception {
TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());
TxnHeader hdr = logEntry.getHeader();
Record txn = logEntry.getTxn();
- TxnDigest digest = logEntry.getDigest();
if (hdr.getZxid() != lastQueued + 1) {
LOG.warn(
"Got zxid 0x{} expected 0x{}",
@@ -179,7 +177,7 @@ protected void processPacket(QuorumPacket qp) throws
Exception {
self.setLastSeenQuorumVerifier(qv, true);
}
- fzk.logRequest(hdr, txn, digest);
+ fzk.logRequest(logEntry.toRequest());
if (hdr != null) {
/*
* Request header is created only by the leader, so this is
only set
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
index b67661999..1b0b5cd92 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
@@ -22,7 +22,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.management.JMException;
-import org.apache.jute.Record;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.metrics.MetricsContext;
import org.apache.zookeeper.server.ExitCode;
@@ -33,8 +32,6 @@
import org.apache.zookeeper.server.SyncRequestProcessor;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.txn.TxnDigest;
-import org.apache.zookeeper.txn.TxnHeader;
import org.apache.zookeeper.util.ServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,20 +76,17 @@ protected void setupRequestProcessors() {
LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<>();
- public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {
- final Request request = buildRequestToProcess(hdr, txn, digest);
+ public void logRequest(Request request) {
+ if ((request.zxid & 0xffffffffL) != 0) {
+ pendingTxns.add(request);
+ }
syncProcessor.processRequest(request);
}
/**
- * Build a request for the txn and append it to the transaction log
- * @param hdr the txn header
- * @param txn the txn
- * @param digest the digest of txn
+ * Append txn request to the transaction log directly without go through
request processors.
*/
- public void appendRequest(final TxnHeader hdr, final Record txn, final
TxnDigest digest) throws IOException {
- final Request request = new Request(hdr.getClientId(), hdr.getCxid(),
hdr.getType(), hdr, txn, hdr.getZxid());
- request.setTxnDigest(digest);
+ public void appendRequest(Request request) throws IOException {
getZKDatabase().append(request);
}
@@ -188,20 +182,4 @@ protected void unregisterMetrics() {
rootContext.unregisterGauge("synced_observers");
}
-
- /**
- * Build a request for the txn
- * @param hdr the txn header
- * @param txn the txn
- * @param digest the digest of txn
- * @return a request moving through a chain of RequestProcessors
- */
- private Request buildRequestToProcess(final TxnHeader hdr, final Record
txn, final TxnDigest digest) {
- final Request request = new Request(hdr.getClientId(), hdr.getCxid(),
hdr.getType(), hdr, txn, hdr.getZxid());
- request.setTxnDigest(digest);
- if ((request.zxid & 0xffffffffL) != 0) {
- pendingTxns.add(request);
- }
- return request;
- }
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 1ef99e50a..adf0ef6e5 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -82,6 +82,10 @@ static class PacketInFlight {
Record rec;
TxnDigest digest;
+ Request toRequest() {
+ return new Request(hdr, rec, digest);
+ }
+
}
QuorumPeer self;
@@ -535,6 +539,27 @@ protected long registerWithLeader(int pktType) throws
IOException {
}
}
+ long enforceContinuousProposal(long lastQueued, PacketInFlight pif) throws
Exception {
+ if (lastQueued == 0) {
+ LOG.info("DIFF sync got first proposal 0x{}",
Long.toHexString(pif.hdr.getZxid()));
+ } else if (pif.hdr.getZxid() != lastQueued + 1) {
+ if (ZxidUtils.getEpochFromZxid(pif.hdr.getZxid()) <=
ZxidUtils.getEpochFromZxid(lastQueued)) {
+ String msg = String.format(
+ "DIFF sync got proposal 0x%s, last queued 0x%s, expected
0x%s",
+ Long.toHexString(pif.hdr.getZxid()),
Long.toHexString(lastQueued),
+ Long.toHexString(lastQueued + 1));
+ LOG.error(msg);
+ throw new Exception(msg);
+ }
+ // We can't tell whether it is a data loss. Given that new epoch
is rare,
+ // log at warn should not be too verbose.
+ LOG.warn("DIFF sync got new epoch proposal 0x{}, last queued 0x{},
expected 0x{}",
+ Long.toHexString(pif.hdr.getZxid()),
Long.toHexString(lastQueued),
+ Long.toHexString(lastQueued + 1));
+ }
+ return pif.hdr.getZxid();
+ }
+
/**
* Finally, synchronize our history with the Leader (if Follower)
* or the LearnerMaster (if Observer).
@@ -609,6 +634,8 @@ protected void syncWithLeader(long newLeaderZxid) throws
Exception {
zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
zk.createSessionTracker();
+ // TODO: Ideally, this should be lastProcessZxid(a.k.a.
QuorumPacket::zxid from above), but currently
+ // LearnerHandler does not guarantee this. So, let's be
conservative and keep it unchange for now.
long lastQueued = 0;
// in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the
NEWLEADER message, but in pre V1.0
@@ -630,13 +657,7 @@ protected void syncWithLeader(long newLeaderZxid) throws
Exception {
pif.hdr = logEntry.getHeader();
pif.rec = logEntry.getTxn();
pif.digest = logEntry.getDigest();
- if (pif.hdr.getZxid() != lastQueued + 1) {
- LOG.warn(
- "Got zxid 0x{} expected 0x{}",
- Long.toHexString(pif.hdr.getZxid()),
- Long.toHexString(lastQueued + 1));
- }
- lastQueued = pif.hdr.getZxid();
+ lastQueued = enforceContinuousProposal(lastQueued, pif);
if (pif.hdr.getType() == OpCode.reconfig) {
SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
@@ -666,7 +687,7 @@ protected void syncWithLeader(long newLeaderZxid) throws
Exception {
Long.toHexString(qp.getZxid()),
Long.toHexString(pif.hdr.getZxid()));
} else {
- zk.processTxn(pif.hdr, pif.rec);
+ zk.processTxn(pif.toRequest());
packetsNotLogged.remove();
}
} else {
@@ -696,18 +717,11 @@ protected void syncWithLeader(long newLeaderZxid) throws
Exception {
packet.rec = logEntry.getTxn();
packet.hdr = logEntry.getHeader();
packet.digest = logEntry.getDigest();
- // Log warning message if txn comes out-of-order
- if (packet.hdr.getZxid() != lastQueued + 1) {
- LOG.warn(
- "Got zxid 0x{} expected 0x{}",
- Long.toHexString(packet.hdr.getZxid()),
- Long.toHexString(lastQueued + 1));
- }
- lastQueued = packet.hdr.getZxid();
+ lastQueued = enforceContinuousProposal(lastQueued,
packet);
}
if (!writeToTxnLog) {
// Apply to db directly if we haven't taken the
snapshot
- zk.processTxn(packet.hdr, packet.rec);
+ zk.processTxn(packet.toRequest());
} else {
packetsNotLogged.add(packet);
packetsCommitted.add(qp.getZxid());
@@ -780,8 +794,9 @@ protected void syncWithLeader(long newLeaderZxid) throws
Exception {
continue;
}
packetsNotLogged.removeFirst();
- fzk.appendRequest(pif.hdr, pif.rec, pif.digest);
- fzk.processTxn(pif.hdr, pif.rec);
+ Request request = pif.toRequest();
+ fzk.appendRequest(request);
+ fzk.processTxn(request);
}
// @see
https://issues.apache.org/jira/browse/ZOOKEEPER-4646
@@ -823,7 +838,7 @@ protected void syncWithLeader(long newLeaderZxid) throws
Exception {
if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotLogged) {
- fzk.logRequest(p.hdr, p.rec, p.digest);
+ fzk.logRequest(p.toRequest());
}
LOG.info("{} txns have been logged asynchronously",
packetsNotLogged.size());
@@ -847,8 +862,7 @@ protected void syncWithLeader(long newLeaderZxid) throws
Exception {
continue;
}
packetsCommitted.remove();
- Request request = new Request(p.hdr.getClientId(),
p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1);
- request.setTxnDigest(p.digest);
+ Request request = p.toRequest();
ozk.commitRequest(request);
}
} else {
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
index d3aa41b5f..334fa54c1 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
@@ -202,12 +202,8 @@ protected void processPacket(QuorumPacket qp) throws
Exception {
case Leader.INFORM:
ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
logEntry = SerializeUtils.deserializeTxn(qp.getData());
- hdr = logEntry.getHeader();
- txn = logEntry.getTxn();
- digest = logEntry.getDigest();
- Request request = new Request(hdr.getClientId(), hdr.getCxid(),
hdr.getType(), hdr, txn, 0);
+ Request request = logEntry.toRequest();
request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
- request.setTxnDigest(digest);
ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk;
obs.commitRequest(request);
break;
@@ -219,13 +215,10 @@ protected void processPacket(QuorumPacket qp) throws
Exception {
byte[] remainingdata = new byte[buffer.remaining()];
buffer.get(remainingdata);
logEntry = SerializeUtils.deserializeTxn(remainingdata);
- hdr = logEntry.getHeader();
txn = logEntry.getTxn();
- digest = logEntry.getDigest();
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn)
txn).getData(), UTF_8));
- request = new Request(hdr.getClientId(), hdr.getCxid(),
hdr.getType(), hdr, txn, 0);
- request.setTxnDigest(digest);
+ request = logEntry.toRequest();
obs = (ObserverZooKeeperServer) zk;
boolean majorChange = self.processReconfig(qv, suggestedLeaderId,
qp.getZxid(), true);
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java
index 75d6fe680..b52ea3418 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java
@@ -60,6 +60,7 @@ public class TxnLogDigestTest extends ClientBase {
@BeforeEach
public void setUp() throws Exception {
+ System.setProperty("zookeeper.test.allowDiscontinuousProposals",
"true");
super.setUp();
server = serverFactory.getZooKeeperServer();
zk = createClient();
@@ -67,6 +68,7 @@ public void setUp() throws Exception {
@AfterEach
public void tearDown() throws Exception {
+ System.clearProperty("zookeeper.test.allowDiscontinuousProposals");
// server will be closed in super.tearDown
super.tearDown();
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java
index 031ccc2f7..b23fd80a3 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java
@@ -60,6 +60,7 @@ private ZooKeeper getClient(int idx) {
@BeforeEach
public void setUp() throws Exception {
System.setProperty("zookeeper.admin.enableServer", "false");
+ System.setProperty("zookeeper.test.allowDiscontinuousProposals",
"true");
// set the snap count to something low so that we force log rollover
// and verify that is working as part of the epoch rollover.
@@ -215,6 +216,7 @@ private void adjustEpochNearEnd() {
@AfterEach
public void tearDown() throws Exception {
+ System.clearProperty("zookeeper.test.allowDiscontinuousProposals");
LOG.info("tearDown starting");
for (int i = 0; i < zkClients.length; i++) {
zkClients[i].close();
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSyncTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSyncTest.java
new file mode 100644
index 000000000..c4b7720cf
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSyncTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import java.util.Comparator;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.QuorumUtil;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+public class QuorumSyncTest extends ZKTestCase {
+ private QuorumUtil qu;
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (qu != null) {
+ qu.shutdownAll();
+ }
+ }
+
+ @Test
+ public void testStaleDiffSync() throws Exception {
+ qu = new QuorumUtil(2);
+ qu.startAll();
+
+ int[] followerIds = qu.getFollowerQuorumPeers()
+ .stream()
+ .sorted(Comparator.comparingLong(QuorumPeer::getMyId).reversed())
+ .mapToInt(peer -> (int) peer.getMyId()).toArray();
+
+ int follower1 = followerIds[0];
+ int follower2 = followerIds[1];
+
+ String leaderConnectString =
qu.getConnectString(qu.getLeaderQuorumPeer());
+ try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) {
+ qu.shutdown(follower2);
+
+ for (int i = 0; i < 10; i++) {
+ zk.create("/foo" + i, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ }
+
+ qu.shutdown(follower1);
+
+ for (int i = 0; i < 10; i++) {
+ zk.create("/bar" + i, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ }
+
+ qu.restart(follower1);
+ }
+
+ try (ZooKeeper zk =
ClientBase.createZKClient(qu.getConnectionStringForServer(follower1))) {
+ for (int i = 0; i < 10; i++) {
+ String path = "/foo" + i;
+ assertNotNull(zk.exists(path, false), path + " not found");
+ }
+
+ for (int i = 0; i < 10; i++) {
+ String path = "/bar" + i;
+ assertNotNull(zk.exists(path, false), path + " not found");
+ }
+ }
+
+ qu.shutdown(qu.getLeaderServer());
+
+ qu.restart(follower2);
+
+ try (ZooKeeper zk =
ClientBase.createZKClient(qu.getConnectionStringForServer(follower2))) {
+ for (int i = 0; i < 10; i++) {
+ String path = "/foo" + i;
+ assertNotNull(zk.exists(path, false), path + " not found");
+ }
+
+ for (int i = 0; i < 10; i++) {
+ String path = "/bar" + i;
+ assertNotNull(zk.exists(path, false), path + " not found");
+ }
+ }
+ }
+}