Repository: zookeeper
Updated Branches:
  refs/heads/master 4ebb847bc -> fdde8b006


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatcherOrBitSet.java
----------------------------------------------------------------------
diff --git 
a/src/java/main/org/apache/zookeeper/server/watch/WatcherOrBitSet.java 
b/src/java/main/org/apache/zookeeper/server/watch/WatcherOrBitSet.java
new file mode 100644
index 0000000..83b186f
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatcherOrBitSet.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Set;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.util.BitHashSet;
+
+public class WatcherOrBitSet {
+
+    private Set<Watcher> watchers;
+    private BitHashSet watcherBits;
+
+    public WatcherOrBitSet(final Set<Watcher> watchers) {
+        this.watchers = watchers;
+    }
+
+    public WatcherOrBitSet(final BitHashSet watcherBits) {
+        this.watcherBits = watcherBits;
+    }
+
+    public boolean contains(Watcher watcher) {
+        if (watchers == null) {
+            return false;
+        }
+        return watchers.contains(watcher);
+    }
+
+    public boolean contains(int watcherBit) {
+        if (watcherBits == null) {
+            return false;
+        }
+        return watcherBits.contains(watcherBit);
+    }
+
+    public int size() {
+        if (watchers != null) {
+            return watchers.size();
+        }
+        if (watcherBits != null) {
+            return watcherBits.size();
+        }
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatchesPathReport.java
----------------------------------------------------------------------
diff --git 
a/src/java/main/org/apache/zookeeper/server/watch/WatchesPathReport.java 
b/src/java/main/org/apache/zookeeper/server/watch/WatchesPathReport.java
new file mode 100644
index 0000000..38f02de
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatchesPathReport.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A watch report, essentially a mapping of path to session IDs of sessions 
that
+ * have set a watch on that path. This class is immutable.
+ */
+public class WatchesPathReport {
+
+    private final Map<String, Set<Long>> path2Ids;
+
+    /**
+     * Creates a new report.
+     *
+     * @param path2Ids map of paths to session IDs of sessions that have set a
+     * watch on that path
+     */
+    WatchesPathReport(Map<String, Set<Long>> path2Ids) {
+        this.path2Ids = Collections.unmodifiableMap(deepCopy(path2Ids));
+    }
+
+    private static Map<String, Set<Long>> deepCopy(Map<String, Set<Long>> m) {
+        Map<String, Set<Long>> m2 = new HashMap<String, Set<Long>>();
+        for (Map.Entry<String, Set<Long>> e : m.entrySet()) {
+            m2.put(e.getKey(), new HashSet<Long>(e.getValue()));
+        }
+        return m2;
+    }
+
+    /**
+     * Checks if the given path has watches set.
+     *
+     * @param path path
+     * @return true if path has watch set
+     */
+    public boolean hasSessions(String path) {
+        return path2Ids.containsKey(path);
+    }
+    /**
+     * Gets the session IDs of sessions that have set watches on the given 
path.
+     * The returned set is immutable.
+     *
+     * @param path session ID
+     * @return session IDs of sessions that have set watches on the path, or
+     * null if none
+     */
+    public Set<Long> getSessions(String path) {
+        Set<Long> s = path2Ids.get(path);
+        return s != null ? Collections.unmodifiableSet(s) : null;
+    }
+
+    /**
+     * Converts this report to a map. The returned map is mutable, and changes
+     * to it do not reflect back into this report.
+     *
+     * @return map representation of report
+     */
+    public Map<String, Set<Long>> toMap() {
+        return deepCopy(path2Ids);
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatchesReport.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatchesReport.java 
b/src/java/main/org/apache/zookeeper/server/watch/WatchesReport.java
new file mode 100644
index 0000000..ac888d3
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatchesReport.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A watch report, essentially a mapping of session ID to paths that the 
session
+ * has set a watch on. This class is immutable.
+ */
+public class WatchesReport {
+
+    private final Map<Long, Set<String>> id2paths;
+
+    /**
+     * Creates a new report.
+     *
+     * @param id2paths map of session IDs to paths that each session has set
+     * a watch on
+     */
+    WatchesReport(Map<Long, Set<String>> id2paths) {
+        this.id2paths = Collections.unmodifiableMap(deepCopy(id2paths));
+    }
+
+    private static Map<Long, Set<String>> deepCopy(Map<Long, Set<String>> m) {
+        Map<Long, Set<String>> m2 = new HashMap<Long, Set<String>>();
+        for (Map.Entry<Long, Set<String>> e : m.entrySet()) {
+            m2.put(e.getKey(), new HashSet<String>(e.getValue()));
+        }
+        return m2;
+    }
+
+    /**
+     * Checks if the given session has watches set.
+     *
+     * @param sessionId session ID
+     * @return true if session has paths with watches set
+     */
+    public boolean hasPaths(long sessionId) {
+        return id2paths.containsKey(sessionId);
+    }
+
+    /**
+     * Gets the paths that the given session has set watches on. The returned
+     * set is immutable.
+     *
+     * @param sessionId session ID
+     * @return paths that have watches set by the session, or null if none
+     */
+    public Set<String> getPaths(long sessionId) {
+        Set<String> s = id2paths.get(sessionId);
+        return s != null ? Collections.unmodifiableSet(s) : null;
+    }
+
+    /**
+     * Converts this report to a map. The returned map is mutable, and changes
+     * to it do not reflect back into this report.
+     *
+     * @return map representation of report
+     */
+    public Map<Long, Set<String>> toMap() {
+        return deepCopy(id2paths);
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatchesSummary.java
----------------------------------------------------------------------
diff --git 
a/src/java/main/org/apache/zookeeper/server/watch/WatchesSummary.java 
b/src/java/main/org/apache/zookeeper/server/watch/WatchesSummary.java
new file mode 100644
index 0000000..b2449ba
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/watch/WatchesSummary.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A summary of watch information. This class is immutable.
+ */
+public class WatchesSummary {
+
+    /**
+     * The key in the map returned by {@link #toMap()} for the number of
+     * connections.
+     */
+    public static final String KEY_NUM_CONNECTIONS = "num_connections";
+    /**
+     * The key in the map returned by {@link #toMap()} for the number of paths.
+     */
+    public static final String KEY_NUM_PATHS = "num_paths";
+    /**
+     * The key in the map returned by {@link #toMap()} for the total number of
+     * watches.
+     */
+    public static final String KEY_NUM_TOTAL_WATCHES = "num_total_watches";
+
+    private final int numConnections;
+    private final int numPaths;
+    private final int totalWatches;
+
+    /**
+     * Creates a new summary.
+     *
+     * @param numConnections the number of sessions that have set watches
+     * @param numPaths the number of paths that have watches set on them
+     * @param totalWatches the total number of watches set
+     */
+    WatchesSummary(int numConnections, int numPaths, int totalWatches) {
+        this.numConnections = numConnections;
+        this.numPaths = numPaths;
+        this.totalWatches = totalWatches;
+    }
+
+    /**
+     * Gets the number of connections (sessions) that have set watches.
+     *
+     * @return number of connections
+     */
+    public int getNumConnections() {
+        return numConnections;
+    }
+    /**
+     * Gets the number of paths that have watches set on them.
+     *
+     * @return number of paths
+     */
+    public int getNumPaths() {
+        return numPaths;
+    }
+    /**
+     * Gets the total number of watches set.
+     *
+     * @return total watches
+     */
+    public int getTotalWatches() {
+        return totalWatches;
+    }
+
+    /**
+     * Converts this summary to a map. The returned map is mutable, and changes
+     * to it do not reflect back into this summary.
+     *
+     * @return map representation of summary
+     */
+    public Map<String, Object> toMap() {
+        Map<String, Object> summary = new LinkedHashMap<String, Object>();
+        summary.put(KEY_NUM_CONNECTIONS, numConnections);
+        summary.put(KEY_NUM_PATHS, numPaths);
+        summary.put(KEY_NUM_TOTAL_WATCHES, totalWatches);
+        return summary;
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/config/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/src/java/test/config/findbugsExcludeFile.xml 
b/src/java/test/config/findbugsExcludeFile.xml
index 4ab5a5e..a3f45a6 100644
--- a/src/java/test/config/findbugsExcludeFile.xml
+++ b/src/java/test/config/findbugsExcludeFile.xml
@@ -53,7 +53,13 @@
     <Method name="run" />
     <Bug pattern="DM_EXIT" />
   </Match>
-  
+
+   <!-- Failed to create watch manager is a unrecoverable error -->
+   <Match>
+     <Class name="org.apache.zookeeper.server.DataTree" />
+     <Bug pattern="DM_EXIT" />
+   </Match>
+
   <Match>
     <Package name="org.apache.jute.compiler.generated" />
   </Match>
@@ -85,7 +91,7 @@
 
   <Match>
     <Class name="org.apache.zookeeper.server.DataNode"/>
-      <Field name="children"/> 
+      <Field name="children"/>
       <Bug code="IS"/>
   </Match>
  <Match>
@@ -98,6 +104,15 @@
     <Field name="serverStats"/>
     <Bug code="IS"/>
   </Match>
+
+  <!-- The iterate function is non-thread safe, the caller will synchronize
+       on the BitHHashSet object -->
+  <Match>
+    <Class name="org.apache.zookeeper.server.util.BitHashSet" />
+    <Field name="elementCount" />
+    <Bug code="IS" />
+  </Match>
+
   <Match>
      <Class name="org.apache.zookeeper.server.quorum.LearnerSessionTracker"/>
        <Bug code="UrF"/>
@@ -111,7 +126,7 @@
   <!-- these are old classes just for upgrading and should go away -->
   <Match>
     <Class name="org.apache.zookeeper.server.upgrade.DataNodeV1"/>
-  </Match> 
+  </Match>
 
   <Match>
     <Class name="org.apache.zookeeper.server.upgrade.DataTreeV1"/>
@@ -134,6 +149,23 @@
     </Or>
   </Match>
 
+  <!-- Synchronize on the AtomicInteger to do wait/notify, but not relying
+       on the synchronization to control the AtomicInteger value update,
+       so it's not a problem -->
+  <Match>
+    <Class name="org.apache.zookeeper.server.watch.WatcherCleaner" />
+    <Bug code="JLM" />
+    <Method name="addDeadWatcher" />
+  </Match>
+
+  <Match>
+    <Class name="org.apache.zookeeper.server.watch.WatcherCleaner$1" />
+    <Bug code="JLM" />
+    <Method name="doWork" />
+  </Match>
+
+
+
   <Match>
     <Class name="org.apache.zookeeper.server.quorum.QuorumPeer"/>
     <Bug pattern="OS_OPEN_STREAM" />

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/WatchesPathReportTest.java
----------------------------------------------------------------------
diff --git 
a/src/java/test/org/apache/zookeeper/server/WatchesPathReportTest.java 
b/src/java/test/org/apache/zookeeper/server/WatchesPathReportTest.java
deleted file mode 100644
index c0b107d..0000000
--- a/src/java/test/org/apache/zookeeper/server/WatchesPathReportTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zookeeper.server;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.zookeeper.ZKTestCase;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class WatchesPathReportTest extends ZKTestCase {
-    private Map<String, Set<Long>> m;
-    private WatchesPathReport r;
-    @Before public void setUp() {
-        m = new HashMap<String, Set<Long>>();
-        Set<Long> s = new HashSet<Long>();
-        s.add(101L);
-        s.add(102L);
-        m.put("path1", s);
-        s = new HashSet<Long>();
-        s.add(201L);
-        m.put("path2", s);
-        r = new WatchesPathReport(m);
-    }
-    @Test public void testHasSessions() {
-        assertTrue(r.hasSessions("path1"));
-        assertTrue(r.hasSessions("path2"));
-        assertFalse(r.hasSessions("path3"));
-    }
-    @Test public void testGetSessions() {
-        Set<Long> s = r.getSessions("path1");
-        assertEquals(2, s.size());
-        assertTrue(s.contains(101L));
-        assertTrue(s.contains(102L));
-        s = r.getSessions("path2");
-        assertEquals(1, s.size());
-        assertTrue(s.contains(201L));
-        assertNull(r.getSessions("path3"));
-    }
-    @Test public void testToMap() {
-        assertEquals(m, r.toMap());
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/WatchesReportTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/WatchesReportTest.java 
b/src/java/test/org/apache/zookeeper/server/WatchesReportTest.java
deleted file mode 100644
index 7f0343b..0000000
--- a/src/java/test/org/apache/zookeeper/server/WatchesReportTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zookeeper.server;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.zookeeper.ZKTestCase;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class WatchesReportTest extends ZKTestCase {
-    private Map<Long, Set<String>> m;
-    private WatchesReport r;
-    @Before public void setUp() {
-        m = new HashMap<Long, Set<String>>();
-        Set<String> s = new HashSet<String>();
-        s.add("path1a");
-        s.add("path1b");
-        m.put(1L, s);
-        s = new HashSet<String>();
-        s.add("path2a");
-        m.put(2L, s);
-        r = new WatchesReport(m);
-    }
-    @Test public void testHasPaths() {
-        assertTrue(r.hasPaths(1L));
-        assertTrue(r.hasPaths(2L));
-        assertFalse(r.hasPaths(3L));
-    }
-    @Test public void testGetPaths() {
-        Set<String> s = r.getPaths(1L);
-        assertEquals(2, s.size());
-        assertTrue(s.contains("path1a"));
-        assertTrue(s.contains("path1b"));
-        s = r.getPaths(2L);
-        assertEquals(1, s.size());
-        assertTrue(s.contains("path2a"));
-        assertNull(r.getPaths(3L));
-    }
-    @Test public void testToMap() {
-        assertEquals(m, r.toMap());
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/WatchesSummaryTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/WatchesSummaryTest.java 
b/src/java/test/org/apache/zookeeper/server/WatchesSummaryTest.java
deleted file mode 100644
index d679065..0000000
--- a/src/java/test/org/apache/zookeeper/server/WatchesSummaryTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zookeeper.server;
-
-import java.util.Map;
-import org.apache.zookeeper.ZKTestCase;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class WatchesSummaryTest extends ZKTestCase {
-    private WatchesSummary s;
-    @Before public void setUp() {
-        s = new WatchesSummary(1, 2, 3);
-    }
-    @Test public void testGetters() {
-        assertEquals(1, s.getNumConnections());
-        assertEquals(2, s.getNumPaths());
-        assertEquals(3, s.getTotalWatches());
-    }
-    @Test public void testToMap() {
-        Map<String, Object> m = s.toMap();
-        assertEquals(3, m.size());
-        assertEquals(Integer.valueOf(1), 
m.get(WatchesSummary.KEY_NUM_CONNECTIONS));
-        assertEquals(Integer.valueOf(2), m.get(WatchesSummary.KEY_NUM_PATHS));
-        assertEquals(Integer.valueOf(3), 
m.get(WatchesSummary.KEY_NUM_TOTAL_WATCHES));
-    }
-}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/util/BitHashSetTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/util/BitHashSetTest.java 
b/src/java/test/org/apache/zookeeper/server/util/BitHashSetTest.java
new file mode 100644
index 0000000..a70eaa5
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/util/BitHashSetTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.util;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class BitHashSetTest extends ZKTestCase {
+
+    @Test
+    public void testAddWatchBit() {
+        int watcherCacheSize = 1;
+        BitHashSet ws = new BitHashSet(watcherCacheSize);
+        Assert.assertTrue(ws.add(1));
+        Assert.assertEquals(1, ws.size());
+        Assert.assertEquals(1, ws.cachedSize());
+
+        List<Integer> actualBits = new ArrayList<Integer>();
+
+        for (int bit: ws) {
+            actualBits.add(bit);
+        }
+        Assert.assertArrayEquals(
+            new Integer[] {1},
+            actualBits.toArray(new Integer[actualBits.size()]));
+
+        // add the same bit again
+        Assert.assertFalse(ws.add(1));
+        Assert.assertEquals(1, ws.size());
+        Assert.assertEquals(1, ws.cachedSize());
+
+        // add another bit, make sure there there is only 1 bit cached
+        Assert.assertTrue(ws.add(2));
+        Assert.assertEquals(2, ws.size());
+        Assert.assertEquals(1, ws.cachedSize());
+
+        Assert.assertTrue(ws.contains(1));
+
+        actualBits.clear();
+        for (int bit: ws) {
+            actualBits.add(bit);
+        }
+        Assert.assertArrayEquals(
+            new Integer[] {1, 2},
+            actualBits.toArray(new Integer[actualBits.size()]));
+    }
+
+    @Test
+    public void testRemoveWatchBit() {
+        int watcherCacheSize = 1;
+        BitHashSet ws = new BitHashSet(watcherCacheSize);
+        ws.add(1);
+        ws.add(2);
+
+        Assert.assertTrue(ws.contains(1));
+        Assert.assertTrue(ws.contains(2));
+
+        ws.remove(1);
+        Assert.assertFalse(ws.contains(1));
+        Assert.assertEquals(1, ws.size());
+        Assert.assertEquals(0, ws.cachedSize());
+
+        List<Integer> actualBits = new ArrayList<Integer>();
+
+        for (int bit: ws) {
+            actualBits.add(bit);
+        }
+        Assert.assertArrayEquals(
+            new Integer[] {2},
+            actualBits.toArray(new Integer[actualBits.size()]));
+
+        ws.add(3);
+        Assert.assertEquals(2, ws.size());
+        Assert.assertEquals(1, ws.cachedSize());
+
+        actualBits.clear();
+        for (int bit: ws) {
+            actualBits.add(bit);
+        }
+        Assert.assertArrayEquals(
+            new Integer[] {2, 3},
+            actualBits.toArray(new Integer[actualBits.size()]));
+
+        ws.remove(2);
+        ws.remove(3);
+
+        Assert.assertEquals(0, ws.size());
+        Assert.assertEquals(0, ws.cachedSize());
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/util/BitMapTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/util/BitMapTest.java 
b/src/java/test/org/apache/zookeeper/server/util/BitMapTest.java
new file mode 100644
index 0000000..eca0f2d
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/util/BitMapTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.util;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class BitMapTest extends ZKTestCase {
+
+    @Test
+    public void testAddAndRemove() {
+        BitMap<String> bitMap = new BitMap<String>();
+        String v1 = new String("v1");
+        Integer bit = bitMap.add(v1);
+
+        Assert.assertEquals(1, bitMap.size());
+        Assert.assertTrue(bit >= 0);
+        Assert.assertEquals(v1, bitMap.get(bit));
+        Assert.assertEquals(bit, bitMap.getBit(v1));
+
+        // add the same value again
+        Integer newBit = bitMap.add(v1);
+        Assert.assertEquals(bit, newBit);
+        Assert.assertEquals(1, bitMap.size());
+
+        String v2 = new String("v2");
+        Integer v2Bit = bitMap.add(v2);
+        Assert.assertEquals(2, bitMap.size());
+        Assert.assertNotEquals(v2Bit, bit);
+
+        // remove by value
+        bitMap.remove(v1);
+        Assert.assertEquals(1, bitMap.size());
+        Assert.assertNull(bitMap.get(bit));
+        Assert.assertNull(bitMap.getBit(v1));
+
+        // remove by bit
+        bitMap.remove(v2Bit);
+        Assert.assertEquals(0, bitMap.size());
+        Assert.assertNull(bitMap.get(v2Bit));
+        Assert.assertNull(bitMap.getBit(v2));
+    }
+
+    @Test
+    public void testBitReuse() {
+        BitMap<String> bitMap = new BitMap<String>();
+        int v1Bit = bitMap.add("v1");
+        int v2Bit = bitMap.add("v2");
+        int v3Bit = bitMap.add("v3");
+        bitMap.remove(v2Bit);
+
+        int v4Bit = bitMap.add("v4");
+
+        Assert.assertEquals(v4Bit, v2Bit);
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatchManagerTest.java
----------------------------------------------------------------------
diff --git 
a/src/java/test/org/apache/zookeeper/server/watch/WatchManagerTest.java 
b/src/java/test/org/apache/zookeeper/server/watch/WatchManagerTest.java
new file mode 100644
index 0000000..f6a229b
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/watch/WatchManagerTest.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.server.DumbWatcher;
+import org.apache.zookeeper.server.ServerCnxn;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class WatchManagerTest extends ZKTestCase {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(WatchManagerTest.class);
+
+    private static final String PATH_PREFIX = "path";
+
+    private ConcurrentHashMap<Integer, DumbWatcher> watchers;
+    private Random r;
+    private String className;
+
+    public WatchManagerTest(String className) {
+        this.className = className;
+    }
+
+    @Parameterized.Parameters
+    public static List<Object []> data() {
+        return Arrays.asList(new Object [][] {
+            {WatchManager.class.getName()},
+            {WatchManagerOptimized.class.getName()}
+        });
+    }
+
+    @Before
+    public void setUp() {
+        watchers = new ConcurrentHashMap<Integer, DumbWatcher>();
+        r = new Random(System.nanoTime());
+    }
+
+    public IWatchManager getWatchManager() throws IOException {
+        System.setProperty(WatchManagerFactory.ZOOKEEPER_WATCH_MANAGER_NAME, 
className);
+        return WatchManagerFactory.createWatchManager();
+    }
+
+    public DumbWatcher createOrGetWatcher(int watcherId) {
+        if (!watchers.containsKey(watcherId)) {
+            DumbWatcher watcher = new DumbWatcher(watcherId);
+            watchers.putIfAbsent(watcherId, watcher);
+        }
+        return watchers.get(watcherId);
+    }
+
+    public class AddWatcherWorker extends Thread {
+
+        private final IWatchManager manager;
+        private final int paths;
+        private final int watchers;
+        private final AtomicInteger watchesAdded;
+        private volatile boolean stopped = false;
+
+        public AddWatcherWorker(IWatchManager manager,
+                int paths, int watchers, AtomicInteger watchesAdded) {
+            this.manager = manager;
+            this.paths = paths;
+            this.watchers = watchers;
+            this.watchesAdded = watchesAdded;
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                String path = PATH_PREFIX + r.nextInt(paths);
+                Watcher watcher = createOrGetWatcher(r.nextInt(watchers));
+                if (manager.addWatch(path, watcher)) {
+                    watchesAdded.addAndGet(1);
+                }
+            }
+        }
+
+        public void shutdown() {
+            stopped = true;
+        }
+    }
+
+    public class WatcherTriggerWorker extends Thread {
+
+        private final IWatchManager manager;
+        private final int paths;
+        private final AtomicInteger triggeredCount;
+        private volatile boolean stopped = false;
+
+        public WatcherTriggerWorker(IWatchManager manager,
+                int paths, AtomicInteger triggeredCount) {
+            this.manager = manager;
+            this.paths = paths;
+            this.triggeredCount = triggeredCount;
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                String path = PATH_PREFIX + r.nextInt(paths);
+                WatcherOrBitSet s = manager.triggerWatch(
+                        path, EventType.NodeDeleted);
+                if (s != null) {
+                    triggeredCount.addAndGet(s.size());
+                }
+                try {
+                    Thread.sleep(r.nextInt(10));
+                } catch (InterruptedException e) {}
+            }
+        }
+
+        public void shutdown() {
+            stopped = true;
+        }
+    }
+
+    public class RemoveWatcherWorker extends Thread {
+
+        private final IWatchManager manager;
+        private final int paths;
+        private final int watchers;
+        private final AtomicInteger watchesRemoved;
+        private volatile boolean stopped = false;
+
+        public RemoveWatcherWorker(IWatchManager manager,
+                int paths, int watchers, AtomicInteger watchesRemoved) {
+            this.manager = manager;
+            this.paths = paths;
+            this.watchers = watchers;
+            this.watchesRemoved = watchesRemoved;
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                String path = PATH_PREFIX + r.nextInt(paths);
+                Watcher watcher = createOrGetWatcher(r.nextInt(watchers));
+                if (manager.removeWatcher(path, watcher)) {
+                    watchesRemoved.addAndGet(1);
+                }
+                try {
+                    Thread.sleep(r.nextInt(10));
+                } catch (InterruptedException e) {}
+            }
+        }
+
+        public void shutdown() {
+            stopped = true;
+        }
+
+    }
+
+    public class CreateDeadWatchersWorker extends Thread {
+
+        private final IWatchManager manager;
+        private final int watchers;
+        private final Set<Watcher> removedWatchers;
+        private volatile boolean stopped = false;
+
+        public CreateDeadWatchersWorker(IWatchManager manager,
+                int watchers, Set<Watcher> removedWatchers) {
+            this.manager = manager;
+            this.watchers = watchers;
+            this.removedWatchers = removedWatchers;
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                DumbWatcher watcher = createOrGetWatcher(r.nextInt(watchers));
+                watcher.setStale();
+                manager.removeWatcher(watcher);
+                synchronized (removedWatchers) {
+                    removedWatchers.add(watcher);
+                }
+                try {
+                    Thread.sleep(r.nextInt(10));
+                } catch (InterruptedException e) {}
+            }
+        }
+
+        public void shutdown() {
+            stopped = true;
+        }
+
+    }
+
+    /**
+     * Concurrently add and trigger watch, make sure the watches triggered
+     * are the same as the number added.
+     */
+    @Test(timeout = 90000)
+    public void testAddAndTriggerWatcher() throws IOException {
+        IWatchManager manager = getWatchManager();
+        int paths = 1;
+        int watchers = 10000;
+
+        // 1. start 5 workers to trigger watchers on that path
+        //    count all the watchers have been fired
+        AtomicInteger watchTriggered = new AtomicInteger();
+        List<WatcherTriggerWorker> triggerWorkers =
+                new ArrayList<WatcherTriggerWorker>();
+        for (int i = 0; i < 5; i++) {
+            WatcherTriggerWorker worker =
+                    new WatcherTriggerWorker(manager, paths, watchTriggered);
+            triggerWorkers.add(worker);
+            worker.start();
+        }
+
+        // 2. start 5 workers to add different watchers on the same path
+        //    count all the watchers being added
+        AtomicInteger watchesAdded = new AtomicInteger();
+        List<AddWatcherWorker> addWorkers = new ArrayList<AddWatcherWorker>();
+        for (int i = 0; i < 5; i++) {
+            AddWatcherWorker worker = new AddWatcherWorker(
+                    manager, paths, watchers, watchesAdded);
+            addWorkers.add(worker);
+            worker.start();
+        }
+
+        while (watchesAdded.get() < 100000) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {}
+        }
+
+        // 3. stop all the addWorkers
+        for (AddWatcherWorker worker: addWorkers) {
+            worker.shutdown();
+        }
+
+        // 4. running the trigger worker a bit longer to make sure
+        //    all watchers added are fired
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {}
+
+        // 5. stop all triggerWorkers
+        for (WatcherTriggerWorker worker: triggerWorkers) {
+            worker.shutdown();
+        }
+
+        // 6. make sure the total watch triggered is same as added
+        Assert.assertTrue(watchesAdded.get() > 0);
+        Assert.assertEquals(watchesAdded.get(), watchTriggered.get());
+    }
+
+    /**
+     * Concurrently add and remove watch, make sure the watches left +
+     * the watches removed are equal to the total added watches.
+     */
+    @Test(timeout = 90000)
+    public void testRemoveWatcherOnPath() throws IOException {
+        IWatchManager manager = getWatchManager();
+        int paths = 10;
+        int watchers = 10000;
+
+        // 1. start 5 workers to remove watchers on those path
+        //    record the watchers have been removed
+        AtomicInteger watchesRemoved = new AtomicInteger();
+        List<RemoveWatcherWorker> removeWorkers =
+                new ArrayList<RemoveWatcherWorker>();
+        for (int i = 0; i < 5; i++) {
+            RemoveWatcherWorker worker =
+                    new RemoveWatcherWorker(manager, paths, watchers, 
watchesRemoved);
+            removeWorkers.add(worker);
+            worker.start();
+        }
+
+        // 2. start 5 workers to add different watchers on different path
+        //    record the watchers have been added
+        AtomicInteger watchesAdded = new AtomicInteger();
+        List<AddWatcherWorker> addWorkers = new ArrayList<AddWatcherWorker>();
+        for (int i = 0; i < 5; i++) {
+            AddWatcherWorker worker = new AddWatcherWorker(
+                    manager, paths, watchers, watchesAdded);
+            addWorkers.add(worker);
+            worker.start();
+        }
+
+        while (watchesAdded.get() < 100000) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {}
+        }
+
+        // 3. stop all workers
+        for (RemoveWatcherWorker worker: removeWorkers) {
+            worker.shutdown();
+        }
+        for (AddWatcherWorker worker: addWorkers) {
+            worker.shutdown();
+        }
+
+        // 4. sleep for a while to make sure all the thread exited
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {}
+
+        // 5. make sure left watches + removed watches = added watches
+        Assert.assertTrue(watchesAdded.get() > 0);
+        Assert.assertTrue(watchesRemoved.get() > 0);
+        Assert.assertTrue(manager.size() > 0);
+        Assert.assertEquals(
+                watchesAdded.get(), watchesRemoved.get() + manager.size());
+    }
+
+    /**
+     * Concurrently add watch while close the watcher to simulate the
+     * client connections closed on prod.
+     */
+    @Test(timeout = 90000)
+    public void testDeadWatchers() throws IOException {
+        System.setProperty("zookeeper.watcherCleanThreshold", "10");
+        System.setProperty("zookeeper.watcherCleanIntervalInSeconds", "1");
+
+        IWatchManager manager = getWatchManager();
+        int paths = 1;
+        int watchers = 100000;
+
+        // 1. start 5 workers to randomly mark those watcher as dead
+        //    and remove them from watch manager
+        Set<Watcher> deadWatchers = new HashSet<Watcher>();
+        List<CreateDeadWatchersWorker> deadWorkers =
+                new ArrayList<CreateDeadWatchersWorker>();
+        for (int i = 0; i < 5; i++) {
+            CreateDeadWatchersWorker worker = new CreateDeadWatchersWorker(
+                    manager, watchers, deadWatchers);
+            deadWorkers.add(worker);
+            worker.start();
+        }
+
+        // 2. start 5 workers to add different watchers on the same path
+        AtomicInteger watchesAdded = new AtomicInteger();
+        List<AddWatcherWorker> addWorkers = new ArrayList<AddWatcherWorker>();
+        for (int i = 0; i < 5; i++) {
+            AddWatcherWorker worker = new AddWatcherWorker(
+                    manager, paths, watchers, watchesAdded);
+            addWorkers.add(worker);
+            worker.start();
+        }
+
+        while (watchesAdded.get() < 50000) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {}
+        }
+
+        // 3. stop all workers
+        for (CreateDeadWatchersWorker worker: deadWorkers) {
+            worker.shutdown();
+        }
+        for (AddWatcherWorker worker: addWorkers) {
+            worker.shutdown();
+        }
+
+        // 4. sleep for a while to make sure all the thread exited
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {}
+
+        // 5. make sure the dead watchers are not in the existing watchers
+        WatchesReport existingWatchers = manager.getWatches();
+        for (Watcher w: deadWatchers) {
+            Assert.assertFalse(
+                    existingWatchers.hasPaths(((ServerCnxn) 
w).getSessionId()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatcherCleanerTest.java
----------------------------------------------------------------------
diff --git 
a/src/java/test/org/apache/zookeeper/server/watch/WatcherCleanerTest.java 
b/src/java/test/org/apache/zookeeper/server/watch/WatcherCleanerTest.java
new file mode 100644
index 0000000..d315232
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/watch/WatcherCleanerTest.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.common.Time;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class WatcherCleanerTest extends ZKTestCase {
+
+    public static class MyDeadWatcherListener implements IDeadWatcherListener {
+
+        private CountDownLatch latch;
+        private int delayMs;
+        private Set<Integer> deadWatchers = new HashSet<Integer>();
+
+        public void setCountDownLatch(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        public void setDelayMs(int delayMs) {
+            this.delayMs = delayMs;
+        }
+
+        @Override
+        public void processDeadWatchers(Set<Integer> deadWatchers) {
+            if (delayMs > 0) {
+                try {
+                    Thread.sleep(delayMs);
+                } catch (InterruptedException e) {}
+            }
+            this.deadWatchers.clear();
+            this.deadWatchers.addAll(deadWatchers);
+            latch.countDown();
+        }
+
+        public Set<Integer> getDeadWatchers() {
+            return deadWatchers;
+        }
+
+        public boolean wait(int maxWaitMs) {
+            try {
+                return latch.await(maxWaitMs, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {}
+            return false;
+        }
+    }
+
+    @Test
+    public void testProcessDeadWatchersBasedOnThreshold() {
+        MyDeadWatcherListener listener = new MyDeadWatcherListener();
+        int threshold = 3;
+        WatcherCleaner cleaner = new WatcherCleaner(listener, threshold, 60, 
1, 10);
+        cleaner.start();
+
+        int i = 0;
+        while (i++ < threshold - 1) {
+            cleaner.addDeadWatcher(i);
+        }
+        // not trigger processDeadWatchers yet
+        Assert.assertEquals(0, listener.getDeadWatchers().size());
+
+        listener.setCountDownLatch(new CountDownLatch(1));
+        // add another dead watcher to trigger the process
+        cleaner.addDeadWatcher(i);
+        Assert.assertTrue(listener.wait(1000));
+        Assert.assertEquals(threshold, listener.getDeadWatchers().size());
+    }
+
+    @Test
+    public void testProcessDeadWatchersBasedOnTime() {
+        MyDeadWatcherListener listener = new MyDeadWatcherListener();
+        WatcherCleaner cleaner = new WatcherCleaner(listener, 10, 1, 1, 10);
+        cleaner.start();
+
+        cleaner.addDeadWatcher(1);
+        // not trigger processDeadWatchers yet
+        Assert.assertEquals(0, listener.getDeadWatchers().size());
+
+        listener.setCountDownLatch(new CountDownLatch(1));
+        Assert.assertTrue(listener.wait(2000));
+        Assert.assertEquals(1, listener.getDeadWatchers().size());
+
+        // won't trigger event if there is no dead watchers
+        listener.setCountDownLatch(new CountDownLatch(1));
+        Assert.assertFalse(listener.wait(2000));
+    }
+
+    @Test
+    public void testMaxInProcessingDeadWatchers() {
+        MyDeadWatcherListener listener = new MyDeadWatcherListener();
+        int delayMs = 1000;
+        listener.setDelayMs(delayMs);
+        WatcherCleaner cleaner = new WatcherCleaner(listener, 1, 60, 1, 1);
+        cleaner.start();
+
+        listener.setCountDownLatch(new CountDownLatch(2));
+
+        long startTime = Time.currentElapsedTime();
+        cleaner.addDeadWatcher(1);
+        cleaner.addDeadWatcher(2);
+        long time = Time.currentElapsedTime() - startTime;
+        System.out.println("time used " + time);
+        Assert.assertTrue(Time.currentElapsedTime() - startTime >= delayMs);
+        Assert.assertTrue(listener.wait(5000));
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatcherOrBitSetTest.java
----------------------------------------------------------------------
diff --git 
a/src/java/test/org/apache/zookeeper/server/watch/WatcherOrBitSetTest.java 
b/src/java/test/org/apache/zookeeper/server/watch/WatcherOrBitSetTest.java
new file mode 100644
index 0000000..4b7fbd5
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/watch/WatcherOrBitSetTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.DumbWatcher;
+import org.apache.zookeeper.server.util.BitHashSet;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class WatcherOrBitSetTest extends ZKTestCase {
+
+    @Test
+    public void testWatcherSet() {
+        Set<Watcher> wset = new HashSet<Watcher>();
+        WatcherOrBitSet hashSet = new WatcherOrBitSet(wset);
+        Assert.assertEquals(0, hashSet.size());
+
+        DumbWatcher w1 = new DumbWatcher();
+        Assert.assertFalse(hashSet.contains(w1));
+        wset.add(w1);
+        Assert.assertTrue(hashSet.contains(w1));
+        Assert.assertEquals(1, hashSet.size());
+        Assert.assertFalse(hashSet.contains(1));
+    }
+
+    @Test
+    public void testBitSet() {
+        BitHashSet bset = new BitHashSet(0);
+        WatcherOrBitSet bitSet = new WatcherOrBitSet(bset);
+        Assert.assertEquals(0, bitSet.size());
+
+        Integer bit = new Integer(1);
+        Assert.assertFalse(bitSet.contains(1));
+        Assert.assertFalse(bitSet.contains(bit));
+
+        bset.add(bit);
+        Assert.assertTrue(bitSet.contains(1));
+        Assert.assertTrue(bitSet.contains(bit));
+        Assert.assertEquals(1, bitSet.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatchesPathReportTest.java
----------------------------------------------------------------------
diff --git 
a/src/java/test/org/apache/zookeeper/server/watch/WatchesPathReportTest.java 
b/src/java/test/org/apache/zookeeper/server/watch/WatchesPathReportTest.java
new file mode 100644
index 0000000..34e3789
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/watch/WatchesPathReportTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class WatchesPathReportTest extends ZKTestCase {
+    private Map<String, Set<Long>> m;
+    private WatchesPathReport r;
+    @Before public void setUp() {
+        m = new HashMap<String, Set<Long>>();
+        Set<Long> s = new HashSet<Long>();
+        s.add(101L);
+        s.add(102L);
+        m.put("path1", s);
+        s = new HashSet<Long>();
+        s.add(201L);
+        m.put("path2", s);
+        r = new WatchesPathReport(m);
+    }
+    @Test public void testHasSessions() {
+        assertTrue(r.hasSessions("path1"));
+        assertTrue(r.hasSessions("path2"));
+        assertFalse(r.hasSessions("path3"));
+    }
+    @Test public void testGetSessions() {
+        Set<Long> s = r.getSessions("path1");
+        assertEquals(2, s.size());
+        assertTrue(s.contains(101L));
+        assertTrue(s.contains(102L));
+        s = r.getSessions("path2");
+        assertEquals(1, s.size());
+        assertTrue(s.contains(201L));
+        assertNull(r.getSessions("path3"));
+    }
+    @Test public void testToMap() {
+        assertEquals(m, r.toMap());
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatchesReportTest.java
----------------------------------------------------------------------
diff --git 
a/src/java/test/org/apache/zookeeper/server/watch/WatchesReportTest.java 
b/src/java/test/org/apache/zookeeper/server/watch/WatchesReportTest.java
new file mode 100644
index 0000000..237583a
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/watch/WatchesReportTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class WatchesReportTest extends ZKTestCase {
+    private Map<Long, Set<String>> m;
+    private WatchesReport r;
+    @Before public void setUp() {
+        m = new HashMap<Long, Set<String>>();
+        Set<String> s = new HashSet<String>();
+        s.add("path1a");
+        s.add("path1b");
+        m.put(1L, s);
+        s = new HashSet<String>();
+        s.add("path2a");
+        m.put(2L, s);
+        r = new WatchesReport(m);
+    }
+    @Test public void testHasPaths() {
+        assertTrue(r.hasPaths(1L));
+        assertTrue(r.hasPaths(2L));
+        assertFalse(r.hasPaths(3L));
+    }
+    @Test public void testGetPaths() {
+        Set<String> s = r.getPaths(1L);
+        assertEquals(2, s.size());
+        assertTrue(s.contains("path1a"));
+        assertTrue(s.contains("path1b"));
+        s = r.getPaths(2L);
+        assertEquals(1, s.size());
+        assertTrue(s.contains("path2a"));
+        assertNull(r.getPaths(3L));
+    }
+    @Test public void testToMap() {
+        assertEquals(m, r.toMap());
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatchesSummaryTest.java
----------------------------------------------------------------------
diff --git 
a/src/java/test/org/apache/zookeeper/server/watch/WatchesSummaryTest.java 
b/src/java/test/org/apache/zookeeper/server/watch/WatchesSummaryTest.java
new file mode 100644
index 0000000..35956f1
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/watch/WatchesSummaryTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.util.Map;
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class WatchesSummaryTest extends ZKTestCase {
+    private WatchesSummary s;
+    @Before public void setUp() {
+        s = new WatchesSummary(1, 2, 3);
+    }
+    @Test public void testGetters() {
+        assertEquals(1, s.getNumConnections());
+        assertEquals(2, s.getNumPaths());
+        assertEquals(3, s.getTotalWatches());
+    }
+    @Test public void testToMap() {
+        Map<String, Object> m = s.toMap();
+        assertEquals(3, m.size());
+        assertEquals(Integer.valueOf(1), 
m.get(WatchesSummary.KEY_NUM_CONNECTIONS));
+        assertEquals(Integer.valueOf(2), m.get(WatchesSummary.KEY_NUM_PATHS));
+        assertEquals(Integer.valueOf(3), 
m.get(WatchesSummary.KEY_NUM_TOTAL_WATCHES));
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/test/java/bench/org/apache/zookeeper/BenchMain.java
----------------------------------------------------------------------
diff --git a/src/test/java/bench/org/apache/zookeeper/BenchMain.java 
b/src/test/java/bench/org/apache/zookeeper/BenchMain.java
new file mode 100644
index 0000000..8e370c0
--- /dev/null
+++ b/src/test/java/bench/org/apache/zookeeper/BenchMain.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+public class BenchMain {
+    public static void main(String args[]) throws Exception {
+        org.openjdk.jmh.Main.main(args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/test/java/bench/org/apache/zookeeper/server/watch/WatchBench.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/bench/org/apache/zookeeper/server/watch/WatchBench.java 
b/src/test/java/bench/org/apache/zookeeper/server/watch/WatchBench.java
new file mode 100644
index 0000000..0510df7
--- /dev/null
+++ b/src/test/java/bench/org/apache/zookeeper/server/watch/WatchBench.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.watch.IWatchManager;
+import org.apache.zookeeper.server.DumbWatcher;
+
+import org.openjdk.jmh.annotations.*;
+
+import java.util.concurrent.TimeUnit;
+
+@Fork(3)
+public class WatchBench {
+
+    static final String pathPrefix = "/reasonably/long/path/";
+    static final EventType event = EventType.NodeDataChanged;
+
+    static IWatchManager createWatchManager(String className) throws Exception 
{
+        Class clazz = Class.forName(
+                "org.apache.zookeeper.server.watch." + className);
+        return (IWatchManager) clazz.newInstance();
+    }
+
+    static void forceGC() {
+        int gcTimes = 3;
+        for (int i = 0; i < gcTimes; i++) {
+            try {
+                System.gc();
+                Thread.currentThread().sleep(1000);
+
+                System.runFinalization();
+                Thread.currentThread().sleep(1000);
+            } catch (InterruptedException ex) { /* ignore */ }
+        }
+    }
+
+    static long getMemoryUse() {
+        forceGC();
+        long totalMem = Runtime.getRuntime().totalMemory();
+
+        forceGC();
+        long freeMem = Runtime.getRuntime().freeMemory();
+        return totalMem - freeMem;
+    }
+
+    @State(Scope.Benchmark)
+    public static class IterationState {
+
+        @Param({"WatchManager", "WatchManagerOptimized"})
+        public String watchManagerClass;
+
+        @Param({"10000"})
+        public int pathCount;
+
+        String[] paths;
+
+        long watchesAdded = 0;
+        IWatchManager watchManager;
+
+        long memWhenSetup = 0;
+
+        @Setup(Level.Iteration)
+        public void setup() throws Exception {
+            paths = new String[pathCount];
+            for (int i = 0; i < paths.length; i++) {
+                paths[i] = pathPrefix + i;
+            }
+
+            watchesAdded = 0;
+            watchManager = createWatchManager(watchManagerClass);
+
+            memWhenSetup = getMemoryUse();
+        }
+
+        @TearDown(Level.Iteration)
+        public void tearDown() {
+            long memUsed = getMemoryUse() - memWhenSetup;
+            System.out.println("Memory used: " + watchesAdded + " " + memUsed);
+
+            double memPerMillionWatchesMB = memUsed * 1.0 / watchesAdded ;
+            System.out.println(
+                    "Memory used per million watches " +
+                    String.format("%.2f", memPerMillionWatchesMB) + "MB");
+        }
+    }
+
+    /**
+     * Test concenrate watch case where the watcher watches all paths.
+     *
+     * The output of this test will be the average time used to add the
+     * watch to all paths.
+     */
+    @Benchmark
+    @BenchmarkMode(Mode.AverageTime)
+    @OutputTimeUnit(TimeUnit.MILLISECONDS)
+    @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+    @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+    public void testAddConcentrateWatch(IterationState state) throws Exception 
{
+        Watcher watcher = new DumbWatcher();
+
+        // watch all paths
+        for (String path : state.paths) {
+            if (state.watchManager.addWatch(path, watcher)) {
+                state.watchesAdded++;
+            }
+        }
+    }
+
+    @State(Scope.Benchmark)
+    public static class InvocationState {
+
+        @Param({"WatchManager", "WatchManagerOptimized"})
+        public String watchManagerClass;
+
+        @Param({"1", "1000"})
+        public int pathCount;
+
+        @Param({"1", "1000"})
+        public int watcherCount;
+
+        String[] paths;
+        Watcher[] watchers;
+
+        IWatchManager watchManager;
+
+        @Setup(Level.Invocation)
+        public void setup() throws Exception {
+            initialize();
+            prepare();
+        }
+
+        void initialize() throws Exception {
+            if (paths == null || paths.length != pathCount) {
+                paths = new String[pathCount];
+                for (int i = 0; i < pathCount; i++) {
+                    paths[i] = pathPrefix + i;
+                }
+            }
+
+            if (watchers == null || watchers.length != watcherCount) {
+                watchers = new Watcher[watcherCount];
+                for (int i = 0; i < watcherCount; i++) {
+                    watchers[i] = new DumbWatcher();
+                }
+            }
+            if (watchManager == null ||
+                    !watchManager.getClass().getSimpleName().contains(
+                            watchManagerClass)) {
+                watchManager = createWatchManager(watchManagerClass);
+            }
+        }
+
+        void prepare() {
+            for (String path : paths) {
+                for (Watcher watcher : watchers) {
+                    watchManager.addWatch(path, watcher);
+                }
+            }
+        }
+    }
+
+    /**
+     * Test trigger watches in concenrate case.
+     *
+     * The output of this test is the time used to trigger those watches on
+     * all paths.
+     */
+    @Benchmark
+    @BenchmarkMode(Mode.AverageTime)
+    @OutputTimeUnit(TimeUnit.MILLISECONDS)
+    @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+    @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+    public void testTriggerConcentrateWatch(InvocationState state) throws 
Exception {
+        for (String path : state.paths) {
+            state.watchManager.triggerWatch(path, event);
+        }
+    }
+
+    @State(Scope.Benchmark)
+    public static class AddSparseWatchState extends InvocationState {
+
+        @Param({"10000"})
+        public int pathCount;
+
+        @Param({"10000"})
+        public int watcherCount;
+
+        long watchesAdded = 0;
+        long memWhenSetup = 0;
+
+        @Override
+        public void prepare() {
+            watchesAdded = 0;
+            memWhenSetup = getMemoryUse();
+        }
+
+        @TearDown(Level.Invocation)
+        public void tearDown() {
+            long memUsed = getMemoryUse() - memWhenSetup;
+            System.out.println("Memory used: " + watchesAdded + " " + memUsed);
+
+            double memPerMillionWatchesMB = memUsed * 1.0 / watchesAdded ;
+            System.out.println(
+                    "Memory used per million sparse watches " +
+                    String.format("%.2f", memPerMillionWatchesMB) + "MB");
+
+            // clear all the watches
+            for (String path : paths) {
+                watchManager.triggerWatch(path, event);
+            }
+        }
+    }
+
+    /**
+     * Test sparse watch case where only one watcher watches all paths, and
+     * only one path being watched by all watchers.
+     *
+     * The output of this test will be the average time used to add those
+     * sparse watches.
+     */
+    @Benchmark
+    @BenchmarkMode(Mode.AverageTime)
+    @OutputTimeUnit(TimeUnit.MILLISECONDS)
+    @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+    @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+    public void testAddSparseWatch(AddSparseWatchState state) throws Exception 
{
+        // All watchers are watching the 1st path
+        for (Watcher watcher : state.watchers) {
+            if (state.watchManager.addWatch(state.paths[0], watcher)) {
+                state.watchesAdded++;
+            }
+        }
+        // The 1st watcher is watching all paths
+        for (String path : state.paths) {
+            if (state.watchManager.addWatch(path, state.watchers[0])) {
+                state.watchesAdded++;
+            }
+        }
+    }
+
+    @State(Scope.Benchmark)
+    public static class TriggerSparseWatchState extends InvocationState {
+
+        @Param({"10000"})
+        public int pathCount;
+
+        @Param({"10000"})
+        public int watcherCount;
+
+        @Override
+        public void prepare() {
+            // All watchers are watching the 1st path
+            for (Watcher watcher : watchers) {
+                watchManager.addWatch(paths[0], watcher);
+            }
+
+            // The 1st watcher is watching all paths
+            for (String path : paths) {
+                watchManager.addWatch(path, watchers[0]);
+            }
+        }
+    }
+
+
+    /**
+     * Test trigger watches in sparse case.
+     *
+     * The output of this test is the time used to trigger those watches on
+     * all paths.
+     */
+    @Benchmark
+    @BenchmarkMode(Mode.AverageTime)
+    @OutputTimeUnit(TimeUnit.MILLISECONDS)
+    @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+    @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+    public void testTriggerSparseWatch(TriggerSparseWatchState state) throws 
Exception {
+        for (String path : state.paths) {
+            state.watchManager.triggerWatch(path, event);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses
----------------------------------------------------------------------
diff --git 
a/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses 
b/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses
index 2b0fc83..ba29e89 100644
--- a/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses
+++ b/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses
@@ -8,3 +8,4 @@ quorumBench:org.apache.zookeeper.server.QuorumBenchmark:A 
benchmark of just the
 abBench:org.apache.zookeeper.server.quorum.AtomicBroadcastBenchmark:A 
benchmark of just the atomic broadcast
 ic:org.apache.zookeeper.test.system.InstanceContainer:A container that will 
instantiate classes as directed by an instance manager
 systest:org.apache.zookeeper.test.system.BaseSysTest:Start system test
+jmh:org.apache.zookeeper.BenchMain:Run jmh micro benchmarks

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml
----------------------------------------------------------------------
diff --git a/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml 
b/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml
index 50143d4..b7cd21b 100644
--- a/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml
+++ b/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml
@@ -1025,6 +1025,102 @@ server.3=zoo3:2888:3888</programlisting>
             </listitem>
           </varlistentry>
 
+
+          <varlistentry>
+            <term>watchManaggerName</term>
+
+            <listitem>
+              <para>(Java system property only: <emphasis
+                    role="bold">zookeeper.watchManagerName</emphasis>)</para>
+
+              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+                <ulink 
url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179</ulink>
 New watcher
+                manager WatchManagerOptimized is added to optimize the memory 
overhead in heavy watch use cases. This
+                config is used to define which watcher manager to be used. 
Currently, we only support WatchManager and
+                WatchManagerOptimized.</para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term>watcherCleanThreadsNum</term>
+
+            <listitem>
+              <para>(Java system property only: <emphasis
+                    
role="bold">zookeeper.watcherCleanThreadsNum</emphasis>)</para>
+
+              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+                <ulink 
url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179</ulink>
 The new watcher
+                manager WatchManagerOptimized will clean up the dead watchers 
lazily, this config is used to decide how
+                many thread is used in the WatcherCleaner. More thread usually 
means larger clean up throughput. The
+                default value is 2, which is good enough even for heavy and 
continuous session closing/recreating cases.</para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term>watcherCleanThreshold</term>
+
+            <listitem>
+              <para>(Java system property only: <emphasis
+                    
role="bold">zookeeper.watcherCleanThreshold</emphasis>)</para>
+
+              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+                <ulink 
url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179</ulink>
 The new watcher
+                manager WatchManagerOptimized will clean up the dead watchers 
lazily, the clean up process is relatively
+                heavy, batch processing will reduce the cost and improve the 
performance. This setting is used to decide
+                the batch size. The default one is 1000, we don't need to 
change it if there is no memory or clean up
+                speed issue.</para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term>watcherCleanIntervalInSeconds</term>
+
+            <listitem>
+              <para>(Java system property only: <emphasis
+                    
role="bold">zookeeper.watcherCleanIntervalInSeconds</emphasis>)</para>
+
+              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+                <ulink 
url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179</ulink>
 The new watcher
+                manager WatchManagerOptimized will clean up the dead watchers 
lazily, the clean up process is relatively
+                heavy, batch processing will reduce the cost and improve the 
performance. Besides watcherCleanThreshold,
+                this setting is used to clean up the dead watchers after 
certain time even the dead watchers are not larger
+                than watcherCleanThreshold, so that we won't leave the dead 
watchers there for too long. The default setting
+                is 10 minutes, which usually don't need to be changed.</para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term>maxInProcessingDeadWatchers</term>
+
+            <listitem>
+              <para>(Java system property only: <emphasis
+                    
role="bold">zookeeper.maxInProcessingDeadWatchers</emphasis>)</para>
+
+              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+                <ulink 
url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179</ulink>
 This is used
+                to control how many backlog can we have in the WatcherCleaner, 
when it reaches this number, it will
+                slow down adding the dead watcher to WatcherCleaner, which 
will in turn slow down adding and closing
+                watchers, so that we can avoid OOM issue. By default there is 
no limit, you can set it to values like
+                watcherCleanThreshold * 1000.</para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term>bitHashCacheSize</term>
+
+            <listitem>
+              <para>(Java system property only: <emphasis
+                    role="bold">zookeeper.bitHashCacheSize</emphasis>)</para>
+
+              <para><emphasis role="bold">New 3.6.0:</emphasis> Added in
+                <ulink 
url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179</ulink>
 This is the
+                setting used to decide the HashSet cache size in the 
BitHashSet implementation. Without HashSet, we
+                need to use O(N) time to get the elements, N is the bit 
numbers in elementBits. But we need to
+                keep the size small to make sure it doesn't cost too much in 
memory, there is a trade off between memory
+                and time complexity. The default value is 10, which seems a 
relatively reasonable cache size.</para>
+            </listitem>
+          </varlistentry>
+
         </variablelist>
       </section>
 

Reply via email to