This is an automated email from the ASF dual-hosted git repository.

hanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new ee04684  ZOOKEEPER-3150: Add tree digest check and verify data 
integrity when loading from disk
ee04684 is described below

commit ee04684b259fe00fe3dd8d3b697ba011c71e120b
Author: Fangmin Lyu <fang...@apache.org>
AuthorDate: Wed Jul 31 10:06:04 2019 -0700

    ZOOKEEPER-3150: Add tree digest check and verify data integrity when 
loading from disk
    
    Jira ZOOKEEPER-3114 will be divided into two parts:
    
    1. data integrity check when loading snapshot/txns from disk
    2. real time data consistency check when syncing and following leader
    
    This is the first part, which is going to check the data integrity by 
calculating the hash value of data tree, and compare the value when reload the 
snapshot/txns from disk.
    
    Author: Fangmin Lyu <fang...@apache.org>
    Author: Fangmin Lyu <allen...@fb.com>
    
    Reviewers: Michael Han <h...@apache.org>, Enrico Olivelli 
<eolive...@gmail.com>, Edward Ribeiro
    
    Closes #632 from lvfangmin/ZOOKEEPER-3150
---
 .../src/main/resources/markdown/zookeeperAdmin.md  |  11 +
 .../java/org/apache/zookeeper/DigestWatcher.java   |  32 +++
 .../main/java/org/apache/zookeeper/ZooDefs.java    |   2 +
 .../java/org/apache/zookeeper/server/DataNode.java |  30 +-
 .../java/org/apache/zookeeper/server/DataTree.java | 310 ++++++++++++++++++---
 .../org/apache/zookeeper/server/NodeHashMap.java   | 104 +++++++
 .../apache/zookeeper/server/NodeHashMapImpl.java   | 118 ++++++++
 .../org/apache/zookeeper/server/RateLogger.java    |  38 ++-
 .../org/apache/zookeeper/server/ServerMetrics.java |   6 +
 .../apache/zookeeper/server/ZooKeeperServer.java   |   3 +-
 .../apache/zookeeper/server/admin/Commands.java    |  17 ++
 .../zookeeper/server/command/CommandExecutor.java  |   3 +
 .../zookeeper/server/command/DigestCommand.java    |  49 ++++
 .../server/command/FourLetterCommands.java         |   8 +
 .../zookeeper/server/persistence/FileSnap.java     |  39 +++
 .../server/persistence/FileTxnSnapLog.java         |  15 +-
 .../zookeeper/server/persistence/SnapStream.java   |   1 +
 .../org/apache/zookeeper/server/util/AdHash.java   |  84 ++++++
 .../zookeeper/server/util/DigestCalculator.java    | 145 ++++++++++
 .../org/apache/zookeeper/server/DataTreeTest.java  |  96 ++++++-
 .../zookeeper/server/NodeHashMapImplTest.java      |  98 +++++++
 .../zookeeper/server/SnapshotDigestTest.java       | 222 +++++++++++++++
 .../apache/zookeeper/server/util/AdHashTest.java   | 103 +++++++
 .../java/org/apache/zookeeper/test/ClientBase.java |  11 +
 .../zookeeper/test/LoadFromLogNoServerTest.java    |  76 +++--
 .../src/test/resources/findbugsExcludeFile.xml     |   2 +-
 26 files changed, 1535 insertions(+), 88 deletions(-)

diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md 
b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 5c71832..eea7e52 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -903,6 +903,13 @@ property, when available, is noted below.
     The maximum number of diff syncs a leader or a follower can serve at the 
same
     time. The default is 100.
 
+* *digest.enabled* :
+    (Java system property only: **zookeeper.digest.enabled**)
+    **New in 3.6.0:**
+    The digest feature is added to self-verify the correctness inside
+    ZooKeeper when loading database from disk, and syncing with leader.
+    By default, this feautre is disabled, set "true" to enable it.
+
 <a name="sc_clusterOptions"></a>
 
 #### Cluster Options
@@ -1801,6 +1808,10 @@ The output contains multiple lines with the following 
format:
     server is running in read-only mode.  The server will respond with
     "ro" if in read-only mode or "rw" if not in read-only mode.
 
