[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user asfgit closed the pull request at: https://github.com/apache/zookeeper/pull/590 ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r220055378 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); +if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) { +return false; +} +return true; +}
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r220055291 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java --- @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.Set; +import java.util.HashSet; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.server.RateLogger; +import org.apache.zookeeper.server.WorkerService; +import org.apache.zookeeper.server.WorkerService.WorkRequest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Thread used to lazily clean up the closed watcher, it will trigger the + * clean up when the dead watchers get certain number or some number of + * seconds has elapsed since last clean up. + * + * Cost of running it: + * + * - need to go through all the paths even if the watcher may only + * watching a single path + * - block in the path BitHashSet when we try to check the dead watcher + * which won't block other stuff + */ +public class WatcherCleaner extends Thread { + +private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class); +private final RateLogger RATE_LOGGER = new RateLogger(LOG); + +private volatile boolean stopped = false; +private final Object cleanEvent = new Object(); +private final Random r = new Random(System.nanoTime()); +private final WorkerService cleaners; + +private final Set deadWatchers; +private final DeadWatcherListener listener; +private final int watcherCleanThreshold; +private final int watcherCleanIntervalInSeconds; +private final int maxInProcessingDeadWatchers; +private final AtomicInteger totalDeadWatchers = new AtomicInteger(); + +public WatcherCleaner(DeadWatcherListener listener) { +this(listener, +Integer.getInteger("zookeeper.watcherCleanThreshold", 1000), +Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600), +Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2), +Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1)); +} + +public WatcherCleaner(DeadWatcherListener listener, +int watcherCleanThreshold, int watcherCleanIntervalInSeconds, +int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) { +this.listener = listener; +this.watcherCleanThreshold = watcherCleanThreshold; +this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds; +int suggestedMaxInProcessingThreshold = +watcherCleanThreshold * watcherCleanThreadsNum; +if (maxInProcessingDeadWatchers > 0 && +maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) { +maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold; +LOG.info("The maxInProcessingDeadWatchers config is smaller " + +"than the suggested one, change it to use {}", +maxInProcessingDeadWatchers); +} +this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers; +this.deadWatchers = new HashSet(); +this.cleaners = new WorkerService("DeadWatcherCleanner", +watcherCleanThreadsNum, false); + +LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" + +", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}", +watcherCleanThreshold, watcherCleanIntervalInSeconds, +watcherCleanThreadsNum, maxInProcessingDeadWatchers); +} + +public void addDeadWatcher(int watcherBit) { +// Wait if th
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r220050492 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { --- End diff -- That's correct, reading requests are processed concurrently in CommitProcessor worker service, so it's possible multiple thread might add to pathWatches while we're holding read lock, that's why we need this check here. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r220050127 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); --- End diff -- Yes, it's used to improve the read throughput, creating new watcher bit and adding it to the BitHashSet has it's own lock to minimize the lock scope. I'll add some comments here. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r220049442 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); +if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) { +return false; +} +return true; +
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r220049264 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java --- @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.Set; +import java.util.HashSet; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.server.RateLogger; +import org.apache.zookeeper.server.WorkerService; +import org.apache.zookeeper.server.WorkerService.WorkRequest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Thread used to lazily clean up the closed watcher, it will trigger the + * clean up when the dead watchers get certain number or some number of + * seconds has elapsed since last clean up. + * + * Cost of running it: + * + * - need to go through all the paths even if the watcher may only + * watching a single path + * - block in the path BitHashSet when we try to check the dead watcher + * which won't block other stuff + */ +public class WatcherCleaner extends Thread { + +private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class); +private final RateLogger RATE_LOGGER = new RateLogger(LOG); + +private volatile boolean stopped = false; +private final Object cleanEvent = new Object(); +private final Random r = new Random(System.nanoTime()); +private final WorkerService cleaners; + +private final Set deadWatchers; +private final DeadWatcherListener listener; +private final int watcherCleanThreshold; +private final int watcherCleanIntervalInSeconds; +private final int maxInProcessingDeadWatchers; +private final AtomicInteger totalDeadWatchers = new AtomicInteger(); + +public WatcherCleaner(DeadWatcherListener listener) { +this(listener, +Integer.getInteger("zookeeper.watcherCleanThreshold", 1000), +Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600), +Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2), +Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1)); +} + +public WatcherCleaner(DeadWatcherListener listener, +int watcherCleanThreshold, int watcherCleanIntervalInSeconds, +int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) { +this.listener = listener; +this.watcherCleanThreshold = watcherCleanThreshold; +this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds; +int suggestedMaxInProcessingThreshold = +watcherCleanThreshold * watcherCleanThreadsNum; +if (maxInProcessingDeadWatchers > 0 && +maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) { +maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold; +LOG.info("The maxInProcessingDeadWatchers config is smaller " + +"than the suggested one, change it to use {}", +maxInProcessingDeadWatchers); +} +this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers; +this.deadWatchers = new HashSet(); +this.cleaners = new WorkerService("DeadWatcherCleanner", +watcherCleanThreadsNum, false); + +LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" + +", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}", +watcherCleanThreshold, watcherCleanIntervalInSeconds, +watcherCleanThreadsNum, maxInProcessingDeadWatchers); +} + +public void addDeadWatcher(int watcherBit) { +// Wait
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219958886 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { --- End diff -- Is this check necessary, because we are using a read lock here so it's possible for another thread to modify the `pathWatches` while we are here? ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219912160 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; --- End diff -- couple of unused imports here ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219931441 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); +if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) { +return false; +} +return true; +}
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219911008 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java --- @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.Set; +import java.util.HashSet; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.server.RateLogger; +import org.apache.zookeeper.server.WorkerService; +import org.apache.zookeeper.server.WorkerService.WorkRequest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Thread used to lazily clean up the closed watcher, it will trigger the + * clean up when the dead watchers get certain number or some number of + * seconds has elapsed since last clean up. + * + * Cost of running it: + * + * - need to go through all the paths even if the watcher may only + * watching a single path + * - block in the path BitHashSet when we try to check the dead watcher + * which won't block other stuff + */ +public class WatcherCleaner extends Thread { + +private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class); +private final RateLogger RATE_LOGGER = new RateLogger(LOG); + +private volatile boolean stopped = false; +private final Object cleanEvent = new Object(); +private final Random r = new Random(System.nanoTime()); +private final WorkerService cleaners; + +private final Set deadWatchers; +private final DeadWatcherListener listener; +private final int watcherCleanThreshold; +private final int watcherCleanIntervalInSeconds; +private final int maxInProcessingDeadWatchers; +private final AtomicInteger totalDeadWatchers = new AtomicInteger(); + +public WatcherCleaner(DeadWatcherListener listener) { +this(listener, +Integer.getInteger("zookeeper.watcherCleanThreshold", 1000), +Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600), +Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2), +Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1)); +} + +public WatcherCleaner(DeadWatcherListener listener, +int watcherCleanThreshold, int watcherCleanIntervalInSeconds, +int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) { +this.listener = listener; +this.watcherCleanThreshold = watcherCleanThreshold; +this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds; +int suggestedMaxInProcessingThreshold = +watcherCleanThreshold * watcherCleanThreadsNum; +if (maxInProcessingDeadWatchers > 0 && +maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) { +maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold; +LOG.info("The maxInProcessingDeadWatchers config is smaller " + +"than the suggested one, change it to use {}", +maxInProcessingDeadWatchers); +} +this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers; +this.deadWatchers = new HashSet(); +this.cleaners = new WorkerService("DeadWatcherCleanner", +watcherCleanThreadsNum, false); + +LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" + +", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}", +watcherCleanThreshold, watcherCleanIntervalInSeconds, +watcherCleanThreadsNum, maxInProcessingDeadWatchers); +} + +public void addDeadWatcher(int watcherBit) { +// Wait if th
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219957465 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); --- End diff -- Is the purpose of using a read lock here is to optimize for `addWatch` heavy workloads? Would be good to add a comment here about why choose use a read lock instead of write lock. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219912709 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. --- End diff -- nit: use case ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219934138 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); +if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) { +return false; +} +return true; +}
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219959421 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); --- End diff -- Would be good to add a comment here regarding why no synchronization is required here. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219896267 --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java --- @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.BitSet; +import java.util.Set; +import java.util.HashSet; +import java.util.Iterator; +import java.lang.Iterable; + +import org.apache.zookeeper.server.util.BitMap; + +/** + * Using BitSet to store all the elements, and use HashSet to cache limited + * number of elements to find a balance between memory and time complexity. + * + * Without HashSet, we need to to use O(N) time to get the elements, N is + * the bit numbers in elementBits. But we need to keep the size small to make + * sure it doesn't cost too much in memory, there is a tradeoff between + * memory and time complexity. + * + * Previously, was deciding to dynamically switch between SparseBitSet and + * HashSet based on the memory consumption, but it will take time to copy + * data over and may have some herd effect of keep copying data from one + * data structure to anther. The current solution can do a very good job + * given most of the paths have limited number of elements. + */ +public class BitHashSet implements Iterable { + +static final long serialVersionUID = 6382565447128283568L; + +/** + * Change to SparseBitSet if we we want to optimize more, the number of + * elements on a single server is usually limited, so BitSet should be + * fine. + */ +private final BitSet elementBits = new BitSet(); +private final Set cache = new HashSet(); + +private final int cacheSize; + +// To record how many elements in this set. +private int elementCount = 0; + +public BitHashSet() { +this(Integer.getInteger("zookeeper.bitHashCacheSize", 10)); +} + +public BitHashSet(int cacheSize) { +this.cacheSize = cacheSize; +} + +public synchronized boolean add(Integer elementBit) { +if (elementBit == null || elementBits.get(elementBit)) { +return false; +} +if (cache.size() < cacheSize) { +cache.add(elementBit); +} +elementBits.set(elementBit); +elementCount++; +return true; +} + +/** + * Remove the watches, and return the number of watches being removed. + */ +public synchronized int remove(Set bitSet, BitSet bits) { +cache.removeAll(bitSet); +elementBits.andNot(bits); +int elementCountBefore = elementCount; +elementCount = elementBits.cardinality(); +return elementCountBefore - elementCount; +} + +public synchronized boolean remove(Integer elementBit) { +if (elementBit == null || !elementBits.get(elementBit)) { +return false; +} + +cache.remove(elementBit); +elementBits.clear(elementBit); +elementCount--; +return true; +} + +public synchronized boolean contains(Integer elementBit) { +if (elementBit == null) { +return false; +} +return elementBits.get(elementBit); --- End diff -- would be good to add a comment at the declaration of `cache` variable, stating that its purpose is to optimize iteration. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219688507 --- Diff: src/test/java/bench/org/apache/zookeeper/BenchMain.java --- @@ -0,0 +1,12 @@ +package org.apache.zookeeper; --- End diff -- Will add it. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219687878 --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java --- @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.BitSet; +import java.util.Set; +import java.util.HashSet; +import java.util.Iterator; +import java.lang.Iterable; + +import org.apache.zookeeper.server.util.BitMap; + +/** + * Using BitSet to store all the elements, and use HashSet to cache limited + * number of elements to find a balance between memory and time complexity. + * + * Without HashSet, we need to to use O(N) time to get the elements, N is + * the bit numbers in elementBits. But we need to keep the size small to make + * sure it doesn't cost too much in memory, there is a tradeoff between + * memory and time complexity. + * + * Previously, was deciding to dynamically switch between SparseBitSet and + * HashSet based on the memory consumption, but it will take time to copy + * data over and may have some herd effect of keep copying data from one + * data structure to anther. The current solution can do a very good job + * given most of the paths have limited number of elements. + */ +public class BitHashSet implements Iterable { + +static final long serialVersionUID = 6382565447128283568L; + +/** + * Change to SparseBitSet if we we want to optimize more, the number of + * elements on a single server is usually limited, so BitSet should be + * fine. + */ +private final BitSet elementBits = new BitSet(); +private final Set cache = new HashSet(); + +private final int cacheSize; + +// To record how many elements in this set. +private int elementCount = 0; + +public BitHashSet() { +this(Integer.getInteger("zookeeper.bitHashCacheSize", 10)); +} + +public BitHashSet(int cacheSize) { +this.cacheSize = cacheSize; +} + +public synchronized boolean add(Integer elementBit) { +if (elementBit == null || elementBits.get(elementBit)) { +return false; +} +if (cache.size() < cacheSize) { +cache.add(elementBit); +} +elementBits.set(elementBit); +elementCount++; +return true; +} + +/** + * Remove the watches, and return the number of watches being removed. + */ +public synchronized int remove(Set bitSet, BitSet bits) { +cache.removeAll(bitSet); +elementBits.andNot(bits); +int elementCountBefore = elementCount; +elementCount = elementBits.cardinality(); +return elementCountBefore - elementCount; +} + +public synchronized boolean remove(Integer elementBit) { +if (elementBit == null || !elementBits.get(elementBit)) { +return false; +} + +cache.remove(elementBit); +elementBits.clear(elementBit); +elementCount--; +return true; +} + +public synchronized boolean contains(Integer elementBit) { +if (elementBit == null) { +return false; +} +return elementBits.get(elementBit); --- End diff -- BitSet.get is O(1), check cache doesn't may actually more expensive. HashSet is used to optimize the iterating, for example, if there is a single element in this BitHashSet, but the bit is very large, without HashSet we need to go through all the words before return that element, which is not efficient. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219688028 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java --- @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.Set; +import java.util.HashSet; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.server.RateLogger; +import org.apache.zookeeper.server.WorkerService; +import org.apache.zookeeper.server.WorkerService.WorkRequest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Thread used to lazily clean up the closed watcher, it will trigger the + * clean up when the dead watchers get certain number or some number of + * seconds has elapsed since last clean up. + * + * Cost of running it: + * + * - need to go through all the paths even if the watcher may only + * watching a single path + * - block in the path BitHashSet when we try to check the dead watcher + * which won't block other stuff + */ +public class WatcherCleaner extends Thread { + +private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class); +private final RateLogger RATE_LOGGER = new RateLogger(LOG); + +private volatile boolean stopped = false; +private final Object cleanEvent = new Object(); +private final Random r = new Random(System.nanoTime()); +private final WorkerService cleaners; + +private final Set deadWatchers; +private final DeadWatcherListener listener; +private final int watcherCleanThreshold; +private final int watcherCleanIntervalInSeconds; +private final int maxInProcessingDeadWatchers; +private final AtomicInteger totalDeadWatchers = new AtomicInteger(); + +public WatcherCleaner(DeadWatcherListener listener) { +this(listener, +Integer.getInteger("zookeeper.watcherCleanThreshold", 1000), +Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600), +Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2), +Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1)); +} + +public WatcherCleaner(DeadWatcherListener listener, +int watcherCleanThreshold, int watcherCleanIntervalInSeconds, +int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) { +this.listener = listener; +this.watcherCleanThreshold = watcherCleanThreshold; +this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds; +int suggestedMaxInProcessingThreshold = +watcherCleanThreshold * watcherCleanThreadsNum; +if (maxInProcessingDeadWatchers > 0 && +maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) { +maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold; +LOG.info("The maxInProcessingDeadWatchers config is smaller " + +"than the suggested one, change it to use {}", +maxInProcessingDeadWatchers); +} +this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers; +this.deadWatchers = new HashSet(); +this.cleaners = new WorkerService("DeadWatcherCleanner", +watcherCleanThreadsNum, false); + +LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" + +", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}", +watcherCleanThreshold, watcherCleanIntervalInSeconds, +watcherCleanThreadsNum, maxInProcessingDeadWatchers); +} + +public void addDeadWatcher(int watcherBit) { +// Wait
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219688196 --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java --- @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; --- End diff -- At the beginning when we added this class, it was bound with Watcher, but not anymore after refactoring, we can move this to server.util, I'll do that. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219688243 --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java --- @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.BitSet; +import java.util.Set; +import java.util.HashSet; +import java.util.Iterator; +import java.lang.Iterable; + +import org.apache.zookeeper.server.util.BitMap; + +/** + * Using BitSet to store all the elements, and use HashSet to cache limited + * number of elements to find a balance between memory and time complexity. + * + * Without HashSet, we need to to use O(N) time to get the elements, N is + * the bit numbers in elementBits. But we need to keep the size small to make + * sure it doesn't cost too much in memory, there is a tradeoff between + * memory and time complexity. + * + * Previously, was deciding to dynamically switch between SparseBitSet and + * HashSet based on the memory consumption, but it will take time to copy + * data over and may have some herd effect of keep copying data from one + * data structure to anther. The current solution can do a very good job + * given most of the paths have limited number of elements. + */ +public class BitHashSet implements Iterable { + +static final long serialVersionUID = 6382565447128283568L; --- End diff -- Previously, it was using inheritance instead of composition with HashSet, at that time we added this serialVersionUID, didn't remove this after changing to composition, will remove it. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219687885 --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java --- @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.BitSet; +import java.util.Set; +import java.util.HashSet; +import java.util.Iterator; +import java.lang.Iterable; + +import org.apache.zookeeper.server.util.BitMap; + +/** + * Using BitSet to store all the elements, and use HashSet to cache limited + * number of elements to find a balance between memory and time complexity. + * + * Without HashSet, we need to to use O(N) time to get the elements, N is + * the bit numbers in elementBits. But we need to keep the size small to make + * sure it doesn't cost too much in memory, there is a tradeoff between + * memory and time complexity. + * + * Previously, was deciding to dynamically switch between SparseBitSet and + * HashSet based on the memory consumption, but it will take time to copy + * data over and may have some herd effect of keep copying data from one + * data structure to anther. The current solution can do a very good job + * given most of the paths have limited number of elements. + */ +public class BitHashSet implements Iterable { + +static final long serialVersionUID = 6382565447128283568L; + +/** + * Change to SparseBitSet if we we want to optimize more, the number of + * elements on a single server is usually limited, so BitSet should be + * fine. + */ +private final BitSet elementBits = new BitSet(); +private final Set cache = new HashSet(); + +private final int cacheSize; + +// To record how many elements in this set. +private int elementCount = 0; + +public BitHashSet() { +this(Integer.getInteger("zookeeper.bitHashCacheSize", 10)); +} + +public BitHashSet(int cacheSize) { +this.cacheSize = cacheSize; +} + +public synchronized boolean add(Integer elementBit) { +if (elementBit == null || elementBits.get(elementBit)) { +return false; +} +if (cache.size() < cacheSize) { +cache.add(elementBit); +} +elementBits.set(elementBit); +elementCount++; +return true; +} + +/** + * Remove the watches, and return the number of watches being removed. + */ +public synchronized int remove(Set bitSet, BitSet bits) { +cache.removeAll(bitSet); +elementBits.andNot(bits); +int elementCountBefore = elementCount; +elementCount = elementBits.cardinality(); +return elementCountBefore - elementCount; +} + +public synchronized boolean remove(Integer elementBit) { +if (elementBit == null || !elementBits.get(elementBit)) { +return false; +} + +cache.remove(elementBit); +elementBits.clear(elementBit); +elementCount--; +return true; +} + +public synchronized boolean contains(Integer elementBit) { +if (elementBit == null) { +return false; +} +return elementBits.get(elementBit); +} + +public synchronized int size() { +return elementCount; +} + +/** + * This function is not thread-safe, need to synchronized when + * iterate through this set. + */ +@Override +public Iterator iterator() { --- End diff -- It's used in the triggerWatcher with for iterator. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219688430 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java --- @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.Set; +import java.util.HashSet; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.server.RateLogger; +import org.apache.zookeeper.server.WorkerService; +import org.apache.zookeeper.server.WorkerService.WorkRequest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Thread used to lazily clean up the closed watcher, it will trigger the + * clean up when the dead watchers get certain number or some number of + * seconds has elapsed since last clean up. + * + * Cost of running it: + * + * - need to go through all the paths even if the watcher may only + * watching a single path + * - block in the path BitHashSet when we try to check the dead watcher + * which won't block other stuff + */ +public class WatcherCleaner extends Thread { + +private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class); +private final RateLogger RATE_LOGGER = new RateLogger(LOG); + +private volatile boolean stopped = false; +private final Object cleanEvent = new Object(); +private final Random r = new Random(System.nanoTime()); +private final WorkerService cleaners; + +private final Set deadWatchers; +private final DeadWatcherListener listener; +private final int watcherCleanThreshold; +private final int watcherCleanIntervalInSeconds; +private final int maxInProcessingDeadWatchers; +private final AtomicInteger totalDeadWatchers = new AtomicInteger(); + +public WatcherCleaner(DeadWatcherListener listener) { +this(listener, +Integer.getInteger("zookeeper.watcherCleanThreshold", 1000), +Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600), +Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2), +Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1)); +} + +public WatcherCleaner(DeadWatcherListener listener, +int watcherCleanThreshold, int watcherCleanIntervalInSeconds, +int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) { +this.listener = listener; +this.watcherCleanThreshold = watcherCleanThreshold; +this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds; +int suggestedMaxInProcessingThreshold = +watcherCleanThreshold * watcherCleanThreadsNum; +if (maxInProcessingDeadWatchers > 0 && +maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) { +maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold; +LOG.info("The maxInProcessingDeadWatchers config is smaller " + +"than the suggested one, change it to use {}", +maxInProcessingDeadWatchers); +} +this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers; +this.deadWatchers = new HashSet(); +this.cleaners = new WorkerService("DeadWatcherCleanner", +watcherCleanThreadsNum, false); + +LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" + +", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}", +watcherCleanThreshold, watcherCleanIntervalInSeconds, +watcherCleanThreadsNum, maxInProcessingDeadWatchers); +} + +public void addDeadWatcher(int watcherBit) { +// Wait
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219688164 --- Diff: src/java/main/org/apache/zookeeper/server/util/BitMap.java --- @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.util; + +import java.util.Map; +import java.util.HashMap; +import java.util.BitSet; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * This is a helper class to maintain the bit to specific value and the + * reversed value to bit mapping. + */ +public class BitMap { + +private final Map value2Bit = new HashMap(); +private final Map bit2Value = new HashMap(); + +private final BitSet freedBitSet = new BitSet(); +private Integer nextBit = Integer.valueOf(0); + +private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + +public Integer add(T value) { +Integer bit = getBit(value); +if (bit != null) { +return bit; +} --- End diff -- This BitMap is used by WatchManagerOptimized.watcherBitIdMap, which is used to store watcher to bit mapping. Add might be called a lot if the same client connection is watching on thousands of even millions of nodes, remove only called once when the session is closed, that's why we optimized to check read lock first in add, but use write lock directly in remove. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219643998 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManager.java --- @@ -83,18 +97,21 @@ synchronized void removeWatcher(Watcher watcher) { Set list = watchTable.get(p); if (list != null) { list.remove(watcher); -if (list.size() == 0) { +if (list.isEmpty()) { watchTable.remove(p); } } } } -Set triggerWatch(String path, EventType type) { +@Override +public WatcherOrBitSet triggerWatch(String path, EventType type) { return triggerWatch(path, type, null); } -Set triggerWatch(String path, EventType type, Set supress) { +@Override +public WatcherOrBitSet triggerWatch( +String path, EventType type, WatcherOrBitSet supress) { --- End diff -- `suppress ` instead of `supress` ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219640916 --- Diff: src/java/main/org/apache/zookeeper/server/DataTree.java --- @@ -253,6 +259,14 @@ public DataTree() { addConfigNode(); nodeDataSize.set(approximateDataSize()); +try { +dataWatches = WatchManagerFactory.createWatchManager(); +childWatches = WatchManagerFactory.createWatchManager(); +} catch (Exception e) { +LOG.error("Unexpected exception when creating WatchManager, " + +"exiting abnormally", e); --- End diff -- nit: use parameterized logging here. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219661278 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java --- @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.Set; +import java.util.HashSet; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.server.RateLogger; +import org.apache.zookeeper.server.WorkerService; +import org.apache.zookeeper.server.WorkerService.WorkRequest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Thread used to lazily clean up the closed watcher, it will trigger the + * clean up when the dead watchers get certain number or some number of + * seconds has elapsed since last clean up. + * + * Cost of running it: + * + * - need to go through all the paths even if the watcher may only + * watching a single path + * - block in the path BitHashSet when we try to check the dead watcher + * which won't block other stuff + */ +public class WatcherCleaner extends Thread { + +private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class); +private final RateLogger RATE_LOGGER = new RateLogger(LOG); + +private volatile boolean stopped = false; +private final Object cleanEvent = new Object(); +private final Random r = new Random(System.nanoTime()); +private final WorkerService cleaners; + +private final Set deadWatchers; +private final DeadWatcherListener listener; +private final int watcherCleanThreshold; +private final int watcherCleanIntervalInSeconds; +private final int maxInProcessingDeadWatchers; +private final AtomicInteger totalDeadWatchers = new AtomicInteger(); + +public WatcherCleaner(DeadWatcherListener listener) { +this(listener, +Integer.getInteger("zookeeper.watcherCleanThreshold", 1000), +Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600), +Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2), +Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1)); +} + +public WatcherCleaner(DeadWatcherListener listener, +int watcherCleanThreshold, int watcherCleanIntervalInSeconds, +int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) { +this.listener = listener; +this.watcherCleanThreshold = watcherCleanThreshold; +this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds; +int suggestedMaxInProcessingThreshold = +watcherCleanThreshold * watcherCleanThreadsNum; +if (maxInProcessingDeadWatchers > 0 && +maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) { +maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold; +LOG.info("The maxInProcessingDeadWatchers config is smaller " + +"than the suggested one, change it to use {}", +maxInProcessingDeadWatchers); +} +this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers; +this.deadWatchers = new HashSet(); +this.cleaners = new WorkerService("DeadWatcherCleanner", +watcherCleanThreadsNum, false); + +LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" + +", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}", +watcherCleanThreshold, watcherCleanIntervalInSeconds, +watcherCleanThreadsNum, maxInProcessingDeadWatchers); +} + +public void addDeadWatcher(int watcherBit) { +// Wait if th
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219659808 --- Diff: src/java/main/org/apache/zookeeper/server/util/BitMap.java --- @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.util; + +import java.util.Map; +import java.util.HashMap; +import java.util.BitSet; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * This is a helper class to maintain the bit to specific value and the + * reversed value to bit mapping. + */ +public class BitMap { + +private final Map value2Bit = new HashMap(); +private final Map bit2Value = new HashMap(); + +private final BitSet freedBitSet = new BitSet(); +private Integer nextBit = Integer.valueOf(0); + +private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + +public Integer add(T value) { +Integer bit = getBit(value); +if (bit != null) { +return bit; +} --- End diff -- is it usual that we usually call this `add` method with same value over and over? If that's the case, then this optimization is good, but if not, then this adds unnecessary cost of acquiring and releasing the reader lock plus writer lock per `add`, as opposed to just acquiring and releasing writer lock once per `add`. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219659171 --- Diff: src/java/main/org/apache/zookeeper/server/DumbWatcher.java --- @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.security.cert.Certificate; + +import org.apache.jute.Record; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.proto.ReplyHeader; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.ServerStats; + +/** + * A empthy watcher implementation used in bench and unit test. --- End diff -- spelling: `empty` ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219660065 --- Diff: zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml --- @@ -1025,6 +1025,102 @@ server.3=zoo3:2888:3888 + + +watchManaggerName + + + (Java system property only: zookeeper.watchManaggerName) + + New in 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 New watcher +manager WatchManagerOptimized is added to optimize the memory overhead in heavy watch use cases. This +config is used define which watch manager to be used. Currently, we only support WatchManager and +WatchManagerOptimized. + + + + +watcherCleanThreadsNum + + + (Java system property only: zookeeper.watcherCleanThreadsNum) + + New in 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 The new watcher +manager WatchManagerOptimized will clean up the dead watchers lazily, this config is used to decide how +many thread is used in the WatcherCleaner. More thread usually means larger clean up throughput. The +default value is 2, which is good enough even for heavy and continuous session closing/receating cases. + + + + +watcherCleanThreshold + + + (Java system property only: zookeeper.watcherCleanThreshold) + + New in 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 The new watcher +manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively +heavy, batch processing will reduce the cost and improve the performance. This setting is used to decide +the batch size. The default one is 1000, we don't need to change it if there is no memory or clean up +speed issue. + + + + +watcherCleanIntervalInSeconds + + + (Java system property only: zookeeper.watcherCleanIntervalInSeconds) + + New in 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 The new watcher +manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively +heavy, batch processing will reduce the cost and improve the performance. Besides watcherCleanThreshold, +this setting is used to clean up the dead watchers after certain time even the dead watchers are not larger +than watcherCleanThreshold, so that we won't leave the dead watchers there for too long. The default setting +is 10 minutes, which usually don't need to be changed. + + + + +maxInProcessingDeadWatchers + + + (Java system property only: zookeeper.maxInProcessingDeadWatchers) + + New in 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 This is used +to control how many backlog can we have in the WatcherCleaner, when it reaches this number, it will +slow down adding the dead watcher to WatcherCleaner, which will in turn slow down adding and closing +watchers, so that we can avoid OOM issue. By default there is no limit, you can set it to values like +watcherCleanThreshold * 1000. + + + + +bitHashCacheSize + + + (Java system property only: zookeeper.bitHashCacheSize) + + New 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 This is the +settin used to decide the HashSet cache size in the BitHashSet implementation. Without HashSet, we +need to to use O(N) time to get the elements, N is the bit numbers in elementBits. But we need to +keep the size small to make sure it doesn't cost too much in memory, there is a tradeoff between memory --- End diff -- space between `trade` and `off`. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219645409 --- Diff: zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml --- @@ -1025,6 +1025,102 @@ server.3=zoo3:2888:3888 + + +watchManaggerName + + + (Java system property only: zookeeper.watchManaggerName) --- End diff -- this should be `zookeeper.watchManagerName`. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219659161 --- Diff: src/java/main/org/apache/zookeeper/server/DumbWatcher.java --- @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.security.cert.Certificate; + +import org.apache.jute.Record; +import org.apache.zookeeper.Watcher; --- End diff -- unused import. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219641593 --- Diff: src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.server.ServerCnxn; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; --- End diff -- Remove all imports here except these three since rest of those were not used (my guess is this file was copied pasted?) ` import java.io.PrintWriter; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; ` ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219659865 --- Diff: src/java/main/org/apache/zookeeper/server/util/BitMap.java --- @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.util; + +import java.util.Map; +import java.util.HashMap; +import java.util.BitSet; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * This is a helper class to maintain the bit to specific value and the + * reversed value to bit mapping. + */ +public class BitMap { + +private final Map value2Bit = new HashMap(); +private final Map bit2Value = new HashMap(); + +private final BitSet freedBitSet = new BitSet(); +private Integer nextBit = Integer.valueOf(0); + +private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + +public Integer add(T value) { +Integer bit = getBit(value); +if (bit != null) { +return bit; +} --- End diff -- I am also wondering, if this optimization is indeed useful, why not do the same for the `remove` methods, that is, check and return early with a read lock before trying to acquire a write lock. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219659934 --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java --- @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.BitSet; +import java.util.Set; +import java.util.HashSet; +import java.util.Iterator; +import java.lang.Iterable; + +import org.apache.zookeeper.server.util.BitMap; + +/** + * Using BitSet to store all the elements, and use HashSet to cache limited + * number of elements to find a balance between memory and time complexity. + * + * Without HashSet, we need to to use O(N) time to get the elements, N is + * the bit numbers in elementBits. But we need to keep the size small to make + * sure it doesn't cost too much in memory, there is a tradeoff between --- End diff -- nit `trade off` ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219661330 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java --- @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.Set; +import java.util.HashSet; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.server.RateLogger; +import org.apache.zookeeper.server.WorkerService; +import org.apache.zookeeper.server.WorkerService.WorkRequest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Thread used to lazily clean up the closed watcher, it will trigger the + * clean up when the dead watchers get certain number or some number of + * seconds has elapsed since last clean up. + * + * Cost of running it: + * + * - need to go through all the paths even if the watcher may only + * watching a single path + * - block in the path BitHashSet when we try to check the dead watcher + * which won't block other stuff + */ +public class WatcherCleaner extends Thread { + +private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class); +private final RateLogger RATE_LOGGER = new RateLogger(LOG); + +private volatile boolean stopped = false; +private final Object cleanEvent = new Object(); +private final Random r = new Random(System.nanoTime()); +private final WorkerService cleaners; + +private final Set deadWatchers; +private final DeadWatcherListener listener; +private final int watcherCleanThreshold; +private final int watcherCleanIntervalInSeconds; +private final int maxInProcessingDeadWatchers; +private final AtomicInteger totalDeadWatchers = new AtomicInteger(); + +public WatcherCleaner(DeadWatcherListener listener) { +this(listener, +Integer.getInteger("zookeeper.watcherCleanThreshold", 1000), +Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600), +Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2), +Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1)); +} + +public WatcherCleaner(DeadWatcherListener listener, +int watcherCleanThreshold, int watcherCleanIntervalInSeconds, +int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) { +this.listener = listener; +this.watcherCleanThreshold = watcherCleanThreshold; +this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds; +int suggestedMaxInProcessingThreshold = +watcherCleanThreshold * watcherCleanThreadsNum; +if (maxInProcessingDeadWatchers > 0 && +maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) { +maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold; +LOG.info("The maxInProcessingDeadWatchers config is smaller " + +"than the suggested one, change it to use {}", +maxInProcessingDeadWatchers); +} +this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers; +this.deadWatchers = new HashSet(); +this.cleaners = new WorkerService("DeadWatcherCleanner", +watcherCleanThreadsNum, false); + +LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" + +", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}", +watcherCleanThreshold, watcherCleanIntervalInSeconds, +watcherCleanThreadsNum, maxInProcessingDeadWatchers); +} + +public void addDeadWatcher(int watcherBit) { +// Wait if th
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219661253 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java --- @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.Set; +import java.util.HashSet; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.server.RateLogger; +import org.apache.zookeeper.server.WorkerService; +import org.apache.zookeeper.server.WorkerService.WorkRequest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Thread used to lazily clean up the closed watcher, it will trigger the + * clean up when the dead watchers get certain number or some number of + * seconds has elapsed since last clean up. + * + * Cost of running it: + * + * - need to go through all the paths even if the watcher may only + * watching a single path + * - block in the path BitHashSet when we try to check the dead watcher + * which won't block other stuff + */ +public class WatcherCleaner extends Thread { + +private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class); +private final RateLogger RATE_LOGGER = new RateLogger(LOG); + +private volatile boolean stopped = false; +private final Object cleanEvent = new Object(); +private final Random r = new Random(System.nanoTime()); +private final WorkerService cleaners; + +private final Set deadWatchers; +private final DeadWatcherListener listener; +private final int watcherCleanThreshold; +private final int watcherCleanIntervalInSeconds; +private final int maxInProcessingDeadWatchers; +private final AtomicInteger totalDeadWatchers = new AtomicInteger(); + +public WatcherCleaner(DeadWatcherListener listener) { +this(listener, +Integer.getInteger("zookeeper.watcherCleanThreshold", 1000), +Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600), +Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2), +Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1)); +} + +public WatcherCleaner(DeadWatcherListener listener, +int watcherCleanThreshold, int watcherCleanIntervalInSeconds, +int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) { +this.listener = listener; +this.watcherCleanThreshold = watcherCleanThreshold; +this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds; +int suggestedMaxInProcessingThreshold = +watcherCleanThreshold * watcherCleanThreadsNum; +if (maxInProcessingDeadWatchers > 0 && +maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) { +maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold; +LOG.info("The maxInProcessingDeadWatchers config is smaller " + +"than the suggested one, change it to use {}", +maxInProcessingDeadWatchers); +} +this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers; +this.deadWatchers = new HashSet(); +this.cleaners = new WorkerService("DeadWatcherCleanner", +watcherCleanThreadsNum, false); + +LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" + +", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}", +watcherCleanThreshold, watcherCleanIntervalInSeconds, +watcherCleanThreadsNum, maxInProcessingDeadWatchers); +} + +public void addDeadWatcher(int watcherBit) { +// Wait if th
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219659984 --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java --- @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.BitSet; +import java.util.Set; +import java.util.HashSet; +import java.util.Iterator; +import java.lang.Iterable; + +import org.apache.zookeeper.server.util.BitMap; + +/** + * Using BitSet to store all the elements, and use HashSet to cache limited + * number of elements to find a balance between memory and time complexity. + * + * Without HashSet, we need to to use O(N) time to get the elements, N is + * the bit numbers in elementBits. But we need to keep the size small to make + * sure it doesn't cost too much in memory, there is a tradeoff between + * memory and time complexity. + * + * Previously, was deciding to dynamically switch between SparseBitSet and + * HashSet based on the memory consumption, but it will take time to copy + * data over and may have some herd effect of keep copying data from one + * data structure to anther. The current solution can do a very good job + * given most of the paths have limited number of elements. + */ +public class BitHashSet implements Iterable { + +static final long serialVersionUID = 6382565447128283568L; --- End diff -- why do we need this? `BitHashSet` is not implementing `Serializable` here... ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219660054 --- Diff: zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml --- @@ -1025,6 +1025,102 @@ server.3=zoo3:2888:3888 + + +watchManaggerName + + + (Java system property only: zookeeper.watchManaggerName) + + New in 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 New watcher +manager WatchManagerOptimized is added to optimize the memory overhead in heavy watch use cases. This +config is used define which watch manager to be used. Currently, we only support WatchManager and +WatchManagerOptimized. + + + + +watcherCleanThreadsNum + + + (Java system property only: zookeeper.watcherCleanThreadsNum) + + New in 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 The new watcher +manager WatchManagerOptimized will clean up the dead watchers lazily, this config is used to decide how +many thread is used in the WatcherCleaner. More thread usually means larger clean up throughput. The +default value is 2, which is good enough even for heavy and continuous session closing/receating cases. + + + + +watcherCleanThreshold + + + (Java system property only: zookeeper.watcherCleanThreshold) + + New in 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 The new watcher +manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively +heavy, batch processing will reduce the cost and improve the performance. This setting is used to decide +the batch size. The default one is 1000, we don't need to change it if there is no memory or clean up +speed issue. + + + + +watcherCleanIntervalInSeconds + + + (Java system property only: zookeeper.watcherCleanIntervalInSeconds) + + New in 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 The new watcher +manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively +heavy, batch processing will reduce the cost and improve the performance. Besides watcherCleanThreshold, +this setting is used to clean up the dead watchers after certain time even the dead watchers are not larger +than watcherCleanThreshold, so that we won't leave the dead watchers there for too long. The default setting +is 10 minutes, which usually don't need to be changed. + + + + +maxInProcessingDeadWatchers + + + (Java system property only: zookeeper.maxInProcessingDeadWatchers) + + New in 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 This is used +to control how many backlog can we have in the WatcherCleaner, when it reaches this number, it will +slow down adding the dead watcher to WatcherCleaner, which will in turn slow down adding and closing +watchers, so that we can avoid OOM issue. By default there is no limit, you can set it to values like +watcherCleanThreshold * 1000. + + + + +bitHashCacheSize + + + (Java system property only: zookeeper.bitHashCacheSize) + + New 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 This is the +settin used to decide the HashSet cache size in the BitHashSet implementation. Without HashSet, we --- End diff -- spell check: `setting used to` ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219642525 --- Diff: src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.server.ServerCnxn; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public interface IWatchManager { + +/** + * Add watch to specific path. + * + * @param path znode path + * @param watcher watcher object reference + * + * @return true if the watcher added is not already present + */ +public boolean addWatch(String path, Watcher watcher); + +/** + * Checks the specified watcher exists for the given path + * + * @param path znode path + * @param watcher watcher object reference + * + * @return true if the watcher exists, false otherwise + */ +public boolean containsWatcher(String path, Watcher watcher); + +/** + * Removes the specified watcher for the given path + * + * @param path znode path + * @param watcher watcher object reference + * + * @return true if the watcher successfully removed, false otherwise + */ +public boolean removeWatcher(String path, Watcher watcher); + +/** + * The entry to remove the watcher when the cnxn is closed. + * + * @param watcher watcher object reference + */ +public void removeWatcher(Watcher watcher); + +/** + * Distribute the watch event for the given path. + * + * @param path znode path + * @param type the watch event type + * + * @return the watchers have been notified + */ +public WatcherOrBitSet triggerWatch(String path, EventType type); + +/** + * Distribute the watch event for the given path, but ignore those + * supressed ones. --- End diff -- spell check: `suppressed` instead of`supressed` ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219642002 --- Diff: src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.server.ServerCnxn; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public interface IWatchManager { + +/** + * Add watch to specific path. + * + * @param path znode path + * @param watcher watcher object reference + * + * @return true if the watcher added is not already present + */ +public boolean addWatch(String path, Watcher watcher); + +/** + * Checks the specified watcher exists for the given path + * + * @param path znode path + * @param watcher watcher object reference + * + * @return true if the watcher exists, false otherwise + */ +public boolean containsWatcher(String path, Watcher watcher); + +/** + * Removes the specified watcher for the given path --- End diff -- nit: missing full stop at end of sentence. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219641957 --- Diff: src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.server.ServerCnxn; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public interface IWatchManager { + +/** + * Add watch to specific path. + * + * @param path znode path + * @param watcher watcher object reference + * + * @return true if the watcher added is not already present + */ +public boolean addWatch(String path, Watcher watcher); + +/** + * Checks the specified watcher exists for the given path --- End diff -- nit: missing full stop at end of sentence. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219660689 --- Diff: src/java/main/org/apache/zookeeper/server/watch/DeadWatcherListener.java --- @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.Set; + +/** + * Interface used to process the dead watchers related to closed cnxns. + */ +public interface DeadWatcherListener { --- End diff -- would be good to rename this to `IDeadWatchListner`, which makes it obvious this is an interface. We already do this for `IWatchManager`. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219660630 --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java --- @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.BitSet; +import java.util.Set; +import java.util.HashSet; +import java.util.Iterator; +import java.lang.Iterable; + +import org.apache.zookeeper.server.util.BitMap; + +/** + * Using BitSet to store all the elements, and use HashSet to cache limited + * number of elements to find a balance between memory and time complexity. + * + * Without HashSet, we need to to use O(N) time to get the elements, N is + * the bit numbers in elementBits. But we need to keep the size small to make + * sure it doesn't cost too much in memory, there is a tradeoff between + * memory and time complexity. + * + * Previously, was deciding to dynamically switch between SparseBitSet and + * HashSet based on the memory consumption, but it will take time to copy + * data over and may have some herd effect of keep copying data from one + * data structure to anther. The current solution can do a very good job + * given most of the paths have limited number of elements. + */ +public class BitHashSet implements Iterable { + +static final long serialVersionUID = 6382565447128283568L; + +/** + * Change to SparseBitSet if we we want to optimize more, the number of + * elements on a single server is usually limited, so BitSet should be + * fine. + */ +private final BitSet elementBits = new BitSet(); +private final Set cache = new HashSet(); + +private final int cacheSize; + +// To record how many elements in this set. +private int elementCount = 0; + +public BitHashSet() { +this(Integer.getInteger("zookeeper.bitHashCacheSize", 10)); +} + +public BitHashSet(int cacheSize) { +this.cacheSize = cacheSize; +} + +public synchronized boolean add(Integer elementBit) { +if (elementBit == null || elementBits.get(elementBit)) { +return false; +} +if (cache.size() < cacheSize) { +cache.add(elementBit); +} +elementBits.set(elementBit); +elementCount++; +return true; +} + +/** + * Remove the watches, and return the number of watches being removed. + */ +public synchronized int remove(Set bitSet, BitSet bits) { +cache.removeAll(bitSet); +elementBits.andNot(bits); +int elementCountBefore = elementCount; +elementCount = elementBits.cardinality(); +return elementCountBefore - elementCount; +} + +public synchronized boolean remove(Integer elementBit) { +if (elementBit == null || !elementBits.get(elementBit)) { +return false; +} + +cache.remove(elementBit); +elementBits.clear(elementBit); +elementCount--; +return true; +} + +public synchronized boolean contains(Integer elementBit) { +if (elementBit == null) { +return false; +} +return elementBits.get(elementBit); +} + +public synchronized int size() { +return elementCount; +} + +/** + * This function is not thread-safe, need to synchronized when + * iterate through this set. + */ +@Override +public Iterator iterator() { --- End diff -- curious - what's this `iterator` is used for? I did not spot any usage of it by just going over the diff.. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219644285 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManager.java --- @@ -180,7 +192,8 @@ synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) { *watcher object reference * @return true if the watcher exists, false otherwise */ --- End diff -- we should remove this comment, which is already present in the `IWatchManager`, also for consistency with other overrides in `WatchManager`. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219660120 --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java --- @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.BitSet; +import java.util.Set; +import java.util.HashSet; +import java.util.Iterator; +import java.lang.Iterable; + +import org.apache.zookeeper.server.util.BitMap; + +/** + * Using BitSet to store all the elements, and use HashSet to cache limited + * number of elements to find a balance between memory and time complexity. + * + * Without HashSet, we need to to use O(N) time to get the elements, N is --- End diff -- nit `need to to` ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219645514 --- Diff: zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml --- @@ -1025,6 +1025,102 @@ server.3=zoo3:2888:3888 + + +watchManaggerName + + + (Java system property only: zookeeper.watchManaggerName) + + New in 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 New watcher +manager WatchManagerOptimized is added to optimize the memory overhead in heavy watch use cases. This +config is used define which watch manager to be used. Currently, we only support WatchManager and --- End diff -- is used to define which watcher ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219660592 --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java --- @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.BitSet; +import java.util.Set; +import java.util.HashSet; +import java.util.Iterator; +import java.lang.Iterable; + +import org.apache.zookeeper.server.util.BitMap; + +/** + * Using BitSet to store all the elements, and use HashSet to cache limited + * number of elements to find a balance between memory and time complexity. + * + * Without HashSet, we need to to use O(N) time to get the elements, N is + * the bit numbers in elementBits. But we need to keep the size small to make + * sure it doesn't cost too much in memory, there is a tradeoff between + * memory and time complexity. + * + * Previously, was deciding to dynamically switch between SparseBitSet and + * HashSet based on the memory consumption, but it will take time to copy + * data over and may have some herd effect of keep copying data from one + * data structure to anther. The current solution can do a very good job + * given most of the paths have limited number of elements. + */ +public class BitHashSet implements Iterable { + +static final long serialVersionUID = 6382565447128283568L; + +/** + * Change to SparseBitSet if we we want to optimize more, the number of + * elements on a single server is usually limited, so BitSet should be + * fine. + */ +private final BitSet elementBits = new BitSet(); +private final Set cache = new HashSet(); + +private final int cacheSize; + +// To record how many elements in this set. +private int elementCount = 0; + +public BitHashSet() { +this(Integer.getInteger("zookeeper.bitHashCacheSize", 10)); +} + +public BitHashSet(int cacheSize) { +this.cacheSize = cacheSize; +} + +public synchronized boolean add(Integer elementBit) { +if (elementBit == null || elementBits.get(elementBit)) { +return false; +} +if (cache.size() < cacheSize) { +cache.add(elementBit); +} +elementBits.set(elementBit); +elementCount++; +return true; +} + +/** + * Remove the watches, and return the number of watches being removed. + */ +public synchronized int remove(Set bitSet, BitSet bits) { +cache.removeAll(bitSet); +elementBits.andNot(bits); +int elementCountBefore = elementCount; +elementCount = elementBits.cardinality(); +return elementCountBefore - elementCount; +} + +public synchronized boolean remove(Integer elementBit) { +if (elementBit == null || !elementBits.get(elementBit)) { +return false; +} + +cache.remove(elementBit); +elementBits.clear(elementBit); +elementCount--; +return true; +} + +public synchronized boolean contains(Integer elementBit) { +if (elementBit == null) { +return false; +} +return elementBits.get(elementBit); --- End diff -- should we look up `cache` first here? If not, what's the purpose of adding `cache`? ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219660094 --- Diff: zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml --- @@ -1025,6 +1025,102 @@ server.3=zoo3:2888:3888 + + +watchManaggerName + + + (Java system property only: zookeeper.watchManaggerName) + + New in 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 New watcher +manager WatchManagerOptimized is added to optimize the memory overhead in heavy watch use cases. This +config is used define which watch manager to be used. Currently, we only support WatchManager and +WatchManagerOptimized. + + + + +watcherCleanThreadsNum + + + (Java system property only: zookeeper.watcherCleanThreadsNum) + + New in 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 The new watcher +manager WatchManagerOptimized will clean up the dead watchers lazily, this config is used to decide how +many thread is used in the WatcherCleaner. More thread usually means larger clean up throughput. The +default value is 2, which is good enough even for heavy and continuous session closing/receating cases. + + + + +watcherCleanThreshold + + + (Java system property only: zookeeper.watcherCleanThreshold) + + New in 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 The new watcher +manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively +heavy, batch processing will reduce the cost and improve the performance. This setting is used to decide +the batch size. The default one is 1000, we don't need to change it if there is no memory or clean up +speed issue. + + + + +watcherCleanIntervalInSeconds + + + (Java system property only: zookeeper.watcherCleanIntervalInSeconds) + + New in 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 The new watcher +manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively +heavy, batch processing will reduce the cost and improve the performance. Besides watcherCleanThreshold, +this setting is used to clean up the dead watchers after certain time even the dead watchers are not larger +than watcherCleanThreshold, so that we won't leave the dead watchers there for too long. The default setting +is 10 minutes, which usually don't need to be changed. + + + + +maxInProcessingDeadWatchers + + + (Java system property only: zookeeper.maxInProcessingDeadWatchers) + + New in 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 This is used +to control how many backlog can we have in the WatcherCleaner, when it reaches this number, it will +slow down adding the dead watcher to WatcherCleaner, which will in turn slow down adding and closing +watchers, so that we can avoid OOM issue. By default there is no limit, you can set it to values like +watcherCleanThreshold * 1000. + + + + +bitHashCacheSize + + + (Java system property only: zookeeper.bitHashCacheSize) + + New 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 This is the +settin used to decide the HashSet cache size in the BitHashSet implementation. Without HashSet, we +need to to use O(N) time to get the elements, N is the bit numbers in elementBits. But we need to --- End diff -- `need to` instead of `need to to` ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219660892 --- Diff: zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml --- @@ -1025,6 +1025,102 @@ server.3=zoo3:2888:3888 + + +watchManaggerName + + + (Java system property only: zookeeper.watchManaggerName) + + New in 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 New watcher +manager WatchManagerOptimized is added to optimize the memory overhead in heavy watch use cases. This +config is used define which watch manager to be used. Currently, we only support WatchManager and +WatchManagerOptimized. + + + + +watcherCleanThreadsNum + + + (Java system property only: zookeeper.watcherCleanThreadsNum) + + New in 3.6.0: Added in +https://issues.apache.org/jira/browse/ZOOKEEPER-1179";>ZOOKEEPER-1179 The new watcher +manager WatchManagerOptimized will clean up the dead watchers lazily, this config is used to decide how +many thread is used in the WatcherCleaner. More thread usually means larger clean up throughput. The +default value is 2, which is good enough even for heavy and continuous session closing/receating cases. --- End diff -- `closing/recreating` ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219659918 --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java --- @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; --- End diff -- is there a reason that this `BitHashSet` is part of `server.watch` package rather than part of `server.util` package, where a similar helper class `BitMap` sits? ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219642730 --- Diff: src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.server.ServerCnxn; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public interface IWatchManager { + +/** + * Add watch to specific path. + * + * @param path znode path + * @param watcher watcher object reference + * + * @return true if the watcher added is not already present + */ +public boolean addWatch(String path, Watcher watcher); + +/** + * Checks the specified watcher exists for the given path + * + * @param path znode path + * @param watcher watcher object reference + * + * @return true if the watcher exists, false otherwise + */ +public boolean containsWatcher(String path, Watcher watcher); + +/** + * Removes the specified watcher for the given path + * + * @param path znode path + * @param watcher watcher object reference + * + * @return true if the watcher successfully removed, false otherwise + */ +public boolean removeWatcher(String path, Watcher watcher); + +/** + * The entry to remove the watcher when the cnxn is closed. + * + * @param watcher watcher object reference + */ +public void removeWatcher(Watcher watcher); + +/** + * Distribute the watch event for the given path. + * + * @param path znode path + * @param type the watch event type + * + * @return the watchers have been notified + */ +public WatcherOrBitSet triggerWatch(String path, EventType type); + +/** + * Distribute the watch event for the given path, but ignore those + * supressed ones. + * + * @param path znode path + * @param type the watch event type + * @param supress the supressed watcher set + * + * @return the watchers have been notified + */ +public WatcherOrBitSet triggerWatch( +String path, EventType type, WatcherOrBitSet supress); --- End diff -- similar spelling issue for `supress` ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219659886 --- Diff: src/java/main/org/apache/zookeeper/server/watch/BitHashSet.java --- @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.BitSet; +import java.util.Set; +import java.util.HashSet; +import java.util.Iterator; +import java.lang.Iterable; + +import org.apache.zookeeper.server.util.BitMap; --- End diff -- I don't think this is used anywhere in this file. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219659165 --- Diff: src/java/main/org/apache/zookeeper/server/DumbWatcher.java --- @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.security.cert.Certificate; + +import org.apache.jute.Record; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.proto.ReplyHeader; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.ServerStats; --- End diff -- unused imports. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user hanm commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r219562472 --- Diff: src/test/java/bench/org/apache/zookeeper/BenchMain.java --- @@ -0,0 +1,12 @@ +package org.apache.zookeeper; --- End diff -- This file is missing apache license header. This triggers a -1 in last jenkins build. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r217112114 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); +if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) { +return false; +} +return true; +
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r216973199 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); +if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) { +return false; +} +return true; +
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r216912996 --- Diff: src/java/test/org/apache/zookeeper/server/DumbWatcher.java --- @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.security.cert.Certificate; + +import org.apache.jute.Record; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.proto.ReplyHeader; + +public class DumbWatcher extends ServerCnxn { --- End diff -- I agree from unit test case mock object is easier to maintain than stub ones, but I also need this DumbWatcher in the micro benchmark, I'll put this class somewhere in the code, so the micro benchmark and unit test can share it. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r216910219 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); +if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) { +return false; +} +return true; +
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r215183702 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); +if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) { +return false; +} +return true; +
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r215180602 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManager.java --- @@ -46,15 +48,26 @@ private final Map> watch2Paths = new HashMap>(); -synchronized int size(){ +@Override +public synchronized int size(){ int result = 0; for(Set watches : watchTable.values()) { result += watches.size(); } return result; } -synchronized void addWatch(String path, Watcher watcher) { +boolean isDeadWatcher(Watcher watcher) { --- End diff -- Taking that into account and the jira fix version, this patch will definitely go into 3.5 as well. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r214966630 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); +if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) { +return false; +} +return true; +
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r214987142 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); +if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) { +return false; +} +return true; +
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r214962201 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManager.java --- @@ -46,15 +48,26 @@ private final Map> watch2Paths = new HashMap>(); -synchronized int size(){ +@Override +public synchronized int size(){ int result = 0; for(Set watches : watchTable.values()) { result += watches.size(); } return result; } -synchronized void addWatch(String path, Watcher watcher) { +boolean isDeadWatcher(Watcher watcher) { --- End diff -- Yes, this also fixed the dead watch leaking issue in Watch Manager which was found when I was building the new optimized watch manager. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r214968090 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); +if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) { +return false; +} +return true; +
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r214965941 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); +if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) { +return false; +} +return true; +
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user lvfangmin commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r214961550 --- Diff: build.xml --- @@ -119,6 +119,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant"> + --- End diff -- Will do. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r214916758 --- Diff: src/java/main/org/apache/zookeeper/server/watch/DeadWatcherListener.java --- @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.util.Set; + +public interface DeadWatcherListener { --- End diff -- Please add a few words javadoc here. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r214912044 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); +if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) { +return false; +} +return true; +
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r214901441 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); +if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) { +return false; +} +return true; +
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r214915065 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); +if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) { +return false; +} +return true; +
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r214866254 --- Diff: build.xml --- @@ -119,6 +119,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant"> + --- End diff -- I think this new dir should be added to classpath of `eclipse` task too. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r214901032 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); +if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) { +return false; +} +return true; +
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r214918425 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); +if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) { +return false; +} +return true; +
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r214917025 --- Diff: src/java/main/org/apache/zookeeper/server/util/BitMap.java --- @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.util; + +import java.util.Map; +import java.util.HashMap; +import java.util.BitSet; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class BitMap { --- End diff -- I think a short javadoc similar to BitHashSet's would useful here. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r214915822 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java --- @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.BitSet; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.Iterator; +import java.lang.Iterable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.util.BitMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Optimized in memory and time complexity, compared to WatchManager, both the + * memory consumption and time complexity improved a lot, but it cannot + * efficiently remove the watcher when the session or socket is closed, for + * majority usecase this is not a problem. + * + * Changed made compared to WatchManager: + * + * - Use HashSet and BitSet to store the watchers to find a balance between + * memory usage and time complexity + * - Use ReadWriteLock instead of synchronized to reduce lock retention + * - Lazily clean up the closed watchers + */ +public class WatchManagerOptimized +implements IWatchManager, DeadWatcherListener { + +private static final Logger LOG = +LoggerFactory.getLogger(WatchManagerOptimized.class); + +private final ConcurrentHashMap pathWatches = +new ConcurrentHashMap(); + +// watcher to bit id mapping +private final BitMap watcherBitIdMap = new BitMap(); + +// used to lazily remove the dead watchers +private final WatcherCleaner watcherCleaner; + +private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock(); + +public WatchManagerOptimized() { +watcherCleaner = new WatcherCleaner(this); +watcherCleaner.start(); +} + +@Override +public boolean addWatch(String path, Watcher watcher) { +boolean result = false; +addRemovePathRWLock.readLock().lock(); +try { +// avoid race condition of adding a on flying dead watcher +if (isDeadWatcher(watcher)) { +LOG.debug("Ignoring addWatch with closed cnxn"); +} else { +Integer bit = watcherBitIdMap.add(watcher); +BitHashSet watchers = pathWatches.get(path); +if (watchers == null) { +watchers = new BitHashSet(); +BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers); +if (existingWatchers != null) { +watchers = existingWatchers; +} +} +result = watchers.add(bit); +} +} finally { +addRemovePathRWLock.readLock().unlock(); +} +return result; +} + +@Override +public boolean containsWatcher(String path, Watcher watcher) { +BitHashSet watchers = pathWatches.get(path); +if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) { +return false; +} +return true; +
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r214916521 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManagerFactory.java --- @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.watch; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WatchManagerFactory { --- End diff -- A quick javadoc would be awesome here. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r214890207 --- Diff: src/java/main/org/apache/zookeeper/server/watch/WatchManager.java --- @@ -46,15 +48,26 @@ private final Map> watch2Paths = new HashMap>(); -synchronized int size(){ +@Override +public synchronized int size(){ int result = 0; for(Set watches : watchTable.values()) { result += watches.size(); } return result; } -synchronized void addWatch(String path, Watcher watcher) { +boolean isDeadWatcher(Watcher watcher) { --- End diff -- Looks like this patch is not just an improvement, but it also fixes the edge case of adding dead watchers. Previously stale client connections haven't been checked while registering watchers. Is that correct? ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/590#discussion_r214922678 --- Diff: src/java/test/org/apache/zookeeper/server/DumbWatcher.java --- @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.security.cert.Certificate; + +import org.apache.jute.Record; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.proto.ReplyHeader; + +public class DumbWatcher extends ServerCnxn { --- End diff -- Please consider using mockito. ---
[GitHub] zookeeper pull request #590: [ZOOKEEPER-1177] Add the memory optimized watch...
GitHub user lvfangmin opened a pull request: https://github.com/apache/zookeeper/pull/590 [ZOOKEEPER-1177] Add the memory optimized watch manager for concentrate watches scenario The current HashSet based WatcherManager will consume more than 40GB memory when creating 300M watches. This patch optimized the memory and time complexity for concentrate watches scenario, compared to WatchManager, both the memory consumption and time complexity improved a lot. I'll post more data later with micro benchmark result. Changed made compared to WatchManager: * Only keep path to watches map * Use BitSet to save the memory used to store watches * Use ConcurrentHashMap and ReadWriteLock instead of synchronized to reduce lock retention * Lazily clean up the closed watchers You can merge this pull request into a Git repository by running: $ git pull https://github.com/lvfangmin/zookeeper ZOOKEEPER-1177 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/zookeeper/pull/590.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #590 commit d4f996fdd760417c90ffb28fd63cc37dc87416c1 Author: Fangmin Lyu Date: 2018-08-06T21:43:22Z add the memory optimized watch manager for concentrate watches ---