Copied: giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ValueWithHostname.java (from r1411478, giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ValueGauge.java) URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ValueWithHostname.java?p2=giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ValueWithHostname.java&p1=giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ValueGauge.java&r1=1411478&r2=1411493&rev=1411493&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ValueGauge.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/ValueWithHostname.java Tue Nov 20 01:07:49 2012 @@ -18,58 +18,55 @@ package org.apache.giraph.metrics; -import com.yammer.metrics.core.Gauge; - /** - * A Gauge that holds a value. - * - * @param <T> type of value being held. + * Pair of value with host it came from. */ -public class ValueGauge<T extends Number> extends Gauge<T> { - /** value held by this class */ - private T value; +class ValueWithHostname { + /** long value we're holding */ + private long value; + /** host associated with value */ + private String hostname; /** - * Constructor that registers Gauge in MetricsRegistry. + * Create with initial value * - * @param registry GiraphMetricsRegistry to use. - * @param group MetricGroup for Gauge. - * @param name String name of Gauge. + * @param value long initial value to use */ - public ValueGauge(GiraphMetricsRegistry registry, MetricGroup group, - String name) { - registry.getGauge(group, name, this); + public ValueWithHostname(long value) { + this.value = value; + this.hostname = null; } - @Override - public T value() { + /** + * @return long value + */ + public long getValue() { return value; } /** - * Get double representation of value held. - * - * @return double value + * @return String hostname */ - public double getDouble() { - return value.doubleValue(); + public String getHostname() { + return hostname; } /** - * Get long representation of value held. + * Check if there is any hostname. Used as a flag that we have any data. * - * @return long value + * @return true if hostname is set */ - public long getLong() { - return value.longValue(); + public boolean hasHostname() { + return hostname != null; } /** - * Set value held by this object. - * - * @param value value to set. + * Set value and partition together. + * @param value long value to use. + * @param hostname String host it came from. */ - public void set(T value) { + public void set(long value, String hostname) { this.value = value; + this.hostname = hostname; } }
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java?rev=1411493&view=auto ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java (added) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java Tue Nov 20 01:07:49 2012 @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.metrics; + +import org.apache.giraph.graph.BspServiceWorker; +import org.apache.giraph.graph.ComputeCallable; +import org.apache.giraph.graph.GraphMapper; +import org.apache.hadoop.io.Writable; + +import com.yammer.metrics.core.Gauge; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.PrintStream; +import java.util.concurrent.TimeUnit; + +/** + * Per-superstep metrics for a Worker. + */ +public class WorkerSuperstepMetrics implements Writable { + /** Total network communication time */ + private LongAndTimeUnit commTimer; + /** Time for all compute calls to complete */ + private LongAndTimeUnit computeAllTimer; + /** Time till first message gets flushed */ + private LongAndTimeUnit timeToFirstMsg; + /** Total superstep time */ + private LongAndTimeUnit superstepTimer; + /** Time spent waiting for other workers to finish */ + private LongAndTimeUnit waitRequestsTimer; + /** Time spent in Vertex#compute */ + private LongAndTimeUnit userComputeTime; + + /** + * Constructor + */ + public WorkerSuperstepMetrics() { + commTimer = new LongAndTimeUnit(); + computeAllTimer = new LongAndTimeUnit(); + timeToFirstMsg = new LongAndTimeUnit(); + superstepTimer = new LongAndTimeUnit(); + waitRequestsTimer = new LongAndTimeUnit(); + + // Note this one is not backed by a GiraphTimer, but rather a real Timer + userComputeTime = new LongAndTimeUnit(); + userComputeTime.setTimeUnit(TimeUnit.MILLISECONDS); + } + + /** + * Read metric values from global MetricsRegistry. + * + * @return this object, for chaining + */ + public WorkerSuperstepMetrics readFromRegistry() { + SuperstepMetricsRegistry ssm = GiraphMetrics.getInstance().perSuperstep(); + readGiraphTimer(GraphMapper.TIMER_COMMUNICATION_TIME, commTimer); + readGiraphTimer(GraphMapper.TIMER_COMPUTE_ALL, computeAllTimer); + readGiraphTimer(GraphMapper.TIMER_TIME_TO_FIRST_MSG, timeToFirstMsg); + readGiraphTimer(GraphMapper.TIMER_SUPERSTEP_TIME, superstepTimer); + readGiraphTimer(BspServiceWorker.TIMER_WAIT_REQUESTS, waitRequestsTimer); + userComputeTime.setValue((long) ssm.getTimer( + ComputeCallable.TIMER_COMPUTE_ONE).sum()); + return this; + } + + /** + * Read data from GiraphTimer into a LongAndTimeUnit. + * + * @param name String name of Gauge to retrieve. + * @param data LongAndTimeUnit to read data into. + */ + private void readGiraphTimer(String name, LongAndTimeUnit data) { + Gauge<Long> gauge = GiraphMetrics.getInstance().perSuperstep(). + getExistingGauge(name); + if (gauge instanceof GiraphTimer) { + GiraphTimer giraphTimer = (GiraphTimer) gauge; + data.setTimeUnit(giraphTimer.getTimeUnit()); + data.setValue(giraphTimer.value()); + } else if (gauge != null) { + throw new IllegalStateException(name + " is not a GiraphTimer"); + } + } + + /** + * Human readable dump of metrics stored here. + * + * @param superstep long number of superstep. + * @param out PrintStream to write to. + * @return this object, for chaining + */ + public WorkerSuperstepMetrics print(long superstep, PrintStream out) { + out.println(); + out.println("--- METRICS: superstep " + superstep + " ---"); + out.println(" superstep time: " + superstepTimer); + out.println(" compute all partitions: " + computeAllTimer); + out.println(" user compute time: " + userComputeTime); + out.println(" network communication time: " + commTimer); + out.println(" time to first message: " + timeToFirstMsg); + out.println(" wait on requests time: " + waitRequestsTimer); + return this; + } + + /** + * @return Communication timer + */ + public long getCommTimer() { + return commTimer.getValue(); + } + + /** + * @return Total compute timer + */ + public long getComputeAllTimer() { + return computeAllTimer.getValue(); + } + + /** + * @return timer between start time and first message flushed. + */ + public long getTimeToFirstMsg() { + return timeToFirstMsg.getValue(); + } + + /** + * @return timer for superstep time + */ + public long getSuperstepTimer() { + return superstepTimer.getValue(); + } + + /** + * @return timer waiting for other workers + */ + public long getWaitRequestsTimer() { + return waitRequestsTimer.getValue(); + } + + /** + * @return milliseconds in user compute code + */ + public long getUserComputeTime() { + return userComputeTime.getValue(); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + commTimer.setValue(dataInput.readLong()); + computeAllTimer.setValue(dataInput.readLong()); + timeToFirstMsg.setValue(dataInput.readLong()); + superstepTimer.setValue(dataInput.readLong()); + waitRequestsTimer.setValue(dataInput.readLong()); + userComputeTime.setValue(dataInput.readLong()); + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeLong(commTimer.getValue()); + dataOutput.writeLong(computeAllTimer.getValue()); + dataOutput.writeLong(timeToFirstMsg.getValue()); + dataOutput.writeLong(superstepTimer.getValue()); + dataOutput.writeLong(waitRequestsTimer.getValue()); + dataOutput.writeLong(userComputeTime.getValue()); + } +} Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessageCollection.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessageCollection.java?rev=1411493&r1=1411492&r2=1411493&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessageCollection.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessageCollection.java Tue Nov 20 01:07:49 2012 @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.giraph.utils; import java.io.DataInput; Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FakeTime.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FakeTime.java?rev=1411493&r1=1411492&r2=1411493&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FakeTime.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FakeTime.java Tue Nov 20 01:07:49 2012 @@ -35,6 +35,11 @@ public class FakeTime implements Time { } @Override + public long getMicroseconds() { + return nanosecondsSinceEpoch.get() / NS_PER_US; + } + + @Override public long getNanoseconds() { return nanosecondsSinceEpoch.get(); } Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/MemoryUtils.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/MemoryUtils.java?rev=1411493&r1=1411492&r2=1411493&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/MemoryUtils.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/MemoryUtils.java Tue Nov 20 01:07:49 2012 @@ -21,7 +21,6 @@ package org.apache.giraph.utils; import com.yammer.metrics.util.PercentGauge; import org.apache.giraph.metrics.GiraphMetrics; import org.apache.giraph.metrics.GiraphMetricsRegistry; -import org.apache.giraph.metrics.MetricGroup; /** * Helper static methods for tracking memory usage. @@ -68,8 +67,7 @@ public class MemoryUtils { */ public static void initMetrics() { GiraphMetricsRegistry metrics = GiraphMetrics.getInstance().perJob(); - metrics.getGauge(MetricGroup.SYSTEM, "memory-free-pct", - new PercentGauge() { + metrics.getGauge("memory-free-pct", new PercentGauge() { @Override protected double getNumerator() { return freeMemoryMB(); Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/SystemTime.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/SystemTime.java?rev=1411493&r1=1411492&r2=1411493&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/SystemTime.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/SystemTime.java Tue Nov 20 01:07:49 2012 @@ -36,6 +36,11 @@ public class SystemTime implements Time } @Override + public long getMicroseconds() { + return getNanoseconds() / NS_PER_US; + } + + @Override public long getNanoseconds() { return System.nanoTime(); } @@ -60,7 +65,7 @@ public class SystemTime implements Time * * @return Instance of this object */ - public static Time getInstance() { + public static Time get() { return SINGLE_TIME; } } Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java?rev=1411493&r1=1411492&r2=1411493&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java Tue Nov 20 01:07:49 2012 @@ -53,7 +53,6 @@ public interface Time { long MS_PER_DAY = SECONDS_PER_DAY * MS_PER_SECOND; /** - * * Get the current milliseconds * * @return The difference, measured in milliseconds, between @@ -62,6 +61,14 @@ public interface Time { long getMilliseconds(); /** + * Get the current microseconds + * + * @return The difference, measured in microseconds, between + * the current time and midnight, January 1, 1970 UTC. + */ + long getMicroseconds(); + + /** * Get the current nanoseconds * * @return The difference, measured in nanoseconds, between Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Times.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Times.java?rev=1411493&r1=1411492&r2=1411493&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Times.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Times.java Tue Nov 20 01:07:49 2012 @@ -18,6 +18,8 @@ package org.apache.giraph.utils; +import java.util.concurrent.TimeUnit; + /** * Utility methods for Time classes. */ @@ -26,6 +28,30 @@ public class Times { private Times() { } /** + * Convenience method to measure time in a given TimeUnit. + * + * @param time Time instance to use + * @param timeUnit TimeUnit to measure in + * @return long measured time in TimeUnit dimension + */ + public static long get(Time time, TimeUnit timeUnit) { + return timeUnit.convert(time.getNanoseconds(), TimeUnit.NANOSECONDS); + } + + /** + * Convenience method to get time since the beginning of an event in a given + * TimeUnit. + * + * @param time Time object used for measuring. + * @param timeUnit TimeUnit to use for dimension. + * @param startTime beginning time to diff against + * @return time elapsed since startTime in TimeUnit dimension. + */ + public static long getDiff(Time time, TimeUnit timeUnit, long startTime) { + return get(time, timeUnit) - startTime; + } + + /** * Convenience method to get milliseconds since a previous milliseconds * point. * @@ -51,6 +77,17 @@ public class Times { } /** + * Convenience method to get microseconds since a previous microseconds point. + * + * @param time Time instance to use + * @param previousMicros Previous microseconds + * @return Microseconds elapsed since the previous microseconds + */ + public static long getMicrosSince(Time time, long previousMicros) { + return time.getMicroseconds() - previousMicros; + } + + /** * Convenience method to get nanoseconds since a previous nanoseconds * point. * Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java?rev=1411493&r1=1411492&r2=1411493&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java Tue Nov 20 01:07:49 2012 @@ -18,14 +18,6 @@ package org.apache.giraph.utils; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import org.apache.giraph.zk.ZooKeeperExt; import org.apache.giraph.zk.ZooKeeperExt.PathStat; import org.apache.hadoop.conf.Configuration; @@ -36,6 +28,15 @@ import org.apache.zookeeper.KeeperExcept import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + /** * Helper static methods for working with Writable objects. */ Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/PredicateLock.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/PredicateLock.java?rev=1411493&r1=1411492&r2=1411493&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/PredicateLock.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/PredicateLock.java Tue Nov 20 01:07:49 2012 @@ -56,7 +56,7 @@ public class PredicateLock implements Bs * @param progressable used to report progress() (usually a Mapper.Context) */ public PredicateLock(Progressable progressable) { - this(progressable, DEFAULT_MSEC_PERIOD, SystemTime.getInstance()); + this(progressable, DEFAULT_MSEC_PERIOD, SystemTime.get()); } /** Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestMutableVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestMutableVertex.java?rev=1411493&r1=1411492&r2=1411493&view=diff ============================================================================== --- giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestMutableVertex.java (original) +++ giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestMutableVertex.java Tue Nov 20 01:07:49 2012 @@ -293,9 +293,9 @@ public class TestMutableVertex { long serializeNanos = 0; byte[] byteArray = null; for (int i = 0; i < REPS; ++i) { - serializeNanosStart = SystemTime.getInstance().getNanoseconds(); + serializeNanosStart = SystemTime.get().getNanoseconds(); byteArray = WritableUtils.writeToByteArray(vertex); - serializeNanos += Times.getNanosecondsSince(SystemTime.getInstance(), + serializeNanos += Times.getNanosecondsSince(SystemTime.get(), serializeNanosStart); } serializeNanos /= REPS; @@ -312,9 +312,9 @@ public class TestMutableVertex { long deserializeNanosStart = 0; long deserializeNanos = 0; for (int i = 0; i < REPS; ++i) { - deserializeNanosStart = SystemTime.getInstance().getNanoseconds(); + deserializeNanosStart = SystemTime.get().getNanoseconds(); WritableUtils.readFieldsFromByteArray(byteArray, readVertex); - deserializeNanos += Times.getNanosecondsSince(SystemTime.getInstance(), + deserializeNanos += Times.getNanosecondsSince(SystemTime.get(), deserializeNanosStart); } deserializeNanos /= REPS; @@ -349,11 +349,11 @@ public class TestMutableVertex { DynamicChannelBufferOutputStream outputStream = null; for (int i = 0; i < REPS; ++i) { - serializeNanosStart = SystemTime.getInstance().getNanoseconds(); + serializeNanosStart = SystemTime.get().getNanoseconds(); outputStream = new DynamicChannelBufferOutputStream(32); vertex.write(outputStream); - serializeNanos += Times.getNanosecondsSince(SystemTime.getInstance(), + serializeNanos += Times.getNanosecondsSince(SystemTime.get(), serializeNanosStart); } serializeNanos /= REPS; @@ -373,12 +373,12 @@ public class TestMutableVertex { long deserializeNanosStart = 0; long deserializeNanos = 0; for (int i = 0; i < REPS; ++i) { - deserializeNanosStart = SystemTime.getInstance().getNanoseconds(); + deserializeNanosStart = SystemTime.get().getNanoseconds(); DynamicChannelBufferInputStream inputStream = new DynamicChannelBufferInputStream( outputStream.getDynamicChannelBuffer()); readVertex.readFields(inputStream); - deserializeNanos += Times.getNanosecondsSince(SystemTime.getInstance(), + deserializeNanos += Times.getNanosecondsSince(SystemTime.get(), deserializeNanosStart); outputStream.getDynamicChannelBuffer().readerIndex(0); } @@ -417,11 +417,11 @@ public class TestMutableVertex { UnsafeByteArrayOutputStream outputStream = null; for (int i = 0; i < REPS; ++i) { - serializeNanosStart = SystemTime.getInstance().getNanoseconds(); + serializeNanosStart = SystemTime.get().getNanoseconds(); outputStream = new UnsafeByteArrayOutputStream(32); vertex.write(outputStream); - serializeNanos += Times.getNanosecondsSince(SystemTime.getInstance(), + serializeNanos += Times.getNanosecondsSince(SystemTime.get(), serializeNanosStart); } serializeNanos /= REPS; @@ -441,12 +441,12 @@ public class TestMutableVertex { long deserializeNanosStart = 0; long deserializeNanos = 0; for (int i = 0; i < REPS; ++i) { - deserializeNanosStart = SystemTime.getInstance().getNanoseconds(); + deserializeNanosStart = SystemTime.get().getNanoseconds(); UnsafeByteArrayInputStream inputStream = new UnsafeByteArrayInputStream( outputStream.getByteArray(), 0, outputStream.getPos()); readVertex.readFields(inputStream); - deserializeNanos += Times.getNanosecondsSince(SystemTime.getInstance(), + deserializeNanos += Times.getNanosecondsSince(SystemTime.get(), deserializeNanosStart); } deserializeNanos /= REPS; Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/BspUtilsTest.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/BspUtilsTest.java?rev=1411493&r1=1411492&r2=1411493&view=diff ============================================================================== --- giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/BspUtilsTest.java (original) +++ giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/BspUtilsTest.java Tue Nov 20 01:07:49 2012 @@ -46,7 +46,7 @@ import static org.junit.Assert.assertEqu public class BspUtilsTest { @Rule public TestName name = new TestName(); - private static final Time TIME = SystemTime.getInstance(); + private static final Time TIME = SystemTime.get(); private static final long COUNT = 200000; private Configuration conf = new Configuration(); private long startNanos = -1;
