This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 9229a8b HBASE-24620 : Add a ClusterManager which submits command to ZooKeeper and its Agent which picks and execute those Commands (#2299) 9229a8b is described below commit 9229a8b9867db467149630e25b14fbb150aa195b Author: Lokesh Khurana <khuranalokes...@gmail.com> AuthorDate: Mon Dec 21 15:33:36 2020 +0530 HBASE-24620 : Add a ClusterManager which submits command to ZooKeeper and its Agent which picks and execute those Commands (#2299) Signed-off-by: Aman Poonia <apoo...@salesforce.com> Signed-off-by: Viraj Jasani <vjas...@apache.org> --- bin/chaos-daemon.sh | 140 +++++ ...ent_Which_Submits_Command_Through_ZooKeeper.pdf | Bin 0 -> 270679 bytes .../org/apache/hadoop/hbase/chaos/ChaosAgent.java | 591 +++++++++++++++++++++ .../apache/hadoop/hbase/chaos/ChaosConstants.java | 77 +++ .../apache/hadoop/hbase/chaos/ChaosService.java | 138 +++++ .../org/apache/hadoop/hbase/chaos/ChaosUtils.java | 49 ++ .../org/apache/hadoop/hbase/ChaosZKClient.java | 332 ++++++++++++ .../apache/hadoop/hbase/ZNodeClusterManager.java | 120 +++++ 8 files changed, 1447 insertions(+) diff --git a/bin/chaos-daemon.sh b/bin/chaos-daemon.sh new file mode 100644 index 0000000..084e519 --- /dev/null +++ b/bin/chaos-daemon.sh @@ -0,0 +1,140 @@ +#!/usr/bin/env bash +# +#/** +# * Licensed to the Apache Software Foundation (ASF) under one +# * or more contributor license agreements. See the NOTICE file +# * distributed with this work for additional information +# * regarding copyright ownership. The ASF licenses this file +# * to you under the Apache License, Version 2.0 (the +# * "License"); you may not use this file except in compliance +# * with the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ +# + +usage="Usage: chaos-daemon.sh (start|stop) chaosagent" + +# if no args specified, show usage +if [ $# -le 1 ]; then + echo "$usage" + exit 1 +fi + +# get arguments +startStop=$1 +shift + +command=$1 +shift + +check_before_start(){ + #ckeck if the process is not running + mkdir -p "$HBASE_PID_DIR" + if [ -f "$CHAOS_PID" ]; then + if kill -0 "$(cat "$CHAOS_PID")" > /dev/null 2>&1; then + echo "$command" running as process "$(cat "$CHAOS_PID")". Stop it first. + exit 1 + fi + fi +} + +bin=`dirname "${BASH_SOURCE-$0}"` +bin=$(cd "$bin">/dev/null || exit; pwd) + +. "$bin"/hbase-config.sh +. "$bin"/hbase-common.sh + +CLASSPATH=$HBASE_CONF_DIR +for f in ../lib/*.jar; do + CLASSPATH=${CLASSPATH}:$f +done + +# get log directory +if [ "$HBASE_LOG_DIR" = "" ]; then + export HBASE_LOG_DIR="$HBASE_HOME/logs" +fi + +if [ "$HBASE_PID_DIR" = "" ]; then + HBASE_PID_DIR=/tmp +fi + +if [ "$HBASE_IDENT_STRING" = "" ]; then + export HBASE_IDENT_STRING="$USER" +fi + +if [ "$JAVA_HOME" != "" ]; then + #echo "run java in $JAVA_HOME" + JAVA_HOME=$JAVA_HOME +fi +if [ "$JAVA_HOME" = "" ]; then + echo "Error: JAVA_HOME is not set." + exit 1 +fi + +export HBASE_LOG_PREFIX=hbase-$HBASE_IDENT_STRING-$command-$HOSTNAME +export CHAOS_LOGFILE=$HBASE_LOG_PREFIX.log + +if [ -z "${HBASE_ROOT_LOGGER}" ]; then +export HBASE_ROOT_LOGGER=${HBASE_ROOT_LOGGER:-"INFO,RFA"} +fi + +if [ -z "${HBASE_SECURITY_LOGGER}" ]; then +export HBASE_SECURITY_LOGGER=${HBASE_SECURITY_LOGGER:-"INFO,RFAS"} +fi + +CHAOS_LOGLOG=${CHAOS_LOGLOG:-"${HBASE_LOG_DIR}/${CHAOS_LOGFILE}"} +CHAOS_PID=$HBASE_PID_DIR/hbase-$HBASE_IDENT_STRING-$command.pid + +if [ -z "$CHAOS_JAVA_OPTS" ]; then + CHAOS_JAVA_OPTS="-Xms1024m -Xmx4096m" +fi + +case $startStop in + +(start) + check_before_start + echo running $command + CMD="${JAVA_HOME}/bin/java -Dapp.home=${HBASE_CONF_DIR}/../ ${CHAOS_JAVA_OPTS} -cp ${CLASSPATH} org.apache.hadoop.hbase.chaos.ChaosService -$command start &>> ${CHAOS_LOGLOG} &" + + eval $CMD + PID=$(echo $!) + echo ${PID} >${CHAOS_PID} + + echo "Chaos ${1} process Started with ${PID} !" + now=$(date) + echo "${now} Chaos ${1} process Started with ${PID} !" >>${CHAOS_LOGLOG} + ;; + +(stop) + echo stopping $command + if [ -f $CHAOS_PID ]; then + pidToKill=`cat $CHAOS_PID` + # kill -0 == see if the PID exists + if kill -0 $pidToKill > /dev/null 2>&1; then + echo -n stopping $command + echo "`date` Terminating $command" >> $CHAOS_LOGLOG + kill $pidToKill > /dev/null 2>&1 + waitForProcessEnd $pidToKill $command + else + retval=$? + echo no $command to stop because kill -0 of pid $pidToKill failed with status $retval + fi + else + echo no $command to stop because no pid file $CHAOS_PID + fi + rm -f $CHAOS_PID + ;; + +(*) + echo $usage + exit 1 + ;; + +esac diff --git a/dev-support/design-docs/HBASE-24620_New_ClusterManager_And_Agent_Which_Submits_Command_Through_ZooKeeper.pdf b/dev-support/design-docs/HBASE-24620_New_ClusterManager_And_Agent_Which_Submits_Command_Through_ZooKeeper.pdf new file mode 100644 index 0000000..fe35c04 Binary files /dev/null and b/dev-support/design-docs/HBASE-24620_New_ClusterManager_And_Agent_Which_Submits_Command_Through_ZooKeeper.pdf differ diff --git a/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosAgent.java b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosAgent.java new file mode 100644 index 0000000..430b46e --- /dev/null +++ b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosAgent.java @@ -0,0 +1,591 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.chaos; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.hadoop.util.Shell; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/*** + * An agent for executing destructive actions for ChaosMonkey. + * Uses ZooKeeper Watchers and LocalShell, to do the killing + * and getting status of service on targeted host without SSH. + * uses given ZNode Structure: + * /perfChaosTest (root) + * | + * | + * /chaosAgents (Used for registration has + * hostname ephemeral nodes as children) + * | + * | + * /chaosAgentTaskStatus (Used for task + * Execution, has hostname persistent + * nodes as child with tasks as their children) + * | + * | + * /hostname + * | + * | + * /task0000001 (command as data) + * (has two types of command : + * 1: starts with "exec" + * for executing a destructive action. + * 2: starts with "bool" for getting + * only status of service. + * + */ +@InterfaceAudience.Private +public class ChaosAgent implements Watcher, Closeable, Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(ChaosAgent.class); + static AtomicBoolean stopChaosAgent = new AtomicBoolean(); + private ZooKeeper zk; + private String quorum; + private String agentName; + private Configuration conf; + private RetryCounterFactory retryCounterFactory; + private volatile boolean connected = false; + + public ChaosAgent(Configuration conf, String quorum, String agentName) { + initChaosAgent(conf, quorum, agentName); + } + + /*** + * sets global params and initiates connection with ZooKeeper then does registration. + * @param conf initial configuration to use + * @param quorum ZK Quorum + * @param agentName AgentName to use + */ + private void initChaosAgent(Configuration conf, String quorum, String agentName) { + this.conf = conf; + this.quorum = quorum; + this.agentName = agentName; + this.retryCounterFactory = new RetryCounterFactory(new RetryCounter.RetryConfig() + .setMaxAttempts(conf.getInt(ChaosConstants.RETRY_ATTEMPTS_KEY, + ChaosConstants.DEFAULT_RETRY_ATTEMPTS)).setSleepInterval( + conf.getLong(ChaosConstants.RETRY_SLEEP_INTERVAL_KEY, + ChaosConstants.DEFAULT_RETRY_SLEEP_INTERVAL))); + try { + this.createZKConnection(null); + this.register(); + } catch (IOException e) { + LOG.error("Error Creating Connection: " + e); + } + } + + /*** + * Creates Connection with ZooKeeper. + * @throws IOException if something goes wrong + */ + private void createZKConnection(Watcher watcher) throws IOException { + if(watcher == null) { + zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, this); + } else { + zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, watcher); + } + LOG.info("ZooKeeper Connection created for ChaosAgent: " + agentName); + } + + //WATCHERS: Below are the Watches used by ChaosAgent + + /*** + * Watcher for notifying if any task is assigned to agent or not, + * by seeking if any Node is being added to agent as Child. + */ + Watcher newTaskCreatedWatcher = new Watcher() { + @Override + public void process(WatchedEvent watchedEvent) { + if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) { + if (!(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE + + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName).equals(watchedEvent.getPath())) { + throw new RuntimeException(KeeperException.create( + KeeperException.Code.DATAINCONSISTENCY)); + } + + LOG.info("Change in Tasks Node, checking for Tasks again."); + getTasks(); + } + + } + }; + + //CALLBACKS: Below are the Callbacks used by Chaos Agent + + /** + * Callback used while setting status of a given task, Logs given status. + */ + AsyncCallback.StatCallback setStatusOfTaskZNodeCallback = (rc, path, ctx, stat) -> { + switch (KeeperException.Code.get(rc)) { + case CONNECTIONLOSS: + // Connection to the server was lost while setting status setting again. + try { + recreateZKConnection(); + } catch (Exception e) { + break; + } + setStatusOfTaskZNode(path, (String) ctx); + break; + + case OK: + LOG.info("Status of Task has been set"); + break; + + case NONODE: + LOG.error("Chaos Agent status node does not exists: " + + "check for ZNode directory structure again."); + break; + + default: + LOG.error("Error while setting status of task ZNode: " + + path, KeeperException.create(KeeperException.Code.get(rc), path)); + } + }; + + /** + * Callback used while creating a Persistent ZNode tries to create + * ZNode again if Connection was lost in previous try. + */ + AsyncCallback.StringCallback createZNodeCallback = (rc, path, ctx, name) -> { + switch (KeeperException.Code.get(rc)) { + case CONNECTIONLOSS: + try { + recreateZKConnection(); + } catch (Exception e) { + break; + } + createZNode(path, (byte[]) ctx); + break; + case OK: + LOG.info("ZNode created : " + path); + break; + case NODEEXISTS: + LOG.warn("ZNode already registered: " + path); + break; + default: + LOG.error("Error occurred while creating Persistent ZNode: " + path, + KeeperException.create(KeeperException.Code.get(rc), path)); + } + }; + + /** + * Callback used while creating a Ephemeral ZNode tries to create ZNode again + * if Connection was lost in previous try. + */ + AsyncCallback.StringCallback createEphemeralZNodeCallback = (rc, path, ctx, name) -> { + switch (KeeperException.Code.get(rc)) { + case CONNECTIONLOSS: + try { + recreateZKConnection(); + } catch (Exception e) { + break; + } + createEphemeralZNode(path, (byte[]) ctx); + break; + case OK: + LOG.info("ZNode created : " + path); + break; + case NODEEXISTS: + LOG.warn("ZNode already registered: " + path); + break; + default: + LOG.error("Error occurred while creating Ephemeral ZNode: ", + KeeperException.create(KeeperException.Code.get(rc), path)); + } + }; + + /** + * Callback used by getTasksForAgentCallback while getting command, + * after getting command successfully, it executes command and + * set its status with respect to the command type. + */ + AsyncCallback.DataCallback getTaskForExecutionCallback = new AsyncCallback.DataCallback() { + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + switch (KeeperException.Code.get(rc)) { + case CONNECTIONLOSS: + //Connection to the server has been lost while getting task, getting data again. + try { + recreateZKConnection(); + } catch (Exception e) { + break; + } + zk.getData(path, + false, + getTaskForExecutionCallback, + new String(data)); + break; + case OK: + String cmd = new String(data); + LOG.info("Executing command : " + cmd); + String status = ChaosConstants.TASK_COMPLETION_STRING; + try { + String user = conf.get(ChaosConstants.CHAOSAGENT_SHELL_USER, + ChaosConstants.DEFAULT_SHELL_USER); + switch (cmd.substring(0, 4)) { + case "bool": + String ret = execWithRetries(user, cmd.substring(4)).getSecond(); + status = Boolean.toString(ret.length() > 0); + break; + + case "exec": + execWithRetries(user, cmd.substring(4)); + break; + + default: + LOG.error("Unknown Command Type"); + status = ChaosConstants.TASK_ERROR_STRING; + } + } catch (IOException e) { + LOG.error("Got error while executing command : " + cmd + + " On agent : " + agentName + " Error : " + e); + status = ChaosConstants.TASK_ERROR_STRING; + } + + try { + setStatusOfTaskZNode(path, status); + Thread.sleep(ChaosConstants.SET_STATUS_SLEEP_TIME); + } catch (InterruptedException e) { + LOG.error("Error occured after setting status: " + e); + } + + default: + LOG.error("Error occurred while getting data", + KeeperException.create(KeeperException.Code.get(rc), path)); + } + } + }; + + /*** + * Callback used while getting Tasks for agent if call executed without Exception, + * It creates a separate thread for each children to execute given Tasks parallely. + */ + AsyncCallback.ChildrenCallback getTasksForAgentCallback = new AsyncCallback.ChildrenCallback() { + @Override + public void processResult(int rc, String path, Object ctx, List<String> children) { + switch (KeeperException.Code.get(rc)) { + case CONNECTIONLOSS: { + // Connection to the server has been lost, getting tasks again. + try { + recreateZKConnection(); + } catch (Exception e) { + break; + } + getTasks(); + break; + } + + case OK: { + if (children != null) { + try { + + LOG.info("Executing each task as a separate thread"); + List<Thread> tasksList = new ArrayList<>(); + for (String task : children) { + String threadName = agentName + "_" + task; + Thread t = new Thread(() -> { + + LOG.info("Executing task : " + task + " of agent : " + agentName); + zk.getData(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE + + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName + + ChaosConstants.ZNODE_PATH_SEPARATOR + task, + false, + getTaskForExecutionCallback, + task); + + }); + t.setName(threadName); + t.start(); + tasksList.add(t); + + for (Thread thread : tasksList) { + thread.join(); + } + } + } catch (InterruptedException e) { + LOG.error("Error scheduling next task : " + + " for agent : " + agentName + " Error : " + e); + } + } + break; + } + + default: + LOG.error("Error occurred while getting task", + KeeperException.create(KeeperException.Code.get(rc), path)); + } + } + }; + + /*** + * Function to create PERSISTENT ZNODE with given path and data given as params + * @param path Path at which ZNode to create + * @param data Data to put under ZNode + */ + public void createZNode(String path, byte[] data) { + zk.create(path, + data, + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + createZNodeCallback, + data); + } + + /*** + * Function to create EPHEMERAL ZNODE with given path and data as params. + * @param path Path at which Ephemeral ZNode to create + * @param data Data to put under ZNode + */ + public void createEphemeralZNode(String path, byte[] data) { + zk.create(path, + data, + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL, + createEphemeralZNodeCallback, + data); + } + + /** + * Checks if given ZNode exists, if not creates a PERSISTENT ZNODE for same. + * + * @param path Path to check for ZNode + */ + private void createIfZNodeNotExists(String path) { + try { + if (zk.exists(path, + false) == null) { + createZNode(path, new byte[0]); + } + } catch (KeeperException | InterruptedException e) { + LOG.error("Error checking given node : " + path + " " + e); + } + } + + /** + * sets given Status for Task Znode + * + * @param taskZNode ZNode to set status + * @param status Status value + */ + public void setStatusOfTaskZNode(String taskZNode, String status) { + LOG.info("Setting status of Task ZNode: " + taskZNode + " status : " + status); + zk.setData(taskZNode, + status.getBytes(), + -1, + setStatusOfTaskZNodeCallback, + null); + } + + /** + * registration of ChaosAgent by checking and creating necessary ZNodes. + */ + private void register() { + createIfZNodeNotExists(ChaosConstants.CHAOS_TEST_ROOT_ZNODE); + createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE); + createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE); + createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE + + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName); + + createEphemeralZNode(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE + + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, new byte[0]); + } + + /*** + * Gets tasks for execution, basically sets Watch on it's respective host's Znode and + * waits for tasks to be assigned, also has a getTasksForAgentCallback + * which handles execution of task. + */ + private void getTasks() { + LOG.info("Getting Tasks for Agent: " + agentName + "and setting watch for new Tasks"); + zk.getChildren(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE + + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, + newTaskCreatedWatcher, + getTasksForAgentCallback, + null); + } + + /** + * Below function executes command with retries with given user. + * Uses LocalShell to execute a command. + * + * @param user user name, default none + * @param cmd Command to execute + * @return A pair of Exit Code and Shell output + * @throws IOException Exception while executing shell command + */ + private Pair<Integer, String> execWithRetries(String user, String cmd) throws IOException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return exec(user, cmd); + } catch (IOException e) { + retryOrThrow(retryCounter, e, user, cmd); + } + try { + retryCounter.sleepUntilNextRetry(); + } catch (InterruptedException e) { + LOG.warn("Sleep Interrupted: " + e); + } + } + } + + private Pair<Integer, String> exec(String user, String cmd) throws IOException { + LOG.info("Executing Shell command: " + cmd + " , user: " + user); + + LocalShell shell = new LocalShell(user, cmd); + try { + shell.execute(); + } catch (Shell.ExitCodeException e) { + String output = shell.getOutput(); + throw new Shell.ExitCodeException(e.getExitCode(), "stderr: " + e.getMessage() + + ", stdout: " + output); + } + LOG.info("Executed Shell command, exit code: {}, output n{}", shell.getExitCode(), shell.getOutput()); + + return new Pair<>(shell.getExitCode(), shell.getOutput()); + } + + private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex, + String user, String cmd) throws E { + if (retryCounter.shouldRetry()) { + LOG.warn("Local command: {}, user: {}, failed at attempt {}. Retrying until maxAttempts: {}." + + "Exception {}", cmd, user,retryCounter.getAttemptTimes(), retryCounter.getMaxAttempts(), + ex.getMessage()); + return; + } + throw ex; + } + + private boolean isConnected() { + return connected; + } + + @Override + public void close() throws IOException { + LOG.info("Closing ZooKeeper Connection for Chaos Agent : " + agentName); + try { + zk.close(); + } catch (InterruptedException e) { + LOG.error("Error while closing ZooKeeper Connection."); + } + } + + @Override + public void run() { + try { + LOG.info("Running Chaos Agent on : " + agentName); + while (!this.isConnected()) { + Thread.sleep(100); + } + this.getTasks(); + while (!stopChaosAgent.get()) { + Thread.sleep(500); + } + } catch (InterruptedException e) { + LOG.error("Error while running Chaos Agent", e); + } + + } + + @Override + public void process(WatchedEvent watchedEvent) { + LOG.info("Processing event: " + watchedEvent.toString()); + if (watchedEvent.getType() == Event.EventType.None) { + switch (watchedEvent.getState()) { + case SyncConnected: + connected = true; + break; + case Disconnected: + connected = false; + break; + case Expired: + connected = false; + LOG.error("Session expired creating again"); + try { + createZKConnection(null); + } catch (IOException e) { + LOG.error("Error creating Zookeeper connection", e); + } + default: + LOG.error("Unknown State"); + break; + } + } + } + + private void recreateZKConnection() throws Exception{ + try { + zk.close(); + createZKConnection(newTaskCreatedWatcher); + createEphemeralZNode(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE + + ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, new byte[0]); + } catch (IOException e) { + LOG.error("Error creating new ZK COnnection for agent: {}", agentName + e); + throw e; + } + } + + /** + * Executes Command locally. + */ + protected static class LocalShell extends Shell.ShellCommandExecutor { + + private String user; + private String execCommand; + + public LocalShell(String user, String execCommand) { + super(new String[]{execCommand}); + this.user = user; + this.execCommand = execCommand; + } + + @Override + public String[] getExecString() { + // TODO: Considering Agent is running with same user. + if(!user.equals(ChaosConstants.DEFAULT_SHELL_USER)){ + execCommand = String.format("su -u %1$s %2$s", user, execCommand); + } + return new String[]{"/usr/bin/env", "bash", "-c", execCommand}; + } + + @Override + public void execute() throws IOException { + super.execute(); + } + } +} diff --git a/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosConstants.java b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosConstants.java new file mode 100644 index 0000000..54fbe9b --- /dev/null +++ b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosConstants.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.chaos; + +import org.apache.yetus.audience.InterfaceAudience; + +/*** + * ChaosConstant holds a bunch of Choas-related Constants + */ +@InterfaceAudience.Public +public final class ChaosConstants { + + /*Base ZNode for whole Chaos Testing*/ + public static final String CHAOS_TEST_ROOT_ZNODE = "/hbase"; + + /*Just a / used for path separator*/ + public static final String ZNODE_PATH_SEPARATOR = "/"; + + /*ZNode used for ChaosAgents registration.*/ + public static final String CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE = + CHAOS_TEST_ROOT_ZNODE + ZNODE_PATH_SEPARATOR + "chaosAgents"; + + /*ZNode used for getting status of tasks assigned*/ + public static final String CHAOS_AGENT_STATUS_PERSISTENT_ZNODE = + CHAOS_TEST_ROOT_ZNODE + ZNODE_PATH_SEPARATOR + "chaosAgentTaskStatus"; + + /*Config property for getting number of retries to execute a command*/ + public static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts"; + + /*Default value for number of retries*/ + public static final int DEFAULT_RETRY_ATTEMPTS = 5; + + /*Config property to sleep in between retries*/ + public static final String RETRY_SLEEP_INTERVAL_KEY = + "hbase.it.clustermanager.retry.sleep.interval"; + + /*Default Sleep time between each retry*/ + public static final int DEFAULT_RETRY_SLEEP_INTERVAL = 5000; + + /*Config property for executing command as specific user*/ + public static final String CHAOSAGENT_SHELL_USER = "hbase.it.clustermanager.ssh.user"; + + /*default user for executing local commands*/ + public static final String DEFAULT_SHELL_USER = ""; + + /*timeout used while creating ZooKeeper connection*/ + public static final int SESSION_TIMEOUT_ZK = 60000 * 10; + + /*Time given to ChaosAgent to set status*/ + public static final int SET_STATUS_SLEEP_TIME = 30 * 1000; + + /*Status String when you get an ERROR while executing task*/ + public static final String TASK_ERROR_STRING = "error"; + + /*Status String when your command gets executed correctly*/ + public static final String TASK_COMPLETION_STRING = "done"; + + /*Name of ChoreService to use*/ + public static final String CHORE_SERVICE_PREFIX = "ChaosService"; + +} diff --git a/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosService.java b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosService.java new file mode 100644 index 0000000..e2abe3d --- /dev/null +++ b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosService.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.chaos; + +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.AuthUtil; +import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; +import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser; +import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; +import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; + +/** + * Class used to start/stop Chaos related services (currently chaosagent) + */ +@InterfaceAudience.Private +public class ChaosService { + + private static final Logger LOG = LoggerFactory.getLogger(ChaosService.class.getName()); + + public static void execute(String[] args, Configuration conf) { + LOG.info("arguments : " + Arrays.toString(args)); + + try { + CommandLine cmdline = new GnuParser().parse(getOptions(), args); + if (cmdline.hasOption(ChaosServiceName.CHAOSAGENT.toString().toLowerCase())) { + String actionStr = cmdline.getOptionValue(ChaosServiceName.CHAOSAGENT.toString().toLowerCase()); + try { + ExecutorAction action = ExecutorAction.valueOf(actionStr.toUpperCase()); + if (action == ExecutorAction.START) { + ChaosServiceStart(conf, ChaosServiceName.CHAOSAGENT); + } else if (action == ExecutorAction.STOP) { + ChaosServiceStop(); + } + } catch (IllegalArgumentException e) { + LOG.error("action passed: {} Unexpected action. Please provide only start/stop.", + actionStr, e); + throw new RuntimeException(e); + } + } else { + LOG.error("Invalid Options"); + } + } catch (Exception e) { + LOG.error("Error while starting ChaosService : ", e); + } + } + + private static void ChaosServiceStart(Configuration conf, ChaosServiceName serviceName) { + switch (serviceName) { + case CHAOSAGENT: + ChaosAgent.stopChaosAgent.set(false); + try { + Thread t = new Thread(new ChaosAgent(conf, + ChaosUtils.getZKQuorum(conf), ChaosUtils.getHostName())); + t.start(); + t.join(); + } catch (InterruptedException | UnknownHostException e) { + LOG.error("Failed while executing next task execution of ChaosAgent on : {}", + serviceName, e); + } + break; + default: + LOG.error("Service Name not known : " + serviceName.toString()); + } + } + + private static void ChaosServiceStop() { + ChaosAgent.stopChaosAgent.set(true); + } + + private static Options getOptions() { + Options options = new Options(); + options.addOption(new Option("c", ChaosServiceName.CHAOSAGENT.toString().toLowerCase(), + true, "expecting a start/stop argument")); + options.addOption(new Option("D", ChaosServiceName.GENERIC.toString(), + true, "generic D param")); + LOG.info(Arrays.toString(new Collection[] { options.getOptions() })); + return options; + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + new GenericOptionsParser(conf, args); + + ChoreService choreChaosService = null; + ScheduledChore authChore = AuthUtil.getAuthChore(conf); + + try { + if (authChore != null) { + choreChaosService = new ChoreService(ChaosConstants.CHORE_SERVICE_PREFIX); + choreChaosService.scheduleChore(authChore); + } + + execute(args, conf); + } finally { + if (authChore != null) + choreChaosService.shutdown(); + } + } + + enum ChaosServiceName { + CHAOSAGENT, + GENERIC + } + + + enum ExecutorAction { + START, + STOP + } +} diff --git a/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosUtils.java b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosUtils.java new file mode 100644 index 0000000..da42021 --- /dev/null +++ b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosUtils.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.chaos; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * ChaosUtils holds a bunch of useful functions like getting hostname and getting ZooKeeper quorum. + */ +@InterfaceAudience.Private +public class ChaosUtils { + + public static String getHostName() throws UnknownHostException { + return InetAddress.getLocalHost().getHostName(); + } + + + public static String getZKQuorum(Configuration conf) { + String port = + Integer.toString(conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181)); + String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM, "localhost"); + for (int i = 0; i < serverHosts.length; i++) { + serverHosts[i] = serverHosts[i] + ":" + port; + } + return String.join(",", serverHosts); + } + +} diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ChaosZKClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ChaosZKClient.java new file mode 100644 index 0000000..31fb9e3 --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ChaosZKClient.java @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import org.apache.hadoop.hbase.util.Threads; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class ChaosZKClient { + + private static final Logger LOG = LoggerFactory.getLogger(ChaosZKClient.class.getName()); + private static final String CHAOS_AGENT_PARENT_ZNODE = "/hbase/chaosAgents"; + private static final String CHAOS_AGENT_STATUS_ZNODE = "/hbase/chaosAgentTaskStatus"; + private static final String ZNODE_PATH_SEPARATOR = "/"; + private static final String TASK_PREFIX = "task_"; + private static final String TASK_ERROR_STRING = "error"; + private static final String TASK_COMPLETION_STRING = "done"; + private static final String TASK_BOOLEAN_TRUE = "true"; + private static final String TASK_BOOLEAN_FALSE = "false"; + private static final String CONNECTION_LOSS = "ConnectionLoss"; + private static final int SESSION_TIMEOUT_ZK = 10 * 60 * 1000; + private static final int TASK_EXECUTION_TIMEOUT = 5 * 60 * 1000; + private volatile String taskStatus = null; + + private final String quorum; + private ZooKeeper zk; + + public ChaosZKClient(String quorum) { + this.quorum = quorum; + try { + this.createNewZKConnection(); + } catch (IOException e) { + LOG.error("Error creating ZooKeeper Connection: ", e); + } + } + + /** + * Creates connection with ZooKeeper + * @throws IOException when not able to create connection properly + */ + public void createNewZKConnection() throws IOException { + Watcher watcher = new Watcher() { + @Override + public void process(WatchedEvent watchedEvent) { + LOG.info("Created ZooKeeper Connection For executing task"); + } + }; + + this.zk = new ZooKeeper(quorum, SESSION_TIMEOUT_ZK, watcher); + } + + /** + * Checks if ChaosAgent is running or not on target host by checking its ZNode. + * @param hostname hostname to check for chaosagent + * @return true/false whether agent is running or not + */ + private boolean isChaosAgentRunning(String hostname) { + try { + return zk.exists(CHAOS_AGENT_PARENT_ZNODE + ZNODE_PATH_SEPARATOR + hostname, + false) != null; + } catch (KeeperException e) { + if (e.toString().contains(CONNECTION_LOSS)) { + recreateZKConnection(); + try { + return zk.exists(CHAOS_AGENT_PARENT_ZNODE + ZNODE_PATH_SEPARATOR + hostname, + false) != null; + } catch (KeeperException | InterruptedException ie) { + LOG.error("ERROR ", ie); + } + } + } catch (InterruptedException e) { + LOG.error("Error checking for given hostname: {} ERROR: ", hostname, e); + } + return false; + } + + /** + * Creates tasks for target hosts by creating ZNodes. + * Waits for a limited amount of time to complete task to execute. + * @param taskObject Object data represents command + * @return returns status + */ + public String submitTask(final TaskObject taskObject) { + if (isChaosAgentRunning(taskObject.getTaskHostname())) { + LOG.info("Creating task node"); + zk.create(CHAOS_AGENT_STATUS_ZNODE + ZNODE_PATH_SEPARATOR + + taskObject.getTaskHostname() + ZNODE_PATH_SEPARATOR + TASK_PREFIX, + taskObject.getCommand().getBytes(), + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL_SEQUENTIAL, + submitTaskCallback, + taskObject); + long start = System.currentTimeMillis(); + + while ((System.currentTimeMillis() - start) < TASK_EXECUTION_TIMEOUT) { + if(taskStatus != null) { + return taskStatus; + } + Threads.sleep(500); + } + } else { + LOG.info("EHHHHH! ChaosAgent Not running"); + } + return TASK_ERROR_STRING; + } + + /** + * To get status of task submitted + * @param path path at which to get status + * @param ctx path context + */ + private void getStatus(String path , Object ctx) { + LOG.info("Getting Status of task: " + path); + zk.getData(path, + false, + getStatusCallback, + ctx); + } + + /** + * Set a watch on task submitted + * @param name ZNode name to set a watch + * @param taskObject context for ZNode name + */ + private void setStatusWatch(String name, TaskObject taskObject) { + LOG.info("Checking for ZNode and Setting watch for task : " + name); + zk.exists(name, + setStatusWatcher, + setStatusWatchCallback, + taskObject); + } + + /** + * Delete task after getting its status + * @param path path to delete ZNode + */ + private void deleteTask(String path) { + LOG.info("Deleting task: " + path); + zk.delete(path, + -1, + taskDeleteCallback, + null); + } + + //WATCHERS: + + /** + * Watcher to get notification whenever status of task changes. + */ + Watcher setStatusWatcher = new Watcher() { + @Override + public void process(WatchedEvent watchedEvent) { + LOG.info("Setting status watch for task: " + watchedEvent.getPath()); + if(watchedEvent.getType() == Event.EventType.NodeDataChanged) { + if(!watchedEvent.getPath().contains(TASK_PREFIX)) { + throw new RuntimeException(KeeperException.create( + KeeperException.Code.DATAINCONSISTENCY)); + } + getStatus(watchedEvent.getPath(), (Object) watchedEvent.getPath()); + + } + } + }; + + //CALLBACKS + + AsyncCallback.DataCallback getStatusCallback = (rc, path, ctx, data, stat) -> { + switch (KeeperException.Code.get(rc)) { + case CONNECTIONLOSS: + //Connectionloss while getting status of task, getting again + recreateZKConnection(); + getStatus(path, ctx); + break; + + case OK: + if (ctx!=null) { + + String status = new String(data); + taskStatus = status; + switch (status) { + case TASK_COMPLETION_STRING: + case TASK_BOOLEAN_TRUE: + case TASK_BOOLEAN_FALSE: + LOG.info("Task executed completely : Status --> " + status); + break; + + case TASK_ERROR_STRING: + LOG.info("There was error while executing task : Status --> " + status); + break; + + default: + LOG.warn("Status of task is undefined!! : Status --> " + status); + } + + deleteTask(path); + } + break; + + default: + LOG.error("ERROR while getting status of task: " + path + " ERROR: " + + KeeperException.create(KeeperException.Code.get(rc))); + } + }; + + AsyncCallback.StatCallback setStatusWatchCallback = (rc, path, ctx, stat) -> { + switch (KeeperException.Code.get(rc)) { + case CONNECTIONLOSS: + //ConnectionLoss while setting watch on status ZNode, setting again. + recreateZKConnection(); + setStatusWatch(path, (TaskObject) ctx); + break; + + case OK: + if(stat != null) { + getStatus(path, null); + } + break; + + default: + LOG.error("ERROR while setting watch on task ZNode: " + path + " ERROR: " + + KeeperException.create(KeeperException.Code.get(rc))); + } + }; + + AsyncCallback.StringCallback submitTaskCallback = (rc, path, ctx, name) -> { + switch (KeeperException.Code.get(rc)) { + case CONNECTIONLOSS: + // Connection to server was lost while submitting task, submitting again. + recreateZKConnection(); + submitTask((TaskObject) ctx); + break; + + case OK: + LOG.info("Task created : " + name); + setStatusWatch(name, (TaskObject) ctx); + break; + + default: + LOG.error("Error submitting task: " + name + " ERROR:" + + KeeperException.create(KeeperException.Code.get(rc))); + } + }; + + AsyncCallback.VoidCallback taskDeleteCallback = new AsyncCallback.VoidCallback() { + @Override + public void processResult(int rc, String path, Object ctx) { + switch (KeeperException.Code.get(rc)) { + case CONNECTIONLOSS: + //Connectionloss while deleting task, deleting again + recreateZKConnection(); + deleteTask(path); + break; + + case OK: + LOG.info("Task Deleted successfully!"); + LOG.info("Closing ZooKeeper Connection"); + try { + zk.close(); + } catch (InterruptedException e) { + LOG.error("Error while closing ZooKeeper Connection."); + } + break; + + default: + LOG.error("ERROR while deleting task: " + path + " ERROR: " + + KeeperException.create(KeeperException.Code.get(rc))); + } + } + }; + + + private void recreateZKConnection() { + try { + zk.close(); + } catch (InterruptedException e) { + LOG.error("Error closing ZK connection : ", e); + } finally { + try { + createNewZKConnection(); + } catch (IOException e) { + LOG.error("Error creating new ZK COnnection for agent: ", e); + } + } + } + + static class TaskObject { + private final String command; + private final String taskHostname; + + public TaskObject(String command, String taskHostname) { + this.command = command; + this.taskHostname = taskHostname; + } + + public String getCommand() { + return this.command; + } + + public String getTaskHostname() { + return taskHostname; + } + } + +} diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ZNodeClusterManager.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ZNodeClusterManager.java new file mode 100644 index 0000000..88f14b0 --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ZNodeClusterManager.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configured; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class ZNodeClusterManager extends Configured implements ClusterManager { + private static final Logger LOG = LoggerFactory.getLogger(ZNodeClusterManager.class.getName()); + private static final String SIGKILL = "SIGKILL"; + private static final String SIGSTOP = "SIGSTOP"; + private static final String SIGCONT = "SIGCONT"; + public ZNodeClusterManager() { + } + + private String getZKQuorumServersStringFromHbaseConfig() { + String port = + Integer.toString(getConf().getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181)); + String[] serverHosts = getConf().getStrings(HConstants.ZOOKEEPER_QUORUM, "localhost"); + for (int i = 0; i < serverHosts.length; i++) { + serverHosts[i] = serverHosts[i] + ":" + port; + } + return Arrays.asList(serverHosts).stream().collect(Collectors.joining(",")); + } + + private String createZNode(String hostname, String cmd) throws IOException{ + LOG.info("Zookeeper Mode enabled sending command to zookeeper + " + + cmd + "hostname:" + hostname); + ChaosZKClient chaosZKClient = new ChaosZKClient(getZKQuorumServersStringFromHbaseConfig()); + return chaosZKClient.submitTask(new ChaosZKClient.TaskObject(cmd, hostname)); + } + + protected HBaseClusterManager.CommandProvider getCommandProvider(ServiceType service) + throws IOException { + switch (service) { + case HADOOP_DATANODE: + case HADOOP_NAMENODE: + return new HBaseClusterManager.HadoopShellCommandProvider(getConf()); + case ZOOKEEPER_SERVER: + return new HBaseClusterManager.ZookeeperShellCommandProvider(getConf()); + default: + return new HBaseClusterManager.HBaseShellCommandProvider(getConf()); + } + } + + public void signal(ServiceType service, String signal, String hostname) throws IOException { + createZNode(hostname, CmdType.exec.toString() + + getCommandProvider(service).signalCommand(service, signal)); + } + + private void createOpCommand(String hostname, ServiceType service, + HBaseClusterManager.CommandProvider.Operation op) throws IOException{ + createZNode(hostname, CmdType.exec.toString() + + getCommandProvider(service).getCommand(service, op)); + } + + @Override + public void start(ServiceType service, String hostname, int port) throws IOException { + createOpCommand(hostname, service, HBaseClusterManager.CommandProvider.Operation.START); + } + + @Override + public void stop(ServiceType service, String hostname, int port) throws IOException { + createOpCommand(hostname, service, HBaseClusterManager.CommandProvider.Operation.STOP); + } + + @Override + public void restart(ServiceType service, String hostname, int port) throws IOException { + createOpCommand(hostname, service, HBaseClusterManager.CommandProvider.Operation.RESTART); + } + + @Override + public void kill(ServiceType service, String hostname, int port) throws IOException { + signal(service, SIGKILL, hostname); + } + + @Override + public void suspend(ServiceType service, String hostname, int port) throws IOException { + signal(service, SIGSTOP, hostname); + } + + @Override + public void resume(ServiceType service, String hostname, int port) throws IOException { + signal(service, SIGCONT, hostname); + } + + @Override + public boolean isRunning(ServiceType service, String hostname, int port) throws IOException { + return Boolean.parseBoolean(createZNode(hostname, CmdType.bool.toString() + + getCommandProvider(service).isRunningCommand(service))); + } + + enum CmdType { + exec, + bool + } +}