This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 9229a8b  HBASE-24620 : Add a ClusterManager which submits command to 
ZooKeeper and its Agent which picks and execute those Commands (#2299)
9229a8b is described below

commit 9229a8b9867db467149630e25b14fbb150aa195b
Author: Lokesh Khurana <khuranalokes...@gmail.com>
AuthorDate: Mon Dec 21 15:33:36 2020 +0530

    HBASE-24620 : Add a ClusterManager which submits command to ZooKeeper and 
its Agent which picks and execute those Commands (#2299)
    
    Signed-off-by: Aman Poonia <apoo...@salesforce.com>
    Signed-off-by: Viraj Jasani <vjas...@apache.org>
---
 bin/chaos-daemon.sh                                | 140 +++++
 ...ent_Which_Submits_Command_Through_ZooKeeper.pdf | Bin 0 -> 270679 bytes
 .../org/apache/hadoop/hbase/chaos/ChaosAgent.java  | 591 +++++++++++++++++++++
 .../apache/hadoop/hbase/chaos/ChaosConstants.java  |  77 +++
 .../apache/hadoop/hbase/chaos/ChaosService.java    | 138 +++++
 .../org/apache/hadoop/hbase/chaos/ChaosUtils.java  |  49 ++
 .../org/apache/hadoop/hbase/ChaosZKClient.java     | 332 ++++++++++++
 .../apache/hadoop/hbase/ZNodeClusterManager.java   | 120 +++++
 8 files changed, 1447 insertions(+)

diff --git a/bin/chaos-daemon.sh b/bin/chaos-daemon.sh
new file mode 100644
index 0000000..084e519
--- /dev/null
+++ b/bin/chaos-daemon.sh
@@ -0,0 +1,140 @@
+#!/usr/bin/env bash
+#
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+#
+
+usage="Usage: chaos-daemon.sh (start|stop) chaosagent"
+
+# if no args specified, show usage
+if [ $# -le 1 ]; then
+  echo "$usage"
+  exit 1
+fi
+
+# get arguments
+startStop=$1
+shift
+
+command=$1
+shift
+
+check_before_start(){
+    #ckeck if the process is not running
+    mkdir -p "$HBASE_PID_DIR"
+    if [ -f "$CHAOS_PID" ]; then
+      if kill -0 "$(cat "$CHAOS_PID")" > /dev/null 2>&1; then
+        echo "$command" running as process "$(cat "$CHAOS_PID")".  Stop it 
first.
+        exit 1
+      fi
+    fi
+}
+
+bin=`dirname "${BASH_SOURCE-$0}"`
+bin=$(cd "$bin">/dev/null || exit; pwd)
+
+. "$bin"/hbase-config.sh
+. "$bin"/hbase-common.sh
+
+CLASSPATH=$HBASE_CONF_DIR
+for f in ../lib/*.jar; do
+  CLASSPATH=${CLASSPATH}:$f
+done
+
+# get log directory
+if [ "$HBASE_LOG_DIR" = "" ]; then
+  export HBASE_LOG_DIR="$HBASE_HOME/logs"
+fi
+
+if [ "$HBASE_PID_DIR" = "" ]; then
+  HBASE_PID_DIR=/tmp
+fi
+
+if [ "$HBASE_IDENT_STRING" = "" ]; then
+  export HBASE_IDENT_STRING="$USER"
+fi
+
+if [ "$JAVA_HOME" != "" ]; then
+  #echo "run java in $JAVA_HOME"
+  JAVA_HOME=$JAVA_HOME
+fi
+if [ "$JAVA_HOME" = "" ]; then
+  echo "Error: JAVA_HOME is not set."
+  exit 1
+fi
+
+export HBASE_LOG_PREFIX=hbase-$HBASE_IDENT_STRING-$command-$HOSTNAME
+export CHAOS_LOGFILE=$HBASE_LOG_PREFIX.log
+
+if [ -z "${HBASE_ROOT_LOGGER}" ]; then
+export HBASE_ROOT_LOGGER=${HBASE_ROOT_LOGGER:-"INFO,RFA"}
+fi
+
+if [ -z "${HBASE_SECURITY_LOGGER}" ]; then
+export HBASE_SECURITY_LOGGER=${HBASE_SECURITY_LOGGER:-"INFO,RFAS"}
+fi
+
+CHAOS_LOGLOG=${CHAOS_LOGLOG:-"${HBASE_LOG_DIR}/${CHAOS_LOGFILE}"}
+CHAOS_PID=$HBASE_PID_DIR/hbase-$HBASE_IDENT_STRING-$command.pid
+
+if [ -z "$CHAOS_JAVA_OPTS" ]; then
+  CHAOS_JAVA_OPTS="-Xms1024m -Xmx4096m"
+fi
+
+case $startStop in
+
+(start)
+    check_before_start
+    echo running $command
+    CMD="${JAVA_HOME}/bin/java -Dapp.home=${HBASE_CONF_DIR}/../  
${CHAOS_JAVA_OPTS} -cp ${CLASSPATH} org.apache.hadoop.hbase.chaos.ChaosService 
-$command start &>> ${CHAOS_LOGLOG} &"
+
+    eval $CMD
+    PID=$(echo $!)
+    echo ${PID} >${CHAOS_PID}
+
+    echo "Chaos ${1} process Started with ${PID} !"
+    now=$(date)
+    echo "${now} Chaos ${1} process Started with ${PID} !" >>${CHAOS_LOGLOG}
+    ;;
+
+(stop)
+    echo stopping $command
+    if [ -f $CHAOS_PID ]; then
+      pidToKill=`cat $CHAOS_PID`
+      # kill -0 == see if the PID exists
+      if kill -0 $pidToKill > /dev/null 2>&1; then
+        echo -n stopping $command
+        echo "`date` Terminating $command" >> $CHAOS_LOGLOG
+        kill $pidToKill > /dev/null 2>&1
+        waitForProcessEnd $pidToKill $command
+      else
+        retval=$?
+        echo no $command to stop because kill -0 of pid $pidToKill failed with 
status $retval
+      fi
+    else
+      echo no $command to stop because no pid file $CHAOS_PID
+    fi
+    rm -f $CHAOS_PID
+  ;;
+
+(*)
+  echo $usage
+  exit 1
+  ;;
+
+esac
diff --git 
a/dev-support/design-docs/HBASE-24620_New_ClusterManager_And_Agent_Which_Submits_Command_Through_ZooKeeper.pdf
 
b/dev-support/design-docs/HBASE-24620_New_ClusterManager_And_Agent_Which_Submits_Command_Through_ZooKeeper.pdf
new file mode 100644
index 0000000..fe35c04
Binary files /dev/null and 
b/dev-support/design-docs/HBASE-24620_New_ClusterManager_And_Agent_Which_Submits_Command_Through_ZooKeeper.pdf
 differ
diff --git 
a/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosAgent.java 
b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosAgent.java
new file mode 100644
index 0000000..430b46e
--- /dev/null
+++ b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosAgent.java
@@ -0,0 +1,591 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.chaos;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.hadoop.util.Shell;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/***
+ * An agent for executing destructive actions for ChaosMonkey.
+ * Uses ZooKeeper Watchers and LocalShell, to do the killing
+ * and getting status of service on targeted host without SSH.
+ * uses given ZNode Structure:
+ *  /perfChaosTest (root)
+ *              |
+ *              |
+ *              /chaosAgents (Used for registration has
+ *              hostname ephemeral nodes as children)
+ *              |
+ *              |
+ *              /chaosAgentTaskStatus (Used for task
+ *              Execution, has hostname persistent
+ *              nodes as child with tasks as their children)
+ *                          |
+ *                          |
+ *                          /hostname
+ *                                |
+ *                                |
+ *                                /task0000001 (command as data)
+ *                                (has two types of command :
+ *                                     1: starts with "exec"
+ *                                       for executing a destructive action.
+ *                                     2: starts with "bool" for getting
+ *                                       only status of service.
+ *
+ */
+@InterfaceAudience.Private
+public class ChaosAgent implements Watcher, Closeable, Runnable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ChaosAgent.class);
+  static AtomicBoolean stopChaosAgent = new AtomicBoolean();
+  private ZooKeeper zk;
+  private String quorum;
+  private String agentName;
+  private Configuration conf;
+  private RetryCounterFactory retryCounterFactory;
+  private volatile boolean connected = false;
+
+  public ChaosAgent(Configuration conf, String quorum, String agentName) {
+    initChaosAgent(conf, quorum, agentName);
+  }
+
+  /***
+   * sets global params and initiates connection with ZooKeeper then does 
registration.
+   * @param conf initial configuration to use
+   * @param quorum ZK Quorum
+   * @param agentName AgentName to use
+   */
+  private void initChaosAgent(Configuration conf, String quorum, String 
agentName) {
+    this.conf = conf;
+    this.quorum = quorum;
+    this.agentName = agentName;
+    this.retryCounterFactory = new RetryCounterFactory(new 
RetryCounter.RetryConfig()
+      .setMaxAttempts(conf.getInt(ChaosConstants.RETRY_ATTEMPTS_KEY,
+        ChaosConstants.DEFAULT_RETRY_ATTEMPTS)).setSleepInterval(
+          conf.getLong(ChaosConstants.RETRY_SLEEP_INTERVAL_KEY,
+            ChaosConstants.DEFAULT_RETRY_SLEEP_INTERVAL)));
+    try {
+      this.createZKConnection(null);
+      this.register();
+    } catch (IOException e) {
+      LOG.error("Error Creating Connection: " + e);
+    }
+  }
+
+  /***
+   * Creates Connection with ZooKeeper.
+   * @throws IOException if something goes wrong
+   */
+  private void createZKConnection(Watcher watcher) throws IOException {
+    if(watcher == null) {
+      zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, this);
+    } else {
+      zk = new ZooKeeper(quorum, ChaosConstants.SESSION_TIMEOUT_ZK, watcher);
+    }
+    LOG.info("ZooKeeper Connection created for ChaosAgent: " + agentName);
+  }
+
+  //WATCHERS: Below are the Watches used by ChaosAgent
+
+  /***
+   * Watcher for notifying if any task is assigned to agent or not,
+   * by seeking if any Node is being added to agent as Child.
+   */
+  Watcher newTaskCreatedWatcher = new Watcher() {
+    @Override
+    public void process(WatchedEvent watchedEvent) {
+      if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
+        if (!(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE +
+          ChaosConstants.ZNODE_PATH_SEPARATOR + 
agentName).equals(watchedEvent.getPath())) {
+          throw new RuntimeException(KeeperException.create(
+            KeeperException.Code.DATAINCONSISTENCY));
+        }
+
+        LOG.info("Change in Tasks Node, checking for Tasks again.");
+        getTasks();
+      }
+
+    }
+  };
+
+  //CALLBACKS: Below are the Callbacks used by Chaos Agent
+
+  /**
+   * Callback used while setting status of a given task, Logs given status.
+   */
+  AsyncCallback.StatCallback setStatusOfTaskZNodeCallback = (rc, path, ctx, 
stat) -> {
+    switch (KeeperException.Code.get(rc)) {
+      case CONNECTIONLOSS:
+        // Connection to the server was lost while setting status setting 
again.
+        try {
+          recreateZKConnection();
+        } catch (Exception e) {
+          break;
+        }
+        setStatusOfTaskZNode(path, (String) ctx);
+        break;
+
+      case OK:
+        LOG.info("Status of Task has been set");
+        break;
+
+      case NONODE:
+        LOG.error("Chaos Agent status node does not exists: "
+          + "check for ZNode directory structure again.");
+        break;
+
+      default:
+        LOG.error("Error while setting status of task ZNode: " +
+          path, KeeperException.create(KeeperException.Code.get(rc), path));
+    }
+  };
+
+  /**
+   * Callback used while creating a Persistent ZNode tries to create
+   * ZNode again if Connection was lost in previous try.
+   */
+  AsyncCallback.StringCallback createZNodeCallback = (rc, path, ctx, name) -> {
+    switch (KeeperException.Code.get(rc)) {
+      case CONNECTIONLOSS:
+        try {
+          recreateZKConnection();
+        } catch (Exception e) {
+          break;
+        }
+        createZNode(path, (byte[]) ctx);
+        break;
+      case OK:
+        LOG.info("ZNode created : " + path);
+        break;
+      case NODEEXISTS:
+        LOG.warn("ZNode already registered: " + path);
+        break;
+      default:
+        LOG.error("Error occurred while creating Persistent ZNode: " + path,
+          KeeperException.create(KeeperException.Code.get(rc), path));
+    }
+  };
+
+  /**
+   * Callback used while creating a Ephemeral ZNode tries to create ZNode again
+   * if Connection was lost in previous try.
+   */
+  AsyncCallback.StringCallback createEphemeralZNodeCallback = (rc, path, ctx, 
name) -> {
+    switch (KeeperException.Code.get(rc)) {
+      case CONNECTIONLOSS:
+        try {
+          recreateZKConnection();
+        } catch (Exception e) {
+          break;
+        }
+        createEphemeralZNode(path, (byte[]) ctx);
+        break;
+      case OK:
+        LOG.info("ZNode created : " + path);
+        break;
+      case NODEEXISTS:
+        LOG.warn("ZNode already registered: " + path);
+        break;
+      default:
+        LOG.error("Error occurred while creating Ephemeral ZNode: ",
+          KeeperException.create(KeeperException.Code.get(rc), path));
+    }
+  };
+
+  /**
+   * Callback used by getTasksForAgentCallback while getting command,
+   * after getting command successfully, it executes command and
+   * set its status with respect to the command type.
+   */
+  AsyncCallback.DataCallback getTaskForExecutionCallback = new 
AsyncCallback.DataCallback() {
+    @Override
+    public void processResult(int rc, String path, Object ctx, byte[] data, 
Stat stat) {
+      switch (KeeperException.Code.get(rc)) {
+        case CONNECTIONLOSS:
+          //Connection to the server has been lost while getting task, getting 
data again.
+          try {
+            recreateZKConnection();
+          } catch (Exception e) {
+            break;
+          }
+          zk.getData(path,
+            false,
+            getTaskForExecutionCallback,
+            new String(data));
+          break;
+        case OK:
+          String cmd = new String(data);
+          LOG.info("Executing command : " + cmd);
+          String status = ChaosConstants.TASK_COMPLETION_STRING;
+          try {
+            String user = conf.get(ChaosConstants.CHAOSAGENT_SHELL_USER,
+              ChaosConstants.DEFAULT_SHELL_USER);
+            switch (cmd.substring(0, 4)) {
+              case "bool":
+                String ret = execWithRetries(user, 
cmd.substring(4)).getSecond();
+                status = Boolean.toString(ret.length() > 0);
+                break;
+
+              case "exec":
+                execWithRetries(user, cmd.substring(4));
+                break;
+
+              default:
+                LOG.error("Unknown Command Type");
+                status = ChaosConstants.TASK_ERROR_STRING;
+            }
+          } catch (IOException e) {
+            LOG.error("Got error while executing command : " + cmd +
+              " On agent : " + agentName + " Error : " + e);
+            status = ChaosConstants.TASK_ERROR_STRING;
+          }
+
+          try {
+            setStatusOfTaskZNode(path, status);
+            Thread.sleep(ChaosConstants.SET_STATUS_SLEEP_TIME);
+          } catch (InterruptedException e) {
+            LOG.error("Error occured after setting status: " + e);
+          }
+
+        default:
+          LOG.error("Error occurred while getting data",
+            KeeperException.create(KeeperException.Code.get(rc), path));
+      }
+    }
+  };
+
+  /***
+   * Callback used while getting Tasks for agent if call executed without 
Exception,
+   * It creates a separate thread for each children to execute given Tasks 
parallely.
+   */
+  AsyncCallback.ChildrenCallback getTasksForAgentCallback = new 
AsyncCallback.ChildrenCallback() {
+    @Override
+    public void processResult(int rc, String path, Object ctx, List<String> 
children) {
+      switch (KeeperException.Code.get(rc)) {
+        case CONNECTIONLOSS: {
+          // Connection to the server has been lost, getting tasks again.
+          try {
+            recreateZKConnection();
+          } catch (Exception e) {
+            break;
+          }
+          getTasks();
+          break;
+        }
+
+        case OK: {
+          if (children != null) {
+            try {
+
+              LOG.info("Executing each task as a separate thread");
+              List<Thread> tasksList = new ArrayList<>();
+              for (String task : children) {
+                String threadName = agentName + "_" + task;
+                Thread t = new Thread(() -> {
+
+                  LOG.info("Executing task : " + task + " of agent : " + 
agentName);
+                  
zk.getData(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE +
+                      ChaosConstants.ZNODE_PATH_SEPARATOR + agentName +
+                      ChaosConstants.ZNODE_PATH_SEPARATOR + task,
+                    false,
+                    getTaskForExecutionCallback,
+                    task);
+
+                });
+                t.setName(threadName);
+                t.start();
+                tasksList.add(t);
+
+                for (Thread thread : tasksList) {
+                  thread.join();
+                }
+              }
+            } catch (InterruptedException e) {
+              LOG.error("Error scheduling next task : " +
+                " for agent : " + agentName + " Error : " + e);
+            }
+          }
+          break;
+        }
+
+        default:
+          LOG.error("Error occurred while getting task",
+            KeeperException.create(KeeperException.Code.get(rc), path));
+      }
+    }
+  };
+
+  /***
+   * Function to create PERSISTENT ZNODE with given path and data given as 
params
+   * @param path Path at which ZNode to create
+   * @param data Data to put under ZNode
+   */
+  public void createZNode(String path, byte[] data) {
+    zk.create(path,
+      data,
+      ZooDefs.Ids.OPEN_ACL_UNSAFE,
+      CreateMode.PERSISTENT,
+      createZNodeCallback,
+      data);
+  }
+
+  /***
+   * Function to create EPHEMERAL ZNODE with given path and data as params.
+   * @param path Path at which Ephemeral ZNode to create
+   * @param data Data to put under ZNode
+   */
+  public void createEphemeralZNode(String path, byte[] data) {
+    zk.create(path,
+      data,
+      ZooDefs.Ids.OPEN_ACL_UNSAFE,
+      CreateMode.EPHEMERAL,
+      createEphemeralZNodeCallback,
+      data);
+  }
+
+  /**
+   * Checks if given ZNode exists, if not creates a PERSISTENT ZNODE for same.
+   *
+   * @param path Path to check for ZNode
+   */
+  private void createIfZNodeNotExists(String path) {
+    try {
+      if (zk.exists(path,
+        false) == null) {
+        createZNode(path, new byte[0]);
+      }
+    } catch (KeeperException | InterruptedException e) {
+      LOG.error("Error checking given node : " + path + " " + e);
+    }
+  }
+
+  /**
+   * sets given Status for Task Znode
+   *
+   * @param taskZNode ZNode to set status
+   * @param status Status value
+   */
+  public void setStatusOfTaskZNode(String taskZNode, String status) {
+    LOG.info("Setting status of Task ZNode: " + taskZNode + " status : " + 
status);
+    zk.setData(taskZNode,
+      status.getBytes(),
+      -1,
+      setStatusOfTaskZNodeCallback,
+      null);
+  }
+
+  /**
+   * registration of ChaosAgent by checking and creating necessary ZNodes.
+   */
+  private void register() {
+    createIfZNodeNotExists(ChaosConstants.CHAOS_TEST_ROOT_ZNODE);
+    
createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE);
+    createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE);
+    createIfZNodeNotExists(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE +
+      ChaosConstants.ZNODE_PATH_SEPARATOR + agentName);
+
+    
createEphemeralZNode(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE +
+      ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, new byte[0]);
+  }
+
+  /***
+   * Gets tasks for execution, basically sets Watch on it's respective host's 
Znode and
+   * waits for tasks to be assigned, also has a getTasksForAgentCallback
+   * which handles execution of task.
+   */
+  private void getTasks() {
+    LOG.info("Getting Tasks for Agent: " + agentName + "and setting watch for 
new Tasks");
+    zk.getChildren(ChaosConstants.CHAOS_AGENT_STATUS_PERSISTENT_ZNODE +
+        ChaosConstants.ZNODE_PATH_SEPARATOR + agentName,
+      newTaskCreatedWatcher,
+      getTasksForAgentCallback,
+      null);
+  }
+
+  /**
+   * Below function executes command with retries with given user.
+   * Uses LocalShell to execute a command.
+   *
+   * @param user user name, default none
+   * @param cmd Command to execute
+   * @return A pair of Exit Code and Shell output
+   * @throws IOException Exception while executing shell command
+   */
+  private Pair<Integer, String> execWithRetries(String user, String cmd) 
throws IOException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        return exec(user, cmd);
+      } catch (IOException e) {
+        retryOrThrow(retryCounter, e, user, cmd);
+      }
+      try {
+        retryCounter.sleepUntilNextRetry();
+      } catch (InterruptedException e) {
+        LOG.warn("Sleep Interrupted: " + e);
+      }
+    }
+  }
+
+  private Pair<Integer, String> exec(String user, String cmd) throws 
IOException {
+    LOG.info("Executing Shell command: " + cmd + " , user: " + user);
+
+    LocalShell shell = new LocalShell(user, cmd);
+    try {
+      shell.execute();
+    } catch (Shell.ExitCodeException e) {
+      String output = shell.getOutput();
+      throw new Shell.ExitCodeException(e.getExitCode(), "stderr: " + 
e.getMessage()
+        + ", stdout: " + output);
+    }
+    LOG.info("Executed Shell command, exit code: {}, output n{}", 
shell.getExitCode(), shell.getOutput());
+
+    return new Pair<>(shell.getExitCode(), shell.getOutput());
+  }
+
+  private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E 
ex,
+    String user, String cmd) throws E {
+    if (retryCounter.shouldRetry()) {
+      LOG.warn("Local command: {}, user: {}, failed at attempt {}. Retrying 
until maxAttempts: {}."
+        + "Exception {}", cmd, user,retryCounter.getAttemptTimes(), 
retryCounter.getMaxAttempts(),
+        ex.getMessage());
+      return;
+    }
+    throw ex;
+  }
+
+  private boolean isConnected() {
+    return connected;
+  }
+
+  @Override
+  public void close() throws IOException {
+    LOG.info("Closing ZooKeeper Connection for Chaos Agent : " + agentName);
+    try {
+      zk.close();
+    } catch (InterruptedException e) {
+      LOG.error("Error while closing ZooKeeper Connection.");
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
+      LOG.info("Running Chaos Agent on : " + agentName);
+      while (!this.isConnected()) {
+        Thread.sleep(100);
+      }
+      this.getTasks();
+      while (!stopChaosAgent.get()) {
+        Thread.sleep(500);
+      }
+    } catch (InterruptedException e) {
+      LOG.error("Error while running Chaos Agent", e);
+    }
+
+  }
+
+  @Override
+  public void process(WatchedEvent watchedEvent) {
+    LOG.info("Processing event: " + watchedEvent.toString());
+    if (watchedEvent.getType() == Event.EventType.None) {
+      switch (watchedEvent.getState()) {
+        case SyncConnected:
+          connected = true;
+          break;
+        case Disconnected:
+          connected = false;
+          break;
+        case Expired:
+          connected = false;
+          LOG.error("Session expired creating again");
+          try {
+            createZKConnection(null);
+          } catch (IOException e) {
+            LOG.error("Error creating Zookeeper connection", e);
+          }
+        default:
+          LOG.error("Unknown State");
+          break;
+      }
+    }
+  }
+
+  private void recreateZKConnection() throws Exception{
+    try {
+      zk.close();
+        createZKConnection(newTaskCreatedWatcher);
+        
createEphemeralZNode(ChaosConstants.CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE +
+          ChaosConstants.ZNODE_PATH_SEPARATOR + agentName, new byte[0]);
+      } catch (IOException e) {
+        LOG.error("Error creating new ZK COnnection for agent: {}", agentName 
+ e);
+        throw e;
+      }
+    }
+
+  /**
+   * Executes Command locally.
+   */
+  protected static class LocalShell extends Shell.ShellCommandExecutor {
+
+    private String user;
+    private String execCommand;
+
+    public LocalShell(String user, String execCommand) {
+      super(new String[]{execCommand});
+      this.user = user;
+      this.execCommand = execCommand;
+    }
+
+    @Override
+    public String[] getExecString() {
+      // TODO: Considering Agent is running with same user.
+      if(!user.equals(ChaosConstants.DEFAULT_SHELL_USER)){
+        execCommand = String.format("su -u %1$s %2$s", user, execCommand);
+      }
+      return new String[]{"/usr/bin/env", "bash", "-c", execCommand};
+    }
+
+    @Override
+    public void execute() throws IOException {
+      super.execute();
+    }
+  }
+}
diff --git 
a/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosConstants.java 
b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosConstants.java
new file mode 100644
index 0000000..54fbe9b
--- /dev/null
+++ b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosConstants.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.chaos;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/***
+ * ChaosConstant holds a bunch of Choas-related Constants
+ */
+@InterfaceAudience.Public
+public final class ChaosConstants {
+
+  /*Base ZNode for whole Chaos Testing*/
+  public static final String CHAOS_TEST_ROOT_ZNODE = "/hbase";
+
+  /*Just a / used for path separator*/
+  public static final String ZNODE_PATH_SEPARATOR = "/";
+
+  /*ZNode used for ChaosAgents registration.*/
+  public static final String CHAOS_AGENT_REGISTRATION_EPIMERAL_ZNODE =
+    CHAOS_TEST_ROOT_ZNODE + ZNODE_PATH_SEPARATOR + "chaosAgents";
+
+  /*ZNode used for getting status of tasks assigned*/
+  public static final String CHAOS_AGENT_STATUS_PERSISTENT_ZNODE =
+    CHAOS_TEST_ROOT_ZNODE + ZNODE_PATH_SEPARATOR + "chaosAgentTaskStatus";
+
+  /*Config property for getting number of retries to execute a command*/
+  public static final String RETRY_ATTEMPTS_KEY = 
"hbase.it.clustermanager.retry.attempts";
+
+  /*Default value for number of retries*/
+  public static final int DEFAULT_RETRY_ATTEMPTS = 5;
+
+  /*Config property to sleep in between retries*/
+  public static final String RETRY_SLEEP_INTERVAL_KEY =
+    "hbase.it.clustermanager.retry.sleep.interval";
+
+  /*Default Sleep time between each retry*/
+  public static final int DEFAULT_RETRY_SLEEP_INTERVAL = 5000;
+
+  /*Config property for executing command as specific user*/
+  public static final String CHAOSAGENT_SHELL_USER = 
"hbase.it.clustermanager.ssh.user";
+
+  /*default user for executing local commands*/
+  public static final String DEFAULT_SHELL_USER = "";
+
+  /*timeout used while creating ZooKeeper connection*/
+  public static final int SESSION_TIMEOUT_ZK = 60000 * 10;
+
+  /*Time given to ChaosAgent to set status*/
+  public static final int SET_STATUS_SLEEP_TIME = 30 * 1000;
+
+  /*Status String when you get an ERROR while executing task*/
+  public static final String TASK_ERROR_STRING = "error";
+
+  /*Status String when your command gets executed correctly*/
+  public static final String TASK_COMPLETION_STRING = "done";
+
+  /*Name of ChoreService to use*/
+  public static final String CHORE_SERVICE_PREFIX = "ChaosService";
+
+}
diff --git 
a/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosService.java 
b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosService.java
new file mode 100644
index 0000000..e2abe3d
--- /dev/null
+++ b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosService.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.chaos;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
+
+/**
+ * Class used to start/stop Chaos related services (currently chaosagent)
+ */
+@InterfaceAudience.Private
+public class ChaosService {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ChaosService.class.getName());
+
+  public static void execute(String[] args, Configuration conf) {
+    LOG.info("arguments : " + Arrays.toString(args));
+
+    try {
+      CommandLine cmdline = new GnuParser().parse(getOptions(), args);
+      if 
(cmdline.hasOption(ChaosServiceName.CHAOSAGENT.toString().toLowerCase())) {
+        String actionStr = 
cmdline.getOptionValue(ChaosServiceName.CHAOSAGENT.toString().toLowerCase());
+        try {
+          ExecutorAction action = 
ExecutorAction.valueOf(actionStr.toUpperCase());
+          if (action == ExecutorAction.START) {
+            ChaosServiceStart(conf, ChaosServiceName.CHAOSAGENT);
+          } else if (action == ExecutorAction.STOP) {
+            ChaosServiceStop();
+          }
+        } catch (IllegalArgumentException e) {
+          LOG.error("action passed: {} Unexpected action. Please provide only 
start/stop.",
+            actionStr, e);
+          throw new RuntimeException(e);
+        }
+      } else {
+        LOG.error("Invalid Options");
+      }
+    } catch (Exception e) {
+      LOG.error("Error while starting ChaosService : ", e);
+    }
+  }
+
+  private static void ChaosServiceStart(Configuration conf, ChaosServiceName 
serviceName) {
+    switch (serviceName) {
+      case CHAOSAGENT:
+        ChaosAgent.stopChaosAgent.set(false);
+        try {
+          Thread t = new Thread(new ChaosAgent(conf,
+            ChaosUtils.getZKQuorum(conf), ChaosUtils.getHostName()));
+          t.start();
+          t.join();
+        } catch (InterruptedException | UnknownHostException e) {
+          LOG.error("Failed while executing next task execution of ChaosAgent 
on : {}",
+            serviceName, e);
+        }
+        break;
+      default:
+        LOG.error("Service Name not known : " + serviceName.toString());
+    }
+  }
+
+  private static void ChaosServiceStop() {
+    ChaosAgent.stopChaosAgent.set(true);
+  }
+
+  private static Options getOptions() {
+    Options options = new Options();
+    options.addOption(new Option("c", 
ChaosServiceName.CHAOSAGENT.toString().toLowerCase(),
+      true, "expecting a start/stop argument"));
+    options.addOption(new Option("D", ChaosServiceName.GENERIC.toString(),
+      true, "generic D param"));
+    LOG.info(Arrays.toString(new Collection[] { options.getOptions() }));
+    return options;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    new GenericOptionsParser(conf, args);
+
+    ChoreService choreChaosService = null;
+    ScheduledChore authChore = AuthUtil.getAuthChore(conf);
+
+    try {
+      if (authChore != null) {
+        choreChaosService = new 
ChoreService(ChaosConstants.CHORE_SERVICE_PREFIX);
+        choreChaosService.scheduleChore(authChore);
+      }
+
+      execute(args, conf);
+    } finally {
+      if (authChore != null)
+        choreChaosService.shutdown();
+    }
+  }
+
+  enum ChaosServiceName {
+    CHAOSAGENT,
+    GENERIC
+  }
+
+
+  enum ExecutorAction {
+    START,
+    STOP
+  }
+}
diff --git 
a/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosUtils.java 
b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosUtils.java
new file mode 100644
index 0000000..da42021
--- /dev/null
+++ b/hbase-it/src/main/java/org/apache/hadoop/hbase/chaos/ChaosUtils.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.chaos;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * ChaosUtils holds a bunch of useful functions like getting hostname and 
getting ZooKeeper quorum.
+ */
+@InterfaceAudience.Private
+public class ChaosUtils {
+
+  public static String getHostName() throws UnknownHostException {
+    return InetAddress.getLocalHost().getHostName();
+  }
+
+
+  public static String getZKQuorum(Configuration conf) {
+    String port =
+      Integer.toString(conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181));
+    String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM, 
"localhost");
+    for (int i = 0; i < serverHosts.length; i++) {
+      serverHosts[i] = serverHosts[i] + ":" + port;
+    }
+    return String.join(",", serverHosts);
+  }
+
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ChaosZKClient.java 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/ChaosZKClient.java
new file mode 100644
index 0000000..31fb9e3
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ChaosZKClient.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class ChaosZKClient {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ChaosZKClient.class.getName());
+  private static final String CHAOS_AGENT_PARENT_ZNODE = "/hbase/chaosAgents";
+  private static final String CHAOS_AGENT_STATUS_ZNODE = 
"/hbase/chaosAgentTaskStatus";
+  private static final String ZNODE_PATH_SEPARATOR = "/";
+  private static final String TASK_PREFIX = "task_";
+  private static final String TASK_ERROR_STRING = "error";
+  private static final String TASK_COMPLETION_STRING = "done";
+  private static final String TASK_BOOLEAN_TRUE = "true";
+  private static final String TASK_BOOLEAN_FALSE = "false";
+  private static final String CONNECTION_LOSS = "ConnectionLoss";
+  private static final int SESSION_TIMEOUT_ZK = 10 * 60 * 1000;
+  private static final int TASK_EXECUTION_TIMEOUT = 5 * 60 * 1000;
+  private volatile String taskStatus = null;
+
+  private final String quorum;
+  private ZooKeeper zk;
+
+  public ChaosZKClient(String quorum) {
+    this.quorum = quorum;
+    try {
+      this.createNewZKConnection();
+    } catch (IOException e) {
+      LOG.error("Error creating ZooKeeper Connection: ", e);
+    }
+  }
+
+  /**
+   * Creates connection with ZooKeeper
+   * @throws IOException when not able to create connection properly
+   */
+  public void createNewZKConnection() throws IOException {
+    Watcher watcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent watchedEvent) {
+        LOG.info("Created ZooKeeper Connection For executing task");
+      }
+    };
+
+    this.zk = new ZooKeeper(quorum, SESSION_TIMEOUT_ZK, watcher);
+  }
+
+  /**
+   * Checks if ChaosAgent is running or not on target host by checking its 
ZNode.
+   * @param hostname hostname to check for chaosagent
+   * @return true/false whether agent is running or not
+   */
+  private boolean isChaosAgentRunning(String hostname) {
+    try {
+      return zk.exists(CHAOS_AGENT_PARENT_ZNODE + ZNODE_PATH_SEPARATOR + 
hostname,
+        false) != null;
+    } catch (KeeperException e) {
+      if (e.toString().contains(CONNECTION_LOSS)) {
+        recreateZKConnection();
+        try {
+          return zk.exists(CHAOS_AGENT_PARENT_ZNODE + ZNODE_PATH_SEPARATOR + 
hostname,
+            false) != null;
+        } catch (KeeperException  | InterruptedException ie) {
+          LOG.error("ERROR ", ie);
+        }
+      }
+    } catch (InterruptedException e) {
+      LOG.error("Error checking for given hostname: {} ERROR: ", hostname, e);
+    }
+    return false;
+  }
+
+  /**
+   * Creates tasks for target hosts by creating ZNodes.
+   * Waits for a limited amount of time to complete task to execute.
+   * @param taskObject Object data represents command
+   * @return returns status
+   */
+  public String submitTask(final TaskObject taskObject) {
+    if (isChaosAgentRunning(taskObject.getTaskHostname())) {
+      LOG.info("Creating task node");
+      zk.create(CHAOS_AGENT_STATUS_ZNODE + ZNODE_PATH_SEPARATOR +
+          taskObject.getTaskHostname() + ZNODE_PATH_SEPARATOR + TASK_PREFIX,
+        taskObject.getCommand().getBytes(),
+        ZooDefs.Ids.OPEN_ACL_UNSAFE,
+        CreateMode.EPHEMERAL_SEQUENTIAL,
+        submitTaskCallback,
+        taskObject);
+      long start = System.currentTimeMillis();
+
+      while ((System.currentTimeMillis() - start) < TASK_EXECUTION_TIMEOUT) {
+        if(taskStatus != null) {
+          return taskStatus;
+        }
+        Threads.sleep(500);
+      }
+    } else {
+      LOG.info("EHHHHH!  ChaosAgent Not running");
+    }
+    return TASK_ERROR_STRING;
+  }
+
+  /**
+   * To get status of task submitted
+   * @param path path at which to get status
+   * @param ctx path context
+   */
+  private void getStatus(String path , Object ctx) {
+    LOG.info("Getting Status of task: " + path);
+    zk.getData(path,
+      false,
+      getStatusCallback,
+      ctx);
+  }
+
+  /**
+   * Set a watch on task submitted
+   * @param name ZNode name to set a watch
+   * @param taskObject context for ZNode name
+   */
+  private void setStatusWatch(String name, TaskObject taskObject) {
+    LOG.info("Checking for ZNode and Setting watch for task : " + name);
+    zk.exists(name,
+      setStatusWatcher,
+      setStatusWatchCallback,
+      taskObject);
+  }
+
+  /**
+   * Delete task after getting its status
+   * @param path path to delete ZNode
+   */
+  private void deleteTask(String path) {
+    LOG.info("Deleting task: " + path);
+    zk.delete(path,
+      -1,
+      taskDeleteCallback,
+      null);
+  }
+
+  //WATCHERS:
+
+  /**
+   * Watcher to get notification whenever status of task changes.
+   */
+  Watcher setStatusWatcher = new Watcher() {
+    @Override
+    public void process(WatchedEvent watchedEvent) {
+      LOG.info("Setting status watch for task: " + watchedEvent.getPath());
+      if(watchedEvent.getType() == Event.EventType.NodeDataChanged) {
+        if(!watchedEvent.getPath().contains(TASK_PREFIX)) {
+          throw new RuntimeException(KeeperException.create(
+            KeeperException.Code.DATAINCONSISTENCY));
+        }
+        getStatus(watchedEvent.getPath(), (Object) watchedEvent.getPath());
+
+      }
+    }
+  };
+
+  //CALLBACKS
+
+  AsyncCallback.DataCallback getStatusCallback = (rc, path, ctx, data, stat) 
-> {
+    switch (KeeperException.Code.get(rc)) {
+      case CONNECTIONLOSS:
+        //Connectionloss while getting status of task, getting again
+        recreateZKConnection();
+        getStatus(path, ctx);
+        break;
+
+      case OK:
+        if (ctx!=null) {
+
+          String status = new String(data);
+          taskStatus = status;
+          switch (status) {
+            case TASK_COMPLETION_STRING:
+            case TASK_BOOLEAN_TRUE:
+            case TASK_BOOLEAN_FALSE:
+              LOG.info("Task executed completely : Status --> " + status);
+              break;
+
+            case TASK_ERROR_STRING:
+              LOG.info("There was error while executing task : Status --> " + 
status);
+              break;
+
+            default:
+              LOG.warn("Status of task is undefined!! : Status --> " + status);
+          }
+
+          deleteTask(path);
+        }
+        break;
+
+      default:
+        LOG.error("ERROR while getting status of task: " + path + " ERROR: " +
+          KeeperException.create(KeeperException.Code.get(rc)));
+    }
+  };
+
+  AsyncCallback.StatCallback setStatusWatchCallback = (rc, path, ctx, stat) -> 
{
+    switch (KeeperException.Code.get(rc)) {
+      case CONNECTIONLOSS:
+        //ConnectionLoss while setting watch on status ZNode, setting again.
+        recreateZKConnection();
+        setStatusWatch(path, (TaskObject) ctx);
+        break;
+
+      case OK:
+        if(stat != null) {
+          getStatus(path, null);
+        }
+        break;
+
+      default:
+        LOG.error("ERROR while setting watch on task ZNode: " + path + " 
ERROR: " +
+          KeeperException.create(KeeperException.Code.get(rc)));
+    }
+  };
+
+  AsyncCallback.StringCallback submitTaskCallback = (rc, path, ctx, name) -> {
+    switch (KeeperException.Code.get(rc)) {
+      case CONNECTIONLOSS:
+        // Connection to server was lost while submitting task, submitting 
again.
+        recreateZKConnection();
+        submitTask((TaskObject) ctx);
+        break;
+
+      case OK:
+        LOG.info("Task created : " + name);
+        setStatusWatch(name, (TaskObject) ctx);
+        break;
+
+      default:
+        LOG.error("Error submitting task: " + name + " ERROR:" +
+          KeeperException.create(KeeperException.Code.get(rc)));
+    }
+  };
+
+  AsyncCallback.VoidCallback taskDeleteCallback = new 
AsyncCallback.VoidCallback() {
+    @Override
+    public void processResult(int rc, String path, Object ctx) {
+      switch (KeeperException.Code.get(rc)) {
+        case CONNECTIONLOSS:
+          //Connectionloss while deleting task, deleting again
+          recreateZKConnection();
+          deleteTask(path);
+          break;
+
+        case OK:
+          LOG.info("Task Deleted successfully!");
+          LOG.info("Closing ZooKeeper Connection");
+          try {
+            zk.close();
+          } catch (InterruptedException e) {
+            LOG.error("Error while closing ZooKeeper Connection.");
+          }
+          break;
+
+        default:
+          LOG.error("ERROR while deleting task: " + path + " ERROR: " +
+            KeeperException.create(KeeperException.Code.get(rc)));
+      }
+    }
+  };
+
+
+  private void recreateZKConnection() {
+    try {
+      zk.close();
+    } catch (InterruptedException e) {
+      LOG.error("Error closing ZK connection : ", e);
+    } finally {
+      try {
+        createNewZKConnection();
+      } catch (IOException e) {
+        LOG.error("Error creating new ZK COnnection for agent: ", e);
+      }
+    }
+  }
+
+  static class TaskObject {
+    private final String command;
+    private final String taskHostname;
+
+    public TaskObject(String command, String taskHostname) {
+      this.command = command;
+      this.taskHostname = taskHostname;
+    }
+
+    public String getCommand() {
+      return this.command;
+    }
+
+    public String getTaskHostname() {
+      return taskHostname;
+    }
+  }
+
+}
diff --git 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/ZNodeClusterManager.java 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/ZNodeClusterManager.java
new file mode 100644
index 0000000..88f14b0
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ZNodeClusterManager.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class ZNodeClusterManager extends Configured implements ClusterManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZNodeClusterManager.class.getName());
+  private static final String SIGKILL = "SIGKILL";
+  private static final String SIGSTOP = "SIGSTOP";
+  private static final String SIGCONT = "SIGCONT";
+  public ZNodeClusterManager() {
+  }
+
+  private String getZKQuorumServersStringFromHbaseConfig() {
+    String port =
+      Integer.toString(getConf().getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 
2181));
+    String[] serverHosts = getConf().getStrings(HConstants.ZOOKEEPER_QUORUM, 
"localhost");
+    for (int i = 0; i < serverHosts.length; i++) {
+      serverHosts[i] = serverHosts[i] + ":" + port;
+    }
+    return 
Arrays.asList(serverHosts).stream().collect(Collectors.joining(","));
+  }
+
+  private String createZNode(String hostname, String cmd) throws IOException{
+    LOG.info("Zookeeper Mode enabled sending command to zookeeper + " +
+      cmd + "hostname:" + hostname);
+    ChaosZKClient chaosZKClient = new 
ChaosZKClient(getZKQuorumServersStringFromHbaseConfig());
+    return chaosZKClient.submitTask(new ChaosZKClient.TaskObject(cmd, 
hostname));
+  }
+
+  protected HBaseClusterManager.CommandProvider getCommandProvider(ServiceType 
service)
+    throws IOException {
+    switch (service) {
+      case HADOOP_DATANODE:
+      case HADOOP_NAMENODE:
+        return new HBaseClusterManager.HadoopShellCommandProvider(getConf());
+      case ZOOKEEPER_SERVER:
+        return new 
HBaseClusterManager.ZookeeperShellCommandProvider(getConf());
+      default:
+        return new HBaseClusterManager.HBaseShellCommandProvider(getConf());
+    }
+  }
+
+  public void signal(ServiceType service, String signal, String hostname) 
throws IOException {
+    createZNode(hostname, CmdType.exec.toString() +
+      getCommandProvider(service).signalCommand(service, signal));
+  }
+
+  private void createOpCommand(String hostname, ServiceType service,
+    HBaseClusterManager.CommandProvider.Operation op) throws IOException{
+    createZNode(hostname, CmdType.exec.toString() +
+      getCommandProvider(service).getCommand(service, op));
+  }
+
+  @Override
+  public void start(ServiceType service, String hostname, int port) throws 
IOException {
+    createOpCommand(hostname, service, 
HBaseClusterManager.CommandProvider.Operation.START);
+  }
+
+  @Override
+  public void stop(ServiceType service, String hostname, int port) throws 
IOException {
+    createOpCommand(hostname, service, 
HBaseClusterManager.CommandProvider.Operation.STOP);
+  }
+
+  @Override
+  public void restart(ServiceType service, String hostname, int port) throws 
IOException {
+    createOpCommand(hostname, service, 
HBaseClusterManager.CommandProvider.Operation.RESTART);
+  }
+
+  @Override
+  public void kill(ServiceType service, String hostname, int port) throws 
IOException {
+    signal(service, SIGKILL, hostname);
+  }
+
+  @Override
+  public void suspend(ServiceType service, String hostname, int port) throws 
IOException {
+    signal(service, SIGSTOP, hostname);
+  }
+
+  @Override
+  public void resume(ServiceType service, String hostname, int port) throws 
IOException {
+    signal(service, SIGCONT, hostname);
+  }
+
+  @Override
+  public boolean isRunning(ServiceType service, String hostname, int port) 
throws IOException {
+    return Boolean.parseBoolean(createZNode(hostname, CmdType.bool.toString() +
+      getCommandProvider(service).isRunningCommand(service)));
+  }
+
+  enum CmdType {
+    exec,
+    bool
+  }
+}

Reply via email to