Author: apresta
Date: Wed Aug 15 11:03:03 2012
New Revision: 1373331
URL: http://svn.apache.org/viewvc?rev=1373331&view=rev
Log:
GIRAPH-296: TotalNumVertices and TotalNumEdges are not saved in checkpoint.
(majakabiljo via apresta)
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1373331&r1=1373330&r2=1373331&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Aug 15 11:03:03 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-296: TotalNumVertices and TotalNumEdges are not saved in checkpoint.
+ (majakabiljo via apresta)
+
GIRAPH-297: Checkpointing on master is done one superstep later
(majakabiljo via aching).
Modified:
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1373331&r1=1373330&r2=1373331&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
Wed Aug 15 11:03:03 2012
@@ -621,6 +621,9 @@ public class BspServiceMaster<I extends
getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
DataInputStream finalizedStream =
fs.open(new Path(finalizedCheckpointPath));
+ GlobalStats globalStats = new GlobalStats();
+ globalStats.readFields(finalizedStream);
+ updateCounters(globalStats);
int prefixFileCount = finalizedStream.readInt();
for (int i = 0; i < prefixFileCount; ++i) {
String metadataFilePath =
@@ -1055,11 +1058,19 @@ public class BspServiceMaster<I extends
}
// Format:
+ // <global statistics>
// <number of files>
// <used file prefix 0><used file prefix 1>...
// <aggregator data length><aggregators as a serialized JSON byte array>
+ // <masterCompute data>
FSDataOutputStream finalizedOutputStream =
getFs().create(finalizedCheckpointPath);
+
+ String superstepFinishedNode =
+ getSuperstepFinishedPath(getApplicationAttempt(), superstep - 1);
+ finalizedOutputStream.write(
+ getZkExt().getData(superstepFinishedNode, false, null));
+
finalizedOutputStream.writeInt(chosenWorkerInfoList.size());
for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
String chosenWorkerInfoPrefix =
@@ -1507,18 +1518,7 @@ public class BspServiceMaster<I extends
getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
WritableUtils.writeToZnode(
getZkExt(), superstepFinishedNode, -1, globalStats);
- vertexCounter.increment(
- globalStats.getVertexCount() -
- vertexCounter.getValue());
- finishedVertexCounter.increment(
- globalStats.getFinishedVertexCount() -
- finishedVertexCounter.getValue());
- edgeCounter.increment(
- globalStats.getEdgeCount() -
- edgeCounter.getValue());
- sentMessagesCounter.increment(
- globalStats.getMessageCount() -
- sentMessagesCounter.getValue());
+ updateCounters(globalStats);
incrCachedSuperstep();
// Counter starts at zero, so no need to increment
@@ -1839,4 +1839,24 @@ public class BspServiceMaster<I extends
}
((AggregatorWrapper<A>) aggregator).setCurrentAggregatedValue(value);
}
+
+ /**
+ * Set values of counters to match the ones from {@link GlobalStats}
+ *
+ * @param globalStats Global statistics which holds new counter values
+ */
+ private void updateCounters(GlobalStats globalStats) {
+ vertexCounter.increment(
+ globalStats.getVertexCount() -
+ vertexCounter.getValue());
+ finishedVertexCounter.increment(
+ globalStats.getFinishedVertexCount() -
+ finishedVertexCounter.getValue());
+ edgeCounter.increment(
+ globalStats.getEdgeCount() -
+ edgeCounter.getValue());
+ sentMessagesCounter.increment(
+ globalStats.getMessageCount() -
+ sentMessagesCounter.getValue());
+ }
}
Modified:
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1373331&r1=1373330&r2=1373331&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
Wed Aug 15 11:03:03 2012
@@ -1322,6 +1322,23 @@ public class BspServiceWorker<I extends
workerGraphPartitioner.getPartitionOwners().size() +
" total.");
}
+
+ // Load global statistics
+ String finalizedCheckpointPath =
+ getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
+ try {
+ DataInputStream finalizedStream =
+ getFs().open(new Path(finalizedCheckpointPath));
+ GlobalStats globalStats = new GlobalStats();
+ globalStats.readFields(finalizedStream);
+ getGraphMapper().getGraphState().
+ setTotalNumEdges(globalStats.getEdgeCount()).
+ setTotalNumVertices(globalStats.getVertexCount());
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "loadCheckpoint: Failed to load global statistics", e);
+ }
+
// Communication service needs to setup the connections prior to
// processing vertices
commService.setup();