Author: ereisman
Date: Mon Oct  1 21:45:57 2012
New Revision: 1392645

URL: http://svn.apache.org/viewvc?rev=1392645&view=rev
Log:
GIRAPH-353: Received metrics are not thread-safe (aching via ereisman)

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java

Modified: giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1392645&r1=1392644&r2=1392645&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Oct  1 21:45:57 2012
@@ -1,6 +1,7 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-353: Received metrics are not thread-safe (aching via ereisman)
 
   GIRAPH-326: Writing input splits to ZooKeeper in parallel (maja)
 

Modified: 
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java?rev=1392645&r1=1392644&r2=1392645&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java 
Mon Oct  1 21:45:57 2012
@@ -21,6 +21,8 @@ package org.apache.giraph.comm.netty;
 import java.text.DecimalFormat;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.log4j.Logger;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.ChannelEvent;
 import org.jboss.netty.channel.ChannelHandlerContext;
@@ -40,6 +42,8 @@ public class ByteCounter extends SimpleC
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(ByteCounter.class);
+  /** Class timer */
+  private static final Time TIME = SystemTime.getInstance();
   /** All bytes ever sent */
   private final AtomicLong bytesSent = new AtomicLong();
   /** Total sent requests */
@@ -49,8 +53,9 @@ public class ByteCounter extends SimpleC
   /** Total received requests */
   private final AtomicLong receivedRequests = new AtomicLong();
   /** Start time (for bandwidth calculation) */
-  private final AtomicLong startMsecs =
-      new AtomicLong(System.currentTimeMillis());
+  private final AtomicLong startMsecs = new AtomicLong(TIME.getMilliseconds());
+  /** Last updated msecs for getMetricsWindow */
+  private final AtomicLong metricsWindowLastUpdatedMsecs = new AtomicLong();
 
   @Override
   public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
@@ -100,7 +105,7 @@ public class ByteCounter extends SimpleC
    * Reset the start msecs.
    */
   private void resetStartMsecs() {
-    startMsecs.set(System.currentTimeMillis());
+    startMsecs.set(TIME.getMilliseconds());
   }
 
   /**
@@ -124,7 +129,7 @@ public class ByteCounter extends SimpleC
    */
   public double getMbytesPerSecSent() {
     return bytesSent.get() * 1000f /
-        (1 + System.currentTimeMillis() - startMsecs.get()) / MEGABYTE;
+        (1 + TIME.getMilliseconds() - startMsecs.get()) / MEGABYTE;
   }
 
   /**
@@ -132,7 +137,7 @@ public class ByteCounter extends SimpleC
    */
   public double getMbytesPerSecReceived() {
     return bytesReceived.get() * 1000f /
-        (1 + System.currentTimeMillis() - startMsecs.get()) / MEGABYTE;
+        (1 + TIME.getMilliseconds() - startMsecs.get()) / MEGABYTE;
   }
 
   /**
@@ -158,7 +163,7 @@ public class ByteCounter extends SimpleC
         ", ave received req MBytes = " +
         DOUBLE_FORMAT.format(mBytesReceivedPerReq) +
         ", secs waited = " +
-        ((System.currentTimeMillis() - startMsecs.get()) / 1000f);
+        ((TIME.getMilliseconds() - startMsecs.get()) / 1000f);
   }
 
   /**
@@ -169,10 +174,16 @@ public class ByteCounter extends SimpleC
    * @return Metrics or else null if the window wasn't met
    */
   public String getMetricsWindow(int minMsecsWindow) {
-    if (System.currentTimeMillis() - startMsecs.get() > minMsecsWindow) {
-      String metrics = getMetrics();
-      resetAll();
-      return metrics;
+    long lastUpdatedMsecs =  metricsWindowLastUpdatedMsecs.get();
+    long curMsecs = TIME.getMilliseconds();
+    if (curMsecs - lastUpdatedMsecs > minMsecsWindow) {
+      // Make sure that only one thread does this update
+      if (metricsWindowLastUpdatedMsecs.compareAndSet(lastUpdatedMsecs,
+          curMsecs)) {
+        String metrics = getMetrics();
+        resetAll();
+        return metrics;
+      }
     }
 
     return null;


Reply via email to