Author: maja
Date: Tue Nov 6 17:36:36 2012
New Revision: 1406239
URL: http://svn.apache.org/viewvc?rev=1406239&view=rev
Log:
GIRAPH-404: More SendMessageCache improvements
Added:
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairList.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairListWritable.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Nov 6 17:36:36 2012
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-404: More SendMessageCache improvements (majakabiljo)
+
GIRAPH-412: Checkstyle error from Giraph-403 (majakabiljo)
GIRAPH-403: GraphMapper.notiftySentMessages need to be thread-safe
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
Tue Nov 6 17:36:36 2012
@@ -168,6 +168,13 @@ public interface CentralizedServiceWorke
PartitionOwner getVertexPartitionOwner(I vertexId);
/**
+ * Get all partition owners.
+ *
+ * @return Iterable through partition owners
+ */
+ Iterable<? extends PartitionOwner> getPartitionOwners();
+
+ /**
* Look up a vertex on a worker given its vertex index.
*
* @param vertexId Vertex index to look for
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
Tue Nov 6 17:36:36 2012
@@ -18,15 +18,20 @@
package org.apache.giraph.comm;
-import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.utils.PairList;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
/**
* Aggregates the messages to be send to workers so they can be sent
* in bulk. Not thread-safe.
@@ -37,30 +42,45 @@ import org.apache.hadoop.io.WritableComp
@SuppressWarnings("rawtypes")
public class SendMessageCache<I extends WritableComparable,
M extends Writable> {
- /** Combiner instance, can be null */
- private final VertexCombiner<I, M> combiner;
/** Internal cache */
- private Map<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>
- messageCache =
- new HashMap<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>();
+ private final VertexIdMessageCollection<I, M>[] messageCache;
/** Number of messages in each partition */
- private final Map<WorkerInfo, Integer> messageCountMap =
- new HashMap<WorkerInfo, Integer>();
+ private final int[] messageCounts;
+ /** List of partition ids belonging to a worker */
+ private final Map<WorkerInfo, List<Integer>> workerPartitions =
+ Maps.newHashMap();
/** Giraph configuration */
private final ImmutableClassesGiraphConfiguration conf;
/**
* Constructor
*
- * @param conf Configuration used for instantiating the combiner.
+ * @param conf Giraph configuration
+ * @param serviceWorker Service worker
*/
- public SendMessageCache(ImmutableClassesGiraphConfiguration conf) {
+ public SendMessageCache(ImmutableClassesGiraphConfiguration conf,
+ CentralizedServiceWorker<?, ?, ?, ?> serviceWorker) {
this.conf = conf;
- if (conf.getVertexCombinerClass() == null) {
- this.combiner = null;
- } else {
- this.combiner = conf.createVertexCombiner();
+
+ int maxPartition = 0;
+ for (PartitionOwner partitionOwner : serviceWorker.getPartitionOwners()) {
+ List<Integer> workerPartitionIds =
+ workerPartitions.get(partitionOwner.getWorkerInfo());
+ if (workerPartitionIds == null) {
+ workerPartitionIds = Lists.newArrayList();
+ workerPartitions.put(partitionOwner.getWorkerInfo(),
+ workerPartitionIds);
+ }
+ workerPartitionIds.add(partitionOwner.getPartitionId());
+ maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
}
+ messageCache = new VertexIdMessageCollection[maxPartition + 1];
+
+ int maxWorker = 0;
+ for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
+ maxWorker = Math.max(maxWorker, workerInfo.getTaskId());
+ }
+ messageCounts = new int[maxWorker + 1];
}
/**
@@ -76,31 +96,18 @@ public class SendMessageCache<I extends
public int addMessage(WorkerInfo workerInfo,
final int partitionId, I destVertexId, M message) {
// Get the message collection
- Map<Integer, VertexIdMessageCollection<I, M>> partitionMap =
- messageCache.get(workerInfo);
- if (partitionMap == null) {
- partitionMap = new HashMap<Integer, VertexIdMessageCollection<I, M>>();
- messageCache.put(workerInfo, partitionMap);
+ VertexIdMessageCollection<I, M> partitionMessages =
+ messageCache[partitionId];
+ if (partitionMessages == null) {
+ partitionMessages = new VertexIdMessageCollection<I, M>(conf);
+ partitionMessages.initialize();
+ messageCache[partitionId] = partitionMessages;
}
- VertexIdMessageCollection<I, M> vertexMessages =
- partitionMap.get(partitionId);
-
- if (vertexMessages == null) {
- vertexMessages = new VertexIdMessageCollection<I, M>(conf);
- vertexMessages.initialize();
- partitionMap.put(partitionId, vertexMessages);
- }
- vertexMessages.add(destVertexId, message);
+ partitionMessages.add(destVertexId, message);
// Update the number of cached, outgoing messages per worker
- Integer currentWorkerMessageCount = messageCountMap.get(workerInfo);
- if (currentWorkerMessageCount == null) {
- currentWorkerMessageCount = 0;
- }
- final int updatedWorkerMessageCount =
- currentWorkerMessageCount + 1;
- messageCountMap.put(workerInfo, updatedWorkerMessageCount);
- return updatedWorkerMessageCount;
+ messageCounts[workerInfo.getTaskId()]++;
+ return messageCounts[workerInfo.getTaskId()];
}
/**
@@ -108,14 +115,21 @@ public class SendMessageCache<I extends
*
* @param workerInfo the address of the worker who owns the data
* partitions that are receiving the messages
- * @return Map of all messages (keyed by partition ID's) destined
- * for vertices hosted by <code>workerInfo</code>
+ * @return List of pairs (partitionId, VertexIdMessageCollection),
+ * where all partition ids belong to workerInfo
*/
- public Map<Integer, VertexIdMessageCollection<I, M>> removeWorkerMessages(
- WorkerInfo workerInfo) {
- Map<Integer, VertexIdMessageCollection<I, M>> workerMessages =
- messageCache.remove(workerInfo);
- messageCountMap.put(workerInfo, 0);
+ public PairList<Integer, VertexIdMessageCollection<I, M>>
+ removeWorkerMessages(WorkerInfo workerInfo) {
+ PairList<Integer, VertexIdMessageCollection<I, M>> workerMessages =
+ new PairList<Integer, VertexIdMessageCollection<I, M>>();
+ workerMessages.initialize();
+ for (Integer partitionId : workerPartitions.get(workerInfo)) {
+ if (messageCache[partitionId] != null) {
+ workerMessages.add(partitionId, messageCache[partitionId]);
+ messageCache[partitionId] = null;
+ }
+ }
+ messageCounts[workerInfo.getTaskId()] = 0;
return workerMessages;
}
@@ -124,14 +138,20 @@ public class SendMessageCache<I extends
*
* @return All vertex messages for all partitions
*/
- public Map<WorkerInfo, Map<
+ public PairList<WorkerInfo, PairList<
Integer, VertexIdMessageCollection<I, M>>> removeAllMessages() {
- Map<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>
- allMessages = messageCache;
- messageCache =
- new HashMap<WorkerInfo,
- Map<Integer, VertexIdMessageCollection<I, M>>>();
- messageCountMap.clear();
+ PairList<WorkerInfo, PairList<Integer, VertexIdMessageCollection<I, M>>>
+ allMessages = new PairList<WorkerInfo,
+ PairList<Integer, VertexIdMessageCollection<I, M>>>();
+ allMessages.initialize();
+ for (WorkerInfo workerInfo : workerPartitions.keySet()) {
+ PairList<Integer, VertexIdMessageCollection<I, M>> workerMessages =
+ removeWorkerMessages(workerInfo);
+ if (!workerMessages.isEmpty()) {
+ allMessages.add(workerInfo, workerMessages);
+ }
+ messageCounts[workerInfo.getTaskId()] = 0;
+ }
return allMessages;
}
}
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java
Tue Nov 6 17:36:36 2012
@@ -19,16 +19,10 @@
package org.apache.giraph.comm;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.PairListWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import com.google.common.collect.Lists;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
/**
* Holder for pairs of vertex ids and messages. Not thread-safe.
*
@@ -36,11 +30,7 @@ import java.util.List;
* @param <M> Message data
*/
public class VertexIdMessageCollection<I extends WritableComparable,
- M extends Writable> implements Writable {
- /** List of ids of vertices */
- private List<I> vertexIds;
- /** List of messages */
- private List<M> messages;
+ M extends Writable> extends PairListWritable<I, M> {
/** Giraph configuration */
private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> conf;
@@ -57,109 +47,13 @@ public class VertexIdMessageCollection<I
this.conf = conf;
}
- /**
- * Initialize the inner state. Must be called before {@code add()} is
- * called. If you want to call {@code readFields()} you don't need to call
- * this method.
- */
- public void initialize() {
- vertexIds = Lists.newArrayList();
- messages = Lists.newArrayList();
- }
-
- /**
- * Adds message for vertex with selected id.
- *
- * @param vertexId Id of vertex
- * @param message Message to add
- */
- public void add(I vertexId, M message) {
- vertexIds.add(vertexId);
- messages.add(message);
- }
-
@Override
- public void write(DataOutput dataOutput) throws IOException {
- dataOutput.writeInt(vertexIds.size());
- for (int i = 0; i < vertexIds.size(); i++) {
- vertexIds.get(i).write(dataOutput);
- messages.get(i).write(dataOutput);
- }
+ protected I newFirstInstance() {
+ return conf.createVertexId();
}
@Override
- public void readFields(DataInput input) throws IOException {
- int messageCount = input.readInt();
- vertexIds = Lists.newArrayListWithCapacity(messageCount);
- messages = Lists.newArrayListWithCapacity(messageCount);
- while (messageCount-- > 0) {
- I vertexId = conf.createVertexId();
- vertexId.readFields(input);
- vertexIds.add(vertexId);
- M message = conf.createMessageValue();
- message.readFields(input);
- messages.add(message);
- }
- }
-
- /**
- * Get iterator through destination vertices and messages.
- *
- * @return {@link Iterator} iterator
- */
- public Iterator getIterator() {
- return new Iterator();
- }
-
- /**
- * Special iterator class which we'll use to iterate through elements of
- * {@link VertexIdMessageCollection}, without having to create new object as
- * wrapper for destination vertex id and message.
- *
- * Protocol is somewhat similar to the protocol of {@link java.util.Iterator}
- * only here next() doesn't return the next object, it just moves along in
- * the collection. Values related to current pair of (vertex id, message)
- * can be retrieved by calling getCurrentVertexId() and getCurrentMessage()
- * methods.
- *
- * Not thread-safe.
- */
- public class Iterator {
- /** Current position of the iterator */
- private int position = -1;
-
- /**
- * Returns true if the iteration has more elements.
- *
- * @return True if the iteration has more elements.
- */
- public boolean hasNext() {
- return position < messages.size() - 1;
- }
-
- /**
- * Moves to the next element in the iteration.
- */
- public void next() {
- position++;
- }
-
- /**
- * Get vertex id related to current element of the iteration.
- *
- * @return Vertex id related to current element of the iteration.
- */
- public I getCurrentVertexId() {
- return vertexIds.get(position);
- }
-
- /**
- * Get message related to current element of the iteration.
- *
- * @return Message related to current element of the iteration.
- */
- public M getCurrentMessage() {
- return messages.get(position);
- }
+ protected M newSecondInstance() {
+ return conf.createMessageValue();
}
}
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
Tue Nov 6 17:36:36 2012
@@ -107,8 +107,8 @@ public class DiskBackedMessageStoreByPar
VertexIdMessageCollection<I, M>.Iterator iterator = messages.getIterator();
while (iterator.hasNext()) {
iterator.next();
- I vertexId = iterator.getCurrentVertexId();
- M message = iterator.getCurrentMessage();
+ I vertexId = iterator.getCurrentFirst();
+ M message = iterator.getCurrentSecond();
Collection<M> currentMessages = map.get(vertexId);
if (currentMessages == null) {
currentMessages = Lists.newArrayList(message);
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
Tue Nov 6 17:36:36 2012
@@ -136,8 +136,8 @@ public class SimpleMessageStore<I extend
VertexIdMessageCollection<I, M>.Iterator iterator = messages.getIterator();
while (iterator.hasNext()) {
iterator.next();
- I vertexId = iterator.getCurrentVertexId();
- M message = iterator.getCurrentMessage();
+ I vertexId = iterator.getCurrentFirst();
+ M message = iterator.getCurrentSecond();
Collection<M> currentMessages = partitionMap.get(vertexId);
if (currentMessages == null) {
Collection<M> newMessages = Lists.newArrayList(message);
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
Tue Nov 6 17:36:36 2012
@@ -42,6 +42,7 @@ import org.apache.giraph.graph.partition
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MetricGroup;
+import org.apache.giraph.utils.PairList;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -112,7 +113,7 @@ public class NettyWorkerClientRequestPro
sendPartitionCache = new SendPartitionCache<I, V, E, M>(context,
configuration);
sendMessageCache =
- new SendMessageCache<I, M>(configuration);
+ new SendMessageCache<I, M>(configuration, serviceWorker);
maxMessagesPerWorker = configuration.getInt(
GiraphConfiguration.MSG_SIZE,
GiraphConfiguration.MSG_SIZE_DEFAULT);
@@ -146,7 +147,7 @@ public class NettyWorkerClientRequestPro
// Send a request if the cache of outgoing message to
// the remote worker 'workerInfo' is full enough to be flushed
if (workerMessageCount >= maxMessagesPerWorker) {
- Map<Integer, VertexIdMessageCollection<I, M>> workerMessages =
+ PairList<Integer, VertexIdMessageCollection<I, M>> workerMessages =
sendMessageCache.removeWorkerMessages(workerInfo);
WritableRequest writableRequest =
new SendWorkerMessagesRequest<I, V, E, M>(workerMessages);
@@ -321,13 +322,17 @@ public class NettyWorkerClientRequestPro
sendPartitionCache.clear();
// Execute the remaining sends messages (if any)
- Map<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>
+ PairList<WorkerInfo, PairList<Integer, VertexIdMessageCollection<I, M>>>
remainingMessageCache = sendMessageCache.removeAllMessages();
- for (Map.Entry<WorkerInfo, Map<Integer, VertexIdMessageCollection<I, M>>>
- entry : remainingMessageCache.entrySet()) {
+ PairList<WorkerInfo,
+ PairList<Integer, VertexIdMessageCollection<I, M>>>.Iterator
+ iterator = remainingMessageCache.getIterator();
+ while (iterator.hasNext()) {
+ iterator.next();
WritableRequest writableRequest =
- new SendWorkerMessagesRequest<I, V, E, M>(entry.getValue());
- doRequest(entry.getKey(), writableRequest);
+ new SendWorkerMessagesRequest<I, V, E, M>(
+ iterator.getCurrentSecond());
+ doRequest(iterator.getCurrentFirst(), writableRequest);
}
// Execute the remaining sends mutations (if any)
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
Tue Nov 6 17:36:36 2012
@@ -20,17 +20,14 @@ package org.apache.giraph.comm.requests;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.VertexIdMessageCollection;
+import org.apache.giraph.utils.PairList;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
-import com.google.common.collect.Maps;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
/**
* Send a collection of vertex messages for a partition.
@@ -52,8 +49,8 @@ public class SendWorkerMessagesRequest<I
* are owned by a single (destination) worker. These messages are all
* destined for this worker.
* */
- private Map<Integer, VertexIdMessageCollection<I, M>>
- partitionVertexMessagesMap;
+ private PairList<Integer, VertexIdMessageCollection<I, M>>
+ partitionVertexMessages;
/**
* Constructor used for reflection only
@@ -63,37 +60,38 @@ public class SendWorkerMessagesRequest<I
/**
* Constructor used to send request.
*
- * @param partVertMsgsMap Map of remote partitions =>
- * VertexIdMessageCollection
+ * @param partVertMsgs Map of remote partitions => VertexIdMessageCollection
*/
public SendWorkerMessagesRequest(
- Map<Integer, VertexIdMessageCollection<I, M>> partVertMsgsMap) {
+ PairList<Integer, VertexIdMessageCollection<I, M>> partVertMsgs) {
super();
- this.partitionVertexMessagesMap = partVertMsgsMap;
+ this.partitionVertexMessages = partVertMsgs;
}
@Override
public void readFieldsRequest(DataInput input) throws IOException {
int numPartitions = input.readInt();
- partitionVertexMessagesMap =
- Maps.<Integer, VertexIdMessageCollection<I, M>>
- newHashMapWithExpectedSize(numPartitions);
+ partitionVertexMessages =
+ new PairList<Integer, VertexIdMessageCollection<I, M>>();
+ partitionVertexMessages.initialize(numPartitions);
while (numPartitions-- > 0) {
final int partitionId = input.readInt();
VertexIdMessageCollection<I, M> vertexIdMessages =
new VertexIdMessageCollection<I, M>(getConf());
vertexIdMessages.readFields(input);
- partitionVertexMessagesMap.put(partitionId, vertexIdMessages);
+ partitionVertexMessages.add(partitionId, vertexIdMessages);
}
}
@Override
public void writeRequest(DataOutput output) throws IOException {
- output.writeInt(partitionVertexMessagesMap.size());
- for (Entry<Integer, VertexIdMessageCollection<I, M>> partitionEntry :
- partitionVertexMessagesMap.entrySet()) {
- output.writeInt(partitionEntry.getKey());
- partitionEntry.getValue().write(output);
+ output.writeInt(partitionVertexMessages.getSize());
+ PairList<Integer, VertexIdMessageCollection<I, M>>.Iterator iterator =
+ partitionVertexMessages.getIterator();
+ while (iterator.hasNext()) {
+ iterator.next();
+ output.writeInt(iterator.getCurrentFirst());
+ iterator.getCurrentSecond().write(output);
}
}
@@ -104,11 +102,14 @@ public class SendWorkerMessagesRequest<I
@Override
public void doRequest(ServerData<I, V, E, M> serverData) {
- for (Entry<Integer, VertexIdMessageCollection<I, M>> entry :
- partitionVertexMessagesMap.entrySet()) {
+ PairList<Integer, VertexIdMessageCollection<I, M>>.Iterator iterator =
+ partitionVertexMessages.getIterator();
+ while (iterator.hasNext()) {
+ iterator.next();
try {
serverData.getIncomingMessageStore()
- .addPartitionMessages(entry.getValue(), entry.getKey());
+ .addPartitionMessages(iterator.getCurrentSecond(),
+ iterator.getCurrentFirst());
} catch (IOException e) {
throw new RuntimeException("doRequest: Got IOException ", e);
}
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
Tue Nov 6 17:36:36 2012
@@ -1316,6 +1316,11 @@ else[HADOOP_NON_SECURE]*/
}
@Override
+ public Iterable<? extends PartitionOwner> getPartitionOwners() {
+ return workerGraphPartitioner.getPartitionOwners();
+ }
+
+ @Override
public Partition<I, V, E, M> getPartition(I vertexId) {
return getPartitionStore().getPartition(getPartitionId(vertexId));
}
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
(original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
Tue Nov 6 17:36:36 2012
@@ -112,7 +112,7 @@ public class GraphMapper<I extends Writa
/** Milliseconds from starting compute to sending first message */
private Timer timeToFirstMessage;
/** Timer context used for computer msec from compute to first message */
- private TimerContext timeToFirstMessageContext;
+ private volatile TimerContext timeToFirstMessageContext;
/** Time from first sent message till last message flushed. */
private Timer communicationTimer;
/** Timer context for communication timer. */
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairList.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairList.java?rev=1406239&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairList.java
(added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairList.java Tue
Nov 6 17:36:36 2012
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Collection to keep pairs in, without creating a wrapper object around
+ * each pair of objects.
+ *
+ * @param <U> Type of the first element in a pair
+ * @param <V> Type of the second element in a pair
+ */
+public class PairList<U, V> {
+ /** List to keep first elements of pairs in */
+ protected List<U> firstList;
+ /** List to keep second elements of pairs in */
+ protected List<V> secondList;
+
+ /**
+ * Constructor
+ */
+ public PairList() {
+ }
+
+ /**
+ * Initialize the inner state. Must be called before {@code add()} is
+ * called.
+ */
+ public void initialize() {
+ firstList = Lists.newArrayList();
+ secondList = Lists.newArrayList();
+ }
+
+
+ /**
+ * Initialize the inner state, with a known size. Must be called before
+ * {@code add()} is called.
+ *
+ * @param size Number of pairs which will be added to the list
+ */
+ public void initialize(int size) {
+ firstList = Lists.newArrayListWithCapacity(size);
+ secondList = Lists.newArrayListWithCapacity(size);
+ }
+
+ /**
+ * Add a pair to the collection.
+ *
+ * @param first First element of the pair
+ * @param second Second element of the pair
+ */
+ public void add(U first, V second) {
+ firstList.add(first);
+ secondList.add(second);
+ }
+
+ /**
+ * Get number of pairs in this list.
+ *
+ * @return Number of pairs in the list
+ */
+ public int getSize() {
+ return firstList.size();
+ }
+
+ /**
+ * Check if the list is empty.
+ *
+ * @return True iff there are no pairs in the list
+ */
+ public boolean isEmpty() {
+ return getSize() == 0;
+ }
+
+ /**
+ * Get iterator through elements of this object.
+ *
+ * @return {@link Iterator} iterator
+ */
+ public Iterator getIterator() {
+ return new Iterator();
+ }
+
+ /**
+ * Special iterator class which we'll use to iterate through elements of
+ * {@link PairList}, without having to create new object as wrapper for
+ * each pair.
+ *
+ * Protocol is somewhat similar to the protocol of {@link java.util.Iterator}
+ * only here next() doesn't return the next object, it just moves along in
+ * the collection. Values related to current pair can be retrieved by calling
+ * getCurrentFirst() and getCurrentSecond() methods.
+ *
+ * Not thread-safe.
+ */
+ public class Iterator {
+ /** Current position of the iterator */
+ private int position = -1;
+
+ /**
+ * Returns true if the iteration has more elements.
+ *
+ * @return True if the iteration has more elements.
+ */
+ public boolean hasNext() {
+ return position < getSize() - 1;
+ }
+
+ /**
+ * Moves to the next element in the iteration.
+ */
+ public void next() {
+ position++;
+ }
+
+ /**
+ * Get first element of the current pair of the iteration.
+ *
+ * @return First element of the current pair of the iteration
+ */
+ public U getCurrentFirst() {
+ return firstList.get(position);
+ }
+
+ /**
+ * Get second element of the current pair of the iteration.
+ *
+ * @return Second element of the current pair of the iteration
+ */
+ public V getCurrentSecond() {
+ return secondList.get(position);
+ }
+ }
+}
Added:
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairListWritable.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairListWritable.java?rev=1406239&view=auto
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairListWritable.java
(added)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/PairListWritable.java
Tue Nov 6 17:36:36 2012
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.collect.Lists;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Writable implementation of {@link PairList}.
+ *
+ * @param <U> Type of the first element in a pair
+ * @param <V> Type of the second element in a pair
+ */
+public abstract class PairListWritable<U extends Writable,
+ V extends Writable> extends PairList<U, V> implements Writable {
+ /**
+ * Create an empty instance of the first element in the pair,
+ * so we could read it from {@DataInput}.
+ *
+ * @return New instance of the first element in the pair
+ */
+ protected abstract U newFirstInstance();
+
+ /**
+ * Create an empty instance of the second element in the pair,
+ * so we could read it from {@DataInput}.
+ *
+ * @return New instance of the second element in the pair
+ */
+ protected abstract V newSecondInstance();
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ int size = getSize();
+ output.writeInt(size);
+ for (int i = 0; i < size; i++) {
+ firstList.get(i).write(output);
+ secondList.get(i).write(output);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ int size = input.readInt();
+ firstList = Lists.newArrayListWithCapacity(size);
+ secondList = Lists.newArrayListWithCapacity(size);
+ while (size-- > 0) {
+ U first = newFirstInstance();
+ first.readFields(input);
+ V second = newSecondInstance();
+ second.readFields(input);
+ add(first, second);
+ }
+ }
+}
Modified:
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
---
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
(original)
+++
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
Tue Nov 6 17:36:36 2012
@@ -29,6 +29,7 @@ import org.apache.giraph.comm.requests.W
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.utils.MockUtils;
+import org.apache.giraph.utils.PairList;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.junit.Before;
@@ -39,11 +40,9 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Collection;
-import java.util.Map;
/**
* Test all the netty failure scenarios
@@ -85,12 +84,14 @@ public class RequestFailureTest {
private WritableRequest getRequest() {
// Data to send
final int partitionId = 0;
- Map<Integer, VertexIdMessageCollection<IntWritable, IntWritable>> sendMap =
- Maps.newHashMap();
+ PairList<Integer, VertexIdMessageCollection<IntWritable, IntWritable>>
+ dataToSend = new PairList<Integer,
+ VertexIdMessageCollection<IntWritable, IntWritable>>();
+ dataToSend.initialize();
VertexIdMessageCollection<IntWritable, IntWritable> vertexIdMessages =
new VertexIdMessageCollection<IntWritable, IntWritable>(conf);
vertexIdMessages.initialize();
- sendMap.put(partitionId, vertexIdMessages);
+ dataToSend.add(partitionId, vertexIdMessages);
for (int i = 1; i < 7; ++i) {
IntWritable vertexId = new IntWritable(i);
for (int j = 0; j < i; ++j) {
@@ -102,7 +103,7 @@ public class RequestFailureTest {
SendWorkerMessagesRequest<IntWritable, IntWritable, IntWritable,
IntWritable> request =
new SendWorkerMessagesRequest<IntWritable, IntWritable,
- IntWritable, IntWritable>(sendMap);
+ IntWritable, IntWritable>(dataToSend);
return request;
}
Modified:
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1406239&r1=1406238&r2=1406239&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
(original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
Tue Nov 6 17:36:36 2012
@@ -34,6 +34,7 @@ import org.apache.giraph.graph.VertexMut
import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.graph.partition.PartitionStore;
import org.apache.giraph.utils.MockUtils;
+import org.apache.giraph.utils.PairList;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.junit.Before;
@@ -149,13 +150,15 @@ public class RequestTest {
@Test
public void sendWorkerMessagesRequest() throws IOException {
// Data to send
- Map<Integer, VertexIdMessageCollection<IntWritable, IntWritable>> sendMap =
- Maps.newHashMap();
+ PairList<Integer, VertexIdMessageCollection<IntWritable, IntWritable>>
+ dataToSend = new PairList<Integer,
+ VertexIdMessageCollection<IntWritable, IntWritable>>();
+ dataToSend.initialize();
int partitionId = 0;
VertexIdMessageCollection<IntWritable, IntWritable> vertexIdMessages =
new VertexIdMessageCollection<IntWritable, IntWritable>(conf);
vertexIdMessages.initialize();
- sendMap.put(partitionId, vertexIdMessages);
+ dataToSend.add(partitionId, vertexIdMessages);
for (int i = 1; i < 7; ++i) {
IntWritable vertexId = new IntWritable(i);
for (int j = 0; j < i; ++j) {
@@ -167,7 +170,7 @@ public class RequestTest {
SendWorkerMessagesRequest<IntWritable, IntWritable, IntWritable,
IntWritable> request =
new SendWorkerMessagesRequest<IntWritable, IntWritable,
- IntWritable, IntWritable>(sendMap);
+ IntWritable, IntWritable>(dataToSend);
client.sendWritableRequest(-1, request);
client.waitAllRequests();