[ 
https://issues.apache.org/jira/browse/ZOOKEEPER-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lujie updated ZOOKEEPER-4711:
-----------------------------
    Summary: There is a data race bettween run() and addDeadWatcher in 
org.apache.zookeeper.server.watch.WatcherCleaner class when run 
org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers junit test. 
 (was: There is a data race bettween run() and "public void addDeadWatcher(int 
watcherBit)" in org.apache.zookeeper.server.watch.WatcherCleaner class when run 
org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers junit test.)

> There is a data race bettween run() and addDeadWatcher in 
> org.apache.zookeeper.server.watch.WatcherCleaner class when run 
> org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers junit 
> test.
> ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: ZOOKEEPER-4711
>                 URL: https://issues.apache.org/jira/browse/ZOOKEEPER-4711
>             Project: ZooKeeper
>          Issue Type: Bug
>          Components: server
>    Affects Versions: 3.9.0
>         Environment: download zookeeper 3.9.0-SNAPSHOT from github repository 
> ([https://github.com/apache/zookeeper)]
> Then run : mvn test -Dmaven.test.failure.ignore=true 
> -Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers 
> -DfailIfNoTests=false -DredirectTestOutputToFile=false
>            Reporter: lujie
>            Priority: Critical
>
> When we run :
> mvn test -Dmaven.test.failure.ignore=true 
> -Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers 
> -DfailIfNoTests=false -DredirectTestOutputToFile=false
> The method of addDeadWatcher
> (
>             System.out.println("2s::" +Thread.currentThread().getName()+ "  
> "+System.identityHashCode(deadWatchers)+"  " + System.currentTimeMillis());
> this is my debug info.
> )
> {code:java}
> public void addDeadWatcher(int watcherBit) {
>         // Wait if there are too many watchers waiting to be closed,
>         // this is will slow down the socket packet processing and
>         // the adding watches in the ZK pipeline.
>         while (maxInProcessingDeadWatchers > 0 && !stopped && 
> totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
>             try {
>                 RATE_LOGGER.rateLimitLog("Waiting for dead watchers 
> cleaning");
>                 long startTime = Time.currentElapsedTime();
>                 synchronized (processingCompletedEvent) {
>                     processingCompletedEvent.wait(100);
>                 }
>                 long latency = Time.currentElapsedTime() - startTime;
>                 
> ServerMetrics.getMetrics().ADD_DEAD_WATCHER_STALL_TIME.add(latency);
>             } catch (InterruptedException e) {
>                 LOG.info("Got interrupted while waiting for dead watches 
> queue size");
>                 break;
>             }
>         }
>         synchronized (this) {
>             
>             if (deadWatchers.add(watcherBit)) {
>                 totalDeadWatchers.incrementAndGet();
>                 ServerMetrics.getMetrics().DEAD_WATCHERS_QUEUED.add(1);
>                 if (deadWatchers.size() >= watcherCleanThreshold) {
>                     synchronized (cleanEvent) {
>                         cleanEvent.notifyAll();
>                     }
>                 }
>             }
>         }
>     }{code}
>  
> {code:java}
> @Override
>     public void run() {
>         while (!stopped) {
>             synchronized (cleanEvent) {
>                 try {
>                     // add some jitter to avoid cleaning dead watchers at the
>                     // same time in the quorum
>                     if (!stopped && deadWatchers.size() < 
> watcherCleanThreshold) {
>                         
>                         int maxWaitMs = (watcherCleanIntervalInSeconds
>                                          + 
> ThreadLocalRandom.current().nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 
> 1000;
>                         cleanEvent.wait(maxWaitMs);
>                     }
>                 } catch (InterruptedException e) {
>                     LOG.info("Received InterruptedException while waiting for 
> cleanEvent");
>                     break;
>                 }
>             }            if (deadWatchers.isEmpty()) {
>                 continue;
>             }            synchronized (this) {
>                 // Clean the dead watchers need to go through all the current
>                 // watches, which is pretty heavy and may take a second if
>                 // there are millions of watches, that's why we're doing 
> lazily
>                 // batch clean up in a separate thread with a snapshot of the
>                 // current dead watchers.
>                 final Set<Integer> snapshot = new HashSet<>(deadWatchers);
>                 deadWatchers.clear();
>                 int total = snapshot.size();
>                 LOG.info("Processing {} dead watchers", total);
>                 cleaners.schedule(new WorkRequest() {
>                     @Override
>                     public void doWork() throws Exception {
>                         long startTime = Time.currentElapsedTime();
>                         listener.processDeadWatchers(snapshot);
>                         long latency = Time.currentElapsedTime() - startTime;
>                         LOG.info("Takes {} to process {} watches", latency, 
> total);
>                         
> ServerMetrics.getMetrics().DEAD_WATCHERS_CLEANER_LATENCY.add(latency);
>                         
> ServerMetrics.getMetrics().DEAD_WATCHERS_CLEARED.add(total);
>                         totalDeadWatchers.addAndGet(-total);
>                         synchronized (processingCompletedEvent) {
>                             processingCompletedEvent.notifyAll();
>                         }
>                     }
>                 });
>             }
>         }
>         LOG.info("WatcherCleaner thread exited");
>     }{code}
> As we can see, the two methods visist deadWatchers Object by different 
> thread. *Thread in run()* is *read* operation on deadWachers and Thread in 
> addDeadWatcher is *write* operation on deadWachers. This causes a data race 
> without any lock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to