Author: ereisman
Date: Mon Oct 1 21:45:57 2012
New Revision: 1392645
URL: http://svn.apache.org/viewvc?rev=1392645&view=rev
Log:
GIRAPH-353: Received metrics are not thread-safe (aching via ereisman)
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1392645&r1=1392644&r2=1392645&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Oct 1 21:45:57 2012
@@ -1,6 +1,7 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-353: Received metrics are not thread-safe (aching via ereisman)
GIRAPH-326: Writing input splits to ZooKeeper in parallel (maja)
Modified:
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java?rev=1392645&r1=1392644&r2=1392645&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java
Mon Oct 1 21:45:57 2012
@@ -21,6 +21,8 @@ package org.apache.giraph.comm.netty;
import java.text.DecimalFormat;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
@@ -40,6 +42,8 @@ public class ByteCounter extends SimpleC
/** Class logger */
private static final Logger LOG =
Logger.getLogger(ByteCounter.class);
+ /** Class timer */
+ private static final Time TIME = SystemTime.getInstance();
/** All bytes ever sent */
private final AtomicLong bytesSent = new AtomicLong();
/** Total sent requests */
@@ -49,8 +53,9 @@ public class ByteCounter extends SimpleC
/** Total received requests */
private final AtomicLong receivedRequests = new AtomicLong();
/** Start time (for bandwidth calculation) */
- private final AtomicLong startMsecs =
- new AtomicLong(System.currentTimeMillis());
+ private final AtomicLong startMsecs = new AtomicLong(TIME.getMilliseconds());
+ /** Last updated msecs for getMetricsWindow */
+ private final AtomicLong metricsWindowLastUpdatedMsecs = new AtomicLong();
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
@@ -100,7 +105,7 @@ public class ByteCounter extends SimpleC
* Reset the start msecs.
*/
private void resetStartMsecs() {
- startMsecs.set(System.currentTimeMillis());
+ startMsecs.set(TIME.getMilliseconds());
}
/**
@@ -124,7 +129,7 @@ public class ByteCounter extends SimpleC
*/
public double getMbytesPerSecSent() {
return bytesSent.get() * 1000f /
- (1 + System.currentTimeMillis() - startMsecs.get()) / MEGABYTE;
+ (1 + TIME.getMilliseconds() - startMsecs.get()) / MEGABYTE;
}
/**
@@ -132,7 +137,7 @@ public class ByteCounter extends SimpleC
*/
public double getMbytesPerSecReceived() {
return bytesReceived.get() * 1000f /
- (1 + System.currentTimeMillis() - startMsecs.get()) / MEGABYTE;
+ (1 + TIME.getMilliseconds() - startMsecs.get()) / MEGABYTE;
}
/**
@@ -158,7 +163,7 @@ public class ByteCounter extends SimpleC
", ave received req MBytes = " +
DOUBLE_FORMAT.format(mBytesReceivedPerReq) +
", secs waited = " +
- ((System.currentTimeMillis() - startMsecs.get()) / 1000f);
+ ((TIME.getMilliseconds() - startMsecs.get()) / 1000f);
}
/**
@@ -169,10 +174,16 @@ public class ByteCounter extends SimpleC
* @return Metrics or else null if the window wasn't met
*/
public String getMetricsWindow(int minMsecsWindow) {
- if (System.currentTimeMillis() - startMsecs.get() > minMsecsWindow) {
- String metrics = getMetrics();
- resetAll();
- return metrics;
+ long lastUpdatedMsecs = metricsWindowLastUpdatedMsecs.get();
+ long curMsecs = TIME.getMilliseconds();
+ if (curMsecs - lastUpdatedMsecs > minMsecsWindow) {
+ // Make sure that only one thread does this update
+ if (metricsWindowLastUpdatedMsecs.compareAndSet(lastUpdatedMsecs,
+ curMsecs)) {
+ String metrics = getMetrics();
+ resetAll();
+ return metrics;
+ }
}
return null;