Repository: zookeeper Updated Branches: refs/heads/master 4ebb847bc -> fdde8b006
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatcherOrBitSet.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatcherOrBitSet.java b/src/java/main/org/apache/zookeeper/server/watch/WatcherOrBitSet.java new file mode 100644 index 0000000..83b186f --- /dev/null +++ b/src/java/main/org/apache/zookeeper/server/watch/WatcherOrBitSet.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.Set; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.server.util.BitHashSet; + +public class WatcherOrBitSet { + + private Set<Watcher> watchers; + private BitHashSet watcherBits; + + public WatcherOrBitSet(final Set<Watcher> watchers) { + this.watchers = watchers; + } + + public WatcherOrBitSet(final BitHashSet watcherBits) { + this.watcherBits = watcherBits; + } + + public boolean contains(Watcher watcher) { + if (watchers == null) { + return false; + } + return watchers.contains(watcher); + } + + public boolean contains(int watcherBit) { + if (watcherBits == null) { + return false; + } + return watcherBits.contains(watcherBit); + } + + public int size() { + if (watchers != null) { + return watchers.size(); + } + if (watcherBits != null) { + return watcherBits.size(); + } + return 0; + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatchesPathReport.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatchesPathReport.java b/src/java/main/org/apache/zookeeper/server/watch/WatchesPathReport.java new file mode 100644 index 0000000..38f02de --- /dev/null +++ b/src/java/main/org/apache/zookeeper/server/watch/WatchesPathReport.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * A watch report, essentially a mapping of path to session IDs of sessions that + * have set a watch on that path. This class is immutable. + */ +public class WatchesPathReport { + + private final Map<String, Set<Long>> path2Ids; + + /** + * Creates a new report. + * + * @param path2Ids map of paths to session IDs of sessions that have set a + * watch on that path + */ + WatchesPathReport(Map<String, Set<Long>> path2Ids) { + this.path2Ids = Collections.unmodifiableMap(deepCopy(path2Ids)); + } + + private static Map<String, Set<Long>> deepCopy(Map<String, Set<Long>> m) { + Map<String, Set<Long>> m2 = new HashMap<String, Set<Long>>(); + for (Map.Entry<String, Set<Long>> e : m.entrySet()) { + m2.put(e.getKey(), new HashSet<Long>(e.getValue())); + } + return m2; + } + + /** + * Checks if the given path has watches set. + * + * @param path path + * @return true if path has watch set + */ + public boolean hasSessions(String path) { + return path2Ids.containsKey(path); + } + /** + * Gets the session IDs of sessions that have set watches on the given path. + * The returned set is immutable. + * + * @param path session ID + * @return session IDs of sessions that have set watches on the path, or + * null if none + */ + public Set<Long> getSessions(String path) { + Set<Long> s = path2Ids.get(path); + return s != null ? Collections.unmodifiableSet(s) : null; + } + + /** + * Converts this report to a map. The returned map is mutable, and changes + * to it do not reflect back into this report. + * + * @return map representation of report + */ + public Map<String, Set<Long>> toMap() { + return deepCopy(path2Ids); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatchesReport.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatchesReport.java b/src/java/main/org/apache/zookeeper/server/watch/WatchesReport.java new file mode 100644 index 0000000..ac888d3 --- /dev/null +++ b/src/java/main/org/apache/zookeeper/server/watch/WatchesReport.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * A watch report, essentially a mapping of session ID to paths that the session + * has set a watch on. This class is immutable. + */ +public class WatchesReport { + + private final Map<Long, Set<String>> id2paths; + + /** + * Creates a new report. + * + * @param id2paths map of session IDs to paths that each session has set + * a watch on + */ + WatchesReport(Map<Long, Set<String>> id2paths) { + this.id2paths = Collections.unmodifiableMap(deepCopy(id2paths)); + } + + private static Map<Long, Set<String>> deepCopy(Map<Long, Set<String>> m) { + Map<Long, Set<String>> m2 = new HashMap<Long, Set<String>>(); + for (Map.Entry<Long, Set<String>> e : m.entrySet()) { + m2.put(e.getKey(), new HashSet<String>(e.getValue())); + } + return m2; + } + + /** + * Checks if the given session has watches set. + * + * @param sessionId session ID + * @return true if session has paths with watches set + */ + public boolean hasPaths(long sessionId) { + return id2paths.containsKey(sessionId); + } + + /** + * Gets the paths that the given session has set watches on. The returned + * set is immutable. + * + * @param sessionId session ID + * @return paths that have watches set by the session, or null if none + */ + public Set<String> getPaths(long sessionId) { + Set<String> s = id2paths.get(sessionId); + return s != null ? Collections.unmodifiableSet(s) : null; + } + + /** + * Converts this report to a map. The returned map is mutable, and changes + * to it do not reflect back into this report. + * + * @return map representation of report + */ + public Map<Long, Set<String>> toMap() { + return deepCopy(id2paths); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/main/org/apache/zookeeper/server/watch/WatchesSummary.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/watch/WatchesSummary.java b/src/java/main/org/apache/zookeeper/server/watch/WatchesSummary.java new file mode 100644 index 0000000..b2449ba --- /dev/null +++ b/src/java/main/org/apache/zookeeper/server/watch/WatchesSummary.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * A summary of watch information. This class is immutable. + */ +public class WatchesSummary { + + /** + * The key in the map returned by {@link #toMap()} for the number of + * connections. + */ + public static final String KEY_NUM_CONNECTIONS = "num_connections"; + /** + * The key in the map returned by {@link #toMap()} for the number of paths. + */ + public static final String KEY_NUM_PATHS = "num_paths"; + /** + * The key in the map returned by {@link #toMap()} for the total number of + * watches. + */ + public static final String KEY_NUM_TOTAL_WATCHES = "num_total_watches"; + + private final int numConnections; + private final int numPaths; + private final int totalWatches; + + /** + * Creates a new summary. + * + * @param numConnections the number of sessions that have set watches + * @param numPaths the number of paths that have watches set on them + * @param totalWatches the total number of watches set + */ + WatchesSummary(int numConnections, int numPaths, int totalWatches) { + this.numConnections = numConnections; + this.numPaths = numPaths; + this.totalWatches = totalWatches; + } + + /** + * Gets the number of connections (sessions) that have set watches. + * + * @return number of connections + */ + public int getNumConnections() { + return numConnections; + } + /** + * Gets the number of paths that have watches set on them. + * + * @return number of paths + */ + public int getNumPaths() { + return numPaths; + } + /** + * Gets the total number of watches set. + * + * @return total watches + */ + public int getTotalWatches() { + return totalWatches; + } + + /** + * Converts this summary to a map. The returned map is mutable, and changes + * to it do not reflect back into this summary. + * + * @return map representation of summary + */ + public Map<String, Object> toMap() { + Map<String, Object> summary = new LinkedHashMap<String, Object>(); + summary.put(KEY_NUM_CONNECTIONS, numConnections); + summary.put(KEY_NUM_PATHS, numPaths); + summary.put(KEY_NUM_TOTAL_WATCHES, totalWatches); + return summary; + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/config/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/src/java/test/config/findbugsExcludeFile.xml b/src/java/test/config/findbugsExcludeFile.xml index 4ab5a5e..a3f45a6 100644 --- a/src/java/test/config/findbugsExcludeFile.xml +++ b/src/java/test/config/findbugsExcludeFile.xml @@ -53,7 +53,13 @@ <Method name="run" /> <Bug pattern="DM_EXIT" /> </Match> - + + <!-- Failed to create watch manager is a unrecoverable error --> + <Match> + <Class name="org.apache.zookeeper.server.DataTree" /> + <Bug pattern="DM_EXIT" /> + </Match> + <Match> <Package name="org.apache.jute.compiler.generated" /> </Match> @@ -85,7 +91,7 @@ <Match> <Class name="org.apache.zookeeper.server.DataNode"/> - <Field name="children"/> + <Field name="children"/> <Bug code="IS"/> </Match> <Match> @@ -98,6 +104,15 @@ <Field name="serverStats"/> <Bug code="IS"/> </Match> + + <!-- The iterate function is non-thread safe, the caller will synchronize + on the BitHHashSet object --> + <Match> + <Class name="org.apache.zookeeper.server.util.BitHashSet" /> + <Field name="elementCount" /> + <Bug code="IS" /> + </Match> + <Match> <Class name="org.apache.zookeeper.server.quorum.LearnerSessionTracker"/> <Bug code="UrF"/> @@ -111,7 +126,7 @@ <!-- these are old classes just for upgrading and should go away --> <Match> <Class name="org.apache.zookeeper.server.upgrade.DataNodeV1"/> - </Match> + </Match> <Match> <Class name="org.apache.zookeeper.server.upgrade.DataTreeV1"/> @@ -134,6 +149,23 @@ </Or> </Match> + <!-- Synchronize on the AtomicInteger to do wait/notify, but not relying + on the synchronization to control the AtomicInteger value update, + so it's not a problem --> + <Match> + <Class name="org.apache.zookeeper.server.watch.WatcherCleaner" /> + <Bug code="JLM" /> + <Method name="addDeadWatcher" /> + </Match> + + <Match> + <Class name="org.apache.zookeeper.server.watch.WatcherCleaner$1" /> + <Bug code="JLM" /> + <Method name="doWork" /> + </Match> + + + <Match> <Class name="org.apache.zookeeper.server.quorum.QuorumPeer"/> <Bug pattern="OS_OPEN_STREAM" /> http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/WatchesPathReportTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/WatchesPathReportTest.java b/src/java/test/org/apache/zookeeper/server/WatchesPathReportTest.java deleted file mode 100644 index c0b107d..0000000 --- a/src/java/test/org/apache/zookeeper/server/WatchesPathReportTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zookeeper.server; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import org.apache.zookeeper.ZKTestCase; -import org.junit.Before; -import org.junit.Test; -import static org.junit.Assert.*; - -public class WatchesPathReportTest extends ZKTestCase { - private Map<String, Set<Long>> m; - private WatchesPathReport r; - @Before public void setUp() { - m = new HashMap<String, Set<Long>>(); - Set<Long> s = new HashSet<Long>(); - s.add(101L); - s.add(102L); - m.put("path1", s); - s = new HashSet<Long>(); - s.add(201L); - m.put("path2", s); - r = new WatchesPathReport(m); - } - @Test public void testHasSessions() { - assertTrue(r.hasSessions("path1")); - assertTrue(r.hasSessions("path2")); - assertFalse(r.hasSessions("path3")); - } - @Test public void testGetSessions() { - Set<Long> s = r.getSessions("path1"); - assertEquals(2, s.size()); - assertTrue(s.contains(101L)); - assertTrue(s.contains(102L)); - s = r.getSessions("path2"); - assertEquals(1, s.size()); - assertTrue(s.contains(201L)); - assertNull(r.getSessions("path3")); - } - @Test public void testToMap() { - assertEquals(m, r.toMap()); - } -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/WatchesReportTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/WatchesReportTest.java b/src/java/test/org/apache/zookeeper/server/WatchesReportTest.java deleted file mode 100644 index 7f0343b..0000000 --- a/src/java/test/org/apache/zookeeper/server/WatchesReportTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zookeeper.server; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import org.apache.zookeeper.ZKTestCase; -import org.junit.Before; -import org.junit.Test; -import static org.junit.Assert.*; - -public class WatchesReportTest extends ZKTestCase { - private Map<Long, Set<String>> m; - private WatchesReport r; - @Before public void setUp() { - m = new HashMap<Long, Set<String>>(); - Set<String> s = new HashSet<String>(); - s.add("path1a"); - s.add("path1b"); - m.put(1L, s); - s = new HashSet<String>(); - s.add("path2a"); - m.put(2L, s); - r = new WatchesReport(m); - } - @Test public void testHasPaths() { - assertTrue(r.hasPaths(1L)); - assertTrue(r.hasPaths(2L)); - assertFalse(r.hasPaths(3L)); - } - @Test public void testGetPaths() { - Set<String> s = r.getPaths(1L); - assertEquals(2, s.size()); - assertTrue(s.contains("path1a")); - assertTrue(s.contains("path1b")); - s = r.getPaths(2L); - assertEquals(1, s.size()); - assertTrue(s.contains("path2a")); - assertNull(r.getPaths(3L)); - } - @Test public void testToMap() { - assertEquals(m, r.toMap()); - } -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/WatchesSummaryTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/WatchesSummaryTest.java b/src/java/test/org/apache/zookeeper/server/WatchesSummaryTest.java deleted file mode 100644 index d679065..0000000 --- a/src/java/test/org/apache/zookeeper/server/WatchesSummaryTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zookeeper.server; - -import java.util.Map; -import org.apache.zookeeper.ZKTestCase; -import org.junit.Before; -import org.junit.Test; -import static org.junit.Assert.*; - -public class WatchesSummaryTest extends ZKTestCase { - private WatchesSummary s; - @Before public void setUp() { - s = new WatchesSummary(1, 2, 3); - } - @Test public void testGetters() { - assertEquals(1, s.getNumConnections()); - assertEquals(2, s.getNumPaths()); - assertEquals(3, s.getTotalWatches()); - } - @Test public void testToMap() { - Map<String, Object> m = s.toMap(); - assertEquals(3, m.size()); - assertEquals(Integer.valueOf(1), m.get(WatchesSummary.KEY_NUM_CONNECTIONS)); - assertEquals(Integer.valueOf(2), m.get(WatchesSummary.KEY_NUM_PATHS)); - assertEquals(Integer.valueOf(3), m.get(WatchesSummary.KEY_NUM_TOTAL_WATCHES)); - } -} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/util/BitHashSetTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/util/BitHashSetTest.java b/src/java/test/org/apache/zookeeper/server/util/BitHashSetTest.java new file mode 100644 index 0000000..a70eaa5 --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/util/BitHashSetTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server.util; + +import java.util.Set; +import java.util.HashSet; +import java.util.List; +import java.util.ArrayList; + +import org.apache.zookeeper.ZKTestCase; +import org.junit.Test; +import org.junit.Assert; + +public class BitHashSetTest extends ZKTestCase { + + @Test + public void testAddWatchBit() { + int watcherCacheSize = 1; + BitHashSet ws = new BitHashSet(watcherCacheSize); + Assert.assertTrue(ws.add(1)); + Assert.assertEquals(1, ws.size()); + Assert.assertEquals(1, ws.cachedSize()); + + List<Integer> actualBits = new ArrayList<Integer>(); + + for (int bit: ws) { + actualBits.add(bit); + } + Assert.assertArrayEquals( + new Integer[] {1}, + actualBits.toArray(new Integer[actualBits.size()])); + + // add the same bit again + Assert.assertFalse(ws.add(1)); + Assert.assertEquals(1, ws.size()); + Assert.assertEquals(1, ws.cachedSize()); + + // add another bit, make sure there there is only 1 bit cached + Assert.assertTrue(ws.add(2)); + Assert.assertEquals(2, ws.size()); + Assert.assertEquals(1, ws.cachedSize()); + + Assert.assertTrue(ws.contains(1)); + + actualBits.clear(); + for (int bit: ws) { + actualBits.add(bit); + } + Assert.assertArrayEquals( + new Integer[] {1, 2}, + actualBits.toArray(new Integer[actualBits.size()])); + } + + @Test + public void testRemoveWatchBit() { + int watcherCacheSize = 1; + BitHashSet ws = new BitHashSet(watcherCacheSize); + ws.add(1); + ws.add(2); + + Assert.assertTrue(ws.contains(1)); + Assert.assertTrue(ws.contains(2)); + + ws.remove(1); + Assert.assertFalse(ws.contains(1)); + Assert.assertEquals(1, ws.size()); + Assert.assertEquals(0, ws.cachedSize()); + + List<Integer> actualBits = new ArrayList<Integer>(); + + for (int bit: ws) { + actualBits.add(bit); + } + Assert.assertArrayEquals( + new Integer[] {2}, + actualBits.toArray(new Integer[actualBits.size()])); + + ws.add(3); + Assert.assertEquals(2, ws.size()); + Assert.assertEquals(1, ws.cachedSize()); + + actualBits.clear(); + for (int bit: ws) { + actualBits.add(bit); + } + Assert.assertArrayEquals( + new Integer[] {2, 3}, + actualBits.toArray(new Integer[actualBits.size()])); + + ws.remove(2); + ws.remove(3); + + Assert.assertEquals(0, ws.size()); + Assert.assertEquals(0, ws.cachedSize()); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/util/BitMapTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/util/BitMapTest.java b/src/java/test/org/apache/zookeeper/server/util/BitMapTest.java new file mode 100644 index 0000000..eca0f2d --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/util/BitMapTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server.util; + +import org.apache.zookeeper.ZKTestCase; +import org.junit.Test; +import org.junit.Assert; + +public class BitMapTest extends ZKTestCase { + + @Test + public void testAddAndRemove() { + BitMap<String> bitMap = new BitMap<String>(); + String v1 = new String("v1"); + Integer bit = bitMap.add(v1); + + Assert.assertEquals(1, bitMap.size()); + Assert.assertTrue(bit >= 0); + Assert.assertEquals(v1, bitMap.get(bit)); + Assert.assertEquals(bit, bitMap.getBit(v1)); + + // add the same value again + Integer newBit = bitMap.add(v1); + Assert.assertEquals(bit, newBit); + Assert.assertEquals(1, bitMap.size()); + + String v2 = new String("v2"); + Integer v2Bit = bitMap.add(v2); + Assert.assertEquals(2, bitMap.size()); + Assert.assertNotEquals(v2Bit, bit); + + // remove by value + bitMap.remove(v1); + Assert.assertEquals(1, bitMap.size()); + Assert.assertNull(bitMap.get(bit)); + Assert.assertNull(bitMap.getBit(v1)); + + // remove by bit + bitMap.remove(v2Bit); + Assert.assertEquals(0, bitMap.size()); + Assert.assertNull(bitMap.get(v2Bit)); + Assert.assertNull(bitMap.getBit(v2)); + } + + @Test + public void testBitReuse() { + BitMap<String> bitMap = new BitMap<String>(); + int v1Bit = bitMap.add("v1"); + int v2Bit = bitMap.add("v2"); + int v3Bit = bitMap.add("v3"); + bitMap.remove(v2Bit); + + int v4Bit = bitMap.add("v4"); + + Assert.assertEquals(v4Bit, v2Bit); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatchManagerTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/watch/WatchManagerTest.java b/src/java/test/org/apache/zookeeper/server/watch/WatchManagerTest.java new file mode 100644 index 0000000..f6a229b --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/watch/WatchManagerTest.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server.watch; + +import java.io.IOException; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.HashSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.server.DumbWatcher; +import org.apache.zookeeper.server.ServerCnxn; + +import org.apache.zookeeper.ZKTestCase; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Parameterized.class) +public class WatchManagerTest extends ZKTestCase { + protected static final Logger LOG = LoggerFactory.getLogger(WatchManagerTest.class); + + private static final String PATH_PREFIX = "path"; + + private ConcurrentHashMap<Integer, DumbWatcher> watchers; + private Random r; + private String className; + + public WatchManagerTest(String className) { + this.className = className; + } + + @Parameterized.Parameters + public static List<Object []> data() { + return Arrays.asList(new Object [][] { + {WatchManager.class.getName()}, + {WatchManagerOptimized.class.getName()} + }); + } + + @Before + public void setUp() { + watchers = new ConcurrentHashMap<Integer, DumbWatcher>(); + r = new Random(System.nanoTime()); + } + + public IWatchManager getWatchManager() throws IOException { + System.setProperty(WatchManagerFactory.ZOOKEEPER_WATCH_MANAGER_NAME, className); + return WatchManagerFactory.createWatchManager(); + } + + public DumbWatcher createOrGetWatcher(int watcherId) { + if (!watchers.containsKey(watcherId)) { + DumbWatcher watcher = new DumbWatcher(watcherId); + watchers.putIfAbsent(watcherId, watcher); + } + return watchers.get(watcherId); + } + + public class AddWatcherWorker extends Thread { + + private final IWatchManager manager; + private final int paths; + private final int watchers; + private final AtomicInteger watchesAdded; + private volatile boolean stopped = false; + + public AddWatcherWorker(IWatchManager manager, + int paths, int watchers, AtomicInteger watchesAdded) { + this.manager = manager; + this.paths = paths; + this.watchers = watchers; + this.watchesAdded = watchesAdded; + } + + @Override + public void run() { + while (!stopped) { + String path = PATH_PREFIX + r.nextInt(paths); + Watcher watcher = createOrGetWatcher(r.nextInt(watchers)); + if (manager.addWatch(path, watcher)) { + watchesAdded.addAndGet(1); + } + } + } + + public void shutdown() { + stopped = true; + } + } + + public class WatcherTriggerWorker extends Thread { + + private final IWatchManager manager; + private final int paths; + private final AtomicInteger triggeredCount; + private volatile boolean stopped = false; + + public WatcherTriggerWorker(IWatchManager manager, + int paths, AtomicInteger triggeredCount) { + this.manager = manager; + this.paths = paths; + this.triggeredCount = triggeredCount; + } + + @Override + public void run() { + while (!stopped) { + String path = PATH_PREFIX + r.nextInt(paths); + WatcherOrBitSet s = manager.triggerWatch( + path, EventType.NodeDeleted); + if (s != null) { + triggeredCount.addAndGet(s.size()); + } + try { + Thread.sleep(r.nextInt(10)); + } catch (InterruptedException e) {} + } + } + + public void shutdown() { + stopped = true; + } + } + + public class RemoveWatcherWorker extends Thread { + + private final IWatchManager manager; + private final int paths; + private final int watchers; + private final AtomicInteger watchesRemoved; + private volatile boolean stopped = false; + + public RemoveWatcherWorker(IWatchManager manager, + int paths, int watchers, AtomicInteger watchesRemoved) { + this.manager = manager; + this.paths = paths; + this.watchers = watchers; + this.watchesRemoved = watchesRemoved; + } + + @Override + public void run() { + while (!stopped) { + String path = PATH_PREFIX + r.nextInt(paths); + Watcher watcher = createOrGetWatcher(r.nextInt(watchers)); + if (manager.removeWatcher(path, watcher)) { + watchesRemoved.addAndGet(1); + } + try { + Thread.sleep(r.nextInt(10)); + } catch (InterruptedException e) {} + } + } + + public void shutdown() { + stopped = true; + } + + } + + public class CreateDeadWatchersWorker extends Thread { + + private final IWatchManager manager; + private final int watchers; + private final Set<Watcher> removedWatchers; + private volatile boolean stopped = false; + + public CreateDeadWatchersWorker(IWatchManager manager, + int watchers, Set<Watcher> removedWatchers) { + this.manager = manager; + this.watchers = watchers; + this.removedWatchers = removedWatchers; + } + + @Override + public void run() { + while (!stopped) { + DumbWatcher watcher = createOrGetWatcher(r.nextInt(watchers)); + watcher.setStale(); + manager.removeWatcher(watcher); + synchronized (removedWatchers) { + removedWatchers.add(watcher); + } + try { + Thread.sleep(r.nextInt(10)); + } catch (InterruptedException e) {} + } + } + + public void shutdown() { + stopped = true; + } + + } + + /** + * Concurrently add and trigger watch, make sure the watches triggered + * are the same as the number added. + */ + @Test(timeout = 90000) + public void testAddAndTriggerWatcher() throws IOException { + IWatchManager manager = getWatchManager(); + int paths = 1; + int watchers = 10000; + + // 1. start 5 workers to trigger watchers on that path + // count all the watchers have been fired + AtomicInteger watchTriggered = new AtomicInteger(); + List<WatcherTriggerWorker> triggerWorkers = + new ArrayList<WatcherTriggerWorker>(); + for (int i = 0; i < 5; i++) { + WatcherTriggerWorker worker = + new WatcherTriggerWorker(manager, paths, watchTriggered); + triggerWorkers.add(worker); + worker.start(); + } + + // 2. start 5 workers to add different watchers on the same path + // count all the watchers being added + AtomicInteger watchesAdded = new AtomicInteger(); + List<AddWatcherWorker> addWorkers = new ArrayList<AddWatcherWorker>(); + for (int i = 0; i < 5; i++) { + AddWatcherWorker worker = new AddWatcherWorker( + manager, paths, watchers, watchesAdded); + addWorkers.add(worker); + worker.start(); + } + + while (watchesAdded.get() < 100000) { + try { + Thread.sleep(100); + } catch (InterruptedException e) {} + } + + // 3. stop all the addWorkers + for (AddWatcherWorker worker: addWorkers) { + worker.shutdown(); + } + + // 4. running the trigger worker a bit longer to make sure + // all watchers added are fired + try { + Thread.sleep(500); + } catch (InterruptedException e) {} + + // 5. stop all triggerWorkers + for (WatcherTriggerWorker worker: triggerWorkers) { + worker.shutdown(); + } + + // 6. make sure the total watch triggered is same as added + Assert.assertTrue(watchesAdded.get() > 0); + Assert.assertEquals(watchesAdded.get(), watchTriggered.get()); + } + + /** + * Concurrently add and remove watch, make sure the watches left + + * the watches removed are equal to the total added watches. + */ + @Test(timeout = 90000) + public void testRemoveWatcherOnPath() throws IOException { + IWatchManager manager = getWatchManager(); + int paths = 10; + int watchers = 10000; + + // 1. start 5 workers to remove watchers on those path + // record the watchers have been removed + AtomicInteger watchesRemoved = new AtomicInteger(); + List<RemoveWatcherWorker> removeWorkers = + new ArrayList<RemoveWatcherWorker>(); + for (int i = 0; i < 5; i++) { + RemoveWatcherWorker worker = + new RemoveWatcherWorker(manager, paths, watchers, watchesRemoved); + removeWorkers.add(worker); + worker.start(); + } + + // 2. start 5 workers to add different watchers on different path + // record the watchers have been added + AtomicInteger watchesAdded = new AtomicInteger(); + List<AddWatcherWorker> addWorkers = new ArrayList<AddWatcherWorker>(); + for (int i = 0; i < 5; i++) { + AddWatcherWorker worker = new AddWatcherWorker( + manager, paths, watchers, watchesAdded); + addWorkers.add(worker); + worker.start(); + } + + while (watchesAdded.get() < 100000) { + try { + Thread.sleep(100); + } catch (InterruptedException e) {} + } + + // 3. stop all workers + for (RemoveWatcherWorker worker: removeWorkers) { + worker.shutdown(); + } + for (AddWatcherWorker worker: addWorkers) { + worker.shutdown(); + } + + // 4. sleep for a while to make sure all the thread exited + try { + Thread.sleep(500); + } catch (InterruptedException e) {} + + // 5. make sure left watches + removed watches = added watches + Assert.assertTrue(watchesAdded.get() > 0); + Assert.assertTrue(watchesRemoved.get() > 0); + Assert.assertTrue(manager.size() > 0); + Assert.assertEquals( + watchesAdded.get(), watchesRemoved.get() + manager.size()); + } + + /** + * Concurrently add watch while close the watcher to simulate the + * client connections closed on prod. + */ + @Test(timeout = 90000) + public void testDeadWatchers() throws IOException { + System.setProperty("zookeeper.watcherCleanThreshold", "10"); + System.setProperty("zookeeper.watcherCleanIntervalInSeconds", "1"); + + IWatchManager manager = getWatchManager(); + int paths = 1; + int watchers = 100000; + + // 1. start 5 workers to randomly mark those watcher as dead + // and remove them from watch manager + Set<Watcher> deadWatchers = new HashSet<Watcher>(); + List<CreateDeadWatchersWorker> deadWorkers = + new ArrayList<CreateDeadWatchersWorker>(); + for (int i = 0; i < 5; i++) { + CreateDeadWatchersWorker worker = new CreateDeadWatchersWorker( + manager, watchers, deadWatchers); + deadWorkers.add(worker); + worker.start(); + } + + // 2. start 5 workers to add different watchers on the same path + AtomicInteger watchesAdded = new AtomicInteger(); + List<AddWatcherWorker> addWorkers = new ArrayList<AddWatcherWorker>(); + for (int i = 0; i < 5; i++) { + AddWatcherWorker worker = new AddWatcherWorker( + manager, paths, watchers, watchesAdded); + addWorkers.add(worker); + worker.start(); + } + + while (watchesAdded.get() < 50000) { + try { + Thread.sleep(100); + } catch (InterruptedException e) {} + } + + // 3. stop all workers + for (CreateDeadWatchersWorker worker: deadWorkers) { + worker.shutdown(); + } + for (AddWatcherWorker worker: addWorkers) { + worker.shutdown(); + } + + // 4. sleep for a while to make sure all the thread exited + try { + Thread.sleep(1000); + } catch (InterruptedException e) {} + + // 5. make sure the dead watchers are not in the existing watchers + WatchesReport existingWatchers = manager.getWatches(); + for (Watcher w: deadWatchers) { + Assert.assertFalse( + existingWatchers.hasPaths(((ServerCnxn) w).getSessionId())); + } + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatcherCleanerTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/watch/WatcherCleanerTest.java b/src/java/test/org/apache/zookeeper/server/watch/WatcherCleanerTest.java new file mode 100644 index 0000000..d315232 --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/watch/WatcherCleanerTest.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server.watch; + +import java.util.Set; +import java.util.HashSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.common.Time; +import org.junit.Test; +import org.junit.Assert; + +public class WatcherCleanerTest extends ZKTestCase { + + public static class MyDeadWatcherListener implements IDeadWatcherListener { + + private CountDownLatch latch; + private int delayMs; + private Set<Integer> deadWatchers = new HashSet<Integer>(); + + public void setCountDownLatch(CountDownLatch latch) { + this.latch = latch; + } + + public void setDelayMs(int delayMs) { + this.delayMs = delayMs; + } + + @Override + public void processDeadWatchers(Set<Integer> deadWatchers) { + if (delayMs > 0) { + try { + Thread.sleep(delayMs); + } catch (InterruptedException e) {} + } + this.deadWatchers.clear(); + this.deadWatchers.addAll(deadWatchers); + latch.countDown(); + } + + public Set<Integer> getDeadWatchers() { + return deadWatchers; + } + + public boolean wait(int maxWaitMs) { + try { + return latch.await(maxWaitMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) {} + return false; + } + } + + @Test + public void testProcessDeadWatchersBasedOnThreshold() { + MyDeadWatcherListener listener = new MyDeadWatcherListener(); + int threshold = 3; + WatcherCleaner cleaner = new WatcherCleaner(listener, threshold, 60, 1, 10); + cleaner.start(); + + int i = 0; + while (i++ < threshold - 1) { + cleaner.addDeadWatcher(i); + } + // not trigger processDeadWatchers yet + Assert.assertEquals(0, listener.getDeadWatchers().size()); + + listener.setCountDownLatch(new CountDownLatch(1)); + // add another dead watcher to trigger the process + cleaner.addDeadWatcher(i); + Assert.assertTrue(listener.wait(1000)); + Assert.assertEquals(threshold, listener.getDeadWatchers().size()); + } + + @Test + public void testProcessDeadWatchersBasedOnTime() { + MyDeadWatcherListener listener = new MyDeadWatcherListener(); + WatcherCleaner cleaner = new WatcherCleaner(listener, 10, 1, 1, 10); + cleaner.start(); + + cleaner.addDeadWatcher(1); + // not trigger processDeadWatchers yet + Assert.assertEquals(0, listener.getDeadWatchers().size()); + + listener.setCountDownLatch(new CountDownLatch(1)); + Assert.assertTrue(listener.wait(2000)); + Assert.assertEquals(1, listener.getDeadWatchers().size()); + + // won't trigger event if there is no dead watchers + listener.setCountDownLatch(new CountDownLatch(1)); + Assert.assertFalse(listener.wait(2000)); + } + + @Test + public void testMaxInProcessingDeadWatchers() { + MyDeadWatcherListener listener = new MyDeadWatcherListener(); + int delayMs = 1000; + listener.setDelayMs(delayMs); + WatcherCleaner cleaner = new WatcherCleaner(listener, 1, 60, 1, 1); + cleaner.start(); + + listener.setCountDownLatch(new CountDownLatch(2)); + + long startTime = Time.currentElapsedTime(); + cleaner.addDeadWatcher(1); + cleaner.addDeadWatcher(2); + long time = Time.currentElapsedTime() - startTime; + System.out.println("time used " + time); + Assert.assertTrue(Time.currentElapsedTime() - startTime >= delayMs); + Assert.assertTrue(listener.wait(5000)); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatcherOrBitSetTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/watch/WatcherOrBitSetTest.java b/src/java/test/org/apache/zookeeper/server/watch/WatcherOrBitSetTest.java new file mode 100644 index 0000000..4b7fbd5 --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/watch/WatcherOrBitSetTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server.watch; + +import java.util.Set; +import java.util.HashSet; + +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.server.DumbWatcher; +import org.apache.zookeeper.server.util.BitHashSet; + +import org.apache.zookeeper.ZKTestCase; +import org.junit.Test; +import org.junit.Assert; + +public class WatcherOrBitSetTest extends ZKTestCase { + + @Test + public void testWatcherSet() { + Set<Watcher> wset = new HashSet<Watcher>(); + WatcherOrBitSet hashSet = new WatcherOrBitSet(wset); + Assert.assertEquals(0, hashSet.size()); + + DumbWatcher w1 = new DumbWatcher(); + Assert.assertFalse(hashSet.contains(w1)); + wset.add(w1); + Assert.assertTrue(hashSet.contains(w1)); + Assert.assertEquals(1, hashSet.size()); + Assert.assertFalse(hashSet.contains(1)); + } + + @Test + public void testBitSet() { + BitHashSet bset = new BitHashSet(0); + WatcherOrBitSet bitSet = new WatcherOrBitSet(bset); + Assert.assertEquals(0, bitSet.size()); + + Integer bit = new Integer(1); + Assert.assertFalse(bitSet.contains(1)); + Assert.assertFalse(bitSet.contains(bit)); + + bset.add(bit); + Assert.assertTrue(bitSet.contains(1)); + Assert.assertTrue(bitSet.contains(bit)); + Assert.assertEquals(1, bitSet.size()); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatchesPathReportTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/watch/WatchesPathReportTest.java b/src/java/test/org/apache/zookeeper/server/watch/WatchesPathReportTest.java new file mode 100644 index 0000000..34e3789 --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/watch/WatchesPathReportTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server.watch; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.zookeeper.ZKTestCase; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.*; + +public class WatchesPathReportTest extends ZKTestCase { + private Map<String, Set<Long>> m; + private WatchesPathReport r; + @Before public void setUp() { + m = new HashMap<String, Set<Long>>(); + Set<Long> s = new HashSet<Long>(); + s.add(101L); + s.add(102L); + m.put("path1", s); + s = new HashSet<Long>(); + s.add(201L); + m.put("path2", s); + r = new WatchesPathReport(m); + } + @Test public void testHasSessions() { + assertTrue(r.hasSessions("path1")); + assertTrue(r.hasSessions("path2")); + assertFalse(r.hasSessions("path3")); + } + @Test public void testGetSessions() { + Set<Long> s = r.getSessions("path1"); + assertEquals(2, s.size()); + assertTrue(s.contains(101L)); + assertTrue(s.contains(102L)); + s = r.getSessions("path2"); + assertEquals(1, s.size()); + assertTrue(s.contains(201L)); + assertNull(r.getSessions("path3")); + } + @Test public void testToMap() { + assertEquals(m, r.toMap()); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatchesReportTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/watch/WatchesReportTest.java b/src/java/test/org/apache/zookeeper/server/watch/WatchesReportTest.java new file mode 100644 index 0000000..237583a --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/watch/WatchesReportTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server.watch; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.zookeeper.ZKTestCase; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.*; + +public class WatchesReportTest extends ZKTestCase { + private Map<Long, Set<String>> m; + private WatchesReport r; + @Before public void setUp() { + m = new HashMap<Long, Set<String>>(); + Set<String> s = new HashSet<String>(); + s.add("path1a"); + s.add("path1b"); + m.put(1L, s); + s = new HashSet<String>(); + s.add("path2a"); + m.put(2L, s); + r = new WatchesReport(m); + } + @Test public void testHasPaths() { + assertTrue(r.hasPaths(1L)); + assertTrue(r.hasPaths(2L)); + assertFalse(r.hasPaths(3L)); + } + @Test public void testGetPaths() { + Set<String> s = r.getPaths(1L); + assertEquals(2, s.size()); + assertTrue(s.contains("path1a")); + assertTrue(s.contains("path1b")); + s = r.getPaths(2L); + assertEquals(1, s.size()); + assertTrue(s.contains("path2a")); + assertNull(r.getPaths(3L)); + } + @Test public void testToMap() { + assertEquals(m, r.toMap()); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/java/test/org/apache/zookeeper/server/watch/WatchesSummaryTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/watch/WatchesSummaryTest.java b/src/java/test/org/apache/zookeeper/server/watch/WatchesSummaryTest.java new file mode 100644 index 0000000..35956f1 --- /dev/null +++ b/src/java/test/org/apache/zookeeper/server/watch/WatchesSummaryTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.server.watch; + +import java.util.Map; +import org.apache.zookeeper.ZKTestCase; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.*; + +public class WatchesSummaryTest extends ZKTestCase { + private WatchesSummary s; + @Before public void setUp() { + s = new WatchesSummary(1, 2, 3); + } + @Test public void testGetters() { + assertEquals(1, s.getNumConnections()); + assertEquals(2, s.getNumPaths()); + assertEquals(3, s.getTotalWatches()); + } + @Test public void testToMap() { + Map<String, Object> m = s.toMap(); + assertEquals(3, m.size()); + assertEquals(Integer.valueOf(1), m.get(WatchesSummary.KEY_NUM_CONNECTIONS)); + assertEquals(Integer.valueOf(2), m.get(WatchesSummary.KEY_NUM_PATHS)); + assertEquals(Integer.valueOf(3), m.get(WatchesSummary.KEY_NUM_TOTAL_WATCHES)); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/test/java/bench/org/apache/zookeeper/BenchMain.java ---------------------------------------------------------------------- diff --git a/src/test/java/bench/org/apache/zookeeper/BenchMain.java b/src/test/java/bench/org/apache/zookeeper/BenchMain.java new file mode 100644 index 0000000..8e370c0 --- /dev/null +++ b/src/test/java/bench/org/apache/zookeeper/BenchMain.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import java.io.File; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; + +public class BenchMain { + public static void main(String args[]) throws Exception { + org.openjdk.jmh.Main.main(args); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/src/test/java/bench/org/apache/zookeeper/server/watch/WatchBench.java ---------------------------------------------------------------------- diff --git a/src/test/java/bench/org/apache/zookeeper/server/watch/WatchBench.java b/src/test/java/bench/org/apache/zookeeper/server/watch/WatchBench.java new file mode 100644 index 0000000..0510df7 --- /dev/null +++ b/src/test/java/bench/org/apache/zookeeper/server/watch/WatchBench.java @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.server.watch.IWatchManager; +import org.apache.zookeeper.server.DumbWatcher; + +import org.openjdk.jmh.annotations.*; + +import java.util.concurrent.TimeUnit; + +@Fork(3) +public class WatchBench { + + static final String pathPrefix = "/reasonably/long/path/"; + static final EventType event = EventType.NodeDataChanged; + + static IWatchManager createWatchManager(String className) throws Exception { + Class clazz = Class.forName( + "org.apache.zookeeper.server.watch." + className); + return (IWatchManager) clazz.newInstance(); + } + + static void forceGC() { + int gcTimes = 3; + for (int i = 0; i < gcTimes; i++) { + try { + System.gc(); + Thread.currentThread().sleep(1000); + + System.runFinalization(); + Thread.currentThread().sleep(1000); + } catch (InterruptedException ex) { /* ignore */ } + } + } + + static long getMemoryUse() { + forceGC(); + long totalMem = Runtime.getRuntime().totalMemory(); + + forceGC(); + long freeMem = Runtime.getRuntime().freeMemory(); + return totalMem - freeMem; + } + + @State(Scope.Benchmark) + public static class IterationState { + + @Param({"WatchManager", "WatchManagerOptimized"}) + public String watchManagerClass; + + @Param({"10000"}) + public int pathCount; + + String[] paths; + + long watchesAdded = 0; + IWatchManager watchManager; + + long memWhenSetup = 0; + + @Setup(Level.Iteration) + public void setup() throws Exception { + paths = new String[pathCount]; + for (int i = 0; i < paths.length; i++) { + paths[i] = pathPrefix + i; + } + + watchesAdded = 0; + watchManager = createWatchManager(watchManagerClass); + + memWhenSetup = getMemoryUse(); + } + + @TearDown(Level.Iteration) + public void tearDown() { + long memUsed = getMemoryUse() - memWhenSetup; + System.out.println("Memory used: " + watchesAdded + " " + memUsed); + + double memPerMillionWatchesMB = memUsed * 1.0 / watchesAdded ; + System.out.println( + "Memory used per million watches " + + String.format("%.2f", memPerMillionWatchesMB) + "MB"); + } + } + + /** + * Test concenrate watch case where the watcher watches all paths. + * + * The output of this test will be the average time used to add the + * watch to all paths. + */ + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) + @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS) + public void testAddConcentrateWatch(IterationState state) throws Exception { + Watcher watcher = new DumbWatcher(); + + // watch all paths + for (String path : state.paths) { + if (state.watchManager.addWatch(path, watcher)) { + state.watchesAdded++; + } + } + } + + @State(Scope.Benchmark) + public static class InvocationState { + + @Param({"WatchManager", "WatchManagerOptimized"}) + public String watchManagerClass; + + @Param({"1", "1000"}) + public int pathCount; + + @Param({"1", "1000"}) + public int watcherCount; + + String[] paths; + Watcher[] watchers; + + IWatchManager watchManager; + + @Setup(Level.Invocation) + public void setup() throws Exception { + initialize(); + prepare(); + } + + void initialize() throws Exception { + if (paths == null || paths.length != pathCount) { + paths = new String[pathCount]; + for (int i = 0; i < pathCount; i++) { + paths[i] = pathPrefix + i; + } + } + + if (watchers == null || watchers.length != watcherCount) { + watchers = new Watcher[watcherCount]; + for (int i = 0; i < watcherCount; i++) { + watchers[i] = new DumbWatcher(); + } + } + if (watchManager == null || + !watchManager.getClass().getSimpleName().contains( + watchManagerClass)) { + watchManager = createWatchManager(watchManagerClass); + } + } + + void prepare() { + for (String path : paths) { + for (Watcher watcher : watchers) { + watchManager.addWatch(path, watcher); + } + } + } + } + + /** + * Test trigger watches in concenrate case. + * + * The output of this test is the time used to trigger those watches on + * all paths. + */ + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) + @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS) + public void testTriggerConcentrateWatch(InvocationState state) throws Exception { + for (String path : state.paths) { + state.watchManager.triggerWatch(path, event); + } + } + + @State(Scope.Benchmark) + public static class AddSparseWatchState extends InvocationState { + + @Param({"10000"}) + public int pathCount; + + @Param({"10000"}) + public int watcherCount; + + long watchesAdded = 0; + long memWhenSetup = 0; + + @Override + public void prepare() { + watchesAdded = 0; + memWhenSetup = getMemoryUse(); + } + + @TearDown(Level.Invocation) + public void tearDown() { + long memUsed = getMemoryUse() - memWhenSetup; + System.out.println("Memory used: " + watchesAdded + " " + memUsed); + + double memPerMillionWatchesMB = memUsed * 1.0 / watchesAdded ; + System.out.println( + "Memory used per million sparse watches " + + String.format("%.2f", memPerMillionWatchesMB) + "MB"); + + // clear all the watches + for (String path : paths) { + watchManager.triggerWatch(path, event); + } + } + } + + /** + * Test sparse watch case where only one watcher watches all paths, and + * only one path being watched by all watchers. + * + * The output of this test will be the average time used to add those + * sparse watches. + */ + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) + @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS) + public void testAddSparseWatch(AddSparseWatchState state) throws Exception { + // All watchers are watching the 1st path + for (Watcher watcher : state.watchers) { + if (state.watchManager.addWatch(state.paths[0], watcher)) { + state.watchesAdded++; + } + } + // The 1st watcher is watching all paths + for (String path : state.paths) { + if (state.watchManager.addWatch(path, state.watchers[0])) { + state.watchesAdded++; + } + } + } + + @State(Scope.Benchmark) + public static class TriggerSparseWatchState extends InvocationState { + + @Param({"10000"}) + public int pathCount; + + @Param({"10000"}) + public int watcherCount; + + @Override + public void prepare() { + // All watchers are watching the 1st path + for (Watcher watcher : watchers) { + watchManager.addWatch(paths[0], watcher); + } + + // The 1st watcher is watching all paths + for (String path : paths) { + watchManager.addWatch(path, watchers[0]); + } + } + } + + + /** + * Test trigger watches in sparse case. + * + * The output of this test is the time used to trigger those watches on + * all paths. + */ + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) + @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS) + public void testTriggerSparseWatch(TriggerSparseWatchState state) throws Exception { + for (String path : state.paths) { + state.watchManager.triggerWatch(path, event); + } + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses b/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses index 2b0fc83..ba29e89 100644 --- a/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses +++ b/zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses @@ -8,3 +8,4 @@ quorumBench:org.apache.zookeeper.server.QuorumBenchmark:A benchmark of just the abBench:org.apache.zookeeper.server.quorum.AtomicBroadcastBenchmark:A benchmark of just the atomic broadcast ic:org.apache.zookeeper.test.system.InstanceContainer:A container that will instantiate classes as directed by an instance manager systest:org.apache.zookeeper.test.system.BaseSysTest:Start system test +jmh:org.apache.zookeeper.BenchMain:Run jmh micro benchmarks http://git-wip-us.apache.org/repos/asf/zookeeper/blob/fdde8b00/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml ---------------------------------------------------------------------- diff --git a/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml b/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml index 50143d4..b7cd21b 100644 --- a/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml +++ b/zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml @@ -1025,6 +1025,102 @@ server.3=zoo3:2888:3888</programlisting> </listitem> </varlistentry> + + <varlistentry> + <term>watchManaggerName</term> + + <listitem> + <para>(Java system property only: <emphasis + role="bold">zookeeper.watchManagerName</emphasis>)</para> + + <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in + <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> New watcher + manager WatchManagerOptimized is added to optimize the memory overhead in heavy watch use cases. This + config is used to define which watcher manager to be used. Currently, we only support WatchManager and + WatchManagerOptimized.</para> + </listitem> + </varlistentry> + + <varlistentry> + <term>watcherCleanThreadsNum</term> + + <listitem> + <para>(Java system property only: <emphasis + role="bold">zookeeper.watcherCleanThreadsNum</emphasis>)</para> + + <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in + <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher + manager WatchManagerOptimized will clean up the dead watchers lazily, this config is used to decide how + many thread is used in the WatcherCleaner. More thread usually means larger clean up throughput. The + default value is 2, which is good enough even for heavy and continuous session closing/recreating cases.</para> + </listitem> + </varlistentry> + + <varlistentry> + <term>watcherCleanThreshold</term> + + <listitem> + <para>(Java system property only: <emphasis + role="bold">zookeeper.watcherCleanThreshold</emphasis>)</para> + + <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in + <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher + manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively + heavy, batch processing will reduce the cost and improve the performance. This setting is used to decide + the batch size. The default one is 1000, we don't need to change it if there is no memory or clean up + speed issue.</para> + </listitem> + </varlistentry> + + <varlistentry> + <term>watcherCleanIntervalInSeconds</term> + + <listitem> + <para>(Java system property only: <emphasis + role="bold">zookeeper.watcherCleanIntervalInSeconds</emphasis>)</para> + + <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in + <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher + manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively + heavy, batch processing will reduce the cost and improve the performance. Besides watcherCleanThreshold, + this setting is used to clean up the dead watchers after certain time even the dead watchers are not larger + than watcherCleanThreshold, so that we won't leave the dead watchers there for too long. The default setting + is 10 minutes, which usually don't need to be changed.</para> + </listitem> + </varlistentry> + + <varlistentry> + <term>maxInProcessingDeadWatchers</term> + + <listitem> + <para>(Java system property only: <emphasis + role="bold">zookeeper.maxInProcessingDeadWatchers</emphasis>)</para> + + <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in + <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> This is used + to control how many backlog can we have in the WatcherCleaner, when it reaches this number, it will + slow down adding the dead watcher to WatcherCleaner, which will in turn slow down adding and closing + watchers, so that we can avoid OOM issue. By default there is no limit, you can set it to values like + watcherCleanThreshold * 1000.</para> + </listitem> + </varlistentry> + + <varlistentry> + <term>bitHashCacheSize</term> + + <listitem> + <para>(Java system property only: <emphasis + role="bold">zookeeper.bitHashCacheSize</emphasis>)</para> + + <para><emphasis role="bold">New 3.6.0:</emphasis> Added in + <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> This is the + setting used to decide the HashSet cache size in the BitHashSet implementation. Without HashSet, we + need to use O(N) time to get the elements, N is the bit numbers in elementBits. But we need to + keep the size small to make sure it doesn't cost too much in memory, there is a trade off between memory + and time complexity. The default value is 10, which seems a relatively reasonable cache size.</para> + </listitem> + </varlistentry> + </variablelist> </section>