Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java?rev=1399090&r1=1399089&r2=1399090&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java Wed Oct 17 04:33:16 2012 @@ -34,6 +34,7 @@ import java.util.Set; /** * Mutable vertex with no edge values. + * * @param <I> Vertex id * @param <V> Vertex data * @param <M> Message data @@ -96,7 +97,7 @@ public abstract class SimpleMutableVerte * @param sourceVertexId Source vertex id of edge */ public void addEdgeRequest(I sourceVertexId) throws IOException { - getGraphState().getWorkerCommunications(). + getGraphState().getWorkerClientRequestProcessor(). addEdgeRequest(sourceVertexId, new Edge<I, NullWritable>(sourceVertexId, NullWritable.get())); }
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1399090&r1=1399089&r2=1399090&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java Wed Oct 17 04:33:16 2012 @@ -20,7 +20,6 @@ package org.apache.giraph.graph; import org.apache.giraph.ImmutableClassesGiraphConfigurable; import org.apache.giraph.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.comm.WorkerClientServer; import org.apache.giraph.graph.partition.PartitionOwner; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -221,11 +220,13 @@ public abstract class Vertex<I extends W throw new IllegalArgumentException( "sendMessage: Cannot send null message to " + id); } - getWorkerCommunications().sendMessageRequest(id, message); + getGraphState().getWorkerClientRequestProcessor(). + sendMessageRequest(id, message); } /** * Lookup WorkerInfo for myself. + * * @return WorkerInfo about worker holding this Vertex. */ public WorkerInfo getMyWorkerInfo() { @@ -234,6 +235,7 @@ public abstract class Vertex<I extends W /** * Lookup WorkerInfo for a Vertex. + * * @param vertexId VertexId to lookup * @return WorkerInfo about worker holding this Vertex. */ @@ -243,19 +245,13 @@ public abstract class Vertex<I extends W /** * Lookup PartitionOwner for a Vertex + * * @param vertexId id of Vertex to look up. * @return PartitionOwner holding Vertex */ private PartitionOwner getVertexPartitionOwner(I vertexId) { - return getWorkerCommunications().getVertexPartitionOwner(vertexId); - } - - /** - * Get WorkerClientServer used to communicate with other servers. - * @return WorkerClientServer used. - */ - private WorkerClientServer<I, V, E, M> getWorkerCommunications() { - return getGraphState().getWorkerCommunications(); + return getGraphState().getWorkerClientRequestProcessor(). + getVertexPartitionOwner(vertexId); } /** Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java?rev=1399090&r1=1399089&r2=1399090&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java Wed Oct 17 04:33:16 2012 @@ -18,6 +18,7 @@ package org.apache.giraph.graph.partition; +import com.google.common.collect.Lists; import org.apache.giraph.graph.WorkerInfo; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -42,9 +43,12 @@ import java.util.Set; public class HashWorkerPartitioner<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> implements WorkerGraphPartitioner<I, V, E, M> { - /** Mapping of the vertex ids to {@link PartitionOwner} */ + /** + * Mapping of the vertex ids to {@link PartitionOwner}. Needs to be + * thread-safe (hence CopyOnWriteArrayList). + */ protected List<PartitionOwner> partitionOwnerList = - new ArrayList<PartitionOwner>(); + Lists.newCopyOnWriteArrayList(); @Override public PartitionOwner createPartitionOwner() { @@ -53,10 +57,8 @@ public class HashWorkerPartitioner<I ext @Override public PartitionOwner getPartitionOwner(I vertexId) { - synchronized (partitionOwnerList) { - return partitionOwnerList.get(Math.abs(vertexId.hashCode()) % - partitionOwnerList.size()); - } + return partitionOwnerList.get(Math.abs(vertexId.hashCode()) % + partitionOwnerList.size()); } @Override @@ -72,10 +74,8 @@ public class HashWorkerPartitioner<I ext WorkerInfo myWorkerInfo, Collection<? extends PartitionOwner> masterSetPartitionOwners, PartitionStore<I, V, E, M> partitionStore) { - synchronized (partitionOwnerList) { - partitionOwnerList.clear(); - partitionOwnerList.addAll(masterSetPartitionOwners); - } + partitionOwnerList.clear(); + partitionOwnerList.addAll(masterSetPartitionOwners); Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>(); Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap = Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java?rev=1399090&r1=1399089&r2=1399090&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java Wed Oct 17 04:33:16 2012 @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.giraph.graph.partition; import java.io.DataInput; @@ -37,6 +36,8 @@ public class PartitionStats implements W private long finishedVertexCount = 0; /** Edges in this partition */ private long edgeCount = 0; + /** Messages sent from this partition */ + private long messagesSentCount = 0; /** * Default constructor for reflection. @@ -50,15 +51,18 @@ public class PartitionStats implements W * @param vertexCount Vertex count. * @param finishedVertexCount Finished vertex count. * @param edgeCount Edge count. + * @param messagesSentCount Number of messages sent */ public PartitionStats(int partitionId, long vertexCount, long finishedVertexCount, - long edgeCount) { + long edgeCount, + long messagesSentCount) { this.partitionId = partitionId; this.vertexCount = vertexCount; this.finishedVertexCount = finishedVertexCount; this.edgeCount = edgeCount; + this.messagesSentCount = messagesSentCount; } /** @@ -129,12 +133,31 @@ public class PartitionStats implements W return edgeCount; } + /** + * Add messages to the messages sent count. + * + * @param messagesSentCount Number of messages to add. + */ + public void addMessagesSentCount(long messagesSentCount) { + this.messagesSentCount += messagesSentCount; + } + + /** + * Get the messages sent count. + * + * @return Messages sent count. + */ + public long getMessagesSentCount() { + return messagesSentCount; + } + @Override public void readFields(DataInput input) throws IOException { partitionId = input.readInt(); vertexCount = input.readLong(); finishedVertexCount = input.readLong(); edgeCount = input.readLong(); + messagesSentCount = input.readLong(); } @Override @@ -143,11 +166,13 @@ public class PartitionStats implements W output.writeLong(vertexCount); output.writeLong(finishedVertexCount); output.writeLong(edgeCount); + output.writeLong(messagesSentCount); } @Override public String toString() { return "(id=" + partitionId + ",vtx=" + vertexCount + ",finVtx=" + - finishedVertexCount + ",edges=" + edgeCount + ")"; + finishedVertexCount + ",edges=" + edgeCount + ",msgsSent=" + + messagesSentCount + ")"; } } Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java?rev=1399090&r1=1399089&r2=1399090&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java Wed Oct 17 04:33:16 2012 @@ -111,7 +111,8 @@ public abstract class PartitionStore<I e } /** - * Return all the stored partitions as an Iterable. + * Return all the stored partitions as an Iterable. Note that this may force + * out-of-core partitions to be loaded into memory if using out-of-core. * * @return The partition objects */ Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/LoggerUtils.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/LoggerUtils.java?rev=1399090&view=auto ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/LoggerUtils.java (added) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/LoggerUtils.java Wed Oct 17 04:33:16 2012 @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.utils; + +import java.io.IOException; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +/** + * Logger utils for log4j + */ +public class LoggerUtils { + /** + * Don't construct this. + */ + private LoggerUtils() { } + + /** + * Helper method to set the status and log message together. + * + * @param context Context to set the status with + * @param logger Logger to write to + * @param level Level of logging + * @param message Message to + */ + public static void setStatusAndLog( + TaskAttemptContext context, Logger logger, Level level, + String message) { + try { + context.setStatus(message); + } catch (IOException e) { + throw new IllegalStateException("setStatusAndLog: Got IOException", e); + } + if (logger.isEnabledFor(level)) { + logger.log(level, message); + } + } +} Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java?rev=1399090&r1=1399089&r2=1399090&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java Wed Oct 17 04:33:16 2012 @@ -21,16 +21,19 @@ package org.apache.giraph.utils; import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** Functions for waiting on some events to happen while reporting progress */ public class ProgressableUtils { /** Class logger */ private static final Logger LOG = Logger.getLogger(ProgressableUtils.class); - /** Msecs to refresh the progress meter */ - private static final int MSEC_PERIOD = 10000; + /** Msecs to refresh the progress meter (one minute) */ + private static final int MSEC_PERIOD = 60 * 1000; /** Do not instantiate. */ private ProgressableUtils() { } @@ -39,7 +42,7 @@ public class ProgressableUtils { * Wait for executor tasks to terminate, while periodically reporting * progress. * - * @param executor Executor which we are waiting for + * @param executor Executor which we are waiting for * @param progressable Progressable for reporting progress (Job context) */ public static void awaitExecutorTermination(ExecutorService executor, @@ -85,4 +88,73 @@ public class ProgressableUtils { remainingWaitMsecs = Math.max(0, remainingWaitMsecs - currentWaitMsecs); } } + + + /** + * Wait for the result of the future to be ready, while periodically + * reporting progress. + * + * @param <T> Type of the return value of the future + * @param future Future + * @param progressable Progressable for reporting progress (Job context) + * @return Computed result of the future. + */ + public static <T> T getFutureResult(Future<T> future, + Progressable progressable) { + while (!future.isDone()) { + tryGetFutureResult(future, progressable, MSEC_PERIOD); + } + + try { + return future.get(); + } catch (InterruptedException e) { + throw new IllegalStateException("get: " + + "InterruptedException occurred while waiting for future result", e); + } catch (ExecutionException e) { + throw new IllegalStateException("get: " + + "ExecutionException occurred while waiting for future result", e); + } + } + + /** + * Wait maximum given number of milliseconds for result to become available, + * while periodically reporting progress. + * + * @param <T> Type of the return value of the future + * @param future Future + * @param progressable Progressable for reporting progress (Job context) + * @param msecs Number of milliseconds to wait + * @return Future result + */ + public static <T> T tryGetFutureResult( + Future<T> future, Progressable progressable, int msecs) { + long maxMsecs = System.currentTimeMillis() + msecs; + int curMsecTimeout; + while (true) { + curMsecTimeout = Math.min(msecs, MSEC_PERIOD); + try { + future.get(curMsecTimeout, TimeUnit.MILLISECONDS); + if (future.isDone()) { + return future.get(); + } + } catch (InterruptedException e) { + throw new IllegalStateException("tryGet: " + + "InterruptedException occurred while waiting for future result", + e); + } catch (ExecutionException e) { + throw new IllegalStateException("tryGet: " + + "ExecutionException occurred while waiting for future result", e); + } catch (TimeoutException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("tryGetFutureResult: Timeout occurred"); + } + } + + progressable.progress(); + if (System.currentTimeMillis() >= maxMsecs) { + return null; + } + msecs = Math.max(0, msecs - curMsecTimeout); + } + } } Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java?rev=1399090&r1=1399089&r2=1399090&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java Wed Oct 17 04:33:16 2012 @@ -33,10 +33,16 @@ public interface Time { long NS_PER_MS = US_PER_MS * NS_PER_US; /** Milliseconds per second */ long MS_PER_SECOND = 1000; + /** Milliseconds per second (as float) */ + float MS_PER_SECOND_AS_FLOAT = MS_PER_SECOND * 1f; /** Microseconds per second */ long US_PER_SECOND = US_PER_MS * MS_PER_SECOND; + /** Microseconds per second (as float) */ + float US_PER_SECOND_AS_FLOAT = US_PER_SECOND * 1f; /** Nanoseconds per second */ long NS_PER_SECOND = NS_PER_US * US_PER_SECOND; + /** Nanoseconds per second (as float) */ + float NS_PER_SECOND_AS_FLOAT = NS_PER_SECOND * 1f; /** Seconds per hour */ long SECONDS_PER_HOUR = 60 * 60; /** Seconds per day */ Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java?rev=1399090&r1=1399089&r2=1399090&view=diff ============================================================================== --- giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java (original) +++ giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java Wed Oct 17 04:33:16 2012 @@ -37,7 +37,8 @@ import org.apache.zookeeper.ZooKeeper; /** * ZooKeeper provides only atomic operations. ZooKeeperExt provides additional - * non-atomic operations that are useful. + * non-atomic operations that are useful. All methods of this class should + * be thread-safe. */ public class ZooKeeperExt extends ZooKeeper { /** Internal logger */ Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java?rev=1399090&r1=1399089&r2=1399090&view=diff ============================================================================== --- giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java (original) +++ giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java Wed Oct 17 04:33:16 2012 @@ -81,14 +81,14 @@ public class BspCase implements Watcher // Allow this test to be run on a real Hadoop setup if (runningInDistributedMode()) { - System.out.println("setup: Sending job to job tracker " + + System.out.println("setupConfiguration: Sending job to job tracker " + jobTracker + " with jar path " + getJarLocation() + " for " + getName()); conf.set("mapred.job.tracker", jobTracker); conf.setWorkerConfiguration(getNumWorkers(), getNumWorkers(), 100.0f); } else { - System.out.println("setup: Using local job runner with " + + System.out.println("setupConfiguration: Using local job runner with " + "location " + getJarLocation() + " for " + getName()); conf.setWorkerConfiguration(1, 1, 100.0f); // Single node testing @@ -341,7 +341,7 @@ public class BspCase implements Watcher @Before public void setUp() { if (runningInDistributedMode()) { - System.out.println("Setting tasks to 3 for " + getName() + + System.out.println("setUp: Setting tasks to 3 for " + getName() + " since JobTracker exists..."); numWorkers = 3; } Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/TestBspBasic.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1399090&r1=1399089&r2=1399090&view=diff ============================================================================== --- giraph/trunk/giraph/src/test/java/org/apache/giraph/TestBspBasic.java (original) +++ giraph/trunk/giraph/src/test/java/org/apache/giraph/TestBspBasic.java Wed Oct 17 04:33:16 2012 @@ -117,15 +117,11 @@ public class TestBspBasic extends BspCas GiraphJob job = prepareJob(getCallingMethodName(), SimpleSuperstepVertex.class, SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat.class); - GraphState<LongWritable, IntWritable, FloatWritable, IntWritable> gs = - new GraphState<LongWritable, IntWritable, - FloatWritable, IntWritable>(); ImmutableClassesGiraphConfiguration configuration = new ImmutableClassesGiraphConfiguration(job.getConfiguration()); Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex = configuration.createVertex(); - System.out.println("testInstantiateVertex: Got vertex " + vertex + - ", graphState" + gs); + System.out.println("testInstantiateVertex: Got vertex " + vertex); VertexInputFormat<LongWritable, IntWritable, FloatWritable, IntWritable> inputFormat = configuration.createVertexInputFormat(); /*if[HADOOP_NON_JOBCONTEXT_IS_INTERFACE] @@ -358,38 +354,6 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE] } /** - * Run a sample BSP job locally and test PageRank. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testBspPageRank() - throws IOException, InterruptedException, ClassNotFoundException { - GiraphJob job = prepareJob(getCallingMethodName(), - SimplePageRankVertex.class, SimplePageRankVertexInputFormat.class); - job.getConfiguration().setWorkerContextClass( - SimplePageRankVertex.SimplePageRankVertexWorkerContext.class); - job.getConfiguration().setMasterComputeClass( - SimplePageRankVertex.SimplePageRankVertexMasterCompute.class); - assertTrue(job.run(true)); - if (!runningInDistributedMode()) { - double maxPageRank = - SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMax(); - double minPageRank = - SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMin(); - long numVertices = - SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalSum(); - System.out.println("testBspPageRank: maxPageRank=" + maxPageRank + - " minPageRank=" + minPageRank + " numVertices=" + numVertices); - assertEquals(34.03, maxPageRank, 0.001); - assertEquals(0.03, minPageRank, 0.00001); - assertEquals(5l, numVertices); - } - } - - /** * Run a sample BSP job locally and test shortest paths. * * @throws IOException Added: giraph/trunk/giraph/src/test/java/org/apache/giraph/TestPageRank.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestPageRank.java?rev=1399090&view=auto ============================================================================== --- giraph/trunk/giraph/src/test/java/org/apache/giraph/TestPageRank.java (added) +++ giraph/trunk/giraph/src/test/java/org/apache/giraph/TestPageRank.java Wed Oct 17 04:33:16 2012 @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph; + +import java.io.IOException; +import org.apache.giraph.examples.SimplePageRankVertex; +import org.apache.giraph.graph.GiraphJob; +import org.apache.giraph.graph.partition.HashMasterPartitioner; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test page rank (with and without multithreading) + */ +public class TestPageRank extends BspCase { + + /** + * Constructor + */ + public TestPageRank() { + super(TestPageRank.class.getName()); + } + + @Test + public void testBspPageRankSingleCompute() + throws ClassNotFoundException, IOException, InterruptedException { + testPageRank(1); + } + + + @Test + public void testPageRankTenThreadsCompute() + throws ClassNotFoundException, IOException, InterruptedException { + testPageRank(10); + } + + /** + * Generic page rank test + * + * @param numComputeThreads Number of compute threads to use + * @throws java.io.IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + private void testPageRank(int numComputeThreads) + throws IOException, InterruptedException, ClassNotFoundException { + GiraphJob job = prepareJob(getCallingMethodName(), + SimplePageRankVertex.class, SimplePageRankVertex.SimplePageRankVertexInputFormat.class); + job.getConfiguration().setWorkerContextClass( + SimplePageRankVertex.SimplePageRankVertexWorkerContext.class); + job.getConfiguration().setMasterComputeClass( + SimplePageRankVertex.SimplePageRankVertexMasterCompute.class); + job.getConfiguration().setNumComputeThreads(numComputeThreads); + // Set enough partitions to generate randomness on the compute side + if (numComputeThreads != 1) { + job.getConfiguration().setInt( + HashMasterPartitioner.USER_PARTITION_COUNT, + numComputeThreads * 5); + } + assertTrue(job.run(true)); + if (!runningInDistributedMode()) { + double maxPageRank = + SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMax(); + double minPageRank = + SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMin(); + long numVertices = + SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalSum(); + System.out.println(getCallingMethodName() + ": maxPageRank=" + + maxPageRank + " minPageRank=" + + minPageRank + " numVertices=" + numVertices + ", " + + " numComputeThreads=" + numComputeThreads); + assertEquals(34.03, maxPageRank, 0.001); + assertEquals(0.03, minPageRank, 0.00001); + assertEquals(5l, numVertices); + } + } +} Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java?rev=1399090&r1=1399089&r2=1399090&view=diff ============================================================================== --- giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java (original) +++ giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java Wed Oct 17 04:33:16 2012 @@ -19,6 +19,7 @@ package org.apache.giraph.utils; import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.WorkerClientRequestProcessor; import org.apache.giraph.comm.WorkerClientServer; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.GraphState; @@ -53,13 +54,14 @@ public class MockUtils { private final GraphState<I, V, E, M> graphState; private final Mapper.Context context; private final Configuration conf; - private final WorkerClientServer communications; + private final WorkerClientRequestProcessor workerClientRequestProcessor; public MockedEnvironment() { graphState = Mockito.mock(GraphState.class); context = Mockito.mock(Mapper.Context.class); conf = Mockito.mock(Configuration.class); - communications = Mockito.mock(WorkerClientServer.class); + workerClientRequestProcessor = + Mockito.mock(WorkerClientRequestProcessor.class); } /** the injected graph state */ @@ -78,19 +80,19 @@ public class MockUtils { } /** the injected worker communications */ - public WorkerClientServer getCommunications() { - return communications; + public WorkerClientRequestProcessor getWorkerClientRequestProcessor() { + return workerClientRequestProcessor; } /** assert that the test vertex message has been sent to a particular vertex */ public void verifyMessageSent(I targetVertexId, M message) { - Mockito.verify(communications).sendMessageRequest(targetVertexId, - message); + Mockito.verify(workerClientRequestProcessor).sendMessageRequest + (targetVertexId, message); } /** assert that the test vertex has sent no message to a particular vertex */ public void verifyNoMessageSent() { - Mockito.verifyZeroInteractions(communications); + Mockito.verifyZeroInteractions(workerClientRequestProcessor); } } @@ -124,8 +126,8 @@ public class MockUtils { .thenReturn(env.getContext()); Mockito.when(env.getContext().getConfiguration()) .thenReturn(env.getConfiguration()); - Mockito.when(env.getGraphState().getWorkerCommunications()) - .thenReturn(env.getCommunications()); + Mockito.when(env.getGraphState().getWorkerClientRequestProcessor()) + .thenReturn(env.getWorkerClientRequestProcessor()); ReflectionUtils.setField(vertex, "id", vertexId); ReflectionUtils.setField(vertex, "value", vertexValue);
