Author: aching
Date: Tue Aug 7 19:12:07 2012
New Revision: 1370438
URL: http://svn.apache.org/viewvc?rev=1370438&view=rev
Log:
GIRAPH-288: Bandwidth tracking - subset of GIRAPH-262. (aching)
Added:
giraph/trunk/src/main/java/org/apache/giraph/comm/ByteCounter.java
giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java
giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java
giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1370438&r1=1370437&r2=1370438&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Aug 7 19:12:07 2012
@@ -2,6 +2,8 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-288: Bandwidth tracking - subset of GIRAPH-262. (aching)
+
GIRAPH-289: Add thread and channel pooling to NettyClient and
NettyServer. (ekoontz via aching)
Added: giraph/trunk/src/main/java/org/apache/giraph/comm/ByteCounter.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ByteCounter.java?rev=1370438&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ByteCounter.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ByteCounter.java Tue Aug
7 19:12:07 2012
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.text.DecimalFormat;
+import java.util.concurrent.atomic.AtomicLong;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * Keep track of the bytes sent/received and provide some metrics when
+ * desired as part of the Netty Channel stack.
+ */
+public class ByteCounter extends SimpleChannelHandler {
+ /** Megabyte in bytes */
+ public static final double MEGABYTE = 1024f * 1024f;
+ /** Helper to format the doubles */
+ private static final DecimalFormat DOUBLE_FORMAT =
+ new DecimalFormat("#######.####");
+ /** All bytes ever sent */
+ private final AtomicLong bytesSent = new AtomicLong();
+ /** Total sent requests */
+ private final AtomicLong sentRequests = new AtomicLong();
+ /** All bytes ever received */
+ private final AtomicLong bytesReceived = new AtomicLong();
+ /** Total received requests */
+ private final AtomicLong receivedRequests = new AtomicLong();
+ /** Start time (for bandwidth calculation) */
+ private final AtomicLong startMsecs =
+ new AtomicLong(System.currentTimeMillis());
+
+ @Override
+ public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
+ throws Exception {
+ if (e instanceof MessageEvent &&
+ ((MessageEvent) e).getMessage() instanceof ChannelBuffer) {
+ ChannelBuffer b = (ChannelBuffer) ((MessageEvent) e).getMessage();
+ bytesReceived.addAndGet(b.readableBytes());
+ receivedRequests.incrementAndGet();
+ }
+
+ super.handleUpstream(ctx, e);
+ }
+
+ @Override
+ public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
+ throws Exception {
+ if (e instanceof MessageEvent &&
+ ((MessageEvent) e).getMessage() instanceof ChannelBuffer) {
+ ChannelBuffer b = (ChannelBuffer) ((MessageEvent) e).getMessage();
+ bytesSent.addAndGet(b.readableBytes());
+ sentRequests.incrementAndGet();
+ }
+
+ super.handleDownstream(ctx, e);
+ }
+
+ /**
+ * Reset all the bytes kept track of.
+ */
+ private void resetBytes() {
+ bytesSent.set(0);
+ sentRequests.set(0);
+ bytesReceived.set(0);
+ receivedRequests.set(0);
+ }
+
+ /**
+ * Reset the start msecs.
+ */
+ private void resetStartMsecs() {
+ startMsecs.set(System.currentTimeMillis());
+ }
+
+ /**
+ * Reset everything this object keeps track of
+ */
+ public void resetAll() {
+ resetBytes();
+ resetStartMsecs();
+ }
+
+ public long getBytesSent() {
+ return bytesSent.get();
+ }
+
+ public long getBytesReceived() {
+ return bytesReceived.get();
+ }
+
+ /**
+ * @return Mbytes sent / sec in the current interval
+ */
+ public double getMbytesPerSecSent() {
+ return bytesSent.get() * 1000f /
+ (1 + System.currentTimeMillis() - startMsecs.get()) / MEGABYTE;
+ }
+
+ /**
+ * @return Mbytes received / sec in the current interval
+ */
+ public double getMbytesPerSecReceived() {
+ return bytesReceived.get() * 1000f /
+ (1 + System.currentTimeMillis() - startMsecs.get()) / MEGABYTE;
+ }
+
+ /**
+ * @return A string containing all the metrics
+ */
+ public String getMetrics() {
+ double mBytesSent = bytesSent.get() / MEGABYTE;
+ double mBytesReceived = bytesReceived.get() / MEGABYTE;
+ long curSentRequests = sentRequests.get();
+ long curReceivedRequests = receivedRequests.get();
+ double mBytesSentPerReq =
+ (curSentRequests == 0) ? 0 : mBytesSent / curSentRequests;
+ double mBytesReceivedPerReq =
+ (curReceivedRequests == 0) ? 0 : mBytesReceived / curReceivedRequests;
+ return "MBytes/sec sent = " +
+ DOUBLE_FORMAT.format(getMbytesPerSecSent()) +
+ ", MBytes/sec received = " +
+ DOUBLE_FORMAT.format(getMbytesPerSecReceived()) +
+ ", MBytesSent = " + DOUBLE_FORMAT.format(mBytesSent) +
+ ", MBytesReceived = " + DOUBLE_FORMAT.format(mBytesReceived) +
+ ", ave sent request MBytes = " +
+ DOUBLE_FORMAT.format(mBytesSentPerReq) +
+ ", ave received request MBytes = " +
+ DOUBLE_FORMAT.format(mBytesReceivedPerReq) +
+ ", secs waited = " +
+ ((System.currentTimeMillis() - startMsecs.get()) / 1000f);
+ }
+
+ /**
+ * Get the metrics if a given window of time has passed. Return null
+ * otherwise. If the window is met, reset the metrics.
+ *
+ * @param minMsecsWindow Msecs of the minimum window
+ * @return Metrics or else null if the window wasn't met
+ */
+ public String getMetricsWindow(int minMsecsWindow) {
+ if (System.currentTimeMillis() - startMsecs.get() > minMsecsWindow) {
+ String metrics = getMetrics();
+ resetAll();
+ return metrics;
+ }
+
+ return null;
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java?rev=1370438&r1=1370437&r2=1370438&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java Tue Aug
7 19:12:07 2012
@@ -73,6 +73,8 @@ public class NettyClient<I extends Writa
Maps.newHashMap();
/** Number of channels per server */
private final int channelsPerServer;
+ /** Byte counter for this client */
+ private final ByteCounter byteCounter = new ByteCounter();
/** Send buffer size */
private final int sendBufferSize;
/** Receive buffer size */
@@ -107,6 +109,7 @@ public class NettyClient<I extends Writa
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
+ byteCounter,
new RequestEncoder(),
new ResponseClientHandler(waitingRequestCount));
}
@@ -209,6 +212,9 @@ public class NettyClient<I extends Writa
*/
public void sendWritableRequest(InetSocketAddress remoteServer,
WritableRequest<I, V, E, M> request) {
+ if (waitingRequestCount.get() == 0) {
+ byteCounter.resetAll();
+ }
waitingRequestCount.incrementAndGet();
Channel channel = addressChannelMap.get(remoteServer).nextChannel();
if (channel == null) {
@@ -226,6 +232,11 @@ public class NettyClient<I extends Writa
public void waitAllRequests() {
synchronized (waitingRequestCount) {
while (waitingRequestCount.get() != 0) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("waitAllRequests: Waiting interval of " +
+ WAITING_REQUEST_MSECS + " msecs and still waiting on " +
+ waitingRequestCount + " requests, " + byteCounter.getMetrics());
+ }
try {
waitingRequestCount.wait(WAITING_REQUEST_MSECS);
} catch (InterruptedException e) {
@@ -235,6 +246,10 @@ public class NettyClient<I extends Writa
context.progress();
}
}
+ if (LOG.isInfoEnabled()) {
+ LOG.info("waitAllRequests: Finished all requests. " +
+ byteCounter.getMetrics());
+ }
}
/**
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java?rev=1370438&r1=1370437&r2=1370438&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java Tue Aug
7 19:12:07 2012
@@ -82,6 +82,8 @@ public class NettyServer<I extends Writa
private final ServerData<I, V, E, M> serverData;
/** Server bootstrap */
private ServerBootstrap bootstrap;
+ /** Byte counter for this client */
+ private final ByteCounter byteCounter = new ByteCounter();
/** Send buffer size */
private final int sendBufferSize;
/** Receive buffer size */
@@ -142,8 +144,9 @@ public class NettyServer<I extends Writa
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
+ byteCounter,
new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
- new RequestDecoder<I, V, E, M>(conf, requestRegistry),
+ new RequestDecoder<I, V, E, M>(conf, requestRegistry, byteCounter),
new RequestServerHandler<I, V, E, M>(serverData));
}
});
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java?rev=1370438&r1=1370437&r2=1370438&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java Tue
Aug 7 19:12:07 2012
@@ -48,16 +48,21 @@ public class RequestDecoder<I extends Wr
private final Configuration conf;
/** Registry of requests */
private final RequestRegistry requestRegistry;
+ /** Byte counter to output */
+ private final ByteCounter byteCounter;
/**
* Constructor.
*
* @param conf Configuration
* @param requestRegistry Request registry
+ * @param byteCounter Keeps track of the decoded bytes
*/
- public RequestDecoder(Configuration conf, RequestRegistry requestRegistry) {
+ public RequestDecoder(Configuration conf, RequestRegistry requestRegistry,
+ ByteCounter byteCounter) {
this.conf = conf;
this.requestRegistry = requestRegistry;
+ this.byteCounter = byteCounter;
}
@Override
@@ -67,6 +72,14 @@ public class RequestDecoder<I extends Wr
throw new IllegalStateException("decode: Got illegal message " + msg);
}
+ // Output metrics every 1/2 minute
+ String metrics = byteCounter.getMetricsWindow(30000);
+ if (metrics != null) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("decode: Server window metrics " + metrics);
+ }
+ }
+
ChannelBuffer buffer = (ChannelBuffer) msg;
ChannelBufferInputStream inputStream = new
ChannelBufferInputStream(buffer);
int enumValue = inputStream.readByte();
Modified:
giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java?rev=1370438&r1=1370437&r2=1370438&view=diff
==============================================================================
---
giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java
(original)
+++
giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java
Tue Aug 7 19:12:07 2012
@@ -25,6 +25,7 @@ import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
@@ -93,4 +94,11 @@ public class ResponseClientHandler exten
}
}
}
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ throw new IllegalStateException("exceptionCaught: Channel failed with " +
+ "remote address " + ctx.getChannel().getRemoteAddress() + " with " +
+ "cause " + e.getCause());
+ }
}
Modified:
giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java?rev=1370438&r1=1370437&r2=1370438&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java
Tue Aug 7 19:12:07 2012
@@ -110,9 +110,11 @@ public class SendVertexRequest<I extends
Collection<Vertex<I, V, E, M>> vertexMap =
partitionVertexMap.get(partitionId);
if (vertexMap == null) {
- vertexMap = partitionVertexMap.putIfAbsent(partitionId, vertices);
+ final Collection<Vertex<I, V, E, M>> tmpVertices =
+ Lists.newArrayListWithCapacity(vertices.size());
+ vertexMap = partitionVertexMap.putIfAbsent(partitionId, tmpVertices);
if (vertexMap == null) {
- return;
+ vertexMap = tmpVertices;
}
}
synchronized (vertexMap) {
Modified:
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1370438&r1=1370437&r2=1370438&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
Tue Aug 7 19:12:07 2012
@@ -452,7 +452,7 @@ public class BspServiceWorker<I extends
vertexReader.getCurrentVertex();
if (readerVertex.getId() == null) {
throw new IllegalArgumentException(
- "loadVertices: Vertex reader returned a vertex " +
+ "readVerticesFromInputSplit: Vertex reader returned a vertex " +
"without an id! - " + readerVertex);
}
if (readerVertex.getValue() == null) {
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1370438&r1=1370437&r2=1370438&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Tue Aug
7 19:12:07 2012
@@ -26,6 +26,7 @@ import org.apache.giraph.graph.partition
import org.apache.giraph.graph.partition.PartitionStats;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.TimedLogger;
import org.apache.giraph.zk.ZooKeeperManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
@@ -589,6 +590,8 @@ public class GraphMapper<I extends Writa
}
partitionStatsList.clear();
+ TimedLogger partitionLogger = new TimedLogger(15000, LOG);
+ int completedPartitions = 0;
for (Partition<I, V, E, M> partition :
serviceWorker.getPartitionMap().values()) {
PartitionStats partitionStats =
@@ -633,6 +636,10 @@ public class GraphMapper<I extends Writa
}
partitionStatsList.add(partitionStats);
+ ++completedPartitions;
+ partitionLogger.info("map: Completed " + completedPartitions + " of " +
+ serviceWorker.getPartitionMap().size() + " partitions " +
+ MemoryUtils.getRuntimeMemoryStats());
}
} while (!serviceWorker.finishSuperstep(partitionStatsList));
if (LOG.isInfoEnabled()) {
Added: giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java?rev=1370438&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/TimedLogger.java Tue Aug
7 19:12:07 2012
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Print log messages only if the time is met. Thread-safe.
+ */
+public class TimedLogger {
+ /** Last time printed */
+ private volatile long lastPrint = System.currentTimeMillis();
+ /** Minimum interval of time to wait before printing */
+ private final int msecs;
+ /** Logger */
+ private final Logger log;
+
+ /**
+ * Constructor of the timed logger
+ *
+ * @param msecs Msecs to wait before printing again
+ * @param log Logger to print to
+ */
+ public TimedLogger(int msecs, Logger log) {
+ this.msecs = msecs;
+ this.log = log;
+ }
+
+ /**
+ * Print to the info log level if the minimum waiting time was reached.
+ *
+ * @param msg Message to print
+ */
+ public void info(String msg) {
+ if (System.currentTimeMillis() > lastPrint + msecs) {
+ log.info(msg);
+ lastPrint = System.currentTimeMillis();
+ }
+ }
+}
Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1370438&r1=1370437&r2=1370438&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Tue
Aug 7 19:12:07 2012
@@ -82,6 +82,7 @@ public class ConnectionTest {
@SuppressWarnings("rawtypes")
Context context = mock(Context.class);
when(context.getConfiguration()).thenReturn(conf);
+
ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
(SimpleMessageStore.newFactory(