This is an automated email from the ASF dual-hosted git repository.

kezhuw pushed a commit to branch branch-3.9.4
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/branch-3.9.4 by this push:
     new cf31c3ec1 ZOOKEEPER-4925: Fix data loss due to propagation of 
discontinuous committedLog (#2254) (#2266)
cf31c3ec1 is described below

commit cf31c3ec12729f5fc6cc52a0464486859fc86b92
Author: Kezhu Wang <[email protected]>
AuthorDate: Thu Aug 7 22:42:10 2025 +0800

    ZOOKEEPER-4925: Fix data loss due to propagation of discontinuous 
committedLog (#2254) (#2266)
    
    There are two variants of `ZooKeeperServer::processTxn`. Those two
    variants diverge significantly since ZOOKEEPER-3484.
    `processTxn(Request request)` pops outstanding change from
    `outstandingChanges` and adds txn to `committedLog` for follower to sync
    in addition to what `processTxn(TxnHeader hdr, Record txn)` does. The
    `Learner` uses `processTxn(TxnHeader hdr, Record txn)` to commit txn to
    memory after ZOOKEEPER-4394, which means it leaves `committedLog`
    untouched in `SYNCHRONIZATION` phase.
    
    This way, a stale follower will have hole in its `committedLog` after
    joining cluster. The stale follower will propagate the in memory hole
    to other stale nodes after becoming leader. This causes data loss.
    
    The test case fails on master and 3.9.3, and passes on 3.9.2. So only
    3.9.3 is affected.
    
    This commit drops `processTxn(TxnHeader hdr, Record txn)` as
    `processTxn(Request request)` is capable in `SYNCHRONIZATION` phase too.
    
    Also, this commit rejects discontinuous proposals in `syncWithLeader`
    and `committedLog`, so to avoid possible data loss.
    
    Refs: ZOOKEEPER-4925, ZOOKEEPER-4394, ZOOKEEPER-3484
    
    Reviewers: li4wang
    Author: kezhuw
    Closes #2254 from kezhuw/ZOOKEEPER-4925-fix-data-loss
    
    (cherry picked from commit e5dd60bf0512ccc1e090d99410a8da48623219da)
    
    Signed-off-by: Kezhu Wang <[email protected]>
---
 .../java/org/apache/zookeeper/server/Request.java  |  13 +++
 .../org/apache/zookeeper/server/TxnLogEntry.java   |   4 +
 .../org/apache/zookeeper/server/ZKDatabase.java    |  28 ++++--
 .../apache/zookeeper/server/ZooKeeperServer.java   |  21 ++---
 .../apache/zookeeper/server/quorum/Follower.java   |   4 +-
 .../server/quorum/FollowerZooKeeperServer.java     |  34 ++-----
 .../apache/zookeeper/server/quorum/Learner.java    |  58 +++++++-----
 .../apache/zookeeper/server/quorum/Observer.java   |  11 +--
 .../apache/zookeeper/server/TxnLogDigestTest.java  |   2 +
 .../apache/zookeeper/server/ZxidRolloverTest.java  |   2 +
 .../zookeeper/server/quorum/QuorumSyncTest.java    | 100 +++++++++++++++++++++
 11 files changed, 196 insertions(+), 81 deletions(-)

diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index c8b404e79..8b6109171 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -78,6 +78,19 @@ public Request(long sessionId, int xid, int type, TxnHeader 
hdr, Record txn, lon
         this.authInfo = null;
     }
 
+    public Request(TxnHeader hdr, Record txn, TxnDigest digest) {
+        this.sessionId = hdr.getClientId();
+        this.cxid = hdr.getCxid();
+        this.type = hdr.getType();
+        this.hdr = hdr;
+        this.txn = txn;
+        this.zxid = hdr.getZxid();
+        this.request = null;
+        this.cnxn = null;
+        this.authInfo = null;
+        this.txnDigest = digest;
+    }
+
     public final long sessionId;
 
     public final int cxid;
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java
index 352eb81da..409fd21fa 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java
@@ -47,4 +47,8 @@ public TxnHeader getHeader() {
     public TxnDigest getDigest() {
         return digest;
     }
+
+    public Request toRequest() {
+        return new Request(header, txn, digest);
+    }
 }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
index 7258daa7c..7a26d8362 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
@@ -58,6 +58,7 @@
 import org.apache.zookeeper.server.quorum.Leader.PureRequestProposal;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.slf4j.Logger;
@@ -82,6 +83,8 @@ public class ZKDatabase {
     protected FileTxnSnapLog snapLog;
     protected long minCommittedLog, maxCommittedLog;
 
+    private final boolean allowDiscontinuousProposals = 
Boolean.getBoolean("zookeeper.test.allowDiscontinuousProposals");
+
     /**
      * Default value is to use snapshot if txnlog size exceeds 1/3 the size of 
snapshot
      */
@@ -170,8 +173,6 @@ public boolean isInitialized() {
      * data structures in zkdatabase.
      */
     public void clear() {
-        minCommittedLog = 0;
-        maxCommittedLog = 0;
         /* to be safe we just create a new
          * datatree.
          */
@@ -182,6 +183,8 @@ public void clear() {
         try {
             lock.lock();
             committedLog.clear();
+            minCommittedLog = 0;
+            maxCommittedLog = 0;
         } finally {
             lock.unlock();
         }
@@ -320,17 +323,30 @@ public void addCommittedProposal(Request request) {
         WriteLock wl = logLock.writeLock();
         try {
             wl.lock();
-            if (committedLog.size() > commitLogCount) {
-                committedLog.remove();
-                minCommittedLog = committedLog.peek().getZxid();
-            }
             if (committedLog.isEmpty()) {
                 minCommittedLog = request.zxid;
                 maxCommittedLog = request.zxid;
+            } else if (request.zxid <= maxCommittedLog) {
+                // This could happen if lastProcessedZxid is rewinded and 
database is re-synced.
+                // Currently, it only happens in test codes, but it should 
also be safe for production path.
+                return;
+            } else if (!allowDiscontinuousProposals
+                    && request.zxid != maxCommittedLog + 1
+                    && ZxidUtils.getEpochFromZxid(request.zxid) <= 
ZxidUtils.getEpochFromZxid(maxCommittedLog)) {
+                String msg = String.format(
+                    "Committed proposal cached out of order: 0x%s is not the 
next proposal of 0x%s",
+                    ZxidUtils.zxidToString(request.zxid),
+                    ZxidUtils.zxidToString(maxCommittedLog));
+                LOG.error(msg);
+                throw new IllegalStateException(msg);
             }
             PureRequestProposal p = new PureRequestProposal(request);
             committedLog.add(p);
             maxCommittedLog = p.getZxid();
+            if (committedLog.size() > commitLogCount) {
+                committedLog.remove();
+                minCommittedLog = committedLog.peek().getZxid();
+            }
         } finally {
             wl.unlock();
         }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index fc677b329..114e7afc2 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -1846,13 +1846,6 @@ private void processSasl(RequestRecord request, 
ServerCnxn cnxn, RequestHeader r
         cnxn.sendResponse(replyHeader, record, "response");
     }
 
-    // entry point for quorum/Learner.java
-    public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
-        processTxnForSessionEvents(null, hdr, txn);
-        return processTxnInDB(hdr, txn, null);
-    }
-
-    // entry point for FinalRequestProcessor.java
     public ProcessTxnResult processTxn(Request request) {
         TxnHeader hdr = request.getHdr();
         processTxnForSessionEvents(request, hdr, request.getTxn());
@@ -1864,8 +1857,10 @@ public ProcessTxnResult processTxn(Request request) {
         if (!writeRequest && !quorumRequest) {
             return new ProcessTxnResult();
         }
+
+        ProcessTxnResult rc;
         synchronized (outstandingChanges) {
-            ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), 
request.getTxnDigest());
+            rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
 
             // request.hdr is set for write requests, which are the only ones
             // that add to outstandingChanges.
@@ -1886,13 +1881,13 @@ public ProcessTxnResult processTxn(Request request) {
                     }
                 }
             }
+        }
 
-            // do not add non quorum packets to the queue.
-            if (quorumRequest) {
-                getZKDatabase().addCommittedProposal(request);
-            }
-            return rc;
+        // do not add non quorum packets to the queue.
+        if (quorumRequest) {
+            getZKDatabase().addCommittedProposal(request);
         }
+        return rc;
     }
 
     private void processTxnForSessionEvents(Request request, TxnHeader hdr, 
Record txn) {
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
index 0eff9d248..ca99974cb 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
@@ -35,7 +35,6 @@
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.SetDataTxn;
-import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -164,7 +163,6 @@ protected void processPacket(QuorumPacket qp) throws 
Exception {
             TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());
             TxnHeader hdr = logEntry.getHeader();
             Record txn = logEntry.getTxn();
-            TxnDigest digest = logEntry.getDigest();
             if (hdr.getZxid() != lastQueued + 1) {
                 LOG.warn(
                     "Got zxid 0x{} expected 0x{}",
@@ -179,7 +177,7 @@ protected void processPacket(QuorumPacket qp) throws 
Exception {
                 self.setLastSeenQuorumVerifier(qv, true);
             }
 
-            fzk.logRequest(hdr, txn, digest);
+            fzk.logRequest(logEntry.toRequest());
             if (hdr != null) {
                 /*
                  * Request header is created only by the leader, so this is 
only set
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
index b67661999..1b0b5cd92 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
@@ -22,7 +22,6 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import javax.management.JMException;
-import org.apache.jute.Record;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.metrics.MetricsContext;
 import org.apache.zookeeper.server.ExitCode;
@@ -33,8 +32,6 @@
 import org.apache.zookeeper.server.SyncRequestProcessor;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.txn.TxnDigest;
-import org.apache.zookeeper.txn.TxnHeader;
 import org.apache.zookeeper.util.ServiceUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,20 +76,17 @@ protected void setupRequestProcessors() {
 
     LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<>();
 
-    public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {
-        final Request request = buildRequestToProcess(hdr, txn, digest);
+    public void logRequest(Request request) {
+        if ((request.zxid & 0xffffffffL) != 0) {
+            pendingTxns.add(request);
+        }
         syncProcessor.processRequest(request);
     }
 
     /**
-     * Build a request for the txn and append it to the transaction log
-     * @param hdr the txn header
-     * @param txn the txn
-     * @param digest the digest of txn
+     * Append txn request to the transaction log directly without go through 
request processors.
      */
-    public void appendRequest(final TxnHeader hdr, final Record txn, final 
TxnDigest digest) throws IOException {
-        final Request request = new Request(hdr.getClientId(), hdr.getCxid(), 
hdr.getType(), hdr, txn, hdr.getZxid());
-        request.setTxnDigest(digest);
+    public void appendRequest(Request request) throws IOException {
         getZKDatabase().append(request);
     }
 
@@ -188,20 +182,4 @@ protected void unregisterMetrics() {
         rootContext.unregisterGauge("synced_observers");
 
     }
-
-    /**
-     * Build a request for the txn
-     * @param hdr the txn header
-     * @param txn the txn
-     * @param digest the digest of txn
-     * @return a request moving through a chain of RequestProcessors
-     */
-    private Request buildRequestToProcess(final TxnHeader hdr, final Record 
txn, final TxnDigest digest) {
-        final Request request = new Request(hdr.getClientId(), hdr.getCxid(), 
hdr.getType(), hdr, txn, hdr.getZxid());
-        request.setTxnDigest(digest);
-        if ((request.zxid & 0xffffffffL) != 0) {
-            pendingTxns.add(request);
-        }
-        return request;
-    }
 }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 1ef99e50a..adf0ef6e5 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -82,6 +82,10 @@ static class PacketInFlight {
         Record rec;
         TxnDigest digest;
 
+        Request toRequest() {
+            return new Request(hdr, rec, digest);
+        }
+
     }
 
     QuorumPeer self;
@@ -535,6 +539,27 @@ protected long registerWithLeader(int pktType) throws 
IOException {
         }
     }
 
+    long enforceContinuousProposal(long lastQueued, PacketInFlight pif) throws 
Exception {
+        if (lastQueued == 0) {
+            LOG.info("DIFF sync got first proposal 0x{}", 
Long.toHexString(pif.hdr.getZxid()));
+        } else if (pif.hdr.getZxid() != lastQueued + 1) {
+            if (ZxidUtils.getEpochFromZxid(pif.hdr.getZxid()) <= 
ZxidUtils.getEpochFromZxid(lastQueued)) {
+                String msg = String.format(
+                    "DIFF sync got proposal 0x%s, last queued 0x%s, expected 
0x%s",
+                    Long.toHexString(pif.hdr.getZxid()), 
Long.toHexString(lastQueued),
+                    Long.toHexString(lastQueued + 1));
+                LOG.error(msg);
+                throw new Exception(msg);
+            }
+            // We can't tell whether it is a data loss. Given that new epoch 
is rare,
+            // log at warn should not be too verbose.
+            LOG.warn("DIFF sync got new epoch proposal 0x{}, last queued 0x{}, 
expected 0x{}",
+                Long.toHexString(pif.hdr.getZxid()), 
Long.toHexString(lastQueued),
+                Long.toHexString(lastQueued + 1));
+        }
+        return pif.hdr.getZxid();
+    }
+
     /**
      * Finally, synchronize our history with the Leader (if Follower)
      * or the LearnerMaster (if Observer).
@@ -609,6 +634,8 @@ protected void syncWithLeader(long newLeaderZxid) throws 
Exception {
             
zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
             zk.createSessionTracker();
 
+            // TODO: Ideally, this should be lastProcessZxid(a.k.a. 
QuorumPacket::zxid from above), but currently
+            // LearnerHandler does not guarantee this. So, let's be 
conservative and keep it unchange for now.
             long lastQueued = 0;
 
             // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the 
NEWLEADER message, but in pre V1.0
@@ -630,13 +657,7 @@ protected void syncWithLeader(long newLeaderZxid) throws 
Exception {
                     pif.hdr = logEntry.getHeader();
                     pif.rec = logEntry.getTxn();
                     pif.digest = logEntry.getDigest();
-                    if (pif.hdr.getZxid() != lastQueued + 1) {
-                        LOG.warn(
-                            "Got zxid 0x{} expected 0x{}",
-                            Long.toHexString(pif.hdr.getZxid()),
-                            Long.toHexString(lastQueued + 1));
-                    }
-                    lastQueued = pif.hdr.getZxid();
+                    lastQueued = enforceContinuousProposal(lastQueued, pif);
 
                     if (pif.hdr.getType() == OpCode.reconfig) {
                         SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
@@ -666,7 +687,7 @@ protected void syncWithLeader(long newLeaderZxid) throws 
Exception {
                                 Long.toHexString(qp.getZxid()),
                                 Long.toHexString(pif.hdr.getZxid()));
                         } else {
-                            zk.processTxn(pif.hdr, pif.rec);
+                            zk.processTxn(pif.toRequest());
                             packetsNotLogged.remove();
                         }
                     } else {
@@ -696,18 +717,11 @@ protected void syncWithLeader(long newLeaderZxid) throws 
Exception {
                         packet.rec = logEntry.getTxn();
                         packet.hdr = logEntry.getHeader();
                         packet.digest = logEntry.getDigest();
-                        // Log warning message if txn comes out-of-order
-                        if (packet.hdr.getZxid() != lastQueued + 1) {
-                            LOG.warn(
-                                "Got zxid 0x{} expected 0x{}",
-                                Long.toHexString(packet.hdr.getZxid()),
-                                Long.toHexString(lastQueued + 1));
-                        }
-                        lastQueued = packet.hdr.getZxid();
+                        lastQueued = enforceContinuousProposal(lastQueued, 
packet);
                     }
                     if (!writeToTxnLog) {
                         // Apply to db directly if we haven't taken the 
snapshot
-                        zk.processTxn(packet.hdr, packet.rec);
+                        zk.processTxn(packet.toRequest());
                     } else {
                         packetsNotLogged.add(packet);
                         packetsCommitted.add(qp.getZxid());
@@ -780,8 +794,9 @@ protected void syncWithLeader(long newLeaderZxid) throws 
Exception {
                                 continue;
                             }
                             packetsNotLogged.removeFirst();
-                            fzk.appendRequest(pif.hdr, pif.rec, pif.digest);
-                            fzk.processTxn(pif.hdr, pif.rec);
+                            Request request = pif.toRequest();
+                            fzk.appendRequest(request);
+                            fzk.processTxn(request);
                         }
 
                         // @see 
https://issues.apache.org/jira/browse/ZOOKEEPER-4646
@@ -823,7 +838,7 @@ protected void syncWithLeader(long newLeaderZxid) throws 
Exception {
         if (zk instanceof FollowerZooKeeperServer) {
             FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
             for (PacketInFlight p : packetsNotLogged) {
-                fzk.logRequest(p.hdr, p.rec, p.digest);
+                fzk.logRequest(p.toRequest());
             }
             LOG.info("{} txns have been logged asynchronously", 
packetsNotLogged.size());
 
@@ -847,8 +862,7 @@ protected void syncWithLeader(long newLeaderZxid) throws 
Exception {
                     continue;
                 }
                 packetsCommitted.remove();
-                Request request = new Request(p.hdr.getClientId(), 
p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1);
-                request.setTxnDigest(p.digest);
+                Request request = p.toRequest();
                 ozk.commitRequest(request);
             }
         } else {
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
index d3aa41b5f..334fa54c1 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java
@@ -202,12 +202,8 @@ protected void processPacket(QuorumPacket qp) throws 
Exception {
         case Leader.INFORM:
             ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
             logEntry = SerializeUtils.deserializeTxn(qp.getData());
-            hdr = logEntry.getHeader();
-            txn = logEntry.getTxn();
-            digest = logEntry.getDigest();
-            Request request = new Request(hdr.getClientId(), hdr.getCxid(), 
hdr.getType(), hdr, txn, 0);
+            Request request = logEntry.toRequest();
             
request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
-            request.setTxnDigest(digest);
             ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk;
             obs.commitRequest(request);
             break;
@@ -219,13 +215,10 @@ protected void processPacket(QuorumPacket qp) throws 
Exception {
             byte[] remainingdata = new byte[buffer.remaining()];
             buffer.get(remainingdata);
             logEntry = SerializeUtils.deserializeTxn(remainingdata);
-            hdr = logEntry.getHeader();
             txn = logEntry.getTxn();
-            digest = logEntry.getDigest();
             QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) 
txn).getData(), UTF_8));
 
-            request = new Request(hdr.getClientId(), hdr.getCxid(), 
hdr.getType(), hdr, txn, 0);
-            request.setTxnDigest(digest);
+            request = logEntry.toRequest();
             obs = (ObserverZooKeeperServer) zk;
 
             boolean majorChange = self.processReconfig(qv, suggestedLeaderId, 
qp.getZxid(), true);
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java
index 75d6fe680..b52ea3418 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java
@@ -60,6 +60,7 @@ public class TxnLogDigestTest extends ClientBase {
 
     @BeforeEach
     public void setUp() throws Exception {
+        System.setProperty("zookeeper.test.allowDiscontinuousProposals", 
"true");
         super.setUp();
         server = serverFactory.getZooKeeperServer();
         zk = createClient();
@@ -67,6 +68,7 @@ public void setUp() throws Exception {
 
     @AfterEach
     public void tearDown() throws Exception {
+        System.clearProperty("zookeeper.test.allowDiscontinuousProposals");
         // server will be closed in super.tearDown
         super.tearDown();
 
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java
index 031ccc2f7..b23fd80a3 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java
@@ -60,6 +60,7 @@ private ZooKeeper getClient(int idx) {
     @BeforeEach
     public void setUp() throws Exception {
         System.setProperty("zookeeper.admin.enableServer", "false");
+        System.setProperty("zookeeper.test.allowDiscontinuousProposals", 
"true");
 
         // set the snap count to something low so that we force log rollover
         // and verify that is working as part of the epoch rollover.
@@ -215,6 +216,7 @@ private void adjustEpochNearEnd() {
 
     @AfterEach
     public void tearDown() throws Exception {
+        System.clearProperty("zookeeper.test.allowDiscontinuousProposals");
         LOG.info("tearDown starting");
         for (int i = 0; i < zkClients.length; i++) {
             zkClients[i].close();
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSyncTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSyncTest.java
new file mode 100644
index 000000000..c4b7720cf
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSyncTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import java.util.Comparator;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.QuorumUtil;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+public class QuorumSyncTest extends ZKTestCase {
+    private QuorumUtil qu;
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        if (qu != null) {
+            qu.shutdownAll();
+        }
+    }
+
+    @Test
+    public void testStaleDiffSync() throws Exception {
+        qu = new QuorumUtil(2);
+        qu.startAll();
+
+        int[] followerIds = qu.getFollowerQuorumPeers()
+            .stream()
+            .sorted(Comparator.comparingLong(QuorumPeer::getMyId).reversed())
+            .mapToInt(peer -> (int) peer.getMyId()).toArray();
+
+        int follower1 = followerIds[0];
+        int follower2 = followerIds[1];
+
+        String leaderConnectString = 
qu.getConnectString(qu.getLeaderQuorumPeer());
+        try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) {
+            qu.shutdown(follower2);
+
+            for (int i = 0; i < 10; i++) {
+                zk.create("/foo" + i, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+            }
+
+            qu.shutdown(follower1);
+
+            for (int i = 0; i < 10; i++) {
+                zk.create("/bar" + i, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+            }
+
+            qu.restart(follower1);
+        }
+
+        try (ZooKeeper zk = 
ClientBase.createZKClient(qu.getConnectionStringForServer(follower1))) {
+            for (int i = 0; i < 10; i++) {
+                String path = "/foo" + i;
+                assertNotNull(zk.exists(path, false), path + " not found");
+            }
+
+            for (int i = 0; i < 10; i++) {
+                String path = "/bar" + i;
+                assertNotNull(zk.exists(path, false), path + " not found");
+            }
+        }
+
+        qu.shutdown(qu.getLeaderServer());
+
+        qu.restart(follower2);
+
+        try (ZooKeeper zk = 
ClientBase.createZKClient(qu.getConnectionStringForServer(follower2))) {
+            for (int i = 0; i < 10; i++) {
+                String path = "/foo" + i;
+                assertNotNull(zk.exists(path, false), path + " not found");
+            }
+
+            for (int i = 0; i < 10; i++) {
+                String path = "/bar" + i;
+                assertNotNull(zk.exists(path, false), path + " not found");
+            }
+        }
+    }
+}

Reply via email to