BELUGABEHR commented on a change in pull request #876: ZOOKEEPER-3020: Review 
of SyncRequestProcessor
URL: https://github.com/apache/zookeeper/pull/876#discussion_r271280540
 
 

 ##########
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
 ##########
 @@ -102,103 +108,96 @@ public void run() {
 
             // we do this in an attempt to ensure that not all of the servers
             // in the ensemble take a snapshot at the same time
-            int randRoll = r.nextInt(snapCount/2);
+            int randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2, 
snapCount);
             while (true) {
-                Request si = null;
-                if (toFlush.isEmpty()) {
+                Request si = queuedRequests.poll();
+                if (si == null) {
+                    flush();
                     si = queuedRequests.take();
-                } else {
-                    si = queuedRequests.poll();
-                    if (si == null) {
-                        flush(toFlush);
-                        continue;
-                    }
                 }
-                if (si == requestOfDeath) {
+  
+                if (si == REQUEST_OF_DEATH) {
                     break;
                 }
-                if (si != null) {
-                    // track the number of records written to the log
-                    if (zks.getZKDatabase().append(si)) {
-                        logCount++;
-                        if (logCount > (snapCount / 2 + randRoll)) {
-                            randRoll = r.nextInt(snapCount/2);
-                            // roll the log
-                            zks.getZKDatabase().rollLog();
-                            // take a snapshot
-                            if (snapInProcess != null && 
snapInProcess.isAlive()) {
-                                LOG.warn("Too busy to snap, skipping");
-                            } else {
-                                snapInProcess = new ZooKeeperThread("Snapshot 
Thread") {
-                                        public void run() {
-                                            try {
-                                                zks.takeSnapshot();
-                                            } catch(Exception e) {
-                                                LOG.warn("Unexpected 
exception", e);
-                                            }
-                                        }
-                                    };
-                                snapInProcess.start();
-                            }
-                            logCount = 0;
-                        }
-                    } else if (toFlush.isEmpty()) {
-                        // optimization for read heavy workloads
-                        // iff this is a read, and there are no pending
-                        // flushes (writes), then just pass this to the next
-                        // processor
-                        if (nextProcessor != null) {
-                            nextProcessor.processRequest(si);
-                            if (nextProcessor instanceof Flushable) {
-                                ((Flushable)nextProcessor).flush();
-                            }
+
+                // track the number of records written to the log
+                if (zks.getZKDatabase().append(si)) {
+                    logCount++;
+                    if (logCount > randRoll) {
+                        randRoll = 
ThreadLocalRandom.current().nextInt(snapCount / 2, snapCount);
+                        // roll the log
+                        zks.getZKDatabase().rollLog();
+                        // take a snapshot
+                        if (!snapThreadMutex.tryAcquire()) {
+                            LOG.warn("Too busy to snap, skipping");
+                        } else {
+                            new ZooKeeperThread("Snapshot Thread") {
+                                public void run() {
+                                    try {
+                                        zks.takeSnapshot();
+                                    } catch (Exception e) {
+                                        LOG.warn("Unexpected exception", e);
+                                    } finally {
+                                      snapThreadMutex.release();
+                                    }
+                                }
+                            }.start();
                         }
-                        continue;
+                        logCount = 0;
                     }
-                    toFlush.add(si);
-                    if (toFlush.size() > 1000) {
-                        flush(toFlush);
+                } else if (toFlush.isEmpty()) {
+                    // optimization for read heavy workloads
+                    // iff this is a read, and there are no pending
+                    // flushes (writes), then just pass this to the next
+                    // processor
+                    if (nextProcessor != null) {
+                        nextProcessor.processRequest(si);
+                        if (nextProcessor instanceof Flushable) {
+                            ((Flushable)nextProcessor).flush();
+                        }
                     }
+                    continue;
+                }
+                toFlush.add(si);
+                if (toFlush.size() == FLUSH_SIZE) {
+                    flush();
                 }
             }
         } catch (Throwable t) {
             handleException(this.getName(), t);
-        } finally{
-            running = false;
         }
         LOG.info("SyncRequestProcessor exited!");
     }
 
-    private void flush(LinkedList<Request> toFlush)
-        throws IOException, RequestProcessorException
-    {
-        if (toFlush.isEmpty())
-            return;
-
-        zks.getZKDatabase().commit();
-        while (!toFlush.isEmpty()) {
-            Request i = toFlush.remove();
-            if (nextProcessor != null) {
-                nextProcessor.processRequest(i);
-            }
-        }
-        if (nextProcessor != null && nextProcessor instanceof Flushable) {
-            ((Flushable)nextProcessor).flush();
-        }
+    private void flush() throws IOException, RequestProcessorException {
+      if (this.toFlush.isEmpty()) {
+          return;
+      }
+
+      zks.getZKDatabase().commit();
+
+      if (this.nextProcessor == null) {
+        this.toFlush.clear();
+      } else {
+          while (!this.toFlush.isEmpty()) {
+              final Request i = this.toFlush.remove();
+              this.nextProcessor.processRequest(i);
+          }
+          if (this.nextProcessor instanceof Flushable) {
+              ((Flushable)this.nextProcessor).flush();
+          } 
+      }
     }
 
     public void shutdown() {
         LOG.info("Shutting down");
-        queuedRequests.add(requestOfDeath);
+        queuedRequests.add(REQUEST_OF_DEATH);
         try {
-            if(running){
-                this.join();
-            }
-            if (!toFlush.isEmpty()) {
-                flush(toFlush);
-            }
-        } catch(InterruptedException e) {
+            this.join();
+            this.flush();
+        } catch (InterruptedException e) {
             LOG.warn("Interrupted while wating for " + this + " to finish");
 
 Review comment:
   @anmolnar Well, if you can believe it, lol, I am trying to touch less where 
I can in order to make a review easier.  Also, using SLF4J parameter-ized 
logging is the fastest way to do logging *when logging is disabled*.  It would 
be very unlikely that a WARN message is disabled so it's actually faster to 
just keep it as is.  If I was writing this code from scratch, I would use a 
parameter logging message here for consistency, but there's no strong argument 
to change it now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to