SLIDER-285. Slider Agents to bind and work with restarted AM (Gour Saha via 
smohanty)


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/58014d80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/58014d80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/58014d80

Branch: refs/heads/feature/SLIDER-285_Restart_AM
Commit: 58014d805e6f819e24b44c6394001df99c3dccb6
Parents: cebde98
Author: Sumit Mohanty <smoha...@hortonworks.com>
Authored: Mon Aug 11 11:17:46 2014 -0700
Committer: Sumit Mohanty <smoha...@hortonworks.com>
Committed: Mon Aug 11 20:15:38 2014 -0700

----------------------------------------------------------------------
 slider-agent/pom.xml                            |    4 +-
 .../src/main/python/agent/AgentConfig.py        |    2 +
 slider-agent/src/main/python/agent/Constants.py |    4 +-
 .../src/main/python/agent/Controller.py         |   52 +-
 slider-agent/src/main/python/agent/Heartbeat.py |    6 +-
 slider-agent/src/main/python/agent/Registry.py  |   54 +
 slider-agent/src/main/python/agent/main.py      |   29 +-
 slider-agent/src/main/python/kazoo/__init__.py  |    1 +
 slider-agent/src/main/python/kazoo/client.py    | 1412 ++++++++++++++++++
 .../src/main/python/kazoo/exceptions.py         |  199 +++
 .../src/main/python/kazoo/handlers/__init__.py  |    1 +
 .../src/main/python/kazoo/handlers/gevent.py    |  161 ++
 .../src/main/python/kazoo/handlers/threading.py |  287 ++++
 .../src/main/python/kazoo/handlers/utils.py     |   93 ++
 slider-agent/src/main/python/kazoo/hosts.py     |   26 +
 .../src/main/python/kazoo/interfaces.py         |  203 +++
 .../src/main/python/kazoo/loggingsupport.py     |    2 +
 .../src/main/python/kazoo/protocol/__init__.py  |    1 +
 .../main/python/kazoo/protocol/connection.py    |  623 ++++++++
 .../src/main/python/kazoo/protocol/paths.py     |   54 +
 .../main/python/kazoo/protocol/serialization.py |  396 +++++
 .../src/main/python/kazoo/protocol/states.py    |  237 +++
 .../src/main/python/kazoo/recipe/__init__.py    |    1 +
 .../src/main/python/kazoo/recipe/barrier.py     |  214 +++
 .../src/main/python/kazoo/recipe/counter.py     |   94 ++
 .../src/main/python/kazoo/recipe/election.py    |   79 +
 .../src/main/python/kazoo/recipe/lock.py        |  520 +++++++
 .../src/main/python/kazoo/recipe/partitioner.py |  377 +++++
 .../src/main/python/kazoo/recipe/party.py       |  118 ++
 .../src/main/python/kazoo/recipe/queue.py       |  321 ++++
 .../src/main/python/kazoo/recipe/watchers.py    |  419 ++++++
 slider-agent/src/main/python/kazoo/retry.py     |  150 ++
 slider-agent/src/main/python/kazoo/security.py  |  138 ++
 .../src/main/python/kazoo/testing/__init__.py   |    5 +
 .../src/main/python/kazoo/testing/common.py     |  283 ++++
 .../src/main/python/kazoo/testing/harness.py    |  180 +++
 .../src/main/python/kazoo/tests/__init__.py     |    0
 .../src/main/python/kazoo/tests/test_barrier.py |  157 ++
 .../src/main/python/kazoo/tests/test_build.py   |   29 +
 .../src/main/python/kazoo/tests/test_client.py  | 1098 ++++++++++++++
 .../main/python/kazoo/tests/test_connection.py  |  319 ++++
 .../src/main/python/kazoo/tests/test_counter.py |   35 +
 .../main/python/kazoo/tests/test_election.py    |  139 ++
 .../main/python/kazoo/tests/test_exceptions.py  |   22 +
 .../python/kazoo/tests/test_gevent_handler.py   |  160 ++
 .../src/main/python/kazoo/tests/test_lock.py    |  517 +++++++
 .../main/python/kazoo/tests/test_partitioner.py |   92 ++
 .../src/main/python/kazoo/tests/test_party.py   |   84 ++
 .../src/main/python/kazoo/tests/test_paths.py   |   98 ++
 .../src/main/python/kazoo/tests/test_queue.py   |  179 +++
 .../src/main/python/kazoo/tests/test_retry.py   |   77 +
 .../main/python/kazoo/tests/test_security.py    |   40 +
 .../kazoo/tests/test_threading_handler.py       |  326 ++++
 .../main/python/kazoo/tests/test_watchers.py    |  489 ++++++
 .../src/main/python/kazoo/tests/util.py         |  126 ++
 slider-agent/src/test/python/agent/TestMain.py  |    6 +-
 .../org/apache/slider/common/SliderKeys.java    |    5 +
 .../providers/AbstractProviderService.java      |   18 +
 .../slider/providers/ProviderService.java       |   22 +
 .../slider/providers/agent/AgentKeys.java       |    2 +
 .../providers/agent/AgentProviderService.java   |  126 +-
 .../server/appmaster/SliderAppMaster.java       |    7 +
 .../slider/server/appmaster/state/AppState.java |    8 +
 .../appmaster/web/rest/agent/HeartBeat.java     |   11 +
 .../model/mock/MockProviderService.groovy       |   12 +
 .../agent/TestAgentProviderService.java         |   26 +-
 slider-core/src/test/python/agent/main.py       |    5 +-
 67 files changed, 10918 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/pom.xml
----------------------------------------------------------------------
diff --git a/slider-agent/pom.xml b/slider-agent/pom.xml
index 4e67836..e230ec1 100644
--- a/slider-agent/pom.xml
+++ b/slider-agent/pom.xml
@@ -69,7 +69,7 @@
                 <argument>unitTests.py</argument>
               </arguments>
               <environmentVariables>
-                
<PYTHONPATH>${project.basedir}/src/main/python/jinja2:${project.basedir}/src/test/python:${project.basedir}/src/main/python:${project.basedir}/src/main/python/agent:${project.basedir}/src/main/python/resource_management:${project.basedir}/src/test/python/agent:${project.basedir}/src/test/python/resource_management</PYTHONPATH>
+                
<PYTHONPATH>${project.basedir}/src/main/python/jinja2:${project.basedir}/src/test/python:${project.basedir}/src/main/python:${project.basedir}/src/main/python/agent:${project.basedir}/src/main/python/resource_management:${project.basedir}/src/test/python/agent:${project.basedir}/src/test/python/resource_management:${project.basedir}/src/main/python/kazoo</PYTHONPATH>
               </environmentVariables>
               <skip>${skipTests}</skip>
             </configuration>