+* *hash* :
+    **New in 3.6.0:**
+    Return the latest history of the tree digest associated with zxid.
+
 * *gtmk* :
     Gets the current trace mask as a 64-bit signed long value in
     decimal format.  See `stmk` for an explanation of
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/DigestWatcher.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/DigestWatcher.java
new file mode 100644
index 0000000..15035c8
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/DigestWatcher.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+/**
+ * This interface is used to notify the digest mismatch event.
+ */
+public interface DigestWatcher {
+
+    /**
+     * Called when the digest mismatch is found on a given zxid.
+     *
+     * @param mismatchZxid the zxid when the digest mismatch happened. 
+     */
+    void process(long mismatchZxid);
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java
index 20e0f28..1f44c1e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java
@@ -31,6 +31,8 @@ public class ZooDefs {
    
    final public static String CONFIG_NODE = "/zookeeper/config";
 
+   final public static String ZOOKEEPER_NODE_SUBTREE = "/zookeeper/";
+
    @InterfaceAudience.Public
     public interface OpCode {
         public final int notification = 0;
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataNode.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataNode.java
index e4de2ca..a88cdde 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataNode.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataNode.java
@@ -37,8 +37,16 @@ import org.apache.zookeeper.data.StatPersisted;
  * array of ACLs, a stat object, and a set of its children's paths.
  * 
  */
-@SuppressFBWarnings("EI_EXPOSE_REP2")
+@SuppressFBWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
 public class DataNode implements Record {
+
+    // the digest value of this node, calculated from path, data and stat
+    private volatile long digest;
+
+    // indicate if the digest of this node is up to date or not, used to 
+    // optimize the performance.
+    volatile boolean digestCached;
+
     /** the data for this datanode */
     byte data[];
 
@@ -182,4 +190,24 @@ public class DataNode implements Record {
         stat.serialize(archive, "statpersisted");
         archive.endRecord(this, "node");
     }
+
+    public boolean isDigestCached() {
+        return digestCached;
+    }
+
+    public void setDigestCached(boolean digestCached) {
+        this.digestCached = digestCached;
+    }
+
+    public long getDigest() {
+        return digest;
+    }
+
+    public void setDigest(long digest) {
+        this.digest = digest;
+    }
+
+    public byte[] getData() {
+        return data;
+    }
 }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
index ce27c42..0ffad0f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
@@ -21,6 +21,7 @@ package org.apache.zookeeper.server;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
+import org.apache.zookeeper.DigestWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -39,6 +40,7 @@ import org.apache.zookeeper.common.PathTrie;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.data.StatPersisted;
+import org.apache.zookeeper.server.util.DigestCalculator;
 import org.apache.zookeeper.server.watch.IWatchManager;
 import org.apache.zookeeper.server.watch.WatchManagerFactory;
 import org.apache.zookeeper.server.watch.WatcherOrBitSet;
@@ -59,6 +61,7 @@ import org.apache.zookeeper.txn.TxnHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.nio.ByteBuffer;
@@ -67,6 +70,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -86,12 +90,13 @@ import java.util.concurrent.atomic.AtomicLong;
 public class DataTree {
     private static final Logger LOG = LoggerFactory.getLogger(DataTree.class);
 
+    private final RateLogger RATE_LOGGER = new RateLogger(LOG, 15 * 60 * 1000);
+
     /**
-     * This hashtable provides a fast lookup to the datanodes. The tree is the
+     * This map provides a fast lookup to the datanodes. The tree is the
      * source of truth and is where all the locking occurs
      */
-    private final ConcurrentHashMap<String, DataNode> nodes =
-        new ConcurrentHashMap<String, DataNode>();
+    private final NodeHashMap nodes = new NodeHashMapImpl();
 
     private IWatchManager dataWatches;
 
@@ -159,6 +164,26 @@ public class DataTree {
 
     private final ReferenceCountedACLCache aclCache = new 
ReferenceCountedACLCache();
 
+    // The maximum number of tree digests that we will keep in our history
+    public static final int DIGEST_LOG_LIMIT = 1024;
+
+    // Dump digest every 128 txns, in hex it's 80, which will make it easier 
+    // to align and compare between servers.
+    public static final int DIGEST_LOG_INTERVAL = 128;
+
+    // If this is not null, we are actively looking for a target zxid that we
+    // want to validate the digest for
+    private ZxidDigest digestFromLoadedSnapshot;
+
+    // The digest associated with the highest zxid in the data tree.
+    private volatile ZxidDigest lastProcessedZxidDigest;
+
+    // Will be notified when digest mismatch event triggered.
+    private final List<DigestWatcher> digestWatchers = new ArrayList<>();
+
+    // The historical digests list.
+    private LinkedList<ZxidDigest> digestLog = new LinkedList<>();
+
     @SuppressWarnings("unchecked")
     public Set<String> getEphemerals(long sessionId) {
         HashSet<String> retv = ephemerals.get(sessionId);
@@ -252,7 +277,7 @@ public class DataTree {
     public DataTree() {
         /* Rather than fight it, let root have an alias */
         nodes.put("", root);
-        nodes.put(rootZookeeper, root);
+        nodes.putWithoutDigest(rootZookeeper, root);
 
         /** add the proc node and quota node */
         root.addChild(procChildZookeeper);
@@ -468,6 +493,7 @@ public class DataTree {
                 throw new KeeperException.NodeExistsException();
             }
 
+            nodes.preChange(parentName, parent);
             if (parentCVersion == -1) {
                 parentCVersion = parent.stat.getCversion();
                 parentCVersion++;
@@ -483,6 +509,7 @@ public class DataTree {
             }
             DataNode child = new DataNode(data, longval, stat);
             parent.addChild(childName);
+            nodes.postChange(parentName, parent);
             nodeDataSize.addAndGet(getNodeSize(path, child.data));
             nodes.put(path, child);
             EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
@@ -553,6 +580,7 @@ public class DataTree {
             throw new KeeperException.NoNodeException();
         }
         synchronized (parent) {
+            nodes.preChange(parentName, parent);
             parent.removeChild(childName);
             // Only update pzxid when the zxid is larger than the current 
pzxid,
             // otherwise we might override some higher pzxid set by a create
@@ -560,6 +588,7 @@ public class DataTree {
             if (zxid > parent.stat.getPzxid()) {
                 parent.stat.setPzxid(zxid);
             }
+            nodes.postChange(parentName, parent);
         }
 
         DataNode node = nodes.get(path);
@@ -634,11 +663,13 @@ public class DataTree {
         byte lastdata[] = null;
         synchronized (n) {
             lastdata = n.data;
+            nodes.preChange(path, n);
             n.data = data;
             n.stat.setMtime(time);
             n.stat.setMzxid(zxid);
             n.stat.setVersion(version);
             n.copyStat(s);
+            nodes.postChange(path, n);
         }
         // now update if the path is in a quota subtree.
         String lastPrefix = getMaxPrefixWithQuota(path);
@@ -754,9 +785,11 @@ public class DataTree {
         }
         synchronized (n) {
             aclCache.removeUsage(n.acl);
+            nodes.preChange(path, n);
             n.stat.setAversion(version);
             n.acl = aclCache.convertAcls(acl);
             n.copyStat(stat);
+            nodes.postChange(path, n);
             return stat;
         }
     }
@@ -1008,6 +1041,39 @@ public class DataTree {
             }
         }
 
+        /*
+         * Snapshots are taken lazily. It can happen that the child
+         * znodes of a parent are created after the parent
+         * is serialized. Therefore, while replaying logs during restore, a
+         * create might fail because the node was already
+         * created.
+         *
+         * After seeing this failure, we should increment
+         * the cversion of the parent znode since the parent was serialized
+         * before its children.
+         *
+         * Note, such failures on DT should be seen only during
+         * restore.
+         */
+        if (header.getType() == OpCode.create &&
+                rc.err == Code.NODEEXISTS.intValue()) {
+            LOG.debug("Adjusting parent cversion for Txn: " + header.getType() 
+
+                    " path:" + rc.path + " err: " + rc.err);
+            int lastSlash = rc.path.lastIndexOf('/');
+            String parentName = rc.path.substring(0, lastSlash);
+            CreateTxn cTxn = (CreateTxn)txn;
+            try {
+                setCversionPzxid(parentName, cTxn.getParentCVersion(),
+                        header.getZxid());
+            } catch (KeeperException.NoNodeException e) {
+                LOG.error("Failed to set parent cversion for: " +
+                      parentName, e);
+                rc.err = e.code().intValue();
+            }
+        } else if (rc.err != Code.OK.intValue()) {
+            LOG.debug("Ignoring processTxn failure hdr: " + header.getType() +
+                  " : error: " + rc.err);
+        }
 
         /*
          * Things we can only update after the whole txn is applied to data
@@ -1042,41 +1108,15 @@ public class DataTree {
             if (rc.zxid > lastProcessedZxid) {
                 lastProcessedZxid = rc.zxid;
             }
-        }
 
-        /*
-         * Snapshots are taken lazily. It can happen that the child
-         * znodes of a parent are created after the parent
-         * is serialized. Therefore, while replaying logs during restore, a
-         * create might fail because the node was already
-         * created.
-         *
-         * After seeing this failure, we should increment
-         * the cversion of the parent znode since the parent was serialized
-         * before its children.
-         *
-         * Note, such failures on DT should be seen only during
-         * restore.
-         */
-        if (header.getType() == OpCode.create &&
-                rc.err == Code.NODEEXISTS.intValue()) {
-            LOG.debug("Adjusting parent cversion for Txn: " + header.getType() 
+
-                    " path:" + rc.path + " err: " + rc.err);
-            int lastSlash = rc.path.lastIndexOf('/');
-            String parentName = rc.path.substring(0, lastSlash);
-            CreateTxn cTxn = (CreateTxn)txn;
-            try {
-                setCversionPzxid(parentName, cTxn.getParentCVersion(),
-                        header.getZxid());
-            } catch (KeeperException.NoNodeException e) {
-                LOG.error("Failed to set parent cversion for: " +
-                      parentName, e);
-                rc.err = e.code().intValue();
+            if (digestFromLoadedSnapshot != null) {
+                compareSnapshotDigests(rc.zxid);
+            } else {
+                // only start recording digest when we're not in fuzzy state
+                logZxidDigest(rc.zxid, getTreeDigest());
             }
-        } else if (rc.err != Code.OK.intValue()) {
-            LOG.debug("Ignoring processTxn failure hdr: " + header.getType() +
-                  " : error: " + rc.err);
         }
+
         return rc;
     }
 
@@ -1163,7 +1203,9 @@ public class DataTree {
             return;
         }
         synchronized (node) {
+            nodes.preChange(statPath, node);
             node.data = strack.toString().getBytes();
+            nodes.postChange(statPath, node);
         }
     }
 
@@ -1317,7 +1359,9 @@ public class DataTree {
             }
             path = ia.readString("path");
         }
-        nodes.put("/", root);
+        // have counted digest for root node with "", ignore here to avoid
+        // counting twice for root node
+        nodes.putWithoutDigest("/", root);
 
         nodeDataSize.set(approximateDataSize());
 
@@ -1492,8 +1536,10 @@ public class DataTree {
                 newCversion = node.stat.getCversion() + 1;
             }
             if (newCversion > node.stat.getCversion()) {
+                nodes.preChange(path, node);
                 node.stat.setCversion(newCversion);
                 node.stat.setPzxid(zxid);
+                nodes.postChange(path, node);
             }
         }
     }
@@ -1566,4 +1612,194 @@ public class DataTree {
         }
         ServerMetrics.getMetrics().WRITE_PER_NAMESPACE.add(namespace, 
path.length() + bytes);
     }
+
+    /**
+     * Add the digest to the historical list, and update the latest zxid 
digest.
+     */
+    private void logZxidDigest(long zxid, long digest) {
+        ZxidDigest zxidDigest = new ZxidDigest(zxid, 
DigestCalculator.DIGEST_VERSION, digest);
+        lastProcessedZxidDigest = zxidDigest;
+        if (zxidDigest.zxid % DIGEST_LOG_INTERVAL == 0) {
+            synchronized (digestLog) {
+                digestLog.add(zxidDigest);
+                if (digestLog.size() > DIGEST_LOG_LIMIT) {
+                    digestLog.poll();
+                }
+            }
+        }
+    }
+
+    /**
+     * Serializing the digest to snapshot, this is done after the data tree 
+     * is being serialized, so when we replay the txns and it hits this zxid 
+     * we know we should be in a non-fuzzy state, and have the same digest. 
+     *
+     * @param oa the output stream to write to 
+     * @return true if the digest is serialized successfully
+     */
+    public boolean serializeZxidDigest(OutputArchive oa) throws IOException {
+        if (!DigestCalculator.digestEnabled()) {
+            return false;
+        }
+
+        ZxidDigest zxidDigest = lastProcessedZxidDigest;
+        if (zxidDigest == null) {
+            // write an empty digest
+            zxidDigest = new ZxidDigest();
+        }
+        zxidDigest.serialize(oa);
+        return true;
+    }
+
+    /**
+     * Deserializing the zxid digest from the input stream and update the 
+     * digestFromLoadedSnapshot.
+     *
+     * @param ia the input stream to read from
+     * @return the true if it deserialized successfully
+     */
+    public boolean deserializeZxidDigest(InputArchive ia) throws IOException {
+        if (!DigestCalculator.digestEnabled()) {
+            return false;
+        }
+
+        try  {
+            ZxidDigest zxidDigest = new ZxidDigest();
+            zxidDigest.deserialize(ia);
+            if (zxidDigest.zxid > 0) {
+                digestFromLoadedSnapshot = zxidDigest;
+            }
+            return true;
+        } catch (EOFException e) {
+            LOG.warn("Got EOF exception while reading the digest, " +
+                    "likely due to the reading an older snapshot.");
+            return false;
+        }
+    }
+
+    /**
+     * Compares the actual tree's digest with that in the snapshot. 
+     * Resets digestFromLoadedSnapshot after comparision.
+     *
+     * @param zxid zxid
+     */
+    public void compareSnapshotDigests(long zxid) {
+        if (zxid == digestFromLoadedSnapshot.zxid) {
+            if (DigestCalculator.DIGEST_VERSION != 
digestFromLoadedSnapshot.digestVersion) {
+                LOG.info("Digest version changed, local: {}, new: {}, " + 
+                        "skip comparing digest now.", 
+                        digestFromLoadedSnapshot.digestVersion, 
DigestCalculator.DIGEST_VERSION);
+                digestFromLoadedSnapshot = null;
+                return;
+            }
+            if (getTreeDigest() != digestFromLoadedSnapshot.getDigest()) {
+                reportDigestMismatch(zxid);
+            }
+            digestFromLoadedSnapshot = null;
+        } else if (digestFromLoadedSnapshot.zxid != 0 && zxid > 
digestFromLoadedSnapshot.zxid) {
+            LOG.error("Watching for zxid 0x{} during snapshot recovery, " +
+                    "but it wasn't found.", 
+                    Long.toHexString(digestFromLoadedSnapshot.zxid));
+        }
+    }
+
+    /**
+     * Reports any mismatch in the transaction digest.
+     * @param zxid zxid for which the error is being reported.
+     */
+    public void reportDigestMismatch(long zxid) {
+        ServerMetrics.getMetrics().DIGEST_MISMATCHES_COUNT.add(1);
+        RATE_LOGGER.rateLimitLog("Digests are not matching. Value is Zxid.", 
+                String.valueOf(zxid));
+
+        for (DigestWatcher watcher: digestWatchers) {
+            watcher.process(zxid);
+        }
+    }
+
+    public long getTreeDigest() {
+        return nodes.getDigest();
+    }
+
+    public ZxidDigest getLastProcessedZxidDigest() {
+        return lastProcessedZxidDigest;
+    }
+
+    public ZxidDigest getDigestFromLoadedSnapshot() {
+        return digestFromLoadedSnapshot;
+    }
+
+    /**
+     * Add digest mismatch event handler.
+     *
+     * @param digestWatcher the handler to add
+     */
+    public void addDigestWatcher(DigestWatcher digestWatcher) {
+        digestWatchers.add(digestWatcher);
+    }
+
+    /**
+     * Return all the digests in the historical digest list.
+     */
+    public List<ZxidDigest> getDigestLog() {
+        synchronized (digestLog) {
+            // Return a copy of current digest log
+            return new LinkedList<ZxidDigest>(digestLog);
+        }
+    }
+
+    /**
+     * A helper class to maintain the digest meta associated with specific
+     * zxid.
+     */
+    public static class ZxidDigest {
+
+        long zxid;
+        // the digest value associated with this zxid
+        long digest;
+        // the version when the digest was calculated
+        int digestVersion;
+
+        ZxidDigest() {
+            this(0, DigestCalculator.DIGEST_VERSION, 0); 
+        }
+
+        ZxidDigest(long zxid, int digestVersion, long digest) {
+            this.zxid = zxid;
+            this.digestVersion = digestVersion;
+            this.digest = digest;
+        }
+
+        public void serialize(OutputArchive oa) throws IOException {
+            oa.writeLong(zxid, "zxid");
+            oa.writeInt(digestVersion, "digestVersion");
+            oa.writeLong(digest, "digest");
+        }
+
+        public void deserialize(InputArchive ia) throws IOException {
+            zxid = ia.readLong("zxid");
+            digestVersion = ia.readInt("digestVersion");
+            // the old version is using hex string as the digest
+            if (digestVersion < 2) {
+                String d = ia.readString("digest");
+                if (d != null) {
+                    digest = Long.parseLong(d);
+                }
+            } else {
+                digest = ia.readLong("digest");
+            }
+        }
+
+        public long getZxid() {
+            return zxid;
+        }
+
+        public int getDigestVersion() {
+            return digestVersion;
+        } 
+
+        public Long getDigest() {
+            return digest;
+        }
+    }
 }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NodeHashMap.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NodeHashMap.java
new file mode 100644
index 0000000..702fac3
--- /dev/null
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NodeHashMap.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The interface defined to manage the hash based on the entries in the 
+ * nodes map.
+ */
+public interface NodeHashMap {
+
+    /**
+     * Add the node into the map and update the digest with the new node.
+     *
+     * @param path the path of the node
+     * @param node the actual node associated with this path
+     */
+    public DataNode put(String path, DataNode node);
+
+    /**
+     * Add the node into the map without update the digest.
+     *
+     * @param path the path of the node
+     * @param node the actual node associated with this path
+     */
+    public DataNode putWithoutDigest(String path, DataNode node);
+
+    /**
+     * Return the data node associated with the path.
+     *
+     * @param path the path to read from 
+     */
+    public DataNode get(String path);
+
+    /**
+     * Remove the path from the internal nodes map.
+     *
+     * @param path the path to remove
+     * @return the node being removed
+     */
+    public DataNode remove(String path);
+
+    /**
+     * Return all key set view inside this map.
+     */
+    public ConcurrentHashMap.KeySetView<String, DataNode> keySet();
+
+    /**
+     * Return all the entries inside this map.
+     */
+    public Set<Map.Entry<String, DataNode>> entrySet();
+
+    /**
+     * Clear all the items stored inside this map.
+     */
+    public void clear();
+
+    /**
+     * Return the size of the nodes stored in this map.
+     */
+    public int size();
+
+    /**
+     * Called before we made the change on the node, which will clear
+     * the digest associated with it.
+     *
+     * @param path the path being changed
+     * @param node the node associated with the path
+     */
+    public void preChange(String path, DataNode node);
+
+    /**
+     * Called after making the changes on the node, which will update
+     * the digest.
+     *
+     * @param path the path being changed
+     * @param node the node associated with the path
+     */
+    public void postChange(String path, DataNode node);
+
+    /**
+     * Return the digest value.
+     */
+    public long getDigest();
+}
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NodeHashMapImpl.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NodeHashMapImpl.java
new file mode 100644
index 0000000..48844b9
--- /dev/null
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NodeHashMapImpl.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.zookeeper.server.util.DigestCalculator;
+import org.apache.zookeeper.server.util.AdHash;
+
+/**
+ * a simple wrapper to ConcurrentHashMap that recalculates a digest after
+ * each mutation.
+ */
+public class NodeHashMapImpl implements NodeHashMap {
+
+    private final ConcurrentHashMap<String, DataNode> nodes = 
+            new ConcurrentHashMap<String, DataNode>();
+
+    private AdHash hash = new AdHash();
+
+    @Override
+    public DataNode put(String path, DataNode node) {
+        DataNode oldNode = nodes.put(path, node);
+        addDigest(path, node);
+        if (oldNode != null) {
+            removeDigest(path, oldNode);
+        }
+        return oldNode;
+    }
+
+    @Override
+    public DataNode putWithoutDigest(String path, DataNode node) {
+        return nodes.put(path, node);
+    }
+
+    @Override
+    public DataNode get(String path) {
+        return nodes.get(path);
+    }
+
+    @Override
+    public DataNode remove(String path) {
+        DataNode oldNode = nodes.remove(path);
+        if (oldNode != null) {
+            removeDigest(path, oldNode);
+        }
+        return oldNode;
+    }
+
+    @Override
+    public ConcurrentHashMap.KeySetView<String, DataNode> keySet() {
+        return nodes.keySet();
+    }
+
+    @Override
+    public Set<Map.Entry<String, DataNode>> entrySet() {
+        return nodes.entrySet();
+    }
+
+    @Override
+    public void clear() {
+        nodes.clear();
+        hash = new AdHash();
+    }
+
+    @Override
+    public int size() {
+        return nodes.size();
+    }
+
+    @Override
+    public void preChange(String path, DataNode node) {
+        removeDigest(path, node);
+    }
+
+    @Override
+    public void postChange(String path, DataNode node) {
+        // we just made a change, so make sure the digest is
+        // invalidated
+        node.digestCached = false;
+        addDigest(path, node);
+    }
+
+    private void addDigest(String path, DataNode node) {
+        if (DigestCalculator.digestEnabled()) {
+            hash.addDigest(DigestCalculator.calculateDigest(path, node));
+        }
+    }
+
+    private void removeDigest(String path, DataNode node) {
+        if (DigestCalculator.digestEnabled()) {
+            hash.removeDigest(DigestCalculator.calculateDigest(path, node));
+        }
+    }
+
+    @Override
+    public long getDigest() {
+        return hash.getHash();
+    }
+}
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RateLogger.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RateLogger.java
index acbd522..7f33902 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RateLogger.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RateLogger.java
@@ -21,42 +21,68 @@ package org.apache.zookeeper.server;
 import org.apache.zookeeper.common.Time;
 import org.slf4j.Logger;
 
+/**
+ * This logs the message once in the beginning and once every LOG_INTERVAL.
+ */
 public class RateLogger {
+    private final long LOG_INTERVAL; // Duration is in ms
+
     public RateLogger(Logger log) {
+        this(log, 100);
+    }
+
+    public RateLogger(Logger log, long interval) {
         LOG = log;
+        LOG_INTERVAL = interval;
     }
 
     private final Logger LOG;
     private String msg = null;
     private long timestamp;
     private int count = 0;
+    private String value = null;
 
     public void flush() {
-        if (msg != null) {
+        if (msg != null && count > 0) {
+            String log = "";
             if (count > 1) {
-                LOG.warn("[" + count + " times] " + msg);
-            } else if (count == 1) {
-                LOG.warn(msg);
+                log = "[" + count + " times] ";
+            }
+            log += "Message: " + msg;
+            if (value != null) {
+                log += " Last value:" + value;
             }
+            LOG.warn(log);
         }
         msg = null;
+        value = null;
         count = 0;
     }
 
     public void rateLimitLog(String newMsg) {
+        rateLimitLog(newMsg, null);
+    }
+
+    /**
+     * In addition to the message, it also takes a value.
+     */
+    public void rateLimitLog(String newMsg, String value) {
         long now = Time.currentElapsedTime();
         if (newMsg.equals(msg)) {
             ++count;
-            if (now - timestamp >= 100) {
+            this.value = value;
+            if (now - timestamp >= LOG_INTERVAL) {
                 flush();
                 msg = newMsg;
                 timestamp = now;
+                this.value = value;
             }
         } else {
             flush();
             msg = newMsg;
+            this.value = value;
             timestamp = now;
-            LOG.warn(msg);
+            LOG.warn("Message:{} Value:{}", msg, value);
         }
     }
 }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index 9573a4a..17a559f 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -227,6 +227,8 @@ public final class ServerMetrics {
         REQUEST_THROTTLE_WAIT_COUNT = 
metricsContext.getCounter("request_throttle_wait_count");
 
         NETTY_QUEUED_BUFFER = 
metricsContext.getSummary("netty_queued_buffer_capacity", DetailLevel.BASIC);
+
+        DIGEST_MISMATCHES_COUNT = 
metricsContext.getCounter("digest_mismatches_count");
     }
 
     /**
@@ -428,6 +430,10 @@ public final class ServerMetrics {
 
     public final Summary NETTY_QUEUED_BUFFER;
 
+    // Total number of digest mismatches that are observed when applying 
+    // txns to data tree.
+    public final Counter DIGEST_MISMATCHES_COUNT;
+
     private final MetricsProvider metricsProvider;
 
     public void resetAll() {
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index f818fa1..60ab655 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -194,6 +194,7 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
             justification = "Internally the throttler has a BlockingQueue so " 
+
                     "once the throttler is created and started, it is 
thread-safe")
     private RequestThrottler requestThrottler;
+    public static final String SNAP_COUNT = "zookeeper.snapCount";
 
     void removeCnxn(ServerCnxn cnxn) {
         zkDb.removeCnxn(cnxn);
@@ -1010,7 +1011,7 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
     }
 
     public static int getSnapCount() {
-        String sc = System.getProperty("zookeeper.snapCount");
+        String sc = System.getProperty(SNAP_COUNT);
         try {
             int snapCount = Integer.parseInt(sc);
 
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
index dede57e..725a476 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
@@ -128,6 +128,7 @@ public class Commands {
         registerCommand(new EnvCommand());
         registerCommand(new GetTraceMaskCommand());
         registerCommand(new InitialConfigurationCommand());
+        registerCommand(new DigestCommand());
         registerCommand(new IsroCommand());
         registerCommand(new LastSnapshotCommand());
         registerCommand(new LeaderCommand());
@@ -269,6 +270,22 @@ public class Commands {
     }
 
     /**
+     * Digest histories for every specific number of txns.
+     */
+    public static class DigestCommand extends CommandBase {
+        public DigestCommand() {
+            super(Arrays.asList("hash"));
+        }
+
+        @Override
+        public CommandResponse run(ZooKeeperServer zkServer, Map<String, 
String> kwargs) {
+            CommandResponse response = initializeResponse();
+            response.put("digests", 
zkServer.getZKDatabase().getDataTree().getDigestLog());
+            return response;
+        }
+    }
+
+    /**
      * The current trace mask. Returned map contains:
      *   - "tracemask": Long
      */
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/command/CommandExecutor.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/command/CommandExecutor.java
index 52eeda2..a78751b 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/command/CommandExecutor.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/command/CommandExecutor.java
@@ -74,7 +74,10 @@ public class CommandExecutor {
             command = new MonitorCommand(pwriter, serverCnxn);
         } else if (commandCode == FourLetterCommands.isroCmd) {
             command = new IsroCommand(pwriter, serverCnxn);
+        } else if (commandCode == FourLetterCommands.hashCmd) {
+            command = new DigestCommand(pwriter, serverCnxn);
         }
+
         return command;
     }
 
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/command/DigestCommand.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/command/DigestCommand.java
new file mode 100644
index 0000000..fe16bc5
--- /dev/null
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/command/DigestCommand.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.command;
+
+import java.io.PrintWriter;
+import java.util.List;
+
+import org.apache.zookeeper.server.DataTree.ZxidDigest;
+import org.apache.zookeeper.server.ServerCnxn;
+
+/**
+ * Command used to dump the latest digest histories.
+ */
+public class DigestCommand extends AbstractFourLetterCommand {
+
+    public DigestCommand(PrintWriter pw, ServerCnxn serverCnxn) {
+        super(pw, serverCnxn);
+    }
+
+    @Override
+    public void commandRun() {
+        if (!isZKServerRunning()) {
+            pw.print(ZK_NOT_SERVING);
+        } else {
+            List<ZxidDigest> digestLog = 
+                    zkServer.getZKDatabase().getDataTree().getDigestLog();
+            for (ZxidDigest zd : digestLog) {
+                pw.println(Long.toHexString(zd.getZxid()) + ": " + 
+                        zd.getDigest());
+            }
+        }
+    }
+}
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/command/FourLetterCommands.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/command/FourLetterCommands.java
index 45bd942..04cbb60 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/command/FourLetterCommands.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/command/FourLetterCommands.java
@@ -151,6 +151,13 @@ public class FourLetterCommands {
             .getInt();
 
     /*
+     * See <a 
href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+     * Zk Admin</a>. this link is for all the commands.
+     */
+    protected final static int hashCmd =
+        ByteBuffer.wrap("hash".getBytes()).getInt();
+
+    /*
      * The control sequence sent by the telnet program when it closes a
      * connection. Include simply to keep the logs cleaner (the server would
      * close the connection anyway because it would parse this as a negative
@@ -256,5 +263,6 @@ public class FourLetterCommands {
         cmd2String.put(mntrCmd, "mntr");
         cmd2String.put(isroCmd, "isro");
         cmd2String.put(telnetCloseCmd, "telnet close");
+        cmd2String.put(hashCmd, "hash");
     }
 }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
index fc6af85..9736d25 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
@@ -90,6 +90,10 @@ public class FileSnap implements SnapShot {
                 InputArchive ia = BinaryInputArchive.getArchive(snapIS);
                 deserialize(dt, sessions, ia);
                 SnapStream.checkSealIntegrity(snapIS, ia);
+                if (dt.deserializeZxidDigest(ia)) {
+                    SnapStream.checkSealIntegrity(snapIS, ia);
+                }
+
                 foundValid = true;
                 break;
             } catch (IOException e) {
@@ -101,6 +105,12 @@ public class FileSnap implements SnapShot {
         }
         dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), 
SNAPSHOT_FILE_PREFIX);
         lastSnapshotInfo = new SnapshotInfo(dt.lastProcessedZxid, 
snap.lastModified() / 1000);
+
+        // compare the digest if this is not a fuzzy snapshot, we want to 
compare
+        // and find inconsistent asap.
+        if (dt.getDigestFromLoadedSnapshot() != null) {
+            dt.compareSnapshotDigests(dt.lastProcessedZxid);
+        }
         return dt.lastProcessedZxid;
     }
 
@@ -226,6 +236,17 @@ public class FileSnap implements SnapShot {
                 FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
                 serialize(dt, sessions, oa, header);
                 SnapStream.sealStream(snapOS, oa);
+
+                // Digest feature was added after the CRC to make it backward
+                // compatible, the older code cal still read snapshots which 
+                // includes digest.
+                //
+                // To check the intact, after adding digest we added another
+                // CRC check.
+                if (dt.serializeZxidDigest(oa)) {
+                    SnapStream.sealStream(snapOS, oa);
+                }
+
                 lastSnapshotInfo = new SnapshotInfo(
                         Util.getZxidFromName(snapShot.getName(), 
SNAPSHOT_FILE_PREFIX),
                         snapShot.lastModified() / 1000);
@@ -233,6 +254,24 @@ public class FileSnap implements SnapShot {
         }
     }
 
+    private void writeChecksum(CheckedOutputStream crcOut, OutputArchive oa) 
+            throws IOException {
+        long val = crcOut.getChecksum().getValue();
+        oa.writeLong(val, "val");
+        oa.writeString("/", "path");
+    }
+
+    private void checkChecksum(CheckedInputStream crcIn, InputArchive ia)
+            throws IOException {
+        long checkSum = crcIn.getChecksum().getValue();
+        long val = ia.readLong("val");
+        // read and ignore "/" written by writeChecksum
+        ia.readString("path");
+        if (val != checkSum) {
+            throw new IOException("CRC corruption");
+        }
+    }
+
     /**
      * synchronized close just so that if serialize is in place
      * the close operation will block and will wait till serialize
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
index 26fe4f2..6302ca9 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
@@ -257,7 +257,20 @@ public class FileTxnSnapLog {
                 return -1L;
             }
         }
-        return fastForwardFromEdits(dt, sessions, listener);
+
+        long highestZxid = fastForwardFromEdits(dt, sessions, listener);
+        // The snapshotZxidDigest will reset after replaying the txn of the
+        // zxid in the snapshotZxidDigest, if it's not reset to null after
+        // restoring, it means either there are not enough txns to cover that
+        // zxid or that txn is missing
+        DataTree.ZxidDigest snapshotZxidDigest = 
dt.getDigestFromLoadedSnapshot();
+        if (snapshotZxidDigest != null) {
+            LOG.warn("Highest txn zxid 0x{} is not covering the snapshot " +
+                    "digest zxid 0x{}, which might lead to inconsistent state",
+                    Long.toHexString(highestZxid),
+                    Long.toHexString(snapshotZxidDigest.getZxid()));
+        }
+        return highestZxid;
     }
 
     /**
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java
index 14a1cf6..b07131e 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java
@@ -164,6 +164,7 @@ public class SnapStream {
             throws IOException {
         long checkSum = is.getChecksum().getValue();
         long val = ia.readLong("val");
+        ia.readString("path");  // Read and ignore "/" written by SealStream.
         if (val != checkSum) {
             throw new IOException("CRC corruption");
         }
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/AdHash.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/AdHash.java
new file mode 100644
index 0000000..66ee58a
--- /dev/null
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/AdHash.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+/**
+ * This incremental hash is used to keep track of the hash of
+ * the data tree to that we can quickly validate that things
+ * are in sync.
+ *
+ * See the excellent paper: A New Paradigm for collision-free hashing:
+ *   Incrementality at reduced cost,  M. Bellare and D. Micciancio
+ */
+public class AdHash {
+
+    /* we use 64 bits so that we can be fast an efficient */
+    private long hash;
+
+    /**
+     * Add new digest to the hash value maintained in this class.
+     *
+     * @param digest the value to add on
+     * @return the AdHash itself for chained operations
+     */
+    public AdHash addDigest(long digest) {
+        hash += digest;
+        return this;
+    }
+
+    /**
+     * Remove the digest from the hash value.
+     * 
+     * @param digest the value to remove
+     * @return the AdHash itself for chained operations
+     */
+    public AdHash removeDigest(long digest) {
+        hash -= digest;
+        return this;
+    }
+
+    /**
+     * Return hex string of the hash value.
+     */
+    public String toHexString() {
+        return Long.toHexString(hash);
+    }
+
+    /**
+     * Return the long value of the hash.
+     */
+    public long getHash() {
+        return hash;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        return other instanceof AdHash && ((AdHash) other).hash == this.hash;
+    }
+
+    @Override
+    public int hashCode() {
+        return Long.hashCode(hash);
+    }
+
+    @Override
+    public String toString() {
+        return toHexString();
+    }
+}
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/DigestCalculator.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/DigestCalculator.java
new file mode 100644
index 0000000..81e6010
--- /dev/null
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/DigestCalculator.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.nio.ByteBuffer;
+import java.util.zip.CRC32;
+
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.StatPersisted;
+import org.apache.zookeeper.server.DataNode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Defines how to calculate the digest for a given node.
+ */
+public class DigestCalculator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DigestCalculator.class);
+
+    // The hardcoded digest version, should bump up this version whenever
+    // we changed the digest method or fields.
+    //
+    // Defined it as Integer to make it able to be changed in test via 
reflection
+    public static final Integer DIGEST_VERSION = 2;
+
+    public static final String ZOOKEEPER_DIGEST_ENABLED = 
"zookeeper.digest.enabled";
+    private static boolean digestEnabled;
+
+    static {
+        digestEnabled = Boolean.parseBoolean(
+                System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true"));
+        LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled);
+    }
+
+    /**
+     * Calculate the digest based on the given params.
+     *
+     * Besides the path and data, the following stat fields are included in
+     * the digest calculation:
+     *
+     * - long czxid    8 bytes
+     * - long mzxid    8 bytes
+     * - long pzxid    8 bytes
+     * - long ctime    8 bytes
+     * - long mtime    8 bytes
+     * - int version   4 bytes
+     * - int cversion  4 bytes
+     * - int aversion  4 bytes
+     * - long ephemeralOwner 8 bytes
+     *
+     * @param path the path of the node
+     * @param data the data of the node
+     * @param stat the stat associated with the node
+     * @return the digest calculated from the given params
+     */
+    public static long calculateDigest(String path, byte[] data, StatPersisted 
stat) {
+
+        if (!digestEnabled()) {
+            return 0;
+        }
+
+        // Quota nodes are updated locally, there is inconsistent issue
+        // when we tried to release digest feature at the beginning.
+        //
+        // Instead of taking time to fix that, we decided to disable digest
+        // check for all the nodes under /zookeeper/ first.
+        //
+        // We can enable this after fixing that inconsistent problem. The
+        // digest version in the protocol enables us to change the digest
+        // calculation without disrupting the system.
+        if (path.startsWith(ZooDefs.ZOOKEEPER_NODE_SUBTREE)) {
+            return 0;
+        }
+
+        // "" and "/" are aliases to each other, in DataTree when adding child
+        // under "/", it will use "" as the path, but when set data or change
+        // ACL on "/", it will use "/" as the path. Always mapping "/" to ""
+        // to avoid mismatch.
+        if (path.equals("/")) {
+            path = "";
+        }
+
+        // total = 8 * 6 + 4 * 3 = 60 bytes
+        byte b[] = new byte[60];
+        ByteBuffer bb = ByteBuffer.wrap(b);
+        bb.putLong(stat.getCzxid());
+        bb.putLong(stat.getMzxid());
+        bb.putLong(stat.getPzxid());
+        bb.putLong(stat.getCtime());
+        bb.putLong(stat.getMtime());
+        bb.putInt(stat.getVersion());
+        bb.putInt(stat.getCversion());
+        bb.putInt(stat.getAversion());
+        bb.putLong(stat.getEphemeralOwner());
+
+        CRC32 crc = new CRC32();
+        crc.update(path.getBytes());
+        if (data != null) {
+            crc.update(data);
+        }
+        crc.update(b);
+        return crc.getValue();
+    }
+
+    /**
+     * Calculate the digest based on the given path and data node.
+     */
+    public static long calculateDigest(String path, DataNode node) {
+        if (!node.isDigestCached()) {
+            node.setDigest(calculateDigest(path, node.getData(), node.stat));
+            node.setDigestCached(true);
+        }
+        return node.getDigest();
+    }
+
+    /**
+     * Return true if the digest is enabled.
+     */
+    public static boolean digestEnabled() {
+        return digestEnabled;
+    }
+
+    // Visible for test purpose
+    public static void setDigestEnabled(boolean enabled) {
+        digestEnabled = enabled;
+    }
+}
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java
index 931fdac..d26db23 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java
@@ -28,6 +28,9 @@ import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.util.DigestCalculator;
+import org.apache.zookeeper.txn.CreateTxn;
+import org.apache.zookeeper.txn.TxnHeader;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -143,17 +146,25 @@ public class DataTreeTest extends ZKTestCase {
      */
     @Test(timeout = 60000)
     public void testIncrementCversion() throws Exception {
-        dt.createNode("/test", new byte[0], null, 0, 
dt.getNode("/").stat.getCversion()+1, 1, 1);
-        DataNode zk = dt.getNode("/test");
-        int prevCversion = zk.stat.getCversion();
-        long prevPzxid = zk.stat.getPzxid();
-        dt.setCversionPzxid("/test/",  prevCversion + 1, prevPzxid + 1);
-        int newCversion = zk.stat.getCversion();
-        long newPzxid = zk.stat.getPzxid();
-        Assert.assertTrue("<cversion, pzxid> verification failed. Expected: <" 
+
-                (prevCversion + 1) + ", " + (prevPzxid + 1) + ">, found: <" +
-                newCversion + ", " + newPzxid + ">",
-                (newCversion == prevCversion + 1 && newPzxid == prevPzxid + 
1));
+        try {
+            DigestCalculator.setDigestEnabled(true);
+            DataTree dt = new DataTree();
+            dt.createNode("/test", new byte[0], null, 0, 
dt.getNode("/").stat.getCversion()+1, 1, 1);
+            DataNode zk = dt.getNode("/test");
+            int prevCversion = zk.stat.getCversion();
+            long prevPzxid = zk.stat.getPzxid();
+            long digestBefore = dt.getTreeDigest();
+            dt.setCversionPzxid("/test/",  prevCversion + 1, prevPzxid + 1);
+            int newCversion = zk.stat.getCversion();
+            long newPzxid = zk.stat.getPzxid();
+            Assert.assertTrue("<cversion, pzxid> verification failed. 
Expected: <" +
+                    (prevCversion + 1) + ", " + (prevPzxid + 1) + ">, found: 
<" +
+                    newCversion + ", " + newPzxid + ">",
+                    (newCversion == prevCversion + 1 && newPzxid == prevPzxid 
+ 1));
+            Assert.assertNotEquals(digestBefore, dt.getTreeDigest());
+        } finally {
+            DigestCalculator.setDigestEnabled(false);
+        }
     }
 
     @Test
@@ -197,6 +208,26 @@ public class DataTreeTest extends ZKTestCase {
         Assert.assertEquals(currentPzxid, prevPzxid);
     }
 
+    @Test
+    public void testDigestUpdatedWhenReplayCreateTxnForExistNode() {
+        try {
+            DigestCalculator.setDigestEnabled(true);
+            dt.processTxn(new TxnHeader(13, 1000, 1, 30, 
ZooDefs.OpCode.create),
+                    new CreateTxn("/foo", "".getBytes(), 
ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1));
+
+            // create the same node with a higher cversion to simulate the
+            // scenario when replaying a create txn for an existing node due
+            // to fuzzy snapshot
+            dt.processTxn(new TxnHeader(13, 1000, 1, 30, 
ZooDefs.OpCode.create),
+                    new CreateTxn("/foo", "".getBytes(), 
ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 2));
+
+            // check the current digest value
+            Assert.assertEquals(dt.getTreeDigest(), 
dt.getLastProcessedZxidDigest().digest);
+        } finally {
+            DigestCalculator.setDigestEnabled(false);
+        }
+    }
+
     @Test(timeout = 60000)
     public void testPathTrieClearOnDeserialize() throws Exception {
 
@@ -419,4 +450,47 @@ public class DataTreeTest extends ZKTestCase {
         Assert.assertEquals(readBytes2, values.get("sum_" + TOP2+ 
"_read_per_namespace"));
         Assert.assertEquals(1L, values.get("cnt_" + TOP2 + 
"_read_per_namespace"));
     }
+
+    /**
+     * Test digest with general ops in DataTree, check that digest are
+     * updated when call different ops.
+     */
+    @Test
+    public void testDigest() throws Exception {
+        try {
+            // enable diegst check
+            DigestCalculator.setDigestEnabled(true);
+
+            DataTree dt = new DataTree();
+
+            // create a node and check the digest is updated
+            long previousDigest = dt.getTreeDigest();
+            dt.createNode("/digesttest", new byte[0], null, -1, 1, 1, 1);
+            Assert.assertNotEquals(dt.getTreeDigest(), previousDigest);
+
+            // create a child and check the digest is updated
+            previousDigest = dt.getTreeDigest();
+            dt.createNode("/digesttest/1", "1".getBytes(), null, -1, 2, 2, 2);
+            Assert.assertNotEquals(dt.getTreeDigest(), previousDigest);
+            
+            // check the digest is not chhanged when creating the same node
+            previousDigest = dt.getTreeDigest();
+            try {
+                dt.createNode("/digesttest/1", "1".getBytes(), null, -1, 2, 2, 
2);
+            } catch (NodeExistsException e) { /* ignore */ }
+            Assert.assertEquals(dt.getTreeDigest(), previousDigest);
+
+            // check digest with updated data 
+            previousDigest = dt.getTreeDigest();
+            dt.setData("/digesttest/1", "2".getBytes(), 3, 3, 3);
+            Assert.assertNotEquals(dt.getTreeDigest(), previousDigest);
+
+            // check digest with deleted node
+            previousDigest = dt.getTreeDigest();
+            dt.deleteNode("/digesttest/1", 5);
+            Assert.assertNotEquals(dt.getTreeDigest(), previousDigest);
+        } finally {
+            DigestCalculator.setDigestEnabled(false);
+        }
+    }
 }
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NodeHashMapImplTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NodeHashMapImplTest.java
new file mode 100644
index 0000000..4761c00
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NodeHashMapImplTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.util.Set;
+import java.util.Map;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.data.StatPersisted;
+import org.apache.zookeeper.server.util.DigestCalculator;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class NodeHashMapImplTest extends ZKTestCase {
+
+    @Before
+    public void setUp() {
+        DigestCalculator.setDigestEnabled(true);
+    }
+
+    @After
+    public void tearDown() {
+        DigestCalculator.setDigestEnabled(false);
+    }
+
+    /**
+     * Test all the operations supported in NodeHashMapImpl.
+     */
+    @Test
+    public void testOperations() {
+        NodeHashMapImpl nodes = new NodeHashMapImpl();
+
+        Assert.assertEquals(0, nodes.size());
+        Assert.assertEquals(0L, nodes.getDigest());
+
+        // add a new node
+        String p1 = "p1";
+        DataNode n1 = new DataNode(p1.getBytes(), 0L, new StatPersisted());
+        nodes.put(p1, n1);
+
+        Assert.assertEquals(n1, nodes.get(p1));
+        Assert.assertNotEquals(0L, nodes.getDigest());
+        Assert.assertEquals(1, nodes.size());
+
+        // put another node
+        String p2 = "p2";
+        nodes.put(p2, new DataNode(p2.getBytes(), 0L, new StatPersisted()));
+
+        Set<Map.Entry<String, DataNode>> entries = nodes.entrySet();
+        Assert.assertEquals(2, entries.size());
+
+        // remove a node
+        nodes.remove(p1);
+        Assert.assertEquals(1, nodes.size());
+
+        nodes.remove(p2);
+        Assert.assertEquals(0, nodes.size());
+        Assert.assertEquals(0L, nodes.getDigest());
+
+        // test preChange and postChange
+        String p3 = "p3";
+        DataNode n3 = new DataNode(p3.getBytes(), 0L, new StatPersisted());
+        nodes.put(p3, n3);
+        long preChangeDigest = nodes.getDigest();
+        Assert.assertNotEquals(0L, preChangeDigest);
+
+        nodes.preChange(p3, n3);
+        Assert.assertEquals(0L, nodes.getDigest());
+
+        n3.stat.setMzxid(1);
+        n3.stat.setMtime(1);
+        n3.stat.setVersion(1);
+        nodes.postChange(p3, n3);
+
+        long postChangeDigest = nodes.getDigest();
+        Assert.assertNotEquals(0, postChangeDigest);
+        Assert.assertNotEquals(preChangeDigest, postChangeDigest);
+    }
+}
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/SnapshotDigestTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/SnapshotDigestTest.java
new file mode 100644
index 0000000..f74eb03
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/SnapshotDigestTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.server.metric.SimpleCounter;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerMainTest;
+import org.apache.zookeeper.server.util.DigestCalculator;
+import org.apache.zookeeper.test.ClientBase;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.mockito.Mockito;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SnapshotDigestTest extends ClientBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(
+            SnapshotDigestTest.class);
+
+    private ZooKeeper zk;
+    private ZooKeeperServer server;
+
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        server = serverFactory.getZooKeeperServer();
+        zk = createClient();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        // server will be closed in super.tearDown
+        super.tearDown();
+
+        if (zk != null) {
+            zk.close();
+        }
+    }
+
+    @Override
+    public void setupCustomizedEnv() {
+        DigestCalculator.setDigestEnabled(true);
+        System.setProperty(ZooKeeperServer.SNAP_COUNT, "100");
+    }
+
+    @Override
+    public void cleanUpCustomizedEnv() {
+        DigestCalculator.setDigestEnabled(false);
+        System.clearProperty(ZooKeeperServer.SNAP_COUNT);
+    }
+
+    /**
+     * Check snapshot digests when loading a fuzzy or non-fuzzy snapshot.
+     */
+    @Test
+    public void testSnapshotDigest() throws Exception {
+        // take a empty snapshot without creating any txn and make sure
+        // there is no digest mismatch issue
+        server.takeSnapshot(); 
+        reloadSnapshotAndCheckDigest();
+        
+        // trigger various write requests
+        String pathPrefix = "/testSnapshotDigest";
+        for (int i = 0; i < 1000; i++) {
+            String path = pathPrefix + i;
+            zk.create(path, path.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, 
+                    CreateMode.PERSISTENT);
+        }
+
+        // update the data of first node
+        String firstNode = pathPrefix + 0;
+        zk.setData(firstNode, "new_setdata".getBytes(), -1);
+
+        // delete the first node
+        zk.delete(firstNode, -1);
+
+        // trigger multi op
+        List<Op> subTxns = new ArrayList<Op>();
+        for (int i = 0; i < 3; i++) {
+            String path = pathPrefix + "-m" + i;
+            subTxns.add(Op.create(path, path.getBytes(), 
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+        }
+        zk.multi(subTxns);
+
+        reloadSnapshotAndCheckDigest();
+
+        // Take a snapshot and test the logic when loading a non-fuzzy 
snapshot 
+        server = serverFactory.getZooKeeperServer();
+        server.takeSnapshot(); 
+
+        reloadSnapshotAndCheckDigest();
+    }
+
+    /**
+     * Make sure the code will skip digest check when it's comparing 
+     * digest with different version. 
+     *
+     * This enables us to smoonthly add new fields into digest or using 
+     * new digest calculation.
+     */
+    @Test
+    public void testDifferentDigestVersion() throws Exception {
+        // check the current digest version
+        int currentVersion = DigestCalculator.DIGEST_VERSION;
+
+        // create a node
+        String path = "/testDifferentDigestVersion";
+        zk.create(path, path.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, 
+                CreateMode.PERSISTENT);
+
+        // take a full snapshot
+        server.takeSnapshot(); 
+
+        // using reflection to change the final static DIGEST_VERSION
+        int newVersion = currentVersion + 1;
+        Field field = 
DigestCalculator.class.getDeclaredField("DIGEST_VERSION");
+        field.setAccessible(true);
+        Field modifiersField = Field.class.getDeclaredField("modifiers");
+        modifiersField.setAccessible(true);
+        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+        field.set(null, newVersion);
+
+        Assert.assertEquals(newVersion, (int) DigestCalculator.DIGEST_VERSION);
+
+        // using mock to return different digest value when the way we 
+        // calculate digest changed
+        FileTxnSnapLog txnSnapLog = new FileTxnSnapLog(tmpDir, tmpDir);
+        DataTree dataTree = Mockito.spy(new DataTree());
+        Mockito.when(dataTree.getTreeDigest()).thenReturn(0L); 
+        txnSnapLog.restore(dataTree, new ConcurrentHashMap<Long, Integer>(), 
+                Mockito.mock(FileTxnSnapLog.PlayBackListener.class));
+
+        // make sure the reportDigestMismatch function is never called
+        Mockito.verify(dataTree, Mockito.never())
+               .reportDigestMismatch(Mockito.anyLong()); 
+    }
+
+    /**
+     * Make sure it's backward compatible, and also we can rollback this 
+     * feature without corrupt the database.
+     */
+    @Test
+    public void testBackwardCompatible() throws Exception {
+        testCompatibleHelper(false, true);
+
+        testCompatibleHelper(true, false);
+    }
+
+    private void testCompatibleHelper(
+            boolean enabledBefore, boolean enabledAfter) throws Exception {
+
+        DigestCalculator.setDigestEnabled(enabledBefore);
+
+        // restart the server to cache the option change
+        reloadSnapshotAndCheckDigest();
+   
+         // create a node
+        String path = "/testCompatible" + "-" + enabledBefore + "-" + 
enabledAfter;
+        zk.create(path, path.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, 
+                CreateMode.PERSISTENT);
+
+        // take a full snapshot
+        server.takeSnapshot(); 
+
+        DigestCalculator.setDigestEnabled(enabledAfter);
+      
+        reloadSnapshotAndCheckDigest();
+
+        Assert.assertEquals(path, new String(zk.getData(path, false, null)));
+    }
+
+    private void reloadSnapshotAndCheckDigest() throws Exception {
+        stopServer();
+        QuorumPeerMainTest.waitForOne(zk, States.CONNECTING);
+
+        ((SimpleCounter) 
ServerMetrics.getMetrics().DIGEST_MISMATCHES_COUNT).reset();
+
+        startServer();
+        QuorumPeerMainTest.waitForOne(zk, States.CONNECTED);
+
+        // Snapshot digests always match
+        Assert.assertEquals(0L, (long) 
ServerMetrics.getMetrics().DIGEST_MISMATCHES_COUNT.get());
+
+        // reset the digestFromLoadedSnapshot after comparing
+        Assert.assertNull(server.getZKDatabase().getDataTree()
+                .getDigestFromLoadedSnapshot());
+    }
+}
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/AdHashTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/AdHashTest.java
new file mode 100644
index 0000000..1af5f29
--- /dev/null
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/AdHashTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import org.apache.zookeeper.ZKTestCase;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Test;
+
+import org.junit.Test;
+import org.junit.Assert;
+
+public class AdHashTest extends ZKTestCase {
+
+    private static Random rand = new Random();
+
+    private static List<Long> generateRandomHashes(int count) {
+        ArrayList<Long> list = new ArrayList<>(count);
+
+        for (int i = 0; i < count; i++) {
+            list.add(rand.nextLong());
+        }
+        return list;
+    }
+
+    private static void addListOfDigests(AdHash hash, List<Long> digests) {
+        for (long b: digests) {
+            hash.addDigest(b);
+        }
+    }
+
+    private static void removeListOfDigests(AdHash hash, List<Long> digests) {
+        for (long b: digests) {
+            hash.removeDigest(b);
+        }
+    }
+
+    /**
+     * Test thhe add and remove digest from AdHash is working as expected.
+     */
+    @Test
+    public void testAdHash() throws Exception{
+        List<Long> bucket1 = generateRandomHashes(50);
+        List<Long> bucket2 = generateRandomHashes(3);
+        List<Long> bucket3 = generateRandomHashes(30);
+        List<Long> bucket4 = generateRandomHashes(10);
+        List<Long> bucket5 = generateRandomHashes(5);
+
+        // adding out of order should result in the same hash
+        AdHash hash12 = new AdHash();
+        addListOfDigests(hash12, bucket1);
+        addListOfDigests(hash12, bucket2);
+
+        AdHash hash21 = new AdHash();
+        addListOfDigests(hash21, bucket2);
+        addListOfDigests(hash21, bucket1);
+        Assert.assertEquals(hash12, hash21);
+
+        AdHash hashall = new AdHash();
+        addListOfDigests(hashall, bucket1);
+        addListOfDigests(hashall, bucket2);
+        addListOfDigests(hashall, bucket3);
+        addListOfDigests(hashall, bucket4);
+        addListOfDigests(hashall, bucket5);
+        Assert.assertFalse("digest of different set not different", 
hashall.equals(hash21));
+        removeListOfDigests(hashall, bucket4);
+        removeListOfDigests(hashall, bucket5);
+        addListOfDigests(hash21, bucket3);
+        Assert.assertEquals("hashall with 4 & 5 removed should match hash21 
with 3 added",
+                         hashall, hash21);
+
+        removeListOfDigests(hashall, bucket3);
+        removeListOfDigests(hashall, bucket2);
+        removeListOfDigests(hashall, bucket1);
+        Assert.assertEquals("empty hashall's digest should be 0", 
hashall.toHexString(), "0");
+
+        AdHash hash45 = new AdHash();
+        addListOfDigests(hash45, bucket4);
+        addListOfDigests(hash45, bucket5);
+
+        addListOfDigests(hashall, bucket4);
+        addListOfDigests(hashall, bucket5);
+        Assert.assertEquals("empty hashall + 4&5 should equal hash45", 
hashall, hash45);
+    }
+}
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java
index cc223cb..2638053 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java
@@ -522,6 +522,8 @@ public abstract class ClientBase extends ZKTestCase {
 
         setupTestEnv();
 
+        setupCustomizedEnv();
+
         JMXEnv.setUp();
 
         setUpAll();
@@ -537,6 +539,11 @@ public abstract class ClientBase extends ZKTestCase {
         startServer(1);
     }
 
+    /**
+     * Give it a chance to set up customized env before starting the server.
+     */
+    public void setupCustomizedEnv() { /* do nothing by default */ }
+
     private void startServer(int serverId) throws Exception {
         LOG.info("STARTING server");
         serverFactory = createNewServerInstance(serverFactory, hostPort,
@@ -640,8 +647,12 @@ public abstract class ClientBase extends ZKTestCase {
                 //assertTrue(message, fdCount <= initialFdCount);
             }
         }
+
+        cleanUpCustomizedEnv();
     }
 
+    public void cleanUpCustomizedEnv() { /* do nothing by default */ }
+
     public static MBeanServerConnection jmxConn() throws IOException {
         return JMXEnv.conn();
     }
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LoadFromLogNoServerTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LoadFromLogNoServerTest.java
index 4d56f60..bf0efd7 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LoadFromLogNoServerTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LoadFromLogNoServerTest.java
@@ -29,6 +29,7 @@ import org.apache.zookeeper.server.DataTree;
 import org.apache.zookeeper.server.persistence.FileHeader;
 import org.apache.zookeeper.server.persistence.FileTxnLog;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.util.DigestCalculator;
 import org.apache.zookeeper.txn.CreateTxn;
 import org.apache.zookeeper.txn.DeleteTxn;
 import org.apache.zookeeper.txn.MultiTxn;
@@ -55,37 +56,52 @@ public class LoadFromLogNoServerTest extends ZKTestCase {
      */
     @Test
     public void testTxnFailure() throws Exception {
-        long count = 1;
-        File tmpDir = ClientBase.createTmpDir();
-        FileTxnSnapLog logFile = new FileTxnSnapLog(tmpDir, tmpDir);
-        DataTree dt = new DataTree();
-        dt.createNode("/test", new byte[0], null, 0, -1, 1, 1);
-        for (count = 1; count <= 3; count++) {
-            dt.createNode("/test/" + count, new byte[0], null, 0, -1, count,
-                    Time.currentElapsedTime());
+        try {
+            DigestCalculator.setDigestEnabled(true);
+
+            long count = 1;
+            File tmpDir = ClientBase.createTmpDir();
+            FileTxnSnapLog logFile = new FileTxnSnapLog(tmpDir, tmpDir);
+            DataTree dt = new DataTree();
+            dt.createNode("/test", new byte[0], null, 0, -1, 1, 1);
+            for (count = 1; count <= 3; count++) {
+                dt.createNode("/test/" + count, new byte[0], null, 0, -1, 
count,
+                        Time.currentElapsedTime());
+            }
+            long digestBefore = dt.getTreeDigest();
+
+            DataNode zk = dt.getNode("/test");
+
+            // Make create to fail, then verify cversion.
+            LOG.info("Attempting to create " + "/test/" + (count - 1));
+            doOp(logFile, ZooDefs.OpCode.create, "/test/" + (count - 1), dt, 
zk, -1);
+            Assert.assertNotEquals(digestBefore, dt.getTreeDigest());
+
+            LOG.info("Attempting to create " + "/test/" + (count - 1));
+            digestBefore = dt.getTreeDigest();
+            doOp(logFile, ZooDefs.OpCode.create, "/test/" + (count - 1), dt, 
zk,
+                    zk.stat.getCversion() + 1);
+            Assert.assertNotEquals(digestBefore, dt.getTreeDigest());
+
+            LOG.info("Attempting to create " + "/test/" + (count - 1));
+            digestBefore = dt.getTreeDigest();
+            doOp(logFile, ZooDefs.OpCode.multi, "/test/" + (count - 1), dt, zk,
+                    zk.stat.getCversion() + 1);
+            Assert.assertNotEquals(digestBefore, dt.getTreeDigest());
+
+            LOG.info("Attempting to create " + "/test/" + (count - 1));
+            digestBefore = dt.getTreeDigest();
+            doOp(logFile, ZooDefs.OpCode.multi, "/test/" + (count - 1), dt, zk,
+                    -1);
+            Assert.assertNotEquals(digestBefore, dt.getTreeDigest());
+
+            // Make delete fo fail, then verify cversion.
+            // this doesn't happen anymore, we only set the cversion on create
+            // LOG.info("Attempting to delete " + "/test/" + (count + 1));
+            // doOp(logFile, OpCode.delete, "/test/" + (count + 1), dt, zk);
+        } finally {
+            DigestCalculator.setDigestEnabled(false);
         }
-        DataNode zk = dt.getNode("/test");
-
-        // Make create to fail, then verify cversion.
-        LOG.info("Attempting to create " + "/test/" + (count - 1));
-        doOp(logFile, ZooDefs.OpCode.create, "/test/" + (count - 1), dt, zk, 
-1);
-
-        LOG.info("Attempting to create " + "/test/" + (count - 1));
-        doOp(logFile, ZooDefs.OpCode.create, "/test/" + (count - 1), dt, zk,
-                zk.stat.getCversion() + 1);
-
-        LOG.info("Attempting to create " + "/test/" + (count - 1));
-        doOp(logFile, ZooDefs.OpCode.multi, "/test/" + (count - 1), dt, zk,
-                zk.stat.getCversion() + 1);
-
-        LOG.info("Attempting to create " + "/test/" + (count - 1));
-        doOp(logFile, ZooDefs.OpCode.multi, "/test/" + (count - 1), dt, zk,
-                -1);
-
-        // Make delete fo fail, then verify cversion.
-        // this doesn't happen anymore, we only set the cversion on create
-        // LOG.info("Attempting to delete " + "/test/" + (count + 1));
-        // doOp(logFile, OpCode.delete, "/test/" + (count + 1), dt, zk);
     }
 
     /*
diff --git a/zookeeper-server/src/test/resources/findbugsExcludeFile.xml 
b/zookeeper-server/src/test/resources/findbugsExcludeFile.xml
index bc69be1..28ac468 100644
--- a/zookeeper-server/src/test/resources/findbugsExcludeFile.xml
+++ b/zookeeper-server/src/test/resources/findbugsExcludeFile.xml
@@ -71,7 +71,7 @@
 
   <Match>
     <Class name="org.apache.zookeeper.server.DataNode" />
-      <Bug code="EI2"/>
+      <Bug code="EI2, EI"/>
   </Match>
 
   <Match>

Reply via email to