Updated Branches: refs/heads/trunk af21be3b7 -> 7d4b72561
GIRAPH-756: Provide a way to halt running application (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7d4b7256 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7d4b7256 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7d4b7256 Branch: refs/heads/trunk Commit: 7d4b725615916d26610afb2bc2ec71a774ed8cfb Parents: af21be3 Author: Maja Kabiljo <[email protected]> Authored: Mon Sep 16 11:11:28 2013 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Mon Sep 16 11:12:12 2013 -0700 ---------------------------------------------------------------------- bin/giraph-env | 13 +- bin/halt-application | 22 ++++ .../java/org/apache/giraph/bsp/BspService.java | 10 ++ .../org/apache/giraph/conf/GiraphConstants.java | 23 ++++ .../apache/giraph/graph/GraphTaskManager.java | 2 + .../apache/giraph/job/DefaultJobObserver.java | 5 + .../java/org/apache/giraph/job/GiraphJob.java | 7 ++ .../apache/giraph/job/GiraphJobObserver.java | 7 ++ .../apache/giraph/job/HaltApplicationUtils.java | 122 +++++++++++++++++++ .../apache/giraph/master/BspServiceMaster.java | 5 + .../apache/giraph/zk/ZooKeeperNodeCreator.java | 87 +++++++++++++ 11 files changed, 296 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/bin/giraph-env ---------------------------------------------------------------------- diff --git a/bin/giraph-env b/bin/giraph-env index 9aa78bb..34ff91f 100644 --- a/bin/giraph-env +++ b/bin/giraph-env @@ -41,13 +41,12 @@ USER_JAR=$1 shift if [ ! -e "$USER_JAR" ]; then - echo "Can't find user jar (${USER_JAR}) to execute." - exit 1 + echo "No user jar found at $USER_JAR" +else + # add user jar to classpath + CLASSPATH=${USER_JAR} fi -# add user jar to classpath -CLASSPATH=${USER_JAR} - # add Giraph conf dir to classpath CLASSPATH=$CLASSPATH:$GIRAPH_HOME/conf @@ -71,7 +70,7 @@ if [ -d "$GIRAPH_HOME/lib" ]; then done else echo "No lib directory, assuming dev environment" - if [ ! -d "$GIRAPH_HOME/target" ]; then + if [ ! -d "$GIRAPH_HOME/giraph-core/target" ]; then echo "No target directory. Build Giraph jar before proceeding." exit 1 fi @@ -79,7 +78,7 @@ else CLASSPATH2=`mvn dependency:build-classpath | grep -v "[INFO]"` CLASSPATH=$CLASSPATH:$CLASSPATH2 - for f in $GIRAPH_HOME/giraph/target/giraph*.jar; do + for f in $GIRAPH_HOME/giraph-core/target/giraph*.jar; do if [ -e "$f" ]; then JAR=$f break http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/bin/halt-application ---------------------------------------------------------------------- diff --git a/bin/halt-application b/bin/halt-application new file mode 100644 index 0000000..0526661 --- /dev/null +++ b/bin/halt-application @@ -0,0 +1,22 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +GIRAPH_ENV_DIR="$( cd -P "$( dirname "$0" )" && pwd )" +source $GIRAPH_ENV_DIR/giraph-env + +CLASS=org.apache.giraph.zk.ZooKeeperNodeCreator +exec "$HADOOP_HOME/bin/hadoop" --config $HADOOP_CONF_DIR jar $JAR $CLASS $HADOOP_PROPERTIES -libjars $GIRAPH_JARS "$@" http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java index aae01da..34f4b51 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java @@ -137,6 +137,8 @@ public abstract class BspService<I extends WritableComparable, "/_partitionExchangeDir"; /** Denotes that the superstep is done */ public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished"; + /** Denotes that computation should be halted */ + public static final String HALT_COMPUTATION_NODE = "/_haltComputation"; /** Denotes which workers have been cleaned up */ public static final String CLEANED_UP_DIR = "/_cleanedUpDir"; /** JSON partition stats key */ @@ -200,6 +202,8 @@ public abstract class BspService<I extends WritableComparable, protected final String checkpointBasePath; /** Path to the master election path */ protected final String masterElectionPath; + /** If this path exists computation will be halted */ + protected final String haltComputationPath; /** Private ZooKeeper instance that implements the service */ private final ZooKeeperExt zk; /** Has the Connection occurred? */ @@ -318,6 +322,12 @@ public abstract class BspService<I extends WritableComparable, CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(), CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + getJobId()); masterElectionPath = basePath + MASTER_ELECTION_DIR; + haltComputationPath = basePath + HALT_COMPUTATION_NODE; + getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP, + haltComputationPath); + if (LOG.isInfoEnabled()) { + LOG.info("BspService: Path to create to halt is " + haltComputationPath); + } if (LOG.isInfoEnabled()) { LOG.info("BspService: Connecting to ZooKeeper with job " + jobId + ", " + getTaskPartition() + " on " + serverPortList); http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 7c9b19a..4dadd29 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -47,6 +47,7 @@ import org.apache.giraph.io.filters.EdgeInputFilter; import org.apache.giraph.io.filters.VertexInputFilter; import org.apache.giraph.job.DefaultJobObserver; import org.apache.giraph.job.GiraphJobObserver; +import org.apache.giraph.job.HaltApplicationUtils; import org.apache.giraph.master.DefaultMasterCompute; import org.apache.giraph.master.MasterCompute; import org.apache.giraph.master.MasterObserver; @@ -923,5 +924,27 @@ public interface GiraphConstants { BooleanConfOption ONE_TO_ALL_MSG_SENDING = new BooleanConfOption("giraph.oneToAllMsgSending", false, "Enable " + "one-to-all message sending strategy"); + + /** + * This counter group will contain one counter whose name is the ZooKeeper + * server:port which this job is using + */ + String ZOOKEEPER_SERVER_PORT_COUNTER_GROUP = "Zookeeper server:port"; + + /** + * This counter group will contain one counter whose name is the ZooKeeper + * node path which should be created to trigger computation halt + */ + String ZOOKEEPER_HALT_NODE_COUNTER_GROUP = "Zookeeper halt node"; + + /** + * Which class to use to write instructions on how to halt the application + */ + ClassConfOption<HaltApplicationUtils.HaltInstructionsWriter> + HALT_INSTRUCTIONS_WRITER_CLASS = ClassConfOption.create( + "giraph.haltInstructionsWriter", + HaltApplicationUtils.DefaultHaltInstructionsWriter.class, + HaltApplicationUtils.HaltInstructionsWriter.class, + "Class used to write instructions on how to halt the application"); } // CHECKSTYLE: resume InterfaceIsTypeCheck http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 704fb9e..3939d49 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -370,6 +370,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, } zkManager.onlineZooKeeperServers(); serverPortList = zkManager.getZooKeeperServerPortString(); + context.getCounter(GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP, + serverPortList); return false; } http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java index 2e703ca..ca331b2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java @@ -46,6 +46,11 @@ public class DefaultJobObserver implements GiraphJobObserver, } @Override + public void jobRunning(Job submittedJob) { + // do nothing + } + + @Override public void jobFinished(Job jobToSubmit, boolean passed) { // do nothing } http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java index 05b07a5..fca14ac 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java @@ -242,6 +242,13 @@ public class GiraphJob { GiraphJobObserver jobObserver = conf.getJobObserver(); jobObserver.launchingJob(submittedJob); + submittedJob.submit(); + if (LOG.isInfoEnabled()) { + LOG.info("run: Tracking URL: " + submittedJob.getTrackingURL()); + } + HaltApplicationUtils.printHaltInfo(submittedJob, conf); + jobObserver.jobRunning(submittedJob); + boolean passed = submittedJob.waitForCompletion(verbose); jobObserver.jobFinished(submittedJob, passed); http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java index fbcc4f1..3905f77 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java @@ -31,6 +31,13 @@ public interface GiraphJobObserver { void launchingJob(Job jobToSubmit); /** + * Callback after job was submitted. + * For example, you can track its progress here. + * @param submittedJob Job which was submitted. + */ + void jobRunning(Job submittedJob); + + /** * Callback when job finishes. * @param submittedJob Job that ran in hadoop. * @param passed true if job succeeded. http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java b/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java new file mode 100644 index 0000000..28b5781 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/job/HaltApplicationUtils.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.job; + +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.hadoop.mapreduce.Job; +import org.apache.log4j.Logger; + +import java.io.IOException; + +/** + * Utility methods for halting application while running + */ +public class HaltApplicationUtils { + /** Milliseconds to sleep for while waiting for halt info */ + private static final int SLEEP_MSECS = 100; + + /** Do not instantiate */ + private HaltApplicationUtils() { } + + /** + * Wait for halt info (zk server and node) to become available + * + * @param submittedJob Submitted job + * @return True if halt info became available, false if job completed + * before it became available + */ + private static boolean waitForHaltInfo(Job submittedJob) throws IOException { + try { + while (submittedJob.getCounters().getGroup( + GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP).size() == 0) { + if (submittedJob.isComplete()) { + return false; + } + Thread.sleep(SLEEP_MSECS); + } + while (submittedJob.getCounters().getGroup( + GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP).size() == 0) { + if (submittedJob.isComplete()) { + return false; + } + Thread.sleep(SLEEP_MSECS); + } + } catch (InterruptedException e) { + throw new IllegalStateException( + "waitForHaltInfo: InterruptedException occurred", e); + } + return true; + } + + /** + * Wait for halt info to become available and print instructions on how to + * halt + * + * @param submittedJob Submitted job + * @param conf Configuration + */ + public static void printHaltInfo(Job submittedJob, + GiraphConfiguration conf) throws IOException { + if (waitForHaltInfo(submittedJob)) { + String zkServer = submittedJob.getCounters().getGroup( + GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP).iterator() + .next().getName(); + String haltNode = submittedJob.getCounters().getGroup( + GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP).iterator() + .next().getName(); + GiraphConstants.HALT_INSTRUCTIONS_WRITER_CLASS.newInstance(conf) + .writeHaltInstructions(zkServer, haltNode); + } + } + + /** + * Writer of instructions about how to halt + */ + public interface HaltInstructionsWriter { + /** + * Write instructions about how to halt + * + * @param zkServer ZooKeeper server + * @param haltNode ZooKeeper node which should be created in order to halt + */ + void writeHaltInstructions(String zkServer, String haltNode); + } + + /** + * Default implementation of {@link HaltInstructionsWriter} - points to how + * to use {@link org.apache.giraph.zk.ZooKeeperNodeCreator} to halt + */ + public static class DefaultHaltInstructionsWriter implements + HaltInstructionsWriter { + /** Class logger */ + private static final Logger LOG = Logger.getLogger( + DefaultHaltInstructionsWriter.class); + + @Override + public void writeHaltInstructions(String zkServer, String haltNode) { + if (LOG.isInfoEnabled()) { + LOG.info("writeHaltInstructions: " + + "To halt after next superstep execute: " + + "'bin/halt-application --zkServer " + zkServer + + " --zkNode " + haltNode + "'"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 454c934..f043c61 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -1634,6 +1634,11 @@ public class BspServiceMaster<I extends WritableComparable, globalStats.getVertexCount() && globalStats.getMessageCount() == 0)) { globalStats.setHaltComputation(true); + } else if (getZkExt().exists(haltComputationPath, false) != null) { + if (LOG.isInfoEnabled()) { + LOG.info("Halting computation because halt zookeeper node was created"); + } + globalStats.setHaltComputation(true); } // If we have completed the maximum number of supersteps, stop http://git-wip-us.apache.org/repos/asf/giraph/blob/7d4b7256/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperNodeCreator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperNodeCreator.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperNodeCreator.java new file mode 100644 index 0000000..2ffa80a --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperNodeCreator.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.zk; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; + +import static java.lang.System.out; + +/** A utility class to be used to create a ZooKeeper node */ +public class ZooKeeperNodeCreator implements Tool, Watcher { + /** The configuration */ + private Configuration conf; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + options.addOption("zk", "zkServer", true, + "List of host:port ZooKeeper servers"); + options.addOption("n", "zkNode", true, + "ZooKeeper node to create"); + + HelpFormatter formatter = new HelpFormatter(); + if (args.length == 0) { + formatter.printHelp(getClass().getName(), options, true); + return 0; + } + + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(options, args); + + ZooKeeperExt zkExt = new ZooKeeperExt(cmd.getOptionValue("zkServer"), + 30 * 1000, 5, 1000, this); + zkExt.createExt(cmd.getOptionValue("zkNode"), new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true); + return 0; + } + + @Override + public void process(WatchedEvent event) { + out.println("process: ZK event received: " + event); + } + + /** + * Entry point from shell script + * @param args the command line arguments + */ + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(new ZooKeeperNodeCreator(), args)); + } +}
