Updated Branches: refs/heads/trunk d3f4a4e0d -> 6fd9f12da
GIRAPH-492: Saving vertices has no status report, making it hard to find DFS issues (aching) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6fd9f12d Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6fd9f12d Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6fd9f12d Branch: refs/heads/trunk Commit: 6fd9f12da217774f15d32deeaeda3a389d44ea34 Parents: d3f4a4e Author: aching <[email protected]> Authored: Tue Jan 15 17:31:52 2013 -0800 Committer: aching <[email protected]> Committed: Wed Jan 30 11:17:35 2013 -0800 ---------------------------------------------------------------------- CHANGELOG | 3 + .../apache/giraph/benchmark/PageRankBenchmark.java | 17 +++++- .../org/apache/giraph/bsp/CentralizedService.java | 12 +---- .../giraph/bsp/CentralizedServiceMaster.java | 10 +++ .../giraph/bsp/CentralizedServiceWorker.java | 10 +++ .../giraph/graph/FinishedSuperstepStats.java | 36 +++++++++++- .../org/apache/giraph/graph/GraphTaskManager.java | 43 +++++++------- .../org/apache/giraph/master/BspServiceMaster.java | 2 +- .../java/org/apache/giraph/utils/LoggerUtils.java | 19 ++++++ .../org/apache/giraph/worker/BspServiceWorker.java | 44 ++++++++++++--- .../giraph/worker/VertexInputSplitsCallable.java | 26 +++++--- 11 files changed, 166 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index a82ee15..5177255 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-492: Saving vertices has no status report, making it hard to + find DFS issues (aching) + GIRAPH-312: Giraph needs an admin script (ereisman) GIRAPH-469: Refactor GraphMapper (ereisman) http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java index 3ef471a..6e49812 100644 --- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java @@ -26,6 +26,7 @@ import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.combiner.DoubleSumCombiner; import org.apache.giraph.graph.GiraphJob; +import org.apache.giraph.io.formats.JsonBase64VertexOutputFormat; import org.apache.giraph.io.formats.PseudoRandomEdgeInputFormat; import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat; import org.apache.hadoop.conf.Configuration; @@ -97,6 +98,10 @@ public class PageRankBenchmark implements Tool { "combinerType", true, "Combiner type (0 for no combiner, 1 for DoubleSumCombiner (default)"); + options.addOption("o", + "vertexOutputFormat", + true, + "0 for JsonBase64VertexOutputFormat"); HelpFormatter formatter = new HelpFormatter(); if (args.length == 0) { @@ -136,6 +141,7 @@ public class PageRankBenchmark implements Tool { GiraphJob job = new GiraphJob(getConf(), name); GiraphConfiguration configuration = job.getConfiguration(); setVertexAndInputFormatClasses(cmd, configuration); + configuration.setWorkerConfiguration(workers, workers, 100.0f); configuration.setInt( PageRankComputation.SUPERSTEP_COUNT, @@ -188,7 +194,7 @@ public class PageRankBenchmark implements Tool { MultiGraphRepresentativeVertexPageRankBenchmark.class); configuration.useUnsafeSerialization(true); } - LOG.info("Using class " + + LOG.info("Using vertex class " + configuration.get(GiraphConstants.VERTEX_CLASS)); if (!cmd.hasOption('t') || (Integer.parseInt(cmd.getOptionValue('t')) == 2)) { @@ -217,6 +223,15 @@ public class PageRankBenchmark implements Tool { PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX, Long.parseLong(cmd.getOptionValue('e'))); } + + int vertexOutputClassOption = + cmd.hasOption('o') ? Integer.parseInt(cmd.getOptionValue('o')) : -1; + if (vertexOutputClassOption == 0) { + LOG.info("Using vertex output format class " + + JsonBase64VertexOutputFormat.class.getName()); + configuration.setVertexOutputFormatClass( + JsonBase64VertexOutputFormat.class); + } } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java index 83fba57..2281903 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java @@ -18,13 +18,11 @@ package org.apache.giraph.bsp; +import java.util.List; import org.apache.giraph.worker.WorkerInfo; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import java.io.IOException; -import java.util.List; - /** * Basic service interface shared by both {@link CentralizedServiceMaster} and * {@link CentralizedServiceWorker}. @@ -68,12 +66,4 @@ public interface CentralizedService<I extends WritableComparable, * @return List of workers */ List<WorkerInfo> getWorkerInfoList(); - - /** - * Clean up the service (no calls may be issued after this) - * - * @throws IOException - * @throws InterruptedException - */ - void cleanup() throws IOException, InterruptedException; } http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java index 399dc72..5f84ece 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java @@ -139,4 +139,14 @@ public interface CentralizedServiceMaster<I extends WritableComparable, * @param e Exception job failed from. May be null. */ void failureCleanup(Exception e); + + + /** + * Clean up the service (no calls may be issued after this) + * + * @throws IOException + * @throws InterruptedException + */ + void cleanup() + throws IOException, InterruptedException; } http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java index 294c2c7..30d4462 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java @@ -233,4 +233,14 @@ public interface CentralizedServiceWorker<I extends WritableComparable, * TODO how to avoid this additional function */ void prepareSuperstep(); + + /** + * Clean up the service (no calls may be issued after this) + * + * @param finishedSuperstepStats Finished supestep stats + * @throws IOException + * @throws InterruptedException + */ + void cleanup(FinishedSuperstepStats finishedSuperstepStats) + throws IOException, InterruptedException; } http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java index d888d10..c351778 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java @@ -18,27 +18,55 @@ package org.apache.giraph.graph; /** - * Immutable results of finishSuperste() + * Immutable graph stats after the completion of a superstep */ public class FinishedSuperstepStats extends VertexEdgeCount { + /** Number of local vertices */ + private final long localVertexCount; /** Are all the graph vertices halted? */ private final boolean allVerticesHalted; + /** Needs to load a checkpoint */ + private final boolean mustLoadCheckpoint; /** * Constructor. * + * @param numLocalVertices Number of local vertices * @param allVerticesHalted Are all the vertices halted * @param numVertices Number of vertices * @param numEdges Number of edges + * @param mustLoadCheckpoint Has to load a checkpoint? */ - public FinishedSuperstepStats(boolean allVerticesHalted, + public FinishedSuperstepStats(long numLocalVertices, + boolean allVerticesHalted, long numVertices, - long numEdges) { + long numEdges, + boolean mustLoadCheckpoint) { super(numVertices, numEdges); + this.localVertexCount = numLocalVertices; this.allVerticesHalted = allVerticesHalted; + this.mustLoadCheckpoint = mustLoadCheckpoint; } - public boolean getAllVerticesHalted() { + public long getLocalVertexCount() { + return localVertexCount; + } + + /** + * Are all the vertices halted? + * + * @return True if all halted, false otherwise + */ + public boolean allVerticesHalted() { return allVerticesHalted; } + + /** + * Must load the checkpoint? + * + * @return True if the checkpoint must be loaded, false otherwise + */ + public boolean mustLoadCheckpoint() { + return mustLoadCheckpoint; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 401e07b..4ede8bb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -124,10 +124,9 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, private boolean done = false; /** What kind of functions is this mapper doing? */ private GraphFunctions graphFunctions = GraphFunctions.UNKNOWN; - /** Total number of vertices in the graph (at this time) */ - private long numVertices = -1; - /** Total number of edges in the graph (at this time) */ - private long numEdges = -1; + /** Superstep stats */ + private FinishedSuperstepStats finishedSuperstepStats = + new FinishedSuperstepStats(0, false, 0, 0, false); // Per-Job Metrics /** Timer for WorkerContext#preApplication() */ @@ -224,15 +223,14 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, if (checkTaskState()) { return; } - FinishedSuperstepStats inputSuperstepStats = serviceWorker.setup(); - if (collectInputSuperstepStats(inputSuperstepStats)) { + finishedSuperstepStats = serviceWorker.setup(); + if (collectInputSuperstepStats(finishedSuperstepStats)) { return; } WorkerAggregatorUsage aggregatorUsage = prepareAggregatorsAndGraphState(); List<PartitionStats> partitionStatsList = new ArrayList<PartitionStats>(); int numComputeThreads = conf.getNumComputeThreads(); - FinishedSuperstepStats finishedSuperstepStats = null; // main superstep processing loop do { @@ -240,7 +238,9 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, GiraphTimerContext superstepTimerContext = getTimerForThisSuperstep(superstep); GraphState<I, V, E, M> graphState = - new GraphState<I, V, E, M>(superstep, numVertices, numEdges, + new GraphState<I, V, E, M>(superstep, + finishedSuperstepStats.getVertexCount(), + finishedSuperstepStats.getEdgeCount(), context, this, null, aggregatorUsage); Collection<? extends PartitionOwner> masterAssignedPartitionOwners = serviceWorker.startSuperstep(graphState); @@ -273,7 +273,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, finishedSuperstepStats = completeSuperstepAndCollectStats( partitionStatsList, superstepTimerContext, graphState); // END of superstep compute loop - } while (!finishedSuperstepStats.getAllVerticesHalted()); + } while (!finishedSuperstepStats.allVerticesHalted()); if (LOG.isInfoEnabled()) { LOG.info("execute: BSP application done (global vertices marked done)"); @@ -335,7 +335,9 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, WorkerAggregatorUsage aggregatorUsage) { serviceWorker.getWorkerContext().setGraphState( new GraphState<I, V, E, M>(serviceWorker.getSuperstep(), - numVertices, numEdges, context, this, null, aggregatorUsage)); + finishedSuperstepStats.getVertexCount(), + finishedSuperstepStats.getEdgeCount(), context, this, null, + aggregatorUsage)); } /** @@ -350,11 +352,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, List<PartitionStats> partitionStatsList, GiraphTimerContext superstepTimerContext, GraphState<I, V, E, M> graphState) { - FinishedSuperstepStats finishedSuperstepStats; finishedSuperstepStats = serviceWorker.finishSuperstep(graphState, partitionStatsList); - numVertices = finishedSuperstepStats.getVertexCount(); - numEdges = finishedSuperstepStats.getEdgeCount(); superstepTimerContext.stop(); if (conf.metricsEnabled()) { GiraphMetrics.get().perSuperstep().printSummary(); @@ -747,10 +746,13 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, } VertexEdgeCount vertexEdgeCount = serviceWorker.loadCheckpoint( serviceWorker.getRestartedSuperstep()); - numVertices = vertexEdgeCount.getVertexCount(); - numEdges = vertexEdgeCount.getEdgeCount(); - graphState = new GraphState<I, V, E, M>(superstep, numVertices, - numEdges, context, this, null, aggregatorUsage); + finishedSuperstepStats = new FinishedSuperstepStats(0, false, + vertexEdgeCount.getVertexCount(), vertexEdgeCount.getEdgeCount(), + false); + graphState = new GraphState<I, V, E, M>(superstep, + finishedSuperstepStats.getVertexCount(), + finishedSuperstepStats.getEdgeCount(), + context, this, null, aggregatorUsage); } else if (serviceWorker.checkpointFrequencyMet(superstep)) { serviceWorker.storeCheckpoint(); } @@ -767,9 +769,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, */ private boolean collectInputSuperstepStats( FinishedSuperstepStats inputSuperstepStats) { - numVertices = inputSuperstepStats.getVertexCount(); - numEdges = inputSuperstepStats.getEdgeCount(); - if (inputSuperstepStats.getVertexCount() == 0) { + if (inputSuperstepStats.getVertexCount() == 0 && + !inputSuperstepStats.mustLoadCheckpoint()) { LOG.warn("map: No vertices in the graph, exiting."); return true; } @@ -833,7 +834,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, } if (serviceWorker != null) { - serviceWorker.cleanup(); + serviceWorker.cleanup(finishedSuperstepStats); } try { if (masterThread != null) { http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 7ad2902..677ab82 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -461,7 +461,7 @@ public class BspServiceMaster<I extends WritableComparable, LOG.info("checkWorkers: Only found " + totalResponses + " responses of " + maxWorkers + " needed to start superstep " + - getSuperstep() + ". Reporting every" + + getSuperstep() + ". Reporting every " + eventWaitMsecs + " msecs, " + (failWorkerCheckMsecs - SystemTime.get().getMilliseconds()) + " more msecs left before giving up."); http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/utils/LoggerUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/LoggerUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/LoggerUtils.java index 81dfd1d..72b6c23 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/LoggerUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/LoggerUtils.java @@ -32,6 +32,25 @@ public class LoggerUtils { private LoggerUtils() { } /** + * Helper method to set the status and log message together if condition + * has been been met. + * + * @param condition Must be true to write status and log + * @param context Context to set the status with + * @param logger Logger to write to + * @param level Level of logging + * @param message Message to set status with + */ + public static void conditionalSetStatusAndLog( + boolean condition, + TaskAttemptContext context, Logger logger, Level level, + String message) { + if (condition) { + setStatusAndLog(context, logger, level, message); + } + } + + /** * Helper method to set the status and log message together. * * @param context Context to set the status with http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index d5ad62b..f542344 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -432,7 +432,7 @@ public class BspServiceWorker<I extends WritableComparable, // 6. Wait for superstep INPUT_SUPERSTEP to complete. if (getRestartedSuperstep() != UNSET_SUPERSTEP) { setCachedSuperstep(getRestartedSuperstep()); - return new FinishedSuperstepStats(false, -1, -1); + return new FinishedSuperstepStats(0, false, 0, 0, true); } JSONObject jobState = getJobState(); @@ -449,7 +449,7 @@ public class BspServiceWorker<I extends WritableComparable, getApplicationAttempt()); } setRestartedSuperstep(getSuperstep()); - return new FinishedSuperstepStats(false, -1, -1); + return new FinishedSuperstepStats(0, false, 0, 0, true); } } catch (JSONException e) { throw new RuntimeException( @@ -722,8 +722,10 @@ else[HADOOP_NON_SECURE]*/ graphState.getGraphTaskManager().notifyFinishedCommunication(); long workerSentMessages = 0; + long localVertices = 0; for (PartitionStats partitionStats : partitionStatsList) { workerSentMessages += partitionStats.getMessagesSentCount(); + localVertices += partitionStats.getVertexCount(); } if (getSuperstep() != INPUT_SUPERSTEP) { @@ -770,9 +772,11 @@ else[HADOOP_NON_SECURE]*/ ", Superstep=" + getSuperstep()); return new FinishedSuperstepStats( + localVertices, globalStats.getHaltComputation(), globalStats.getVertexCount(), - globalStats.getEdgeCount()); + globalStats.getEdgeCount(), + false); } /** @@ -865,9 +869,12 @@ else[HADOOP_NON_SECURE]*/ /** * Save the vertices using the user-defined VertexOutputFormat from our * vertexArray based on the split. + * + * @param numLocalVertices Number of local vertices * @throws InterruptedException */ - private void saveVertices() throws IOException, InterruptedException { + private void saveVertices(long numLocalVertices) throws IOException, + InterruptedException { if (getConfiguration().getVertexOutputFormatClass() == null) { LOG.warn("saveVertices: " + GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS + @@ -876,19 +883,39 @@ else[HADOOP_NON_SECURE]*/ } LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, - "saveVertices: Starting to save vertices"); + "saveVertices: Starting to save " + numLocalVertices + " vertices"); VertexOutputFormat<I, V, E> vertexOutputFormat = getConfiguration().createVertexOutputFormat(); VertexWriter<I, V, E> vertexWriter = vertexOutputFormat.createVertexWriter(getContext()); vertexWriter.initialize(getContext()); + long verticesWritten = 0; + long nextPrintVertices = 0; + long nextPrintMsecs = System.currentTimeMillis() + 15000; + int partitionIndex = 0; + int numPartitions = getPartitionStore().getNumPartitions(); for (Partition<I, V, E, M> partition : getPartitionStore().getPartitions()) { for (Vertex<I, V, E, M> vertex : partition) { getContext().progress(); vertexWriter.writeVertex(vertex); + ++verticesWritten; + + // Update status at most every 250k vertices or 15 seconds + if (verticesWritten > nextPrintVertices && + System.currentTimeMillis() > nextPrintMsecs) { + LoggerUtils.setStatusAndLog( + getContext(), LOG, Level.INFO, + "saveVertices: Saved " + + verticesWritten + " out of " + numLocalVertices + + " vertices, on partition " + partitionIndex + " out of " + + numPartitions); + nextPrintMsecs = System.currentTimeMillis() + 15000; + nextPrintVertices = verticesWritten + 250000; + } } getContext().progress(); + ++partitionIndex; } vertexWriter.close(getContext()); LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, @@ -896,10 +923,11 @@ else[HADOOP_NON_SECURE]*/ } @Override - public void cleanup() throws IOException, InterruptedException { + public void cleanup(FinishedSuperstepStats finishedSuperstepStats) + throws IOException, InterruptedException { workerClient.closeConnections(); setCachedSuperstep(getSuperstep() - 1); - saveVertices(); + saveVertices(finishedSuperstepStats.getLocalVertexCount()); // All worker processes should denote they are done by adding special // znode. Once the number of znodes equals the number of partitions // for workers and masters, the master will clean up the ZooKeeper @@ -1037,7 +1065,7 @@ else[HADOOP_NON_SECURE]*/ CreateMode.PERSISTENT, true); } catch (KeeperException.NodeExistsException e) { - LOG.warn("finishSuperstep: wrote checkpoint worker path " + + LOG.warn("storeCheckpoint: wrote checkpoint worker path " + workerWroteCheckpoint + " already exists!"); } catch (KeeperException e) { throw new IllegalStateException("Creating " + workerWroteCheckpoint + http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java index 7522027..a4f98e1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java @@ -18,17 +18,20 @@ package org.apache.giraph.worker; +import com.yammer.metrics.core.Counter; +import java.io.IOException; +import java.util.List; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.GraphState; -import org.apache.giraph.vertex.Vertex; import org.apache.giraph.graph.VertexEdgeCount; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexReader; -import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.metrics.GiraphMetrics; import org.apache.giraph.metrics.GiraphMetricsRegistry; +import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.utils.LoggerUtils; import org.apache.giraph.utils.MemoryUtils; +import org.apache.giraph.vertex.Vertex; import org.apache.giraph.zk.ZooKeeperExt; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -37,11 +40,6 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Level; import org.apache.log4j.Logger; -import com.yammer.metrics.core.Counter; - -import java.io.IOException; -import java.util.List; - /** * Load as many vertex input splits as possible. * Every thread will has its own instance of WorkerClientRequestProcessor @@ -131,6 +129,8 @@ public class VertexInputSplitsCallable<I extends WritableComparable, vertexReader.initialize(inputSplit, context); long inputSplitVerticesLoaded = 0; long inputSplitEdgesLoaded = 0; + long nextPrintVertices = 0; + long nextPrintMsecs = System.currentTimeMillis() + 15000; while (vertexReader.nextVertex()) { Vertex<I, V, E, M> readerVertex = vertexReader.getCurrentVertex(); @@ -153,14 +153,20 @@ public class VertexInputSplitsCallable<I extends WritableComparable, ++inputSplitVerticesLoaded; inputSplitEdgesLoaded += readerVertex.getNumEdges(); - // Update status every 250k vertices - if (((inputSplitVerticesLoaded + totalVerticesLoaded) % 250000) == 0) { - LoggerUtils.setStatusAndLog(context, LOG, Level.INFO, + // Update status at most every 250k vertices or 15 seconds + if ((inputSplitVerticesLoaded + totalVerticesLoaded) > + nextPrintVertices && + System.currentTimeMillis() > nextPrintMsecs) { + LoggerUtils.setStatusAndLog( + context, LOG, Level.INFO, "readInputSplit: Loaded " + (inputSplitVerticesLoaded + totalVerticesLoaded) + " vertices " + (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " + MemoryUtils.getRuntimeMemoryStats()); + nextPrintMsecs = System.currentTimeMillis() + 15000; + nextPrintVertices = inputSplitVerticesLoaded + totalVerticesLoaded + + 250000; } // For sampling, or to limit outlier input splits, the number of
