BELUGABEHR commented on a change in pull request #876: ZOOKEEPER-3020: Review
of SyncRequestProcessor
URL: https://github.com/apache/zookeeper/pull/876#discussion_r273282443
##########
File path:
zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
##########
@@ -102,103 +108,96 @@ public void run() {
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
- int randRoll = r.nextInt(snapCount/2);
+ int randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2,
snapCount);
while (true) {
- Request si = null;
- if (toFlush.isEmpty()) {
+ Request si = queuedRequests.poll();
+ if (si == null) {
+ flush();
si = queuedRequests.take();
- } else {
- si = queuedRequests.poll();
- if (si == null) {
- flush(toFlush);
- continue;
- }
}
- if (si == requestOfDeath) {
+
+ if (si == REQUEST_OF_DEATH) {
break;
}
- if (si != null) {
- // track the number of records written to the log
- if (zks.getZKDatabase().append(si)) {
- logCount++;
- if (logCount > (snapCount / 2 + randRoll)) {
- randRoll = r.nextInt(snapCount/2);
- // roll the log
- zks.getZKDatabase().rollLog();
- // take a snapshot
- if (snapInProcess != null &&
snapInProcess.isAlive()) {
- LOG.warn("Too busy to snap, skipping");
- } else {
- snapInProcess = new ZooKeeperThread("Snapshot
Thread") {
- public void run() {
- try {
- zks.takeSnapshot();
- } catch(Exception e) {
- LOG.warn("Unexpected
exception", e);
- }
- }
- };
- snapInProcess.start();
- }
- logCount = 0;
- }
- } else if (toFlush.isEmpty()) {
- // optimization for read heavy workloads
- // iff this is a read, and there are no pending
- // flushes (writes), then just pass this to the next
- // processor
- if (nextProcessor != null) {
- nextProcessor.processRequest(si);
- if (nextProcessor instanceof Flushable) {
- ((Flushable)nextProcessor).flush();
- }
+
+ // track the number of records written to the log
+ if (zks.getZKDatabase().append(si)) {
+ logCount++;
+ if (logCount > randRoll) {
+ randRoll =
ThreadLocalRandom.current().nextInt(snapCount / 2, snapCount);
+ // roll the log
+ zks.getZKDatabase().rollLog();
+ // take a snapshot
+ if (!snapThreadMutex.tryAcquire()) {
+ LOG.warn("Too busy to snap, skipping");
+ } else {
+ new ZooKeeperThread("Snapshot Thread") {
+ public void run() {
+ try {
+ zks.takeSnapshot();
+ } catch (Exception e) {
+ LOG.warn("Unexpected exception", e);
+ } finally {
+ snapThreadMutex.release();
+ }
+ }
+ }.start();
}
- continue;
+ logCount = 0;
}
- toFlush.add(si);
- if (toFlush.size() > 1000) {
- flush(toFlush);
+ } else if (toFlush.isEmpty()) {
+ // optimization for read heavy workloads
+ // iff this is a read, and there are no pending
+ // flushes (writes), then just pass this to the next
+ // processor
+ if (nextProcessor != null) {
+ nextProcessor.processRequest(si);
+ if (nextProcessor instanceof Flushable) {
+ ((Flushable)nextProcessor).flush();
+ }
}
+ continue;
+ }
+ toFlush.add(si);
+ if (toFlush.size() == FLUSH_SIZE) {
Review comment:
The only things that adds an item to this collection is the thread itself,
and the 'add' exists immediately before the check against FLUSH_SIZE. There is
no risk of somehow jumping over the threshold. A 'greater than' check is
confusing to the reader because it implies that there might be some way to jump
the threshold.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services