Github user lvfangmin commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/590#discussion_r220050492
  
    --- Diff: 
src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java ---
    @@ -0,0 +1,355 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.watch;
    +
    +import java.io.PrintWriter;
    +import java.util.HashMap;
    +import java.util.BitSet;
    +import java.util.HashSet;
    +import java.util.ArrayList;
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Random;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.lang.Iterable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.locks.ReadWriteLock;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.apache.zookeeper.Watcher.Event.KeeperState;
    +import org.apache.zookeeper.server.ServerCnxn;
    +import org.apache.zookeeper.server.util.BitMap;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Optimized in memory and time complexity, compared to WatchManager, both 
the
    + * memory consumption and time complexity improved a lot, but it cannot
    + * efficiently remove the watcher when the session or socket is closed, for
    + * majority usecase this is not a problem.
    + *
    + * Changed made compared to WatchManager:
    + *
    + * - Use HashSet and BitSet to store the watchers to find a balance between
    + *   memory usage and time complexity
    + * - Use ReadWriteLock instead of synchronized to reduce lock retention
    + * - Lazily clean up the closed watchers
    + */
    +public class WatchManagerOptimized
    +        implements IWatchManager, DeadWatcherListener {
    +
    +    private static final Logger LOG =
    +            LoggerFactory.getLogger(WatchManagerOptimized.class);
    +
    +    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
    +            new ConcurrentHashMap<String, BitHashSet>();
    +
    +    // watcher to bit id mapping
    +    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
    +
    +    // used to lazily remove the dead watchers
    +    private final WatcherCleaner watcherCleaner;
    +
    +    private final ReentrantReadWriteLock addRemovePathRWLock = new 
ReentrantReadWriteLock();
    +
    +    public WatchManagerOptimized() {
    +        watcherCleaner = new WatcherCleaner(this);
    +        watcherCleaner.start();
    +    }
    +
    +    @Override
    +    public boolean addWatch(String path, Watcher watcher) {
    +        boolean result = false;
    +        addRemovePathRWLock.readLock().lock();
    +        try {
    +            // avoid race condition of adding a on flying dead watcher
    +            if (isDeadWatcher(watcher)) {
    +                LOG.debug("Ignoring addWatch with closed cnxn");
    +            } else {
    +                Integer bit = watcherBitIdMap.add(watcher);
    +                BitHashSet watchers = pathWatches.get(path);
    +                if (watchers == null) {
    +                    watchers = new BitHashSet();
    +                    BitHashSet existingWatchers = 
pathWatches.putIfAbsent(path, watchers);
    +                    if (existingWatchers != null) {
    --- End diff --
    
    That's correct, reading requests are processed concurrently in 
CommitProcessor worker service, so it's possible multiple thread might add to 
pathWatches while we're holding read lock, that's why we need this check here. 


---

Reply via email to