@@ -102,6 +102,8 @@
             <exclude>src/main/python/jinja2/**</exclude>
             <!-- mock files (BSD license) -->
             <exclude>src/test/python/mock/**</exclude>
+            <!-- kazoo files (Apache License, Version 2.0) -->
+            <exclude>src/main/python/kazoo/**</exclude>
           </excludes>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/agent/AgentConfig.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/AgentConfig.py 
b/slider-agent/src/main/python/agent/AgentConfig.py
index 57b1542..9adace1 100644
--- a/slider-agent/src/main/python/agent/AgentConfig.py
+++ b/slider-agent/src/main/python/agent/AgentConfig.py
@@ -33,6 +33,8 @@ content = """
 hostname=localhost
 port=8440
 secured_port=8441
+zk-quorum=localhost:2181
+zk-reg-path=/register/org-apache-slider/cl1
 check_path=/ws/v1/slider/agents/
 register_path=/ws/v1/slider/agents/{name}/register
 heartbeat_path=/ws/v1/slider/agents/{name}/heartbeat

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/agent/Constants.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/Constants.py 
b/slider-agent/src/main/python/agent/Constants.py
index 88cd564..a120999 100644
--- a/slider-agent/src/main/python/agent/Constants.py
+++ b/slider-agent/src/main/python/agent/Constants.py
@@ -29,4 +29,6 @@ AGENT_WORK_ROOT = "AGENT_WORK_ROOT"
 AGENT_LOG_ROOT = "AGENT_LOG_ROOT"
 DO_NOT_REGISTER = "DO_NOT_REGISTER"
 DO_NOT_HEARTBEAT = "DO_NOT_HEARTBEAT"
-DO_NOT_HEARTBEAT_AFTER_ = "DO_NOT_HEARTBEAT_AFTER_"
\ No newline at end of file
+DO_NOT_HEARTBEAT_AFTER_ = "DO_NOT_HEARTBEAT_AFTER_"
+ZK_QUORUM="zk_quorum"
+ZK_REG_PATH="zk_reg_path"

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/agent/Controller.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/Controller.py 
b/slider-agent/src/main/python/agent/Controller.py
index 92e9086..4a103cb 100644
--- a/slider-agent/src/main/python/agent/Controller.py
+++ b/slider-agent/src/main/python/agent/Controller.py
@@ -34,6 +34,7 @@ from Heartbeat import Heartbeat
 from Register import Register
 from ActionQueue import ActionQueue
 from NetUtil import NetUtil
+from Registry import Registry
 import ssl
 import ProcessHelper
 import Constants
@@ -43,7 +44,12 @@ import security
 logger = logging.getLogger()
 
 AGENT_AUTO_RESTART_EXIT_CODE = 77
+HEART_BEAT_RETRY_THRESHOLD = 2
 
+WS_AGENT_CONTEXT_ROOT = '/ws'
+SLIDER_PATH_AGENTS = WS_AGENT_CONTEXT_ROOT + '/v1/slider/agents/'
+SLIDER_REL_PATH_REGISTER = '/register'
+SLIDER_REL_PATH_HEARTBEAT = '/heartbeat'
 
 class State:
   INIT, INSTALLING, INSTALLED, STARTING, STARTED, FAILED = range(6)
@@ -57,13 +63,12 @@ class Controller(threading.Thread):
     self.safeMode = True
     self.credential = None
     self.config = config
-    self.hostname = config.getLabel()
-    server_url = 'https://' + config.get(AgentConfig.SERVER_SECTION,
-                                        'hostname') + \
-                 ':' + config.get(AgentConfig.SERVER_SECTION,
-                                  'secured_port')
-    self.registerUrl = server_url + '/ws/v1/slider/agents/' + self.hostname + 
'/register'
-    self.heartbeatUrl = server_url + '/ws/v1/slider/agents/' + self.hostname + 
'/heartbeat'
+    self.label = config.getLabel()
+    self.hostname = config.get(AgentConfig.SERVER_SECTION, 'hostname')
+    self.secured_port = config.get(AgentConfig.SERVER_SECTION, 'secured_port')
+    self.server_url = 'https://' + self.hostname + ':' + self.secured_port
+    self.registerUrl = self.server_url + SLIDER_PATH_AGENTS + self.label + 
SLIDER_REL_PATH_REGISTER
+    self.heartbeatUrl = self.server_url + SLIDER_PATH_AGENTS + self.label + 
SLIDER_REL_PATH_HEARTBEAT
     self.netutil = NetUtil()
     self.responseId = -1
     self.repeatRegistration = False
@@ -80,6 +85,7 @@ class Controller(threading.Thread):
     self.componentActualState = State.INIT
     self.statusCommand = None
     self.failureCount = 0
+    self.heartBeatRetryCount = 0
 
 
   def __del__(self):
@@ -204,8 +210,8 @@ class Controller(threading.Thread):
       try:
         if not retry:
           data = json.dumps(
-            self.heartbeat.build(commandResult, self.responseId,
-                                 self.hasMappedComponents))
+            self.heartbeat.build(commandResult, self.componentActualState,
+                                 self.responseId, self.hasMappedComponents))
           self.updateStateBasedOnResult(commandResult)
           logger.debug("Sending request: " + data)
           pass
@@ -285,9 +291,33 @@ class Controller(threading.Thread):
             print(
               "Server certificate verify failed. Did you regenerate server 
certificate?")
             certVerifFailed = True
+        self.heartBeatRetryCount += 1
+        logger.error(
+          "Heartbeat retry count = %d" % (self.heartBeatRetryCount))
+        # Re-read zk registry in case AM was restarted and came up with new 
+        # host/port, but do this only after heartbeat retry attempts crosses
+        # threshold
+        if self.heartBeatRetryCount > HEART_BEAT_RETRY_THRESHOLD:
+          self.isRegistered = False
+          self.repeatRegistration = True
+          self.heartBeatRetryCount = 0
+          self.cachedconnect = None # Previous connection is broken now
+          zk_quorum = self.config.get(AgentConfig.SERVER_SECTION, 
Constants.ZK_QUORUM)
+          zk_reg_path = self.config.get(AgentConfig.SERVER_SECTION, 
Constants.ZK_REG_PATH)
+          registry = Registry(zk_quorum, zk_reg_path)
+          amHost, amSecuredPort = registry.readAMHostPort()
+          logger.info("Read from ZK registry: AM host = %s, AM secured port = 
%s" % (amHost, amSecuredPort))
+          self.hostname = amHost
+          self.secured_port = amSecuredPort
+          self.config.set(AgentConfig.SERVER_SECTION, "hostname", 
self.hostname)
+          self.config.set(AgentConfig.SERVER_SECTION, "secured_port", 
self.secured_port)
+          self.server_url = 'https://' + self.hostname + ':' + 
self.secured_port
+          self.registerUrl = self.server_url + SLIDER_PATH_AGENTS + self.label 
+ SLIDER_REL_PATH_REGISTER
+          self.heartbeatUrl = self.server_url + SLIDER_PATH_AGENTS + 
self.label + SLIDER_REL_PATH_HEARTBEAT
+          return
         self.cachedconnect = None # Previous connection is broken now
         retry = True
-        # Sleep for some time
+      # Sleep for some time
       timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \
                 - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
       self.heartbeat_wait_event.wait(timeout=timeout)
@@ -358,8 +388,8 @@ class Controller(threading.Thread):
     statusCommand["serviceName"] = command["serviceName"]
     statusCommand["taskId"] = "status"
     statusCommand['auto_generated'] = True
-    return statusCommand
     logger.info("Status command: " + pprint.pformat(statusCommand))
+    return statusCommand
     pass
 
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/agent/Heartbeat.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/Heartbeat.py 
b/slider-agent/src/main/python/agent/Heartbeat.py
index 8192348..4f2207c 100644
--- a/slider-agent/src/main/python/agent/Heartbeat.py
+++ b/slider-agent/src/main/python/agent/Heartbeat.py
@@ -36,7 +36,8 @@ class Heartbeat:
     self.config = config
     self.reports = []
 
-  def build(self, commandResult, id='-1', componentsMapped=False):
+  def build(self, commandResult, componentActualState, id='-1',
+            componentsMapped=False):
     timestamp = int(time.time() * 1000)
     queueResult = self.actionQueue.result()
     logger.info("Queue result: " + pformat(queueResult))
@@ -48,7 +49,8 @@ class Heartbeat:
                  'timestamp': timestamp,
                  'hostname': self.config.getLabel(),
                  'nodeStatus': nodeStatus,
-                 'fqdn': hostname.public_hostname()
+                 'fqdn': hostname.public_hostname(),
+                 'agentState': componentActualState
     }
 
     commandsInProgress = False

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/agent/Registry.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/Registry.py 
b/slider-agent/src/main/python/agent/Registry.py
new file mode 100644
index 0000000..256509f
--- /dev/null
+++ b/slider-agent/src/main/python/agent/Registry.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+'''
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import json
+import logging
+from kazoo.client import KazooClient
+
+logger = logging.getLogger()
+
+class Registry:
+  def __init__(self, zk_quorum, zk_reg_path):
+    self.zk_quorum = zk_quorum
+    self.zk_reg_path = zk_reg_path
+
+  def readAMHostPort(self):
+    amHost = ""
+    amSecuredPort = ""
+    try:
+      zk = KazooClient(hosts=self.zk_quorum, read_only=True)
+      zk.start()
+      data, stat = zk.get(self.zk_reg_path)
+      logger.debug("Registry Data: %s" % (data.decode("utf-8")))
+      sliderRegistry = json.loads(data)
+      amUrl = 
sliderRegistry["payload"]["internalView"]["endpoints"]["org.apache.slider.agents"]["address"]
+      amHost = amUrl.split("/")[2].split(":")[0]
+      amSecuredPort = amUrl.split(":")[2].split("/")[0]
+      # the port needs to be utf-8 encoded 
+      amSecuredPort = amSecuredPort.encode('utf8', 'ignore')
+    except Exception:
+      # log and let empty strings be returned
+      logger.error("Could not connect to zk registry at {} in quorum {}" % 
+                   (self.zk_reg_path, self.zk_quorum))
+      pass
+    finally:
+      zk.close()
+    logger.info("AM Host = %s, AM Secured Port = %s" % (amHost, amSecuredPort))
+    return amHost, amSecuredPort

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/agent/main.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/main.py 
b/slider-agent/src/main/python/agent/main.py
index 105df5a..f68db04 100644
--- a/slider-agent/src/main/python/agent/main.py
+++ b/slider-agent/src/main/python/agent/main.py
@@ -34,6 +34,8 @@ import posixpath
 from Controller import Controller
 from AgentConfig import AgentConfig
 from NetUtil import NetUtil
+from Registry import Registry
+import Constants
 
 logger = logging.getLogger()
 IS_WINDOWS = platform.system() == "Windows"
@@ -178,9 +180,8 @@ def main():
   parser = OptionParser()
   parser.add_option("-v", "--verbose", dest="verbose", help="verbose log 
output", default=False)
   parser.add_option("-l", "--label", dest="label", help="label of the agent", 
default=None)
-  parser.add_option("--host", dest="host", help="AppMaster host", default=None)
-  parser.add_option("--port", dest="port", help="AppMaster port", default=None)
-  parser.add_option("--secured_port", dest="secured_port", help="AppMaster 2 
Way port", default=None)
+  parser.add_option("--zk-quorum", dest=Constants.ZK_QUORUM, help="Zookeeper 
Quorum", default=None)
+  parser.add_option("--zk-reg-path", dest=Constants.ZK_REG_PATH, 
help="Zookeeper Registry Path", default=None)
   parser.add_option("--debug", dest="debug", help="Agent debug hint", 
default="")
   (options, args) = parser.parse_args()
 
@@ -207,18 +208,24 @@ def main():
   update_config_from_file(agentConfig)
 
   # update configurations if needed
-  if options.host:
-      agentConfig.set(AgentConfig.SERVER_SECTION, "hostname", options.host)
+  if options.zk_quorum:
+      agentConfig.set(AgentConfig.SERVER_SECTION, Constants.ZK_QUORUM, 
options.zk_quorum)
 
-  if options.port:
-      agentConfig.set(AgentConfig.SERVER_SECTION, "port", options.port)
-
-  if options.secured_port:
-      agentConfig.set(AgentConfig.SERVER_SECTION, "secured_port", 
options.secured_port)
+  if options.zk_reg_path:
+      agentConfig.set(AgentConfig.SERVER_SECTION, Constants.ZK_REG_PATH, 
options.zk_reg_path)
 
   if options.debug:
     agentConfig.set(AgentConfig.AGENT_SECTION, AgentConfig.APP_DBG_CMD, 
options.debug)
 
+  # Extract the AM hostname and secured port from ZK registry
+  registry = Registry(options.zk_quorum, options.zk_reg_path)
+  amHost, amSecuredPort = registry.readAMHostPort()
+  if amHost:
+      agentConfig.set(AgentConfig.SERVER_SECTION, "hostname", amHost)
+
+  if amSecuredPort:
+      agentConfig.set(AgentConfig.SERVER_SECTION, "secured_port", 
amSecuredPort)
+
   # set the security directory to a subdirectory of the run dir
   secDir = posixpath.join(agentConfig.getResolvedPath(AgentConfig.RUN_DIR), 
"security")
   logger.info("Security/Keys directory: " + secDir)
@@ -243,7 +250,7 @@ def main():
 
   server_url = SERVER_STATUS_URL.format(
     agentConfig.get(AgentConfig.SERVER_SECTION, 'hostname'),
-    agentConfig.get(AgentConfig.SERVER_SECTION, 'port'),
+    agentConfig.get(AgentConfig.SERVER_SECTION, 'secured_port'),
     agentConfig.get(AgentConfig.SERVER_SECTION, 'check_path'))
   print("Connecting to the server at " + server_url + "...")
   logger.info('Connecting to the server at: ' + server_url)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/__init__.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/__init__.py 
b/slider-agent/src/main/python/kazoo/__init__.py
new file mode 100644
index 0000000..792d600
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/__init__.py
@@ -0,0 +1 @@
+#

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/client.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/client.py 
b/slider-agent/src/main/python/kazoo/client.py
new file mode 100644
index 0000000..6e3a219
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/client.py
@@ -0,0 +1,1412 @@
+"""Kazoo Zookeeper Client"""
+import inspect
+import logging
+import os
+import re
+import warnings
+from collections import defaultdict, deque
+from functools import partial
+from os.path import split
+
+from kazoo.exceptions import (
+    AuthFailedError,
+    ConfigurationError,
+    ConnectionClosedError,
+    ConnectionLoss,
+    NoNodeError,
+    NodeExistsError,
+    SessionExpiredError,
+    WriterNotClosedException,
+)
+from kazoo.handlers.threading import SequentialThreadingHandler
+from kazoo.handlers.utils import capture_exceptions, wrap
+from kazoo.hosts import collect_hosts
+from kazoo.loggingsupport import BLATHER
+from kazoo.protocol.connection import ConnectionHandler
+from kazoo.protocol.paths import normpath
+from kazoo.protocol.paths import _prefix_root
+from kazoo.protocol.serialization import (
+    Auth,
+    CheckVersion,
+    CloseInstance,
+    Create,
+    Delete,
+    Exists,
+    GetChildren,
+    GetChildren2,
+    GetACL,
+    SetACL,
+    GetData,
+    SetData,
+    Sync,
+    Transaction
+)
+from kazoo.protocol.states import KazooState
+from kazoo.protocol.states import KeeperState
+from kazoo.retry import KazooRetry
+from kazoo.security import ACL
+from kazoo.security import OPEN_ACL_UNSAFE
+
+# convenience API
+from kazoo.recipe.barrier import Barrier
+from kazoo.recipe.barrier import DoubleBarrier
+from kazoo.recipe.counter import Counter
+from kazoo.recipe.election import Election
+from kazoo.recipe.lock import Lock
+from kazoo.recipe.lock import Semaphore
+from kazoo.recipe.partitioner import SetPartitioner
+from kazoo.recipe.party import Party
+from kazoo.recipe.party import ShallowParty
+from kazoo.recipe.queue import Queue
+from kazoo.recipe.queue import LockingQueue
+from kazoo.recipe.watchers import ChildrenWatch
+from kazoo.recipe.watchers import DataWatch
+
+try:  # pragma: nocover
+    basestring
+except NameError:  # pragma: nocover
+    basestring = str
+
+LOST_STATES = (KeeperState.EXPIRED_SESSION, KeeperState.AUTH_FAILED,
+               KeeperState.CLOSED)
+ENVI_VERSION = re.compile('[\w\s:.]*=([\d\.]*).*', re.DOTALL)
+log = logging.getLogger(__name__)
+
+
+_RETRY_COMPAT_DEFAULTS = dict(
+    max_retries=None,
+    retry_delay=0.1,
+    retry_backoff=2,
+    retry_jitter=0.8,
+    retry_max_delay=3600,
+)
+
+_RETRY_COMPAT_MAPPING = dict(
+    max_retries='max_tries',
+    retry_delay='delay',
+    retry_backoff='backoff',
+    retry_jitter='max_jitter',
+    retry_max_delay='max_delay',
+)
+
+
+class KazooClient(object):
+    """An Apache Zookeeper Python client supporting alternate callback
+    handlers and high-level functionality.
+
+    Watch functions registered with this class will not get session
+    events, unlike the default Zookeeper watches. They will also be
+    called with a single argument, a
+    :class:`~kazoo.protocol.states.WatchedEvent` instance.
+
+    """
+    def __init__(self, hosts='127.0.0.1:2181',
+                 timeout=10.0, client_id=None, handler=None,
+                 default_acl=None, auth_data=None, read_only=None,
+                 randomize_hosts=True, connection_retry=None,
+                 command_retry=None, logger=None, **kwargs):
+        """Create a :class:`KazooClient` instance. All time arguments
+        are in seconds.
+
+        :param hosts: Comma-separated list of hosts to connect to
+                      (e.g. 127.0.0.1:2181,127.0.0.1:2182,[::1]:2183).
+        :param timeout: The longest to wait for a Zookeeper connection.
+        :param client_id: A Zookeeper client id, used when
+                          re-establishing a prior session connection.
+        :param handler: An instance of a class implementing the
+                        :class:`~kazoo.interfaces.IHandler` interface
+                        for callback handling.
+        :param default_acl: A default ACL used on node creation.
+        :param auth_data:
+            A list of authentication credentials to use for the
+            connection. Should be a list of (scheme, credential)
+            tuples as :meth:`add_auth` takes.
+        :param read_only: Allow connections to read only servers.
+        :param randomize_hosts: By default randomize host selection.
+        :param connection_retry:
+            A :class:`kazoo.retry.KazooRetry` object to use for
+            retrying the connection to Zookeeper. Also can be a dict of
+            options which will be used for creating one.
+        :param command_retry:
+            A :class:`kazoo.retry.KazooRetry` object to use for
+            the :meth:`KazooClient.retry` method. Also can be a dict of
+            options which will be used for creating one.
+        :param logger: A custom logger to use instead of the module
+            global `log` instance.
+
+        Basic Example:
+
+        .. code-block:: python
+
+            zk = KazooClient()
+            zk.start()
+            children = zk.get_children('/')
+            zk.stop()
+
+        As a convenience all recipe classes are available as attributes
+        and get automatically bound to the client. For example::
+
+            zk = KazooClient()
+            zk.start()
+            lock = zk.Lock('/lock_path')
+
+        .. versionadded:: 0.6
+            The read_only option. Requires Zookeeper 3.4+
+
+        .. versionadded:: 0.6
+            The retry_max_delay option.
+
+        .. versionadded:: 0.6
+            The randomize_hosts option.
+
+        .. versionchanged:: 0.8
+            Removed the unused watcher argument (was second argument).
+
+        .. versionadded:: 1.2
+            The connection_retry, command_retry and logger options.
+
+        """
+        self.logger = logger or log
+
+        # Record the handler strategy used
+        self.handler = handler if handler else SequentialThreadingHandler()
+        if inspect.isclass(self.handler):
+            raise ConfigurationError("Handler must be an instance of a class, "
+                                     "not the class: %s" % self.handler)
+
+        self.auth_data = auth_data if auth_data else set([])
+        self.default_acl = default_acl
+        self.randomize_hosts = randomize_hosts
+        self.hosts = None
+        self.chroot = None
+        self.set_hosts(hosts)
+
+        # Curator like simplified state tracking, and listeners for
+        # state transitions
+        self._state = KeeperState.CLOSED
+        self.state = KazooState.LOST
+        self.state_listeners = set()
+
+        self._reset()
+        self.read_only = read_only
+
+        if client_id:
+            self._session_id = client_id[0]
+            self._session_passwd = client_id[1]
+        else:
+            self._reset_session()
+
+        # ZK uses milliseconds
+        self._session_timeout = int(timeout * 1000)
+
+        # We use events like twitter's client to track current and
+        # desired state (connected, and whether to shutdown)
+        self._live = self.handler.event_object()
+        self._writer_stopped = self.handler.event_object()
+        self._stopped = self.handler.event_object()
+        self._stopped.set()
+        self._writer_stopped.set()
+
+        self.retry = self._conn_retry = None
+
+        if type(connection_retry) is dict:
+            self._conn_retry = KazooRetry(**connection_retry)
+        elif type(connection_retry) is KazooRetry:
+            self._conn_retry = connection_retry
+
+        if type(command_retry) is dict:
+            self.retry = KazooRetry(**command_retry)
+        elif type(command_retry) is KazooRetry:
+            self.retry = command_retry
+
+
+        if type(self._conn_retry) is KazooRetry:
+            if self.handler.sleep_func != self._conn_retry.sleep_func:
+                raise ConfigurationError("Retry handler and event handler "
+                                         " must use the same sleep func")
+
+        if type(self.retry) is KazooRetry:
+            if self.handler.sleep_func != self.retry.sleep_func:
+                raise ConfigurationError("Command retry handler and event "
+                                         "handler must use the same sleep 
func")
+
+        if self.retry is None or self._conn_retry is None:
+            old_retry_keys = dict(_RETRY_COMPAT_DEFAULTS)
+            for key in old_retry_keys:
+                try:
+                    old_retry_keys[key] = kwargs.pop(key)
+                    warnings.warn('Passing retry configuration param %s to the'
+                            ' client directly is deprecated, please pass a'
+                            ' configured retry object (using param %s)' % (
+                                key, _RETRY_COMPAT_MAPPING[key]),
+                            DeprecationWarning, stacklevel=2)
+                except KeyError:
+                    pass
+
+            retry_keys = {}
+            for oldname, value in old_retry_keys.items():
+                retry_keys[_RETRY_COMPAT_MAPPING[oldname]] = value
+
+            if self._conn_retry is None:
+                self._conn_retry = KazooRetry(
+                    sleep_func=self.handler.sleep_func,
+                    **retry_keys)
+            if self.retry is None:
+                self.retry = KazooRetry(
+                    sleep_func=self.handler.sleep_func,
+                    **retry_keys)
+
+        self._conn_retry.interrupt = lambda: self._stopped.is_set()
+        self._connection = ConnectionHandler(self, self._conn_retry.copy(),
+            logger=self.logger)
+
+        # Every retry call should have its own copy of the retry helper
+        # to avoid shared retry counts
+        self._retry = self.retry
+        def _retry(*args, **kwargs):
+            return self._retry.copy()(*args, **kwargs)
+        self.retry = _retry
+
+        self.Barrier = partial(Barrier, self)
+        self.Counter = partial(Counter, self)
+        self.DoubleBarrier = partial(DoubleBarrier, self)
+        self.ChildrenWatch = partial(ChildrenWatch, self)
+        self.DataWatch = partial(DataWatch, self)
+        self.Election = partial(Election, self)
+        self.Lock = partial(Lock, self)
+        self.Party = partial(Party, self)
+        self.Queue = partial(Queue, self)
+        self.LockingQueue = partial(LockingQueue, self)
+        self.SetPartitioner = partial(SetPartitioner, self)
+        self.Semaphore = partial(Semaphore, self)
+        self.ShallowParty = partial(ShallowParty, self)
+
+         # If we got any unhandled keywords, complain like python would
+        if kwargs:
+            raise TypeError('__init__() got unexpected keyword arguments: %s'
+                            % (kwargs.keys(),))
+
+    def _reset(self):
+        """Resets a variety of client states for a new connection."""
+        self._queue = deque()
+        self._pending = deque()
+
+        self._reset_watchers()
+        self._reset_session()
+        self.last_zxid = 0
+        self._protocol_version = None
+
+    def _reset_watchers(self):
+        self._child_watchers = defaultdict(set)
+        self._data_watchers = defaultdict(set)
+
+    def _reset_session(self):
+        self._session_id = None
+        self._session_passwd = b'\x00' * 16
+
+    @property
+    def client_state(self):
+        """Returns the last Zookeeper client state
+
+        This is the non-simplified state information and is generally
+        not as useful as the simplified KazooState information.
+
+        """
+        return self._state
+
+    @property
+    def client_id(self):
+        """Returns the client id for this Zookeeper session if
+        connected.
+
+        :returns: client id which consists of the session id and
+                  password.
+        :rtype: tuple
+        """
+        if self._live.is_set():
+            return (self._session_id, self._session_passwd)
+        return None
+
+    @property
+    def connected(self):
+        """Returns whether the Zookeeper connection has been
+        established."""
+        return self._live.is_set()
+
+    def set_hosts(self, hosts, randomize_hosts=None):
+        """ sets the list of hosts used by this client.
+
+        This function accepts the same format hosts parameter as the init
+        function and sets the client to use the new hosts the next time it
+        needs to look up a set of hosts. This function does not affect the
+        current connected status.
+
+        It is not currently possible to change the chroot with this function,
+        setting a host list with a new chroot will raise a ConfigurationError.
+
+        :param hosts: see description in :meth:`KazooClient.__init__`
+        :param randomize_hosts: override client default for host randomization
+        :raises:
+            :exc:`ConfigurationError` if the hosts argument changes the chroot
+
+        .. versionadded:: 1.4
+
+        .. warning::
+
+            Using this function to point a client to a completely disparate
+            zookeeper server cluster has undefined behavior.
+
+        """
+
+        if randomize_hosts is None:
+            randomize_hosts = self.randomize_hosts
+
+        self.hosts, chroot = collect_hosts(hosts, randomize_hosts)
+
+        if chroot:
+            new_chroot = normpath(chroot)
+        else:
+            new_chroot = ''
+
+        if self.chroot is not None and new_chroot != self.chroot:
+            raise ConfigurationError("Changing chroot at runtime is not "
+                                     "currently supported")
+
+        self.chroot = new_chroot
+
+    def add_listener(self, listener):
+        """Add a function to be called for connection state changes.
+
+        This function will be called with a
+        :class:`~kazoo.protocol.states.KazooState` instance indicating
+        the new connection state on state transitions.
+
+        .. warning::
+
+            This function must not block. If its at all likely that it
+            might need data or a value that could result in blocking
+            than the :meth:`~kazoo.interfaces.IHandler.spawn` method
+            should be used so that the listener can return immediately.
+
+        """
+        if not (listener and callable(listener)):
+            raise ConfigurationError("listener must be callable")
+        self.state_listeners.add(listener)
+
+    def remove_listener(self, listener):
+        """Remove a listener function"""
+        self.state_listeners.discard(listener)
+
+    def _make_state_change(self, state):
+        # skip if state is current
+        if self.state == state:
+            return
+
+        self.state = state
+
+        # Create copy of listeners for iteration in case one needs to
+        # remove itself
+        for listener in list(self.state_listeners):
+            try:
+                remove = listener(state)
+                if remove is True:
+                    self.remove_listener(listener)
+            except Exception:
+                self.logger.exception("Error in connection state listener")
+
+    def _session_callback(self, state):
+        if state == self._state:
+            return
+
+        # Note that we don't check self.state == LOST since that's also
+        # the client's initial state
+        dead_state = self._state in LOST_STATES
+        self._state = state
+
+        # If we were previously closed or had an expired session, and
+        # are now connecting, don't bother with the rest of the
+        # transitions since they only apply after
+        # we've established a connection
+        if dead_state and state == KeeperState.CONNECTING:
+            self.logger.log(BLATHER, "Skipping state change")
+            return
+
+        if state in (KeeperState.CONNECTED, KeeperState.CONNECTED_RO):
+            self.logger.info("Zookeeper connection established, state: %s", 
state)
+            self._live.set()
+            self._make_state_change(KazooState.CONNECTED)
+        elif state in LOST_STATES:
+            self.logger.info("Zookeeper session lost, state: %s", state)
+            self._live.clear()
+            self._make_state_change(KazooState.LOST)
+            self._notify_pending(state)
+            self._reset()
+        else:
+            self.logger.info("Zookeeper connection lost")
+            # Connection lost
+            self._live.clear()
+            self._notify_pending(state)
+            self._make_state_change(KazooState.SUSPENDED)
+            self._reset_watchers()
+
+    def _notify_pending(self, state):
+        """Used to clear a pending response queue and request queue
+        during connection drops."""
+        if state == KeeperState.AUTH_FAILED:
+            exc = AuthFailedError()
+        elif state == KeeperState.EXPIRED_SESSION:
+            exc = SessionExpiredError()
+        else:
+            exc = ConnectionLoss()
+
+        while True:
+            try:
+                request, async_object, xid = self._pending.popleft()
+                if async_object:
+                    async_object.set_exception(exc)
+            except IndexError:
+                break
+
+        while True:
+            try:
+                request, async_object = self._queue.popleft()
+                if async_object:
+                    async_object.set_exception(exc)
+            except IndexError:
+                break
+
+    def _safe_close(self):
+        self.handler.stop()
+        timeout = self._session_timeout // 1000
+        if timeout < 10:
+            timeout = 10
+        if not self._connection.stop(timeout):
+            raise WriterNotClosedException(
+                "Writer still open from prior connection "
+                "and wouldn't close after %s seconds" % timeout)
+
+    def _call(self, request, async_object):
+        """Ensure there's an active connection and put the request in
+        the queue if there is.
+
+        Returns False if the call short circuits due to AUTH_FAILED,
+        CLOSED, EXPIRED_SESSION or CONNECTING state.
+
+        """
+
+        if self._state == KeeperState.AUTH_FAILED:
+            async_object.set_exception(AuthFailedError())
+            return False
+        elif self._state == KeeperState.CLOSED:
+            async_object.set_exception(ConnectionClosedError(
+                "Connection has been closed"))
+            return False
+        elif self._state in (KeeperState.EXPIRED_SESSION,
+                             KeeperState.CONNECTING):
+            async_object.set_exception(SessionExpiredError())
+            return False
+
+        self._queue.append((request, async_object))
+
+        # wake the connection, guarding against a race with close()
+        write_pipe = self._connection._write_pipe
+        if write_pipe is None:
+            async_object.set_exception(ConnectionClosedError(
+                "Connection has been closed"))
+        try:
+            os.write(write_pipe, b'\0')
+        except:
+            async_object.set_exception(ConnectionClosedError(
+                "Connection has been closed"))
+
+    def start(self, timeout=15):
+        """Initiate connection to ZK.
+
+        :param timeout: Time in seconds to wait for connection to
+                        succeed.
+        :raises: :attr:`~kazoo.interfaces.IHandler.timeout_exception`
+                 if the connection wasn't established within `timeout`
+                 seconds.
+
+        """
+        event = self.start_async()
+        event.wait(timeout=timeout)
+        if not self.connected:
+            # We time-out, ensure we are disconnected
+            self.stop()
+            raise self.handler.timeout_exception("Connection time-out")
+
+        if self.chroot and not self.exists("/"):
+            warnings.warn("No chroot path exists, the chroot path "
+                          "should be created before normal use.")
+
+    def start_async(self):
+        """Asynchronously initiate connection to ZK.
+
+        :returns: An event object that can be checked to see if the
+                  connection is alive.
+        :rtype: :class:`~threading.Event` compatible object.
+
+        """
+        # If we're already connected, ignore
+        if self._live.is_set():
+            return self._live
+
+        # Make sure we're safely closed
+        self._safe_close()
+
+        # We've been asked to connect, clear the stop and our writer
+        # thread indicator
+        self._stopped.clear()
+        self._writer_stopped.clear()
+
+        # Start the handler
+        self.handler.start()
+
+        # Start the connection
+        self._connection.start()
+        return self._live
+
+    def stop(self):
+        """Gracefully stop this Zookeeper session.
+
+        This method can be called while a reconnection attempt is in
+        progress, which will then be halted.
+
+        Once the connection is closed, its session becomes invalid. All
+        the ephemeral nodes in the ZooKeeper server associated with the
+        session will be removed. The watches left on those nodes (and
+        on their parents) will be triggered.
+
+        """
+        if self._stopped.is_set():
+            return
+
+        self._stopped.set()
+        self._queue.append((CloseInstance, None))
+        os.write(self._connection._write_pipe, b'\0')
+        self._safe_close()
+
+    def restart(self):
+        """Stop and restart the Zookeeper session."""
+        self.stop()
+        self.start()
+
+    def close(self):
+        """Free any resources held by the client.
+
+        This method should be called on a stopped client before it is
+        discarded. Not doing so may result in filehandles being leaked.
+
+        .. versionadded:: 1.0
+        """
+        self._connection.close()
+
+    def command(self, cmd=b'ruok'):
+        """Sent a management command to the current ZK server.
+
+        Examples are `ruok`, `envi` or `stat`.
+
+        :returns: An unstructured textual response.
+        :rtype: str
+
+        :raises:
+            :exc:`ConnectionLoss` if there is no connection open, or
+            possibly a :exc:`socket.error` if there's a problem with
+            the connection used just for this command.
+
+        .. versionadded:: 0.5
+
+        """
+        if not self._live.is_set():
+            raise ConnectionLoss("No connection to server")
+
+        peer = self._connection._socket.getpeername()
+        sock = self.handler.create_connection(
+            peer, timeout=self._session_timeout / 1000.0)
+        sock.sendall(cmd)
+        result = sock.recv(8192)
+        sock.close()
+        return result.decode('utf-8', 'replace')
+
+    def server_version(self):
+        """Get the version of the currently connected ZK server.
+
+        :returns: The server version, for example (3, 4, 3).
+        :rtype: tuple
+
+        .. versionadded:: 0.5
+
+        """
+        data = self.command(b'envi')
+        string = ENVI_VERSION.match(data).group(1)
+        return tuple([int(i) for i in string.split('.')])
+
+    def add_auth(self, scheme, credential):
+        """Send credentials to server.
+
+        :param scheme: authentication scheme (default supported:
+                       "digest").
+        :param credential: the credential -- value depends on scheme.
+
+        :returns: True if it was successful.
+        :rtype: bool
+
+        :raises:
+            :exc:`~kazoo.exceptions.AuthFailedError` if it failed though
+            the session state will be set to AUTH_FAILED as well.
+
+        """
+        return self.add_auth_async(scheme, credential).get()
+
+    def add_auth_async(self, scheme, credential):
+        """Asynchronously send credentials to server. Takes the same
+        arguments as :meth:`add_auth`.
+
+        :rtype: :class:`~kazoo.interfaces.IAsyncResult`
+
+        """
+        if not isinstance(scheme, basestring):
+            raise TypeError("Invalid type for scheme")
+        if not isinstance(credential, basestring):
+            raise TypeError("Invalid type for credential")
+
+        # we need this auth data to re-authenticate on reconnect
+        self.auth_data.add((scheme, credential))
+
+        async_result = self.handler.async_result()
+        self._call(Auth(0, scheme, credential), async_result)
+        return async_result
+
+    def unchroot(self, path):
+        """Strip the chroot if applicable from the path."""
+        if not self.chroot:
+            return path
+
+        if path.startswith(self.chroot):
+            return path[len(self.chroot):]
+        else:
+            return path
+
+    def sync_async(self, path):
+        """Asynchronous sync.
+
+        :rtype: :class:`~kazoo.interfaces.IAsyncResult`
+
+        """
+        async_result = self.handler.async_result()
+        self._call(Sync(_prefix_root(self.chroot, path)), async_result)
+        return async_result
+
+    def sync(self, path):
+        """Sync, blocks until response is acknowledged.
+
+        Flushes channel between process and leader.
+
+        :param path: path of node.
+        :returns: The node path that was synced.
+        :raises:
+            :exc:`~kazoo.exceptions.ZookeeperError` if the server
+            returns a non-zero error code.
+
+        .. versionadded:: 0.5
+
+        """
+        return self.sync_async(path).get()
+
+    def create(self, path, value=b"", acl=None, ephemeral=False,
+               sequence=False, makepath=False):
+        """Create a node with the given value as its data. Optionally
+        set an ACL on the node.
+
+        The ephemeral and sequence arguments determine the type of the
+        node.
+
+        An ephemeral node will be automatically removed by ZooKeeper
+        when the session associated with the creation of the node
+        expires.
+
+        A sequential node will be given the specified path plus a
+        suffix `i` where i is the current sequential number of the
+        node. The sequence number is always fixed length of 10 digits,
+        0 padded. Once such a node is created, the sequential number
+        will be incremented by one.
+
+        If a node with the same actual path already exists in
+        ZooKeeper, a NodeExistsError will be raised. Note that since a
+        different actual path is used for each invocation of creating
+        sequential nodes with the same path argument, the call will
+        never raise NodeExistsError.
+
+        If the parent node does not exist in ZooKeeper, a NoNodeError
+        will be raised. Setting the optional `makepath` argument to
+        `True` will create all missing parent nodes instead.
+
+        An ephemeral node cannot have children. If the parent node of
+        the given path is ephemeral, a NoChildrenForEphemeralsError
+        will be raised.
+
+        This operation, if successful, will trigger all the watches
+        left on the node of the given path by :meth:`exists` and
+        :meth:`get` API calls, and the watches left on the parent node
+        by :meth:`get_children` API calls.
+
+        The maximum allowable size of the node value is 1 MB. Values
+        larger than this will cause a ZookeeperError to be raised.
+
+        :param path: Path of node.
+        :param value: Initial bytes value of node.
+        :param acl: :class:`~kazoo.security.ACL` list.
+        :param ephemeral: Boolean indicating whether node is ephemeral
+                          (tied to this session).
+        :param sequence: Boolean indicating whether path is suffixed
+                         with a unique index.
+        :param makepath: Whether the path should be created if it
+                         doesn't exist.
+        :returns: Real path of the new node.
+        :rtype: str
+
+        :raises:
+            :exc:`~kazoo.exceptions.NodeExistsError` if the node
+            already exists.
+
+            :exc:`~kazoo.exceptions.NoNodeError` if parent nodes are
+            missing.
+
+            :exc:`~kazoo.exceptions.NoChildrenForEphemeralsError` if
+            the parent node is an ephemeral node.
+
+            :exc:`~kazoo.exceptions.ZookeeperError` if the provided
+            value is too large.
+
+            :exc:`~kazoo.exceptions.ZookeeperError` if the server
+            returns a non-zero error code.
+
+        """
+        acl = acl or self.default_acl
+        return self.create_async(path, value, acl=acl, ephemeral=ephemeral,
+            sequence=sequence, makepath=makepath).get()
+
+    def create_async(self, path, value=b"", acl=None, ephemeral=False,
+                     sequence=False, makepath=False):
+        """Asynchronously create a ZNode. Takes the same arguments as
+        :meth:`create`.
+
+        :rtype: :class:`~kazoo.interfaces.IAsyncResult`
+
+        .. versionadded:: 1.1
+            The makepath option.
+
+        """
+        if acl is None and self.default_acl:
+            acl = self.default_acl
+
+        if not isinstance(path, basestring):
+            raise TypeError("path must be a string")
+        if acl and (isinstance(acl, ACL) or
+                    not isinstance(acl, (tuple, list))):
+            raise TypeError("acl must be a tuple/list of ACL's")
+        if value is not None and not isinstance(value, bytes):
+            raise TypeError("value must be a byte string")
+        if not isinstance(ephemeral, bool):
+            raise TypeError("ephemeral must be a bool")
+        if not isinstance(sequence, bool):
+            raise TypeError("sequence must be a bool")
+        if not isinstance(makepath, bool):
+            raise TypeError("makepath must be a bool")
+
+        flags = 0
+        if ephemeral:
+            flags |= 1
+        if sequence:
+            flags |= 2
+        if acl is None:
+            acl = OPEN_ACL_UNSAFE
+
+        async_result = self.handler.async_result()
+
+        @capture_exceptions(async_result)
+        def do_create():
+            result = self._create_async_inner(path, value, acl, flags, 
trailing=sequence)
+            result.rawlink(create_completion)
+
+        @capture_exceptions(async_result)
+        def retry_completion(result):
+            result.get()
+            do_create()
+
+        @wrap(async_result)
+        def create_completion(result):
+            try:
+                return self.unchroot(result.get())
+            except NoNodeError:
+                if not makepath:
+                    raise
+                if sequence and path.endswith('/'):
+                    parent = path.rstrip('/')
+                else:
+                    parent, _ = split(path)
+                self.ensure_path_async(parent, acl).rawlink(retry_completion)
+
+        do_create()
+        return async_result
+
+    def _create_async_inner(self, path, value, acl, flags, trailing=False):
+        async_result = self.handler.async_result()
+        call_result = self._call(
+            Create(_prefix_root(self.chroot, path, trailing=trailing),
+                   value, acl, flags), async_result)
+        if call_result is False:
+            # We hit a short-circuit exit on the _call. Because we are
+            # not using the original async_result here, we bubble the
+            # exception upwards to the do_create function in
+            # KazooClient.create so that it gets set on the correct
+            # async_result object
+            raise async_result.exception
+        return async_result
+
+    def ensure_path(self, path, acl=None):
+        """Recursively create a path if it doesn't exist.
+
+        :param path: Path of node.
+        :param acl: Permissions for node.
+
+        """
+        return self.ensure_path_async(path, acl).get()
+
+    def ensure_path_async(self, path, acl=None):
+        """Recursively create a path asynchronously if it doesn't
+        exist. Takes the same arguments as :meth:`ensure_path`.
+
+        :rtype: :class:`~kazoo.interfaces.IAsyncResult`
+
+        .. versionadded:: 1.1
+
+        """
+        acl = acl or self.default_acl
+        async_result = self.handler.async_result()
+
+        @wrap(async_result)
+        def create_completion(result):
+            try:
+                return result.get()
+            except NodeExistsError:
+                return True
+
+        @capture_exceptions(async_result)
+        def prepare_completion(next_path, result):
+            result.get()
+            self.create_async(next_path, acl=acl).rawlink(create_completion)
+
+        @wrap(async_result)
+        def exists_completion(path, result):
+            if result.get():
+                return True
+            parent, node = split(path)
+            if node:
+                self.ensure_path_async(parent, acl=acl).rawlink(
+                    partial(prepare_completion, path))
+            else:
+                self.create_async(path, acl=acl).rawlink(create_completion)
+
+        self.exists_async(path).rawlink(partial(exists_completion, path))
+
+        return async_result
+
+    def exists(self, path, watch=None):
+        """Check if a node exists.
+
+        If a watch is provided, it will be left on the node with the
+        given path. The watch will be triggered by a successful
+        operation that creates/deletes the node or sets the data on the
+        node.
+
+        :param path: Path of node.
+        :param watch: Optional watch callback to set for future changes
+                      to this path.
+        :returns: ZnodeStat of the node if it exists, else None if the
+                  node does not exist.
+        :rtype: :class:`~kazoo.protocol.states.ZnodeStat` or `None`.
+
+        :raises:
+            :exc:`~kazoo.exceptions.ZookeeperError` if the server
+            returns a non-zero error code.
+
+        """
+        return self.exists_async(path, watch).get()
+
+    def exists_async(self, path, watch=None):
+        """Asynchronously check if a node exists. Takes the same
+        arguments as :meth:`exists`.
+
+        :rtype: :class:`~kazoo.interfaces.IAsyncResult`
+
+        """
+        if not isinstance(path, basestring):
+            raise TypeError("path must be a string")
+        if watch and not callable(watch):
+            raise TypeError("watch must be a callable")
+
+        async_result = self.handler.async_result()
+        self._call(Exists(_prefix_root(self.chroot, path), watch),
+                   async_result)
+        return async_result
+
+    def get(self, path, watch=None):
+        """Get the value of a node.
+
+        If a watch is provided, it will be left on the node with the
+        given path. The watch will be triggered by a successful
+        operation that sets data on the node, or deletes the node.
+
+        :param path: Path of node.
+        :param watch: Optional watch callback to set for future changes
+                      to this path.
+        :returns:
+            Tuple (value, :class:`~kazoo.protocol.states.ZnodeStat`) of
+            node.
+        :rtype: tuple
+
+        :raises:
+            :exc:`~kazoo.exceptions.NoNodeError` if the node doesn't
+            exist
+
+            :exc:`~kazoo.exceptions.ZookeeperError` if the server
+            returns a non-zero error code
+
+        """
+        return self.get_async(path, watch).get()
+
+    def get_async(self, path, watch=None):
+        """Asynchronously get the value of a node. Takes the same
+        arguments as :meth:`get`.
+
+        :rtype: :class:`~kazoo.interfaces.IAsyncResult`
+
+        """
+        if not isinstance(path, basestring):
+            raise TypeError("path must be a string")
+        if watch and not callable(watch):
+            raise TypeError("watch must be a callable")
+
+        async_result = self.handler.async_result()
+        self._call(GetData(_prefix_root(self.chroot, path), watch),
+                   async_result)
+        return async_result
+
+    def get_children(self, path, watch=None, include_data=False):
+        """Get a list of child nodes of a path.
+
+        If a watch is provided it will be left on the node with the
+        given path. The watch will be triggered by a successful
+        operation that deletes the node of the given path or
+        creates/deletes a child under the node.
+
+        The list of children returned is not sorted and no guarantee is
+        provided as to its natural or lexical order.
+
+        :param path: Path of node to list.
+        :param watch: Optional watch callback to set for future changes
+                      to this path.
+        :param include_data:
+            Include the :class:`~kazoo.protocol.states.ZnodeStat` of
+            the node in addition to the children. This option changes
+            the return value to be a tuple of (children, stat).
+
+        :returns: List of child node names, or tuple if `include_data`
+                  is `True`.
+        :rtype: list
+
+        :raises:
+            :exc:`~kazoo.exceptions.NoNodeError` if the node doesn't
+            exist.
+
+            :exc:`~kazoo.exceptions.ZookeeperError` if the server
+            returns a non-zero error code.
+
+        .. versionadded:: 0.5
+            The `include_data` option.
+
+        """
+        return self.get_children_async(path, watch, include_data).get()
+
+    def get_children_async(self, path, watch=None, include_data=False):
+        """Asynchronously get a list of child nodes of a path. Takes
+        the same arguments as :meth:`get_children`.
+
+        :rtype: :class:`~kazoo.interfaces.IAsyncResult`
+
+        """
+        if not isinstance(path, basestring):
+            raise TypeError("path must be a string")
+        if watch and not callable(watch):
+            raise TypeError("watch must be a callable")
+        if not isinstance(include_data, bool):
+            raise TypeError("include_data must be a bool")
+
+        async_result = self.handler.async_result()
+        if include_data:
+            req = GetChildren2(_prefix_root(self.chroot, path), watch)
+        else:
+            req = GetChildren(_prefix_root(self.chroot, path), watch)
+        self._call(req, async_result)
+        return async_result
+
+    def get_acls(self, path):
+        """Return the ACL and stat of the node of the given path.
+
+        :param path: Path of the node.
+        :returns: The ACL array of the given node and its
+            :class:`~kazoo.protocol.states.ZnodeStat`.
+        :rtype: tuple of (:class:`~kazoo.security.ACL` list,
+                :class:`~kazoo.protocol.states.ZnodeStat`)
+        :raises:
+            :exc:`~kazoo.exceptions.NoNodeError` if the node doesn't
+            exist.
+
+            :exc:`~kazoo.exceptions.ZookeeperError` if the server
+            returns a non-zero error code
+
+        .. versionadded:: 0.5
+
+        """
+        return self.get_acls_async(path).get()
+
+    def get_acls_async(self, path):
+        """Return the ACL and stat of the node of the given path. Takes
+        the same arguments as :meth:`get_acls`.
+
+        :rtype: :class:`~kazoo.interfaces.IAsyncResult`
+
+        """
+        if not isinstance(path, basestring):
+            raise TypeError("path must be a string")
+
+        async_result = self.handler.async_result()
+        self._call(GetACL(_prefix_root(self.chroot, path)), async_result)
+        return async_result
+
+    def set_acls(self, path, acls, version=-1):
+        """Set the ACL for the node of the given path.
+
+        Set the ACL for the node of the given path if such a node
+        exists and the given version matches the version of the node.
+
+        :param path: Path for the node.
+        :param acls: List of :class:`~kazoo.security.ACL` objects to
+                     set.
+        :param version: The expected node version that must match.
+        :returns: The stat of the node.
+        :raises:
+            :exc:`~kazoo.exceptions.BadVersionError` if version doesn't
+            match.
+
+            :exc:`~kazoo.exceptions.NoNodeError` if the node doesn't
+            exist.
+
+            :exc:`~kazoo.exceptions.InvalidACLError` if the ACL is
+            invalid.
+
+            :exc:`~kazoo.exceptions.ZookeeperError` if the server
+            returns a non-zero error code.
+
+        .. versionadded:: 0.5
+
+        """
+        return self.set_acls_async(path, acls, version).get()
+
+    def set_acls_async(self, path, acls, version=-1):
+        """Set the ACL for the node of the given path. Takes the same
+        arguments as :meth:`set_acls`.
+
+        :rtype: :class:`~kazoo.interfaces.IAsyncResult`
+
+        """
+        if not isinstance(path, basestring):
+            raise TypeError("path must be a string")
+        if isinstance(acls, ACL) or not isinstance(acls, (tuple, list)):
+            raise TypeError("acl must be a tuple/list of ACL's")
+        if not isinstance(version, int):
+            raise TypeError("version must be an int")
+
+        async_result = self.handler.async_result()
+        self._call(SetACL(_prefix_root(self.chroot, path), acls, version),
+                   async_result)
+        return async_result
+
+    def set(self, path, value, version=-1):
+        """Set the value of a node.
+
+        If the version of the node being updated is newer than the
+        supplied version (and the supplied version is not -1), a
+        BadVersionError will be raised.
+
+        This operation, if successful, will trigger all the watches on
+        the node of the given path left by :meth:`get` API calls.
+
+        The maximum allowable size of the value is 1 MB. Values larger
+        than this will cause a ZookeeperError to be raised.
+
+        :param path: Path of node.
+        :param value: New data value.
+        :param version: Version of node being updated, or -1.
+        :returns: Updated :class:`~kazoo.protocol.states.ZnodeStat` of
+                  the node.
+
+        :raises:
+            :exc:`~kazoo.exceptions.BadVersionError` if version doesn't
+            match.
+
+            :exc:`~kazoo.exceptions.NoNodeError` if the node doesn't
+            exist.
+
+            :exc:`~kazoo.exceptions.ZookeeperError` if the provided
+            value is too large.
+
+            :exc:`~kazoo.exceptions.ZookeeperError` if the server
+            returns a non-zero error code.
+
+        """
+        return self.set_async(path, value, version).get()
+
+    def set_async(self, path, value, version=-1):
+        """Set the value of a node. Takes the same arguments as
+        :meth:`set`.
+
+        :rtype: :class:`~kazoo.interfaces.IAsyncResult`
+
+        """
+        if not isinstance(path, basestring):
+            raise TypeError("path must be a string")
+        if value is not None and not isinstance(value, bytes):
+            raise TypeError("value must be a byte string")
+        if not isinstance(version, int):
+            raise TypeError("version must be an int")
+
+        async_result = self.handler.async_result()
+        self._call(SetData(_prefix_root(self.chroot, path), value, version),
+                   async_result)
+        return async_result
+
+    def transaction(self):
+        """Create and return a :class:`TransactionRequest` object
+
+        Creates a :class:`TransactionRequest` object. A Transaction can
+        consist of multiple operations which can be committed as a
+        single atomic unit. Either all of the operations will succeed
+        or none of them.
+
+        :returns: A TransactionRequest.
+        :rtype: :class:`TransactionRequest`
+
+        .. versionadded:: 0.6
+            Requires Zookeeper 3.4+
+
+        """
+        return TransactionRequest(self)
+
+    def delete(self, path, version=-1, recursive=False):
+        """Delete a node.
+
+        The call will succeed if such a node exists, and the given
+        version matches the node's version (if the given version is -1,
+        the default, it matches any node's versions).
+
+        This operation, if successful, will trigger all the watches on
+        the node of the given path left by `exists` API calls, and the
+        watches on the parent node left by `get_children` API calls.
+
+        :param path: Path of node to delete.
+        :param version: Version of node to delete, or -1 for any.
+        :param recursive: Recursively delete node and all its children,
+                          defaults to False.
+        :type recursive: bool
+
+        :raises:
+            :exc:`~kazoo.exceptions.BadVersionError` if version doesn't
+            match.
+
+            :exc:`~kazoo.exceptions.NoNodeError` if the node doesn't
+            exist.
+
+            :exc:`~kazoo.exceptions.NotEmptyError` if the node has
+            children.
+
+            :exc:`~kazoo.exceptions.ZookeeperError` if the server
+            returns a non-zero error code.
+
+        """
+        if not isinstance(recursive, bool):
+            raise TypeError("recursive must be a bool")
+
+        if recursive:
+            return self._delete_recursive(path)
+        else:
+            return self.delete_async(path, version).get()
+
+    def delete_async(self, path, version=-1):
+        """Asynchronously delete a node. Takes the same arguments as
+        :meth:`delete`, with the exception of `recursive`.
+
+        :rtype: :class:`~kazoo.interfaces.IAsyncResult`
+
+        """
+        if not isinstance(path, basestring):
+            raise TypeError("path must be a string")
+        if not isinstance(version, int):
+            raise TypeError("version must be an int")
+        async_result = self.handler.async_result()
+        self._call(Delete(_prefix_root(self.chroot, path), version),
+                   async_result)
+        return async_result
+
+    def _delete_recursive(self, path):
+        try:
+            children = self.get_children(path)
+        except NoNodeError:
+            return True
+
+        if children:
+            for child in children:
+                if path == "/":
+                    child_path = path + child
+                else:
+                    child_path = path + "/" + child
+
+                self._delete_recursive(child_path)
+        try:
+            self.delete(path)
+        except NoNodeError:  # pragma: nocover
+            pass
+
+
+class TransactionRequest(object):
+    """A Zookeeper Transaction Request
+
+    A Transaction provides a builder object that can be used to
+    construct and commit an atomic set of operations. The transaction
+    must be committed before its sent.
+
+    Transactions are not thread-safe and should not be accessed from
+    multiple threads at once.
+
+    .. versionadded:: 0.6
+        Requires Zookeeper 3.4+
+
+    """
+    def __init__(self, client):
+        self.client = client
+        self.operations = []
+        self.committed = False
+
+    def create(self, path, value=b"", acl=None, ephemeral=False,
+               sequence=False):
+        """Add a create ZNode to the transaction. Takes the same
+        arguments as :meth:`KazooClient.create`, with the exception
+        of `makepath`.
+
+        :returns: None
+
+        """
+        if acl is None and self.client.default_acl:
+            acl = self.client.default_acl
+
+        if not isinstance(path, basestring):
+            raise TypeError("path must be a string")
+        if acl and not isinstance(acl, (tuple, list)):
+            raise TypeError("acl must be a tuple/list of ACL's")
+        if not isinstance(value, bytes):
+            raise TypeError("value must be a byte string")
+        if not isinstance(ephemeral, bool):
+            raise TypeError("ephemeral must be a bool")
+        if not isinstance(sequence, bool):
+            raise TypeError("sequence must be a bool")
+
+        flags = 0
+        if ephemeral:
+            flags |= 1
+        if sequence:
+            flags |= 2
+        if acl is None:
+            acl = OPEN_ACL_UNSAFE
+
+        self._add(Create(_prefix_root(self.client.chroot, path), value, acl,
+                         flags), None)
+
+    def delete(self, path, version=-1):
+        """Add a delete ZNode to the transaction. Takes the same
+        arguments as :meth:`KazooClient.delete`, with the exception of
+        `recursive`.
+
+        """
+        if not isinstance(path, basestring):
+            raise TypeError("path must be a string")
+        if not isinstance(version, int):
+            raise TypeError("version must be an int")
+        self._add(Delete(_prefix_root(self.client.chroot, path), version))
+
+    def set_data(self, path, value, version=-1):
+        """Add a set ZNode value to the transaction. Takes the same
+        arguments as :meth:`KazooClient.set`.
+
+        """
+        if not isinstance(path, basestring):
+            raise TypeError("path must be a string")
+        if not isinstance(value, bytes):
+            raise TypeError("value must be a byte string")
+        if not isinstance(version, int):
+            raise TypeError("version must be an int")
+        self._add(SetData(_prefix_root(self.client.chroot, path), value,
+                  version))
+
+    def check(self, path, version):
+        """Add a Check Version to the transaction.
+
+        This command will fail and abort a transaction if the path
+        does not match the specified version.
+
+        """
+        if not isinstance(path, basestring):
+            raise TypeError("path must be a string")
+        if not isinstance(version, int):
+            raise TypeError("version must be an int")
+        self._add(CheckVersion(_prefix_root(self.client.chroot, path),
+                  version))
+
+    def commit_async(self):
+        """Commit the transaction asynchronously.
+
+        :rtype: :class:`~kazoo.interfaces.IAsyncResult`
+
+        """
+        self._check_tx_state()
+        self.committed = True
+        async_object = self.client.handler.async_result()
+        self.client._call(Transaction(self.operations), async_object)
+        return async_object
+
+    def commit(self):
+        """Commit the transaction.
+
+        :returns: A list of the results for each operation in the
+                  transaction.
+
+        """
+        return self.commit_async().get()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, exc_tb):
+        """Commit and cleanup accumulated transaction data."""
+        if not exc_type:
+            self.commit()
+
+    def _check_tx_state(self):
+        if self.committed:
+            raise ValueError('Transaction already committed')
+
+    def _add(self, request, post_processor=None):
+        self._check_tx_state()
+        self.client.logger.log(BLATHER, 'Added %r to %r', request, self)
+        self.operations.append(request)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/exceptions.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/exceptions.py 
b/slider-agent/src/main/python/kazoo/exceptions.py
new file mode 100644
index 0000000..8d4f0f3
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/exceptions.py
@@ -0,0 +1,199 @@
+"""Kazoo Exceptions"""
+from collections import defaultdict
+
+
+class KazooException(Exception):
+    """Base Kazoo exception that all other kazoo library exceptions
+    inherit from"""
+
+
+class ZookeeperError(KazooException):
+    """Base Zookeeper exception for errors originating from the
+    Zookeeper server"""
+
+
+class CancelledError(KazooException):
+    """Raised when a process is cancelled by another thread"""
+
+
+class ConfigurationError(KazooException):
+    """Raised if the configuration arguments to an object are
+    invalid"""
+
+
+class ZookeeperStoppedError(KazooException):
+    """Raised when the kazoo client stopped (and thus not connected)"""
+
+
+class ConnectionDropped(KazooException):
+    """Internal error for jumping out of loops"""
+
+
+class LockTimeout(KazooException):
+    """Raised if failed to acquire a lock.
+
+    .. versionadded:: 1.1
+    """
+
+
+class WriterNotClosedException(KazooException):
+    """Raised if the writer is unable to stop closing when requested.
+
+    .. versionadded:: 1.2
+    """
+
+
+def _invalid_error_code():
+    raise RuntimeError('Invalid error code')
+
+
+EXCEPTIONS = defaultdict(_invalid_error_code)
+
+
+def _zookeeper_exception(code):
+    def decorator(klass):
+        def create(*args, **kwargs):
+            return klass(args, kwargs)
+
+        EXCEPTIONS[code] = create
+        klass.code = code
+        return klass
+
+    return decorator
+
+
+@_zookeeper_exception(0)
+class RolledBackError(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-1)
+class SystemZookeeperError(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-2)
+class RuntimeInconsistency(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-3)
+class DataInconsistency(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-4)
+class ConnectionLoss(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-5)
+class MarshallingError(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-6)
+class UnimplementedError(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-7)
+class OperationTimeoutError(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-8)
+class BadArgumentsError(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-100)
+class APIError(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-101)
+class NoNodeError(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-102)
+class NoAuthError(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-103)
+class BadVersionError(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-108)
+class NoChildrenForEphemeralsError(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-110)
+class NodeExistsError(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-111)
+class NotEmptyError(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-112)
+class SessionExpiredError(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-113)
+class InvalidCallbackError(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-114)
+class InvalidACLError(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-115)
+class AuthFailedError(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-118)
+class SessionMovedError(ZookeeperError):
+    pass
+
+
+@_zookeeper_exception(-119)
+class NotReadOnlyCallError(ZookeeperError):
+    """An API call that is not read-only was used while connected to
+    a read-only server"""
+
+
+class ConnectionClosedError(SessionExpiredError):
+    """Connection is closed"""
+
+
+# BW Compat aliases for C lib style exceptions
+ConnectionLossException = ConnectionLoss
+MarshallingErrorException = MarshallingError
+SystemErrorException = SystemZookeeperError
+RuntimeInconsistencyException = RuntimeInconsistency
+DataInconsistencyException = DataInconsistency
+UnimplementedException = UnimplementedError
+OperationTimeoutException = OperationTimeoutError
+BadArgumentsException = BadArgumentsError
+ApiErrorException = APIError
+NoNodeException = NoNodeError
+NoAuthException = NoAuthError
+BadVersionException = BadVersionError
+NoChildrenForEphemeralsException = NoChildrenForEphemeralsError
+NodeExistsException = NodeExistsError
+InvalidACLException = InvalidACLError
+AuthFailedException = AuthFailedError
+NotEmptyException = NotEmptyError
+SessionExpiredException = SessionExpiredError
+InvalidCallbackException = InvalidCallbackError

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/handlers/__init__.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/handlers/__init__.py 
b/slider-agent/src/main/python/kazoo/handlers/__init__.py
new file mode 100644
index 0000000..792d600
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/handlers/__init__.py
@@ -0,0 +1 @@
+#

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/handlers/gevent.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/handlers/gevent.py 
b/slider-agent/src/main/python/kazoo/handlers/gevent.py
new file mode 100644
index 0000000..6e40cae
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/handlers/gevent.py
@@ -0,0 +1,161 @@
+"""A gevent based handler."""
+from __future__ import absolute_import
+
+import atexit
+import logging
+
+import gevent
+import gevent.event
+import gevent.queue
+import gevent.select
+import gevent.thread
+
+from gevent.queue import Empty
+from gevent.queue import Queue
+from gevent import socket
+try:
+    from gevent.lock import Semaphore, RLock
+except ImportError:
+    from gevent.coros import Semaphore, RLock
+
+from kazoo.handlers.utils import create_tcp_socket, create_tcp_connection
+
+_using_libevent = gevent.__version__.startswith('0.')
+
+log = logging.getLogger(__name__)
+
+_STOP = object()
+
+AsyncResult = gevent.event.AsyncResult
+
+
+class SequentialGeventHandler(object):
+    """Gevent handler for sequentially executing callbacks.
+
+    This handler executes callbacks in a sequential manner. A queue is
+    created for each of the callback events, so that each type of event
+    has its callback type run sequentially.
+
+    Each queue type has a greenlet worker that pulls the callback event
+    off the queue and runs it in the order the client sees it.
+
+    This split helps ensure that watch callbacks won't block session
+    re-establishment should the connection be lost during a Zookeeper
+    client call.
+
+    Watch callbacks should avoid blocking behavior as the next callback
+    of that type won't be run until it completes. If you need to block,
+    spawn a new greenlet and return immediately so callbacks can
+    proceed.
+
+    """
+    name = "sequential_gevent_handler"
+    sleep_func = staticmethod(gevent.sleep)
+
+    def __init__(self):
+        """Create a :class:`SequentialGeventHandler` instance"""
+        self.callback_queue = Queue()
+        self._running = False
+        self._async = None
+        self._state_change = Semaphore()
+        self._workers = []
+
+    class timeout_exception(gevent.event.Timeout):
+        def __init__(self, msg):
+            gevent.event.Timeout.__init__(self, exception=msg)
+
+    def _create_greenlet_worker(self, queue):
+        def greenlet_worker():
+            while True:
+                try:
+                    func = queue.get()
+                    if func is _STOP:
+                        break
+                    func()
+                except Empty:
+                    continue
+                except Exception as exc:
+                    log.warning("Exception in worker greenlet")
+                    log.exception(exc)
+        return gevent.spawn(greenlet_worker)
+
+    def start(self):
+        """Start the greenlet workers."""
+        with self._state_change:
+            if self._running:
+                return
+
+            self._running = True
+
+            # Spawn our worker greenlets, we have
+            # - A callback worker for watch events to be called
+            for queue in (self.callback_queue,):
+                w = self._create_greenlet_worker(queue)
+                self._workers.append(w)
+            atexit.register(self.stop)
+
+    def stop(self):
+        """Stop the greenlet workers and empty all queues."""
+        with self._state_change:
+            if not self._running:
+                return
+
+            self._running = False
+
+            for queue in (self.callback_queue,):
+                queue.put(_STOP)
+
+            while self._workers:
+                worker = self._workers.pop()
+                worker.join()
+
+            # Clear the queues
+            self.callback_queue = Queue()  # pragma: nocover
+
+            if hasattr(atexit, "unregister"):
+                atexit.unregister(self.stop)
+
+    def select(self, *args, **kwargs):
+        return gevent.select.select(*args, **kwargs)
+
+    def socket(self, *args, **kwargs):
+        return create_tcp_socket(socket)
+
+    def create_connection(self, *args, **kwargs):
+        return create_tcp_connection(socket, *args, **kwargs)
+
+    def event_object(self):
+        """Create an appropriate Event object"""
+        return gevent.event.Event()
+
+    def lock_object(self):
+        """Create an appropriate Lock object"""
+        return gevent.thread.allocate_lock()
+
+    def rlock_object(self):
+        """Create an appropriate RLock object"""
+        return RLock()
+
+    def async_result(self):
+        """Create a :class:`AsyncResult` instance
+
+        The :class:`AsyncResult` instance will have its completion
+        callbacks executed in the thread the
+        :class:`SequentialGeventHandler` is created in (which should be
+        the gevent/main thread).
+
+        """
+        return AsyncResult()
+
+    def spawn(self, func, *args, **kwargs):
+        """Spawn a function to run asynchronously"""
+        return gevent.spawn(func, *args, **kwargs)
+
+    def dispatch_callback(self, callback):
+        """Dispatch to the callback object
+
+        The callback is put on separate queues to run depending on the
+        type as documented for the :class:`SequentialGeventHandler`.
+
+        """
+        self.callback_queue.put(lambda: callback.func(*callback.args))

Reply via email to