lujie created ZOOKEEPER-4711:
--------------------------------
Summary: There is a data race bettween run() and "public void
addDeadWatcher(int watcherBit)" in
org.apache.zookeeper.server.watch.WatcherCleaner class when run
org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers junit test.
Key: ZOOKEEPER-4711
URL: https://issues.apache.org/jira/browse/ZOOKEEPER-4711
Project: ZooKeeper
Issue Type: Bug
Components: server
Affects Versions: 3.9.0
Environment: download zookeeper 3.9.0-SNAPSHOT from github repository
([https://github.com/apache/zookeeper)]
Then run : mvn test -Dmaven.test.failure.ignore=true
-Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers
-DfailIfNoTests=false -DredirectTestOutputToFile=false
Reporter: lujie
When we run :
mvn test -Dmaven.test.failure.ignore=true
-Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers
-DfailIfNoTests=false -DredirectTestOutputToFile=false
The method of addDeadWatcher
(
System.out.println("2s::" +Thread.currentThread().getName()+ "
"+System.identityHashCode(deadWatchers)+" " + System.currentTimeMillis());
this is my debug info.
)
{code:java}
public void addDeadWatcher(int watcherBit) {
// Wait if there are too many watchers waiting to be closed,
// this is will slow down the socket packet processing and
// the adding watches in the ZK pipeline.
while (maxInProcessingDeadWatchers > 0 && !stopped &&
totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
try {
RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning");
long startTime = Time.currentElapsedTime();
synchronized (processingCompletedEvent) {
processingCompletedEvent.wait(100);
}
long latency = Time.currentElapsedTime() - startTime;
ServerMetrics.getMetrics().ADD_DEAD_WATCHER_STALL_TIME.add(latency);
} catch (InterruptedException e) {
LOG.info("Got interrupted while waiting for dead watches queue
size");
break;
}
}
synchronized (this) {
if (deadWatchers.add(watcherBit)) {
totalDeadWatchers.incrementAndGet();
ServerMetrics.getMetrics().DEAD_WATCHERS_QUEUED.add(1);
if (deadWatchers.size() >= watcherCleanThreshold) {
synchronized (cleanEvent) {
cleanEvent.notifyAll();
}
}
}
}
}{code}
{code:java}
@Override
public void run() {
while (!stopped) {
synchronized (cleanEvent) {
try {
// add some jitter to avoid cleaning dead watchers at the
// same time in the quorum
if (!stopped && deadWatchers.size() <
watcherCleanThreshold) {
int maxWaitMs = (watcherCleanIntervalInSeconds
+
ThreadLocalRandom.current().nextInt(watcherCleanIntervalInSeconds / 2 + 1)) *
1000;
cleanEvent.wait(maxWaitMs);
}
} catch (InterruptedException e) {
LOG.info("Received InterruptedException while waiting for
cleanEvent");
break;
}
} if (deadWatchers.isEmpty()) {
continue;
} synchronized (this) {
// Clean the dead watchers need to go through all the current
// watches, which is pretty heavy and may take a second if
// there are millions of watches, that's why we're doing lazily
// batch clean up in a separate thread with a snapshot of the
// current dead watchers.
final Set<Integer> snapshot = new HashSet<>(deadWatchers);
deadWatchers.clear();
int total = snapshot.size();
LOG.info("Processing {} dead watchers", total);
cleaners.schedule(new WorkRequest() {
@Override
public void doWork() throws Exception {
long startTime = Time.currentElapsedTime();
listener.processDeadWatchers(snapshot);
long latency = Time.currentElapsedTime() - startTime;
LOG.info("Takes {} to process {} watches", latency,
total);
ServerMetrics.getMetrics().DEAD_WATCHERS_CLEANER_LATENCY.add(latency);
ServerMetrics.getMetrics().DEAD_WATCHERS_CLEARED.add(total);
totalDeadWatchers.addAndGet(-total);
synchronized (processingCompletedEvent) {
processingCompletedEvent.notifyAll();
}
}
});
}
}
LOG.info("WatcherCleaner thread exited");
}{code}
As we can see, the two methods visist deadWatchers Object by different thread.
*Thread in run()* is *read* operation on deadWachers and Thread in
addDeadWatcher is *write* operation on deadWachers. This causes a data race
without any lock.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)