Updated Branches: refs/heads/trunk d1a061e1a -> 7cc54575d
GIRAPH-792: Print job progress to command line (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7cc54575 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7cc54575 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7cc54575 Branch: refs/heads/trunk Commit: 7cc54575d867e37a43020df309a78cd65c3fbdc0 Parents: d1a061e Author: Maja Kabiljo <[email protected]> Authored: Tue Jan 28 11:49:58 2014 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Tue Jan 28 11:56:10 2014 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 + .../java/org/apache/giraph/bsp/BspService.java | 13 +- .../apache/giraph/conf/GiraphConfiguration.java | 9 + .../org/apache/giraph/conf/GiraphConstants.java | 11 + .../apache/giraph/graph/ComputeCallable.java | 12 + .../apache/giraph/graph/GraphTaskManager.java | 23 +- .../giraph/job/CombinedWorkerProgress.java | 118 ++++++ .../java/org/apache/giraph/job/GiraphJob.java | 5 + .../apache/giraph/job/HaltApplicationUtils.java | 56 +-- .../apache/giraph/job/JobProgressTracker.java | 149 ++++++++ .../apache/giraph/master/BspServiceMaster.java | 8 +- .../org/apache/giraph/utils/CounterUtils.java | 57 +++ .../apache/giraph/worker/BspServiceWorker.java | 37 +- .../giraph/worker/EdgeInputSplitsCallable.java | 5 + .../worker/VertexInputSplitsCallable.java | 6 + .../apache/giraph/worker/WorkerProgress.java | 369 +++++++++++++++++++ .../giraph/worker/WorkerProgressWriter.java | 74 ++++ 17 files changed, 889 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 7435a92..971ab46 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-792: Print job progress to command line (majakabiljo) + GIRAPH-831: waitUntilAllTasksDone waits forever (without debug information) (aching) GIRAPH-830: directMemory used in netty message (pavanka via aching) http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java index 86823ed..ec0ddbb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java @@ -137,6 +137,8 @@ public abstract class BspService<I extends WritableComparable, "/_partitionExchangeDir"; /** Denotes that the superstep is done */ public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished"; + /** Stores progress info for workers */ + public static final String WORKER_PROGRESSES = "/_workerProgresses"; /** Denotes that computation should be halted */ public static final String HALT_COMPUTATION_NODE = "/_haltComputation"; /** Denotes which workers have been cleaned up */ @@ -202,6 +204,8 @@ public abstract class BspService<I extends WritableComparable, protected final String checkpointBasePath; /** Path to the master election path */ protected final String masterElectionPath; + /** Stores progress info of this worker */ + protected final String myProgressPath; /** If this path exists computation will be halted */ protected final String haltComputationPath; /** Private ZooKeeper instance that implements the service */ @@ -253,11 +257,10 @@ public abstract class BspService<I extends WritableComparable, /** * Constructor. * - * @param sessionMsecTimeout ZooKeeper session timeount in milliseconds * @param context Mapper context * @param graphTaskManager GraphTaskManager for this compute node */ - public BspService(int sessionMsecTimeout, + public BspService( Mapper<?, ?, ?, ?>.Context context, GraphTaskManager<I, V, E> graphTaskManager) { this.vertexInputSplitsEvents = new InputSplitEvents(context); @@ -307,6 +310,8 @@ public abstract class BspService<I extends WritableComparable, this.checkpointFrequency = conf.getCheckpointFrequency(); basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId; + getContext().getCounter(GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP, + basePath); masterJobStatePath = basePath + MASTER_JOB_STATE_NODE; vertexInputSplitsPaths = new InputSplitPaths(basePath, VERTEX_INPUT_SPLIT_DIR, VERTEX_INPUT_SPLIT_DONE_DIR, @@ -320,6 +325,7 @@ public abstract class BspService<I extends WritableComparable, CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(), CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + getJobId()); masterElectionPath = basePath + MASTER_ELECTION_DIR; + myProgressPath = basePath + WORKER_PROGRESSES + "/" + taskPartition; String serverPortList = conf.getZookeeperList(); haltComputationPath = basePath + HALT_COMPUTATION_NODE; getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP, @@ -333,7 +339,7 @@ public abstract class BspService<I extends WritableComparable, } try { this.zk = new ZooKeeperExt(serverPortList, - sessionMsecTimeout, + conf.getZooKeeperSessionTimeout(), conf.getZookeeperOpsMaxAttempts(), conf.getZookeeperOpsRetryWaitMsecs(), this, @@ -345,7 +351,6 @@ public abstract class BspService<I extends WritableComparable, } } - /** * Get the superstep from a ZooKeeper path * http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 8cf403a..abc81e8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -1197,4 +1197,13 @@ public class GiraphConfiguration extends Configuration public void setWaitTaskDoneTimeoutMs(int ms) { WAIT_TASK_DONE_TIMEOUT_MS.set(this, ms); } + + /** + * Check whether to track job progress on client or not + * + * @return True if job progress should be tracked on client + */ + public boolean trackJobProgressOnClient() { + return TRACK_JOB_PROGRESS_ON_CLIENT.get(this); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 4e68308..9271152 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -996,6 +996,12 @@ public interface GiraphConstants { String ZOOKEEPER_HALT_NODE_COUNTER_GROUP = "Zookeeper halt node"; /** + * This counter group will contain one counter whose name is the ZooKeeper + * node path which contains all data about this job + */ + String ZOOKEEPER_BASE_PATH_COUNTER_GROUP = "Zookeeper base path"; + + /** * Which class to use to write instructions on how to halt the application */ ClassConfOption<HaltApplicationUtils.HaltInstructionsWriter> @@ -1013,5 +1019,10 @@ public interface GiraphConstants { new IntConfOption("giraph.waitTaskDoneTimeoutMs", MINUTES.toMillis(15), "Maximum timeout (in ms) for waiting for all all tasks to " + "complete"); + + /** Whether to track job progress on client or not */ + BooleanConfOption TRACK_JOB_PROGRESS_ON_CLIENT = + new BooleanConfOption("giraph.trackJobProgressOnClient", true, + "Whether to track job progress on client or not"); } // CHECKSTYLE: resume InterfaceIsTypeCheck http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java index 1fe1d10..0303530 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java @@ -35,6 +35,7 @@ import org.apache.giraph.time.Times; import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.utils.TimedLogger; import org.apache.giraph.worker.WorkerContext; +import org.apache.giraph.worker.WorkerProgress; import org.apache.giraph.worker.WorkerThreadAggregatorUsage; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -74,6 +75,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, private static final Logger LOG = Logger.getLogger(ComputeCallable.class); /** Class time object */ private static final Time TIME = SystemTime.get(); + /** How often to update WorkerProgress */ + private static final long VERTICES_TO_UPDATE_PROGRESS = 100000; /** Context */ private final Mapper<?, ?, ?, ?>.Context context; /** Graph state */ @@ -229,6 +232,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, Partition<I, V, E> partition) throws IOException, InterruptedException { PartitionStats partitionStats = new PartitionStats(partition.getId(), 0, 0, 0, 0, 0); + long verticesComputedProgress = 0; // Make sure this is thread-safe across runs synchronized (partition) { for (Vertex<I, V, E> vertex : partition) { @@ -260,10 +264,18 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, // Add statistics for this vertex partitionStats.incrVertexCount(); partitionStats.addEdgeCount(vertex.getNumEdges()); + + verticesComputedProgress++; + if (verticesComputedProgress == VERTICES_TO_UPDATE_PROGRESS) { + WorkerProgress.get().addVerticesComputed(verticesComputedProgress); + verticesComputedProgress = 0; + } } messageStore.clearPartition(partition.getId()); } + WorkerProgress.get().addVerticesComputed(verticesComputedProgress); + WorkerProgress.get().incrementPartitionsComputed(); return partitionStats; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 0617973..a84ac66 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -45,6 +45,7 @@ import org.apache.giraph.worker.BspServiceWorker; import org.apache.giraph.worker.InputSplitsCallable; import org.apache.giraph.worker.WorkerContext; import org.apache.giraph.worker.WorkerObserver; +import org.apache.giraph.worker.WorkerProgress; import org.apache.giraph.zk.ZooKeeperManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -215,9 +216,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, Thread.sleep(GiraphConstants.DEFAULT_ZOOKEEPER_INIT_LIMIT * GiraphConstants.DEFAULT_ZOOKEEPER_TICK_TIME); } - int sessionMsecTimeout = conf.getZooKeeperSessionTimeout(); try { - instantiateBspService(sessionMsecTimeout); + instantiateBspService(); } catch (IOException e) { LOG.error("setup: Caught exception just before end of setup", e); if (zkManager != null) { @@ -537,17 +537,15 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, /** * Instantiate the appropriate BspService object (Master or Worker) * for this compute node. - * @param sessionMsecTimeout configurable session timeout */ - private void instantiateBspService(int sessionMsecTimeout) + private void instantiateBspService() throws IOException, InterruptedException { if (graphFunctions.isMaster()) { if (LOG.isInfoEnabled()) { LOG.info("setup: Starting up BspServiceMaster " + "(master thread)..."); } - serviceMaster = new BspServiceMaster<I, V, E>( - sessionMsecTimeout, context, this); + serviceMaster = new BspServiceMaster<I, V, E>(context, this); masterThread = new MasterThread<I, V, E>(serviceMaster, context); masterThread.start(); } @@ -555,8 +553,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, if (LOG.isInfoEnabled()) { LOG.info("setup: Starting up BspServiceWorker..."); } - serviceWorker = new BspServiceWorker<I, V, E>( - sessionMsecTimeout, context, this); + serviceWorker = new BspServiceWorker<I, V, E>(context, this); if (LOG.isInfoEnabled()) { LOG.info("setup: Registering health of this worker..."); } @@ -711,10 +708,18 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, int numThreads) { final BlockingQueue<Integer> computePartitionIdQueue = new ArrayBlockingQueue<Integer>(numPartitions); + long verticesToCompute = 0; for (Integer partitionId : serviceWorker.getPartitionStore().getPartitionIds()) { computePartitionIdQueue.add(partitionId); - } + verticesToCompute += + serviceWorker.getPartitionStore().getOrCreatePartition( + partitionId).getVertexCount(); + } + WorkerProgress.get().startSuperstep( + serviceWorker.getSuperstep(), + verticesToCompute, + serviceWorker.getPartitionStore().getNumPartitions()); GiraphTimerContext computeAllTimerContext = computeAll.time(); timeToFirstMessageTimerContext = timeToFirstMessage.time(); http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java new file mode 100644 index 0000000..0810040 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.job; + +import org.apache.giraph.worker.WorkerProgress; + +/** + * Class which combines multiple workers' progresses to get overall + * application progress + */ +public class CombinedWorkerProgress extends WorkerProgress { + /** + * How many workers have reported that they are in highest reported + * superstep + */ + private int workersInSuperstep = 0; + /** + * How many workers reported that they finished application + */ + private int workersDone = 0; + + /** + * Constructor + * + * @param workerProgresses Worker progresses to combine + */ + public CombinedWorkerProgress(Iterable<WorkerProgress> workerProgresses) { + for (WorkerProgress workerProgress : workerProgresses) { + if (workerProgress.getCurrentSuperstep() > currentSuperstep) { + verticesToCompute = 0; + verticesComputed = 0; + partitionsToCompute = 0; + partitionsComputed = 0; + currentSuperstep = workerProgress.getCurrentSuperstep(); + workersInSuperstep = 0; + } + + if (workerProgress.getCurrentSuperstep() == currentSuperstep) { + workersInSuperstep++; + if (isInputSuperstep()) { + verticesLoaded += workerProgress.getVerticesLoaded(); + vertexInputSplitsLoaded += + workerProgress.getVertexInputSplitsLoaded(); + edgesLoaded += workerProgress.getEdgesLoaded(); + edgeInputSplitsLoaded += workerProgress.getEdgeInputSplitsLoaded(); + } else if (isComputeSuperstep()) { + verticesToCompute += workerProgress.getVerticesToCompute(); + verticesComputed += workerProgress.getVerticesComputed(); + partitionsToCompute += workerProgress.getPartitionsToCompute(); + partitionsComputed += workerProgress.getPartitionsComputed(); + } else if (isOutputSuperstep()) { + verticesToStore += workerProgress.getVerticesToStore(); + verticesStored += workerProgress.getVerticesStored(); + partitionsToStore += workerProgress.getPartitionsToStore(); + partitionsStored += workerProgress.getPartitionsStored(); + } + } + + if (workerProgress.isStoringDone()) { + workersDone++; + } + } + } + + /** + * Is the application done + * + * @param expectedWorkersDone Number of workers which should be done in + * order for application to be done + * @return True if application is done + */ + public boolean isDone(int expectedWorkersDone) { + return workersDone == expectedWorkersDone; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Data from ").append(workersInSuperstep).append(" workers - "); + if (isInputSuperstep()) { + sb.append("Loading data: "); + sb.append(verticesLoaded).append(" vertices loaded, "); + sb.append(vertexInputSplitsLoaded).append( + " vertex input splits loaded; "); + sb.append(edgesLoaded).append(" edges loaded, "); + sb.append(edgeInputSplitsLoaded).append(" edge input splits loaded"); + } else if (isComputeSuperstep()) { + sb.append("Compute superstep ").append(currentSuperstep).append(": "); + sb.append(verticesComputed).append(" out of ").append( + verticesToCompute).append(" vertices computed; "); + sb.append(partitionsComputed).append(" out of ").append( + partitionsToCompute).append(" partitions computed"); + } else if (isOutputSuperstep()) { + sb.append("Storing data: "); + sb.append(verticesStored).append(" out of ").append( + verticesToStore).append(" vertices stored; "); + sb.append(partitionsStored).append(" out of ").append( + partitionsToStore).append(" partitions stored"); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java index 40670bb..4a1f02e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java @@ -252,9 +252,14 @@ public class GiraphJob { LOG.info("run: Tracking URL: " + submittedJob.getTrackingURL()); } HaltApplicationUtils.printHaltInfo(submittedJob, conf); + JobProgressTracker jobProgressTracker = conf.trackJobProgressOnClient() ? + new JobProgressTracker(submittedJob, conf) : null; jobObserver.jobRunning(submittedJob); boolean passed = submittedJob.waitForCompletion(verbose); + if (jobProgressTracker != null) { + jobProgressTracker.stop(); + } jobObserver.jobFinished(submittedJob, passed); if (passed || !retryChecker.shouldRetry(submittedJob, tryCount)) { return passed; http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java b/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java index 28b5781..8150de6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java @@ -20,69 +20,33 @@ package org.apache.giraph.job; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.utils.CounterUtils; import org.apache.hadoop.mapreduce.Job; import org.apache.log4j.Logger; -import java.io.IOException; - /** * Utility methods for halting application while running */ public class HaltApplicationUtils { - /** Milliseconds to sleep for while waiting for halt info */ - private static final int SLEEP_MSECS = 100; - /** Do not instantiate */ private HaltApplicationUtils() { } /** - * Wait for halt info (zk server and node) to become available - * - * @param submittedJob Submitted job - * @return True if halt info became available, false if job completed - * before it became available - */ - private static boolean waitForHaltInfo(Job submittedJob) throws IOException { - try { - while (submittedJob.getCounters().getGroup( - GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP).size() == 0) { - if (submittedJob.isComplete()) { - return false; - } - Thread.sleep(SLEEP_MSECS); - } - while (submittedJob.getCounters().getGroup( - GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP).size() == 0) { - if (submittedJob.isComplete()) { - return false; - } - Thread.sleep(SLEEP_MSECS); - } - } catch (InterruptedException e) { - throw new IllegalStateException( - "waitForHaltInfo: InterruptedException occurred", e); - } - return true; - } - - /** * Wait for halt info to become available and print instructions on how to * halt * * @param submittedJob Submitted job - * @param conf Configuration + * @param conf Configuration */ public static void printHaltInfo(Job submittedJob, - GiraphConfiguration conf) throws IOException { - if (waitForHaltInfo(submittedJob)) { - String zkServer = submittedJob.getCounters().getGroup( - GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP).iterator() - .next().getName(); - String haltNode = submittedJob.getCounters().getGroup( - GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP).iterator() - .next().getName(); - GiraphConstants.HALT_INSTRUCTIONS_WRITER_CLASS.newInstance(conf) - .writeHaltInstructions(zkServer, haltNode); + GiraphConfiguration conf) { + String zkServer = CounterUtils.waitAndGetCounterNameFromGroup( + submittedJob, GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP); + String haltNode = CounterUtils.waitAndGetCounterNameFromGroup( + submittedJob, GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP); + if (zkServer != null && haltNode != null) { + GiraphConstants.HALT_INSTRUCTIONS_WRITER_CLASS.newInstance( + conf).writeHaltInstructions(zkServer, haltNode); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java new file mode 100644 index 0000000..f685344 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.job; + +import org.apache.giraph.bsp.BspService; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.utils.CounterUtils; +import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.worker.WorkerProgress; +import org.apache.giraph.zk.ZooKeeperExt; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.Progressable; +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Class which tracks job's progress on client + */ +public class JobProgressTracker implements Watcher { + /** Class logger */ + private static final Logger LOG = Logger.getLogger(JobProgressTracker.class); + /** How often to print job's progress */ + private static final int UPDATE_MILLISECONDS = 5 * 1000; + /** Thread which periodically writes job's progress */ + private Thread writerThread; + /** ZooKeeperExt */ + private ZooKeeperExt zk; + /** Whether application is finished */ + private volatile boolean finished = false; + + /** + * Constructor + * + * @param submittedJob Job to track + * @param conf Configuration + */ + public JobProgressTracker(final Job submittedJob, + final GiraphConfiguration conf) throws IOException, InterruptedException { + String zkServer = CounterUtils.waitAndGetCounterNameFromGroup( + submittedJob, GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP); + final String basePath = CounterUtils.waitAndGetCounterNameFromGroup( + submittedJob, GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP); + // Connect to ZooKeeper + zk = new ZooKeeperExt( + zkServer, + conf.getZooKeeperSessionTimeout(), + conf.getZookeeperOpsMaxAttempts(), + conf.getZookeeperOpsRetryWaitMsecs(), + this, + new Progressable() { + @Override + public void progress() { + } + }); + writerThread = new Thread(new Runnable() { + @Override + public void run() { + String workerProgressBasePath = basePath + BspService.WORKER_PROGRESSES; + try { + while (!finished) { + if (zk.exists(workerProgressBasePath, false) != null) { + // Get locations of all worker progresses + List<String> workerProgressPaths = zk.getChildrenExt( + workerProgressBasePath, false, false, true); + List<WorkerProgress> workerProgresses = + new ArrayList<WorkerProgress>(workerProgressPaths.size()); + // Read all worker progresses + for (String workerProgressPath : workerProgressPaths) { + WorkerProgress workerProgress = new WorkerProgress(); + byte[] zkData = zk.getData(workerProgressPath, false, null); + WritableUtils.readFieldsFromByteArray(zkData, workerProgress); + workerProgresses.add(workerProgress); + } + // Combine and log + CombinedWorkerProgress combinedWorkerProgress = + new CombinedWorkerProgress(workerProgresses); + if (LOG.isInfoEnabled()) { + LOG.info(combinedWorkerProgress.toString()); + } + // Check if application is done + if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) { + break; + } + } + Thread.sleep(UPDATE_MILLISECONDS); + } + } catch (InterruptedException | KeeperException e) { + if (LOG.isInfoEnabled()) { + LOG.info("run: Exception occurred", e); + } + } finally { + try { + // Create a node so master knows we stopped communicating with + // ZooKeeper and it's safe to cleanup + zk.createExt( + basePath + BspService.CLEANED_UP_DIR + "/client", + null, + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + zk.close(); + } catch (InterruptedException | KeeperException e) { + if (LOG.isInfoEnabled()) { + LOG.info("run: Exception occurred", e); + } + } + } + } + }); + writerThread.start(); + } + + /** + * Stop the thread which logs application progress + */ + public void stop() { + finished = true; + } + + @Override + public void process(WatchedEvent event) { + } +} + http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 78487ef..cfee4c5 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -187,15 +187,13 @@ public class BspServiceMaster<I extends WritableComparable, /** * Constructor for setting up the master. * - * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper * @param context Mapper context * @param graphTaskManager GraphTaskManager for this compute node */ public BspServiceMaster( - int sessionMsecTimeout, Mapper<?, ?, ?, ?>.Context context, GraphTaskManager<I, V, E> graphTaskManager) { - super(sessionMsecTimeout, context, graphTaskManager); + super(context, graphTaskManager); workerWroteCheckpoint = new PredicateLock(context); registerBspEvent(workerWroteCheckpoint); superstepStateChanged = new PredicateLock(context); @@ -1725,6 +1723,10 @@ public class BspServiceMaster<I extends WritableComparable, GraphFunctions.ALL_EXCEPT_ZOOKEEPER)) { maxTasks *= 2; } + if (getConfiguration().trackJobProgressOnClient()) { + // For job client + maxTasks++; + } List<String> cleanedUpChildrenList = null; while (true) { try { http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/utils/CounterUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/CounterUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/CounterUtils.java new file mode 100644 index 0000000..afec660 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/CounterUtils.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import org.apache.hadoop.mapreduce.Job; + +import java.io.IOException; + +/** Utility methods for dealing with counters */ +public class CounterUtils { + /** Milliseconds to sleep for while waiting for counter to appear */ + private static final int SLEEP_MSECS = 100; + + /** Do not instantiate */ + private CounterUtils() { + } + + /** + * Wait for a counter to appear in a group and then return the name of that + * counter. If job finishes before counter appears, return null. + * + * @param job Job + * @param group Name of the counter group + * @return Name of the counter inside of the group, or null if job finishes + * before counter appears + */ + public static String waitAndGetCounterNameFromGroup(Job job, String group) { + try { + while (job.getCounters().getGroup(group).size() == 0) { + if (job.isComplete()) { + return null; + } + Thread.sleep(SLEEP_MSECS); + } + return job.getCounters().getGroup(group).iterator().next().getName(); + } catch (IOException | InterruptedException e) { + throw new IllegalStateException( + "waitAndGetCounterNameFromGroup: Exception occurred", e); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index bc29b03..13de188 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -159,6 +159,8 @@ public class BspServiceWorker<I extends WritableComparable, /** array of observers to call back to */ private final WorkerObserver[] observers; + /** Writer for worker progress */ + private final WorkerProgressWriter workerProgressWriter; // Per-Superstep Metrics /** Timer for WorkerContext#postSuperstep */ @@ -169,18 +171,16 @@ public class BspServiceWorker<I extends WritableComparable, /** * Constructor for setting up the worker. * - * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper * @param context Mapper context * @param graphTaskManager GraphTaskManager for this compute node * @throws IOException * @throws InterruptedException */ public BspServiceWorker( - int sessionMsecTimeout, Mapper<?, ?, ?, ?>.Context context, GraphTaskManager<I, V, E> graphTaskManager) throws IOException, InterruptedException { - super(sessionMsecTimeout, context, graphTaskManager); + super(context, graphTaskManager); ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration(); partitionExchangeChildrenChanged = new PredicateLock(context); registerBspEvent(partitionExchangeChildrenChanged); @@ -207,6 +207,9 @@ public class BspServiceWorker<I extends WritableComparable, } observers = conf.createWorkerObservers(); + workerProgressWriter = conf.trackJobProgressOnClient() ? + new WorkerProgressWriter(myProgressPath, getZkExt()) : null; + GiraphMetrics.get().addSuperstepResetObserver(this); } @@ -515,6 +518,7 @@ public class BspServiceWorker<I extends WritableComparable, } else { vertexEdgeCount = new VertexEdgeCount(); } + WorkerProgress.get().finishLoadingVertices(); if (getConfiguration().hasEdgeInputFormat()) { // Ensure the edge InputSplits are ready for processing @@ -531,6 +535,7 @@ public class BspServiceWorker<I extends WritableComparable, } getContext().progress(); } + WorkerProgress.get().finishLoadingEdges(); if (LOG.isInfoEnabled()) { LOG.info("setup: Finally loaded a total of " + vertexEdgeCount); @@ -951,10 +956,21 @@ public class BspServiceWorker<I extends WritableComparable, new ArrayBlockingQueue<Integer>(numPartitions); Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds()); + long verticesToStore = 0; + for (int partitionId : getPartitionStore().getPartitionIds()) { + verticesToStore += getPartitionStore().getOrCreatePartition( + partitionId).getVertexCount(); + } + WorkerProgress.get().startStoring( + verticesToStore, getPartitionStore().getNumPartitions()); + CallableFactory<Void> callableFactory = new CallableFactory<Void>() { @Override public Callable<Void> newCallable(int callableId) { return new Callable<Void>() { + /** How often to update WorkerProgress */ + private static final long VERTICES_TO_UPDATE_PROGRESS = 100000; + @Override public Void call() throws Exception { VertexWriter<I, V, E> vertexWriter = @@ -962,6 +978,7 @@ public class BspServiceWorker<I extends WritableComparable, vertexWriter.setConf(getConfiguration()); vertexWriter.initialize(getContext()); long nextPrintVertices = 0; + long nextUpdateProgressVertices = 0; long nextPrintMsecs = System.currentTimeMillis() + 15000; int partitionIndex = 0; int numPartitions = getPartitionStore().getNumPartitions(); @@ -989,9 +1006,18 @@ public class BspServiceWorker<I extends WritableComparable, nextPrintMsecs = System.currentTimeMillis() + 15000; nextPrintVertices = verticesWritten + 250000; } + + if (verticesWritten >= nextUpdateProgressVertices) { + WorkerProgress.get().addVerticesStored( + VERTICES_TO_UPDATE_PROGRESS); + nextUpdateProgressVertices += VERTICES_TO_UPDATE_PROGRESS; + } } getPartitionStore().putPartition(partition); ++partitionIndex; + WorkerProgress.get().addVerticesStored( + verticesWritten % VERTICES_TO_UPDATE_PROGRESS); + WorkerProgress.get().incrementPartitionsStored(); } vertexWriter.close(getContext()); // the temp results are saved now return null; @@ -1147,6 +1173,11 @@ public class BspServiceWorker<I extends WritableComparable, setCachedSuperstep(getSuperstep() - 1); saveVertices(finishedSuperstepStats.getLocalVertexCount()); saveEdges(); + WorkerProgress.get().finishStoring(); + if (workerProgressWriter != null) { + WorkerProgress.writeToZnode(getZkExt(), myProgressPath); + workerProgressWriter.stop(); + } getPartitionStore().shutdown(); // All worker processes should denote they are done by adding special // znode. Once the number of znodes equals the number of partitions http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java index 8ec0453..828eac4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java @@ -174,6 +174,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, // Update status every EDGES_UPDATE_PERIOD edges if (inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD == 0) { totalEdgesMeter.mark(EDGES_UPDATE_PERIOD); + WorkerProgress.get().addEdgesLoaded(EDGES_UPDATE_PERIOD); LoggerUtils.setStatusAndLog(context, LOG, Level.INFO, "readEdgeInputSplit: Loaded " + totalEdgesMeter.count() + " edges at " + @@ -198,6 +199,10 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, totalEdgesFiltered.inc(inputSplitEdgesFiltered); totalEdgesMeter.mark(inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD); + WorkerProgress.get().addEdgesLoaded( + inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD); + WorkerProgress.get().incrementEdgeInputSplitsLoaded(); + return new VertexEdgeCount(0, inputSplitEdgesLoaded); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java index 01a6fc5..e3e04d6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java @@ -176,6 +176,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable, // Update status every VERTICES_UPDATE_PERIOD vertices if (inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD == 0) { totalVerticesMeter.mark(VERTICES_UPDATE_PERIOD); + WorkerProgress.get().addVerticesLoaded(VERTICES_UPDATE_PERIOD); totalEdgesMeter.mark(edgesSinceLastUpdate); inputSplitEdgesLoaded += edgesSinceLastUpdate; edgesSinceLastUpdate = 0; @@ -208,6 +209,11 @@ public class VertexInputSplitsCallable<I extends WritableComparable, totalVerticesFilteredCounter.inc(inputSplitVerticesFiltered); vertexReader.close(); + + WorkerProgress.get().addVerticesLoaded( + inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD); + WorkerProgress.get().incrementVertexInputSplitsLoaded(); + return new VertexEdgeCount(inputSplitVerticesLoaded, inputSplitEdgesLoaded + edgesSinceLastUpdate); } http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java new file mode 100644 index 0000000..f7de88b --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.worker; + +import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.zk.ZooKeeperExt; +import org.apache.hadoop.io.Writable; +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * Stores information about a worker's progress that is periodically written to + * ZooKeeper with {@link WorkerProgressWriter}. + */ +@ThreadSafe +public class WorkerProgress implements Writable { + /** Class logger */ + private static final Logger LOG = Logger.getLogger(WorkerProgress.class); + /** Singleton instance for everyone to use */ + private static final WorkerProgress INSTANCE = new WorkerProgress(); + + /** Superstep which worker is executing, Long.MAX_VALUE if it's output */ + protected long currentSuperstep = -1; + + /** How many vertices were loaded until now */ + protected long verticesLoaded = 0; + /** How many vertex input splits were loaded until now */ + protected int vertexInputSplitsLoaded = 0; + /** Whether worker finished loading vertices */ + protected boolean loadingVerticesDone = false; + /** How many edges were loaded */ + protected long edgesLoaded = 0; + /** How many edge input splits were loaded until now */ + protected int edgeInputSplitsLoaded = 0; + /** Whether worker finished loading edges until now */ + protected boolean loadingEdgesDone = false; + + /** How many vertices are there to compute in current superstep */ + protected long verticesToCompute = 0; + /** How many vertices were computed in current superstep until now */ + protected long verticesComputed = 0; + /** How many partitions are there to compute in current superstep */ + protected int partitionsToCompute = 0; + /** How many partitions were computed in current superstep until now */ + protected int partitionsComputed = 0; + + /** Whether all compute supersteps are done */ + protected boolean computationDone = false; + + /** How many vertices are there to store */ + protected long verticesToStore = 0; + /** How many vertices were stored until now */ + protected long verticesStored = 0; + /** How many partitions are there to store */ + protected int partitionsToStore = 0; + /** How many partitions were stored until now */ + protected int partitionsStored = 0; + /** Whether worker finished storing data */ + protected boolean storingDone = false; + + /** + * Get singleton instance of WorkerProgress. + * + * @return WorkerProgress singleton instance + */ + public static WorkerProgress get() { + return INSTANCE; + } + + /** + * Write worker's progress to znode + * + * @param zk ZooKeeperExt + * @param myProgressPath Path to write the progress to + */ + public static void writeToZnode(ZooKeeperExt zk, String myProgressPath) { + byte[] byteArray = WritableUtils.writeToByteArray(get()); + try { + zk.createOrSetExt(myProgressPath, + byteArray, + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true, + -1); + } catch (KeeperException | InterruptedException e) { + if (LOG.isInfoEnabled()) { + LOG.info("writeToZnode: " + e.getClass().getName() + + " exception occurred", e); + } + } + } + + public synchronized boolean isLoadingVerticesDone() { + return loadingVerticesDone; + } + + public synchronized boolean isLoadingEdgesDone() { + return loadingEdgesDone; + } + + public synchronized boolean isComputationDone() { + return computationDone; + } + + public synchronized boolean isStoringDone() { + return storingDone; + } + + /** + * Add number of vertices loaded + * + * @param verticesLoaded How many vertices were loaded since the last + * time this function was called + */ + public synchronized void addVerticesLoaded(long verticesLoaded) { + this.verticesLoaded += verticesLoaded; + } + + /** + * Increment number of vertex input splits which were loaded + */ + public synchronized void incrementVertexInputSplitsLoaded() { + vertexInputSplitsLoaded++; + } + + /** + * Notify this class that worker finished loading vertices + */ + public synchronized void finishLoadingVertices() { + loadingVerticesDone = true; + } + + /** + * Add number of edges loaded + * + * @param edgesLoaded How many edges were loaded since the last + * time this function was called + */ + public synchronized void addEdgesLoaded(long edgesLoaded) { + this.edgesLoaded += edgesLoaded; + } + + /** + * Increment number of edge input splits which were loaded + */ + public synchronized void incrementEdgeInputSplitsLoaded() { + edgeInputSplitsLoaded++; + } + + /** + * Notify this class that worker finished loading edges + */ + public synchronized void finishLoadingEdges() { + loadingEdgesDone = true; + } + + /** + * Notify this class that next computation superstep is starting + * + * @param superstep Superstep which is starting + * @param verticesToCompute How many vertices are there to compute + * @param partitionsToCompute How many partitions are there to compute + */ + public synchronized void startSuperstep(long superstep, + long verticesToCompute, int partitionsToCompute) { + this.currentSuperstep = superstep; + this.verticesToCompute = verticesToCompute; + this.partitionsToCompute = partitionsToCompute; + verticesComputed = 0; + partitionsComputed = 0; + } + + /** + * Add number of vertices computed + * + * @param verticesComputed How many vertices were computed since the last + * time this function was called + */ + public synchronized void addVerticesComputed(long verticesComputed) { + this.verticesComputed += verticesComputed; + } + + /** + * Increment number of partitions which were computed + */ + public synchronized void incrementPartitionsComputed() { + partitionsComputed++; + } + + /** + * Notify this class that worker is starting to store data + * + * @param verticesToStore How many vertices should be stored + * @param partitionsToStore How many partitions should be stored + */ + public synchronized void startStoring(long verticesToStore, + int partitionsToStore) { + computationDone = true; + verticesToCompute = 0; + verticesComputed = 0; + partitionsToCompute = 0; + partitionsComputed = 0; + currentSuperstep = Long.MAX_VALUE; + this.verticesToStore = verticesToStore; + this.partitionsToStore = partitionsToStore; + } + + /** + * Add number of vertices stored + * + * @param verticesStored How many vertices were stored since the last time + * this function was called + */ + public synchronized void addVerticesStored(long verticesStored) { + this.verticesStored += verticesStored; + } + + /** + * Increment number of partitions which were stored + */ + public synchronized void incrementPartitionsStored() { + partitionsStored++; + } + + /** + * Notify this class that storing data is done + */ + public synchronized void finishStoring() { + storingDone = true; + } + + public synchronized long getCurrentSuperstep() { + return currentSuperstep; + } + + public synchronized long getVerticesLoaded() { + return verticesLoaded; + } + + public synchronized int getVertexInputSplitsLoaded() { + return vertexInputSplitsLoaded; + } + + public synchronized long getEdgesLoaded() { + return edgesLoaded; + } + + public synchronized int getEdgeInputSplitsLoaded() { + return edgeInputSplitsLoaded; + } + + public synchronized long getVerticesToCompute() { + return verticesToCompute; + } + + public synchronized long getVerticesComputed() { + return verticesComputed; + } + + public synchronized int getPartitionsToCompute() { + return partitionsToCompute; + } + + public synchronized int getPartitionsComputed() { + return partitionsComputed; + } + + public synchronized long getVerticesToStore() { + return verticesToStore; + } + + public synchronized long getVerticesStored() { + return verticesStored; + } + + public synchronized int getPartitionsToStore() { + return partitionsToStore; + } + + public synchronized int getPartitionsStored() { + return partitionsStored; + } + + public synchronized boolean isInputSuperstep() { + return currentSuperstep == -1; + } + + public synchronized boolean isComputeSuperstep() { + return currentSuperstep >= 0 && currentSuperstep < Long.MAX_VALUE; + } + + public synchronized boolean isOutputSuperstep() { + return currentSuperstep == Long.MAX_VALUE; + } + + @Override + public synchronized void write(DataOutput dataOutput) throws IOException { + dataOutput.writeLong(currentSuperstep); + + dataOutput.writeLong(verticesLoaded); + dataOutput.writeInt(vertexInputSplitsLoaded); + dataOutput.writeBoolean(loadingVerticesDone); + dataOutput.writeLong(edgesLoaded); + dataOutput.writeInt(edgeInputSplitsLoaded); + dataOutput.writeBoolean(loadingEdgesDone); + + dataOutput.writeLong(verticesToCompute); + dataOutput.writeLong(verticesComputed); + dataOutput.writeInt(partitionsToCompute); + dataOutput.writeInt(partitionsComputed); + + dataOutput.writeBoolean(computationDone); + + dataOutput.writeLong(verticesToStore); + dataOutput.writeLong(verticesStored); + dataOutput.writeInt(partitionsToStore); + dataOutput.writeInt(partitionsStored); + dataOutput.writeBoolean(storingDone); + } + + @Override + public synchronized void readFields(DataInput dataInput) throws IOException { + currentSuperstep = dataInput.readLong(); + + verticesLoaded = dataInput.readLong(); + vertexInputSplitsLoaded = dataInput.readInt(); + loadingVerticesDone = dataInput.readBoolean(); + edgesLoaded = dataInput.readLong(); + edgeInputSplitsLoaded = dataInput.readInt(); + loadingEdgesDone = dataInput.readBoolean(); + + verticesToCompute = dataInput.readLong(); + verticesComputed = dataInput.readLong(); + partitionsToCompute = dataInput.readInt(); + partitionsComputed = dataInput.readInt(); + + computationDone = dataInput.readBoolean(); + + verticesToStore = dataInput.readLong(); + verticesStored = dataInput.readLong(); + partitionsToStore = dataInput.readInt(); + partitionsStored = dataInput.readInt(); + storingDone = dataInput.readBoolean(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/7cc54575/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java new file mode 100644 index 0000000..f8c7571 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.worker; + +import org.apache.giraph.zk.ZooKeeperExt; +import org.apache.log4j.Logger; + +/** + * Class which periodically writes worker's progress to zookeeper + */ +public class WorkerProgressWriter { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(WorkerProgressWriter.class); + /** How often to update worker's progress */ + private static final int WRITE_UPDATE_PERIOD_MILLISECONDS = 10 * 1000; + + /** Thread which writes worker's progress */ + private final Thread writerThread; + /** Whether worker finished application */ + private volatile boolean finished = false; + + /** + * Constructor, starts separate thread to periodically update worker's + * progress + * + * @param myProgressPath Path where this worker's progress should be stored + * @param zk ZooKeeperExt + */ + public WorkerProgressWriter(final String myProgressPath, + final ZooKeeperExt zk) { + writerThread = new Thread(new Runnable() { + @Override + public void run() { + try { + while (!finished) { + WorkerProgress.writeToZnode(zk, myProgressPath); + double factor = 1 + Math.random(); + Thread.sleep((long) (WRITE_UPDATE_PERIOD_MILLISECONDS * factor)); + } + } catch (InterruptedException e) { + if (LOG.isInfoEnabled()) { + LOG.info("run: WorkerProgressWriter interrupted", e); + } + } + } + }); + writerThread.start(); + } + + /** + * Stop the thread which writes worker's progress + */ + public void stop() { + finished = true; + writerThread.interrupt(); + } +}
