http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/handlers/threading.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/handlers/threading.py 
b/slider-agent/src/main/python/kazoo/handlers/threading.py
new file mode 100644
index 0000000..8e6648d
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/handlers/threading.py
@@ -0,0 +1,287 @@
+"""A threading based handler.
+
+The :class:`SequentialThreadingHandler` is intended for regular Python
+environments that use threads.
+
+.. warning::
+
+    Do not use :class:`SequentialThreadingHandler` with applications
+    using asynchronous event loops (like gevent). Use the
+    :class:`~kazoo.handlers.gevent.SequentialGeventHandler` instead.
+
+"""
+from __future__ import absolute_import
+
+import atexit
+import logging
+import select
+import socket
+import threading
+import time
+
+try:
+    import Queue
+except ImportError:  # pragma: nocover
+    import queue as Queue
+
+from kazoo.handlers.utils import create_tcp_socket, create_tcp_connection
+
+# sentinel objects
+_NONE = object()
+_STOP = object()
+
+log = logging.getLogger(__name__)
+
+
+class TimeoutError(Exception):
+    pass
+
+
+class AsyncResult(object):
+    """A one-time event that stores a value or an exception"""
+    def __init__(self, handler):
+        self._handler = handler
+        self.value = None
+        self._exception = _NONE
+        self._condition = threading.Condition()
+        self._callbacks = []
+
+    def ready(self):
+        """Return true if and only if it holds a value or an
+        exception"""
+        return self._exception is not _NONE
+
+    def successful(self):
+        """Return true if and only if it is ready and holds a value"""
+        return self._exception is None
+
+    @property
+    def exception(self):
+        if self._exception is not _NONE:
+            return self._exception
+
+    def set(self, value=None):
+        """Store the value. Wake up the waiters."""
+        with self._condition:
+            self.value = value
+            self._exception = None
+
+            for callback in self._callbacks:
+                self._handler.completion_queue.put(
+                    lambda: callback(self)
+                )
+            self._condition.notify_all()
+
+    def set_exception(self, exception):
+        """Store the exception. Wake up the waiters."""
+        with self._condition:
+            self._exception = exception
+
+            for callback in self._callbacks:
+                self._handler.completion_queue.put(
+                    lambda: callback(self)
+                )
+            self._condition.notify_all()
+
+    def get(self, block=True, timeout=None):
+        """Return the stored value or raise the exception.
+
+        If there is no value raises TimeoutError.
+
+        """
+        with self._condition:
+            if self._exception is not _NONE:
+                if self._exception is None:
+                    return self.value
+                raise self._exception
+            elif block:
+                self._condition.wait(timeout)
+                if self._exception is not _NONE:
+                    if self._exception is None:
+                        return self.value
+                    raise self._exception
+
+            # if we get to this point we timeout
+            raise TimeoutError()
+
+    def get_nowait(self):
+        """Return the value or raise the exception without blocking.
+
+        If nothing is available, raises TimeoutError
+
+        """
+        return self.get(block=False)
+
+    def wait(self, timeout=None):
+        """Block until the instance is ready."""
+        with self._condition:
+            self._condition.wait(timeout)
+        return self._exception is not _NONE
+
+    def rawlink(self, callback):
+        """Register a callback to call when a value or an exception is
+        set"""
+        with self._condition:
+            # Are we already set? Dispatch it now
+            if self.ready():
+                self._handler.completion_queue.put(
+                    lambda: callback(self)
+                )
+                return
+
+            if callback not in self._callbacks:
+                self._callbacks.append(callback)
+
+    def unlink(self, callback):
+        """Remove the callback set by :meth:`rawlink`"""
+        with self._condition:
+            if self.ready():
+                # Already triggered, ignore
+                return
+
+            if callback in self._callbacks:
+                self._callbacks.remove(callback)
+
+
+class SequentialThreadingHandler(object):
+    """Threading handler for sequentially executing callbacks.
+
+    This handler executes callbacks in a sequential manner. A queue is
+    created for each of the callback events, so that each type of event
+    has its callback type run sequentially. These are split into two
+    queues, one for watch events and one for async result completion
+    callbacks.
+
+    Each queue type has a thread worker that pulls the callback event
+    off the queue and runs it in the order the client sees it.
+
+    This split helps ensure that watch callbacks won't block session
+    re-establishment should the connection be lost during a Zookeeper
+    client call.
+
+    Watch and completion callbacks should avoid blocking behavior as
+    the next callback of that type won't be run until it completes. If
+    you need to block, spawn a new thread and return immediately so
+    callbacks can proceed.
+
+    .. note::
+
+        Completion callbacks can block to wait on Zookeeper calls, but
+        no other completion callbacks will execute until the callback
+        returns.
+
+    """
+    name = "sequential_threading_handler"
+    timeout_exception = TimeoutError
+    sleep_func = staticmethod(time.sleep)
+    queue_impl = Queue.Queue
+    queue_empty = Queue.Empty
+
+    def __init__(self):
+        """Create a :class:`SequentialThreadingHandler` instance"""
+        self.callback_queue = self.queue_impl()
+        self.completion_queue = self.queue_impl()
+        self._running = False
+        self._state_change = threading.Lock()
+        self._workers = []
+
+    def _create_thread_worker(self, queue):
+        def thread_worker():  # pragma: nocover
+            while True:
+                try:
+                    func = queue.get()
+                    try:
+                        if func is _STOP:
+                            break
+                        func()
+                    except Exception:
+                        log.exception("Exception in worker queue thread")
+                    finally:
+                        queue.task_done()
+                except self.queue_empty:
+                    continue
+        t = threading.Thread(target=thread_worker)
+
+        # Even though these should be joined, it's possible stop might
+        # not issue in time so we set them to daemon to let the program
+        # exit anyways
+        t.daemon = True
+        t.start()
+        return t
+
+    def start(self):
+        """Start the worker threads."""
+        with self._state_change:
+            if self._running:
+                return
+
+            # Spawn our worker threads, we have
+            # - A callback worker for watch events to be called
+            # - A completion worker for completion events to be called
+            for queue in (self.completion_queue, self.callback_queue):
+                w = self._create_thread_worker(queue)
+                self._workers.append(w)
+            self._running = True
+            atexit.register(self.stop)
+
+    def stop(self):
+        """Stop the worker threads and empty all queues."""
+        with self._state_change:
+            if not self._running:
+                return
+
+            self._running = False
+
+            for queue in (self.completion_queue, self.callback_queue):
+                queue.put(_STOP)
+
+            self._workers.reverse()
+            while self._workers:
+                worker = self._workers.pop()
+                worker.join()
+
+            # Clear the queues
+            self.callback_queue = self.queue_impl()
+            self.completion_queue = self.queue_impl()
+            if hasattr(atexit, "unregister"):
+                atexit.unregister(self.stop)
+
+    def select(self, *args, **kwargs):
+        return select.select(*args, **kwargs)
+
+    def socket(self):
+        return create_tcp_socket(socket)
+
+    def create_connection(self, *args, **kwargs):
+        return create_tcp_connection(socket, *args, **kwargs)
+
+    def event_object(self):
+        """Create an appropriate Event object"""
+        return threading.Event()
+
+    def lock_object(self):
+        """Create a lock object"""
+        return threading.Lock()
+
+    def rlock_object(self):
+        """Create an appropriate RLock object"""
+        return threading.RLock()
+
+    def async_result(self):
+        """Create a :class:`AsyncResult` instance"""
+        return AsyncResult(self)
+
+    def spawn(self, func, *args, **kwargs):
+        t = threading.Thread(target=func, args=args, kwargs=kwargs)
+        t.daemon = True
+        t.start()
+        return t
+
+    def dispatch_callback(self, callback):
+        """Dispatch to the callback object
+
+        The callback is put on separate queues to run depending on the
+        type as documented for the :class:`SequentialThreadingHandler`.
+
+        """
+        self.callback_queue.put(lambda: callback.func(*callback.args))

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/handlers/utils.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/handlers/utils.py 
b/slider-agent/src/main/python/kazoo/handlers/utils.py
new file mode 100644
index 0000000..ad204b5
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/handlers/utils.py
@@ -0,0 +1,93 @@
+"""Kazoo handler helpers"""
+
+HAS_FNCTL = True
+try:
+    import fcntl
+except ImportError:  # pragma: nocover
+    HAS_FNCTL = False
+import functools
+import os
+
+
+def _set_fd_cloexec(fd):
+    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
+    fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
+
+
+def _set_default_tcpsock_options(module, sock):
+    sock.setsockopt(module.IPPROTO_TCP, module.TCP_NODELAY, 1)
+    if HAS_FNCTL:
+        _set_fd_cloexec(sock)
+    return sock
+
+
+def create_pipe():
+    """Create a non-blocking read/write pipe.
+    """
+    r, w = os.pipe()
+    if HAS_FNCTL:
+        fcntl.fcntl(r, fcntl.F_SETFL, os.O_NONBLOCK)
+        fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK)
+        _set_fd_cloexec(r)
+        _set_fd_cloexec(w)
+    return r, w
+
+
+def create_tcp_socket(module):
+    """Create a TCP socket with the CLOEXEC flag set.
+    """
+    type_ = module.SOCK_STREAM
+    if hasattr(module, 'SOCK_CLOEXEC'):  # pragma: nocover
+        # if available, set cloexec flag during socket creation
+        type_ |= module.SOCK_CLOEXEC
+    sock = module.socket(module.AF_INET, type_)
+    _set_default_tcpsock_options(module, sock)
+    return sock
+
+
+def create_tcp_connection(module, address, timeout=None):
+    if timeout is None:
+        # thanks to create_connection() developers for
+        # this ugliness...
+        timeout = module._GLOBAL_DEFAULT_TIMEOUT
+
+    sock = module.create_connection(address, timeout)
+    _set_default_tcpsock_options(module, sock)
+    return sock
+
+
+def capture_exceptions(async_result):
+    """Return a new decorated function that propagates the exceptions of the
+    wrapped function to an async_result.
+
+    :param async_result: An async result implementing :class:`IAsyncResult`
+
+    """
+    def capture(function):
+        @functools.wraps(function)
+        def captured_function(*args, **kwargs):
+            try:
+                return function(*args, **kwargs)
+            except Exception as exc:
+                async_result.set_exception(exc)
+        return captured_function
+    return capture
+
+
+def wrap(async_result):
+    """Return a new decorated function that propagates the return value or
+    exception of wrapped function to an async_result.  NOTE: Only propagates a
+    non-None return value.
+
+    :param async_result: An async result implementing :class:`IAsyncResult`
+
+    """
+    def capture(function):
+        @capture_exceptions(async_result)
+        def captured_function(*args, **kwargs):
+            value = function(*args, **kwargs)
+            if value is not None:
+                async_result.set(value)
+            return value
+        return captured_function
+    return capture

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/hosts.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/hosts.py 
b/slider-agent/src/main/python/kazoo/hosts.py
new file mode 100644
index 0000000..15ce447
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/hosts.py
@@ -0,0 +1,26 @@
+import random
+
+try:
+    from urlparse import urlsplit
+except ImportError:
+    # try python3 then
+    from urllib.parse import urlsplit
+
+def collect_hosts(hosts, randomize=True):
+    """Collect a set of hosts and an optional chroot from a string."""
+    host_ports, chroot = hosts.partition("/")[::2]
+    chroot = "/" + chroot if chroot else None
+
+    result = []
+    for host_port in host_ports.split(","):
+        # put all complexity of dealing with
+        # IPv4 & IPv6 address:port on the urlsplit
+        res = urlsplit("xxx://" + host_port)
+        host = res.hostname
+        port = int(res.port) if res.port else 2181
+        result.append((host.strip(), port))
+
+    if randomize:
+        random.shuffle(result)
+
+    return result, chroot

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/interfaces.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/interfaces.py 
b/slider-agent/src/main/python/kazoo/interfaces.py
new file mode 100644
index 0000000..351f1fd
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/interfaces.py
@@ -0,0 +1,203 @@
+"""Kazoo Interfaces
+
+.. versionchanged:: 1.4
+
+    The classes in this module used to be interface declarations based on
+    `zope.interface.Interface`. They were converted to normal classes and
+    now serve as documentation only.
+
+"""
+
+# public API
+
+
+class IHandler(object):
+    """A Callback Handler for Zookeeper completion and watch callbacks.
+
+    This object must implement several methods responsible for
+    determining how completion / watch callbacks are handled as well as
+    the method for calling :class:`IAsyncResult` callback functions.
+
+    These functions are used to abstract differences between a Python
+    threading environment and asynchronous single-threaded environments
+    like gevent. The minimum functionality needed for Kazoo to handle
+    these differences is encompassed in this interface.
+
+    The Handler should document how callbacks are called for:
+
+    * Zookeeper completion events
+    * Zookeeper watch events
+
+    .. attribute:: name
+
+        Human readable name of the Handler interface.
+
+    .. attribute:: timeout_exception
+
+        Exception class that should be thrown and captured if a
+        result is not available within the given time.
+
+    .. attribute:: sleep_func
+
+        Appropriate sleep function that can be called with a single
+        argument and sleep.
+
+    """
+
+    def start(self):
+        """Start the handler, used for setting up the handler."""
+
+    def stop(self):
+        """Stop the handler. Should block until the handler is safely
+        stopped."""
+
+    def select(self):
+        """A select method that implements Python's select.select
+        API"""
+
+    def socket(self):
+        """A socket method that implements Python's socket.socket
+        API"""
+
+    def create_connection(self):
+        """A socket method that implements Python's
+        socket.create_connection API"""
+
+    def event_object(self):
+        """Return an appropriate object that implements Python's
+        threading.Event API"""
+
+    def lock_object(self):
+        """Return an appropriate object that implements Python's
+        threading.Lock API"""
+
+    def rlock_object(self):
+        """Return an appropriate object that implements Python's
+        threading.RLock API"""
+
+    def async_result(self):
+        """Return an instance that conforms to the
+        :class:`~IAsyncResult` interface appropriate for this
+        handler"""
+
+    def spawn(self, func, *args, **kwargs):
+        """Spawn a function to run asynchronously
+
+        :param args: args to call the function with.
+        :param kwargs: keyword args to call the function with.
+
+        This method should return immediately and execute the function
+        with the provided args and kwargs in an asynchronous manner.
+
+        """
+
+    def dispatch_callback(self, callback):
+        """Dispatch to the callback object
+
+        :param callback: A :class:`~kazoo.protocol.states.Callback`
+                         object to be called.
+
+        """
+
+
+class IAsyncResult(object):
+    """An Async Result object that can be queried for a value that has
+    been set asynchronously.
+
+    This object is modeled on the ``gevent`` AsyncResult object.
+
+    The implementation must account for the fact that the :meth:`set`
+    and :meth:`set_exception` methods will be called from within the
+    Zookeeper thread which may require extra care under asynchronous
+    environments.
+
+    .. attribute:: value
+
+        Holds the value passed to :meth:`set` if :meth:`set` was
+        called. Otherwise `None`.
+
+    .. attribute:: exception
+
+        Holds the exception instance passed to :meth:`set_exception`
+        if :meth:`set_exception` was called. Otherwise `None`.
+
+    """
+
+    def ready(self):
+        """Return `True` if and only if it holds a value or an
+        exception"""
+
+    def successful(self):
+        """Return `True` if and only if it is ready and holds a
+        value"""
+
+    def set(self, value=None):
+        """Store the value. Wake up the waiters.
+
+        :param value: Value to store as the result.
+
+        Any waiters blocking on :meth:`get` or :meth:`wait` are woken
+        up. Sequential calls to :meth:`wait` and :meth:`get` will not
+        block at all."""
+
+    def set_exception(self, exception):
+        """Store the exception. Wake up the waiters.
+
+        :param exception: Exception to raise when fetching the value.
+
+        Any waiters blocking on :meth:`get` or :meth:`wait` are woken
+        up. Sequential calls to :meth:`wait` and :meth:`get` will not
+        block at all."""
+
+    def get(self, block=True, timeout=None):
+        """Return the stored value or raise the exception
+
+        :param block: Whether this method should block or return
+                      immediately.
+        :type block: bool
+        :param timeout: How long to wait for a value when `block` is
+                        `True`.
+        :type timeout: float
+
+        If this instance already holds a value / an exception, return /
+        raise it immediately. Otherwise, block until :meth:`set` or
+        :meth:`set_exception` has been called or until the optional
+        timeout occurs."""
+
+    def get_nowait(self):
+        """Return the value or raise the exception without blocking.
+
+        If nothing is available, raise the Timeout exception class on
+        the associated :class:`IHandler` interface."""
+
+    def wait(self, timeout=None):
+        """Block until the instance is ready.
+
+        :param timeout: How long to wait for a value when `block` is
+                        `True`.
+        :type timeout: float
+
+        If this instance already holds a value / an exception, return /
+        raise it immediately. Otherwise, block until :meth:`set` or
+        :meth:`set_exception` has been called or until the optional
+        timeout occurs."""
+
+    def rawlink(self, callback):
+        """Register a callback to call when a value or an exception is
+        set
+
+        :param callback:
+            A callback function to call after :meth:`set` or
+            :meth:`set_exception` has been called. This function will
+            be passed a single argument, this instance.
+        :type callback: func
+
+        """
+
+    def unlink(self, callback):
+        """Remove the callback set by :meth:`rawlink`
+
+        :param callback: A callback function to remove.
+        :type callback: func
+
+        """

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/loggingsupport.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/loggingsupport.py 
b/slider-agent/src/main/python/kazoo/loggingsupport.py
new file mode 100644
index 0000000..5ed2f8f
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/loggingsupport.py
@@ -0,0 +1,2 @@
+BLATHER = 5 # log level for low-level debugging
+

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/protocol/__init__.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/protocol/__init__.py 
b/slider-agent/src/main/python/kazoo/protocol/__init__.py
new file mode 100644
index 0000000..792d600
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/protocol/__init__.py
@@ -0,0 +1 @@
+#

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/protocol/connection.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/protocol/connection.py 
b/slider-agent/src/main/python/kazoo/protocol/connection.py
new file mode 100644
index 0000000..e6ad63d
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/protocol/connection.py
@@ -0,0 +1,623 @@
+"""Zookeeper Protocol Connection Handler"""
+import logging
+import os
+import random
+import select
+import socket
+import sys
+import time
+from binascii import hexlify
+from contextlib import contextmanager
+
+from kazoo.exceptions import (
+    AuthFailedError,
+    ConnectionDropped,
+    EXCEPTIONS,
+    SessionExpiredError,
+    NoNodeError
+)
+from kazoo.handlers.utils import create_pipe
+from kazoo.loggingsupport import BLATHER
+from kazoo.protocol.serialization import (
+    Auth,
+    Close,
+    Connect,
+    Exists,
+    GetChildren,
+    Ping,
+    PingInstance,
+    ReplyHeader,
+    Transaction,
+    Watch,
+    int_struct
+)
+from kazoo.protocol.states import (
+    Callback,
+    KeeperState,
+    WatchedEvent,
+    EVENT_TYPE_MAP,
+)
+from kazoo.retry import (
+    ForceRetryError,
+    RetryFailedError
+)
+
+log = logging.getLogger(__name__)
+
+
+# Special testing hook objects used to force a session expired error as
+# if it came from the server
+_SESSION_EXPIRED = object()
+_CONNECTION_DROP = object()
+
+STOP_CONNECTING = object()
+
+CREATED_EVENT = 1
+DELETED_EVENT = 2
+CHANGED_EVENT = 3
+CHILD_EVENT = 4
+
+WATCH_XID = -1
+PING_XID = -2
+AUTH_XID = -4
+
+CLOSE_RESPONSE = Close.type
+
+if sys.version_info > (3, ):  # pragma: nocover
+    def buffer(obj, offset=0):
+        return memoryview(obj)[offset:]
+
+    advance_iterator = next
+else:  # pragma: nocover
+    def advance_iterator(it):
+        return it.next()
+
+
+class RWPinger(object):
+    """A Read/Write Server Pinger Iterable
+
+    This object is initialized with the hosts iterator object and the
+    socket creation function. Anytime `next` is called on its iterator
+    it yields either False, or a host, port tuple if it found a r/w
+    capable Zookeeper node.
+
+    After the first run-through of hosts, an exponential back-off delay
+    is added before the next run. This delay is tracked internally and
+    the iterator will yield False if called too soon.
+
+    """
+    def __init__(self, hosts, connection_func, socket_handling):
+        self.hosts = hosts
+        self.connection = connection_func
+        self.last_attempt = None
+        self.socket_handling = socket_handling
+
+    def __iter__(self):
+        if not self.last_attempt:
+            self.last_attempt = time.time()
+        delay = 0.5
+        while True:
+            yield self._next_server(delay)
+
+    def _next_server(self, delay):
+        jitter = random.randint(0, 100) / 100.0
+        while time.time() < self.last_attempt + delay + jitter:
+            # Skip rw ping checks if its too soon
+            return False
+        for host, port in self.hosts:
+            log.debug("Pinging server for r/w: %s:%s", host, port)
+            self.last_attempt = time.time()
+            try:
+                with self.socket_handling():
+                    sock = self.connection((host, port))
+                    sock.sendall(b"isro")
+                    result = sock.recv(8192)
+                    sock.close()
+                    if result == b'rw':
+                        return (host, port)
+                    else:
+                        return False
+            except ConnectionDropped:
+                return False
+
+            # Add some jitter between host pings
+            while time.time() < self.last_attempt + jitter:
+                return False
+        delay *= 2
+
+
+class RWServerAvailable(Exception):
+    """Thrown if a RW Server becomes available"""
+
+
+class ConnectionHandler(object):
+    """Zookeeper connection handler"""
+    def __init__(self, client, retry_sleeper, logger=None):
+        self.client = client
+        self.handler = client.handler
+        self.retry_sleeper = retry_sleeper
+        self.logger = logger or log
+
+        # Our event objects
+        self.connection_closed = client.handler.event_object()
+        self.connection_closed.set()
+        self.connection_stopped = client.handler.event_object()
+        self.connection_stopped.set()
+        self.ping_outstanding = client.handler.event_object()
+
+        self._read_pipe = None
+        self._write_pipe = None
+
+        self._socket = None
+        self._xid = None
+        self._rw_server = None
+        self._ro_mode = False
+
+        self._connection_routine = None
+
+    # This is instance specific to avoid odd thread bug issues in Python
+    # during shutdown global cleanup
+    @contextmanager
+    def _socket_error_handling(self):
+        try:
+            yield
+        except (socket.error, select.error) as e:
+            err = getattr(e, 'strerror', e)
+            raise ConnectionDropped("socket connection error: %s" % (err,))
+
+    def start(self):
+        """Start the connection up"""
+        if self.connection_closed.is_set():
+            self._read_pipe, self._write_pipe = create_pipe()
+            self.connection_closed.clear()
+        if self._connection_routine:
+            raise Exception("Unable to start, connection routine already "
+                            "active.")
+        self._connection_routine = self.handler.spawn(self.zk_loop)
+
+    def stop(self, timeout=None):
+        """Ensure the writer has stopped, wait to see if it does."""
+        self.connection_stopped.wait(timeout)
+        if self._connection_routine:
+            self._connection_routine.join()
+            self._connection_routine = None
+        return self.connection_stopped.is_set()
+
+    def close(self):
+        """Release resources held by the connection
+
+        The connection can be restarted afterwards.
+        """
+        if not self.connection_stopped.is_set():
+            raise Exception("Cannot close connection until it is stopped")
+        self.connection_closed.set()
+        wp, rp = self._write_pipe, self._read_pipe
+        self._write_pipe = self._read_pipe = None
+        if wp is not None:
+            os.close(wp)
+        if rp is not None:
+            os.close(rp)
+
+    def _server_pinger(self):
+        """Returns a server pinger iterable, that will ping the next
+        server in the list, and apply a back-off between attempts."""
+        return RWPinger(self.client.hosts, self.handler.create_connection,
+                        self._socket_error_handling)
+
+    def _read_header(self, timeout):
+        b = self._read(4, timeout)
+        length = int_struct.unpack(b)[0]
+        b = self._read(length, timeout)
+        header, offset = ReplyHeader.deserialize(b, 0)
+        return header, b, offset
+
+    def _read(self, length, timeout):
+        msgparts = []
+        remaining = length
+        with self._socket_error_handling():
+            while remaining > 0:
+                s = self.handler.select([self._socket], [], [], timeout)[0]
+                if not s:  # pragma: nocover
+                    # If the read list is empty, we got a timeout. We don't
+                    # have to check wlist and xlist as we don't set any
+                    raise self.handler.timeout_exception("socket time-out")
+
+                chunk = self._socket.recv(remaining)
+                if chunk == b'':
+                    raise ConnectionDropped('socket connection broken')
+                msgparts.append(chunk)
+                remaining -= len(chunk)
+            return b"".join(msgparts)
+
+    def _invoke(self, timeout, request, xid=None):
+        """A special writer used during connection establishment
+        only"""
+        self._submit(request, timeout, xid)
+        zxid = None
+        if xid:
+            header, buffer, offset = self._read_header(timeout)
+            if header.xid != xid:
+                raise RuntimeError('xids do not match, expected %r received 
%r',
+                                   xid, header.xid)
+            if header.zxid > 0:
+                zxid = header.zxid
+            if header.err:
+                callback_exception = EXCEPTIONS[header.err]()
+                self.logger.debug(
+                    'Received error(xid=%s) %r', xid, callback_exception)
+                raise callback_exception
+            return zxid
+
+        msg = self._read(4, timeout)
+        length = int_struct.unpack(msg)[0]
+        msg = self._read(length, timeout)
+
+        if hasattr(request, 'deserialize'):
+            try:
+                obj, _ = request.deserialize(msg, 0)
+            except Exception:
+                self.logger.exception("Exception raised during deserialization"
+                                      " of request: %s", request)
+
+                # raise ConnectionDropped so connect loop will retry
+                raise ConnectionDropped('invalid server response')
+            self.logger.log(BLATHER, 'Read response %s', obj)
+            return obj, zxid
+
+        return zxid
+
+    def _submit(self, request, timeout, xid=None):
+        """Submit a request object with a timeout value and optional
+        xid"""
+        b = bytearray()
+        if xid:
+            b.extend(int_struct.pack(xid))
+        if request.type:
+            b.extend(int_struct.pack(request.type))
+        b += request.serialize()
+        self.logger.log((BLATHER if isinstance(request, Ping) else 
logging.DEBUG),
+                        "Sending request(xid=%s): %s", xid, request)
+        self._write(int_struct.pack(len(b)) + b, timeout)
+
+    def _write(self, msg, timeout):
+        """Write a raw msg to the socket"""
+        sent = 0
+        msg_length = len(msg)
+        with self._socket_error_handling():
+            while sent < msg_length:
+                s = self.handler.select([], [self._socket], [], timeout)[1]
+                if not s:  # pragma: nocover
+                    # If the write list is empty, we got a timeout. We don't
+                    # have to check rlist and xlist as we don't set any
+                    raise self.handler.timeout_exception("socket time-out")
+                msg_slice = buffer(msg, sent)
+                bytes_sent = self._socket.send(msg_slice)
+                if not bytes_sent:
+                    raise ConnectionDropped('socket connection broken')
+                sent += bytes_sent
+
+    def _read_watch_event(self, buffer, offset):
+        client = self.client
+        watch, offset = Watch.deserialize(buffer, offset)
+        path = watch.path
+
+        self.logger.debug('Received EVENT: %s', watch)
+
+        watchers = []
+
+        if watch.type in (CREATED_EVENT, CHANGED_EVENT):
+            watchers.extend(client._data_watchers.pop(path, []))
+        elif watch.type == DELETED_EVENT:
+            watchers.extend(client._data_watchers.pop(path, []))
+            watchers.extend(client._child_watchers.pop(path, []))
+        elif watch.type == CHILD_EVENT:
+            watchers.extend(client._child_watchers.pop(path, []))
+        else:
+            self.logger.warn('Received unknown event %r', watch.type)
+            return
+
+        # Strip the chroot if needed
+        path = client.unchroot(path)
+        ev = WatchedEvent(EVENT_TYPE_MAP[watch.type], client._state, path)
+
+        # Last check to ignore watches if we've been stopped
+        if client._stopped.is_set():
+            return
+
+        # Dump the watchers to the watch thread
+        for watch in watchers:
+            client.handler.dispatch_callback(Callback('watch', watch, (ev,)))
+
+    def _read_response(self, header, buffer, offset):
+        client = self.client
+        request, async_object, xid = client._pending.popleft()
+        if header.zxid and header.zxid > 0:
+            client.last_zxid = header.zxid
+        if header.xid != xid:
+            raise RuntimeError('xids do not match, expected %r '
+                               'received %r', xid, header.xid)
+
+        # Determine if its an exists request and a no node error
+        exists_error = (header.err == NoNodeError.code and
+                        request.type == Exists.type)
+
+        # Set the exception if its not an exists error
+        if header.err and not exists_error:
+            callback_exception = EXCEPTIONS[header.err]()
+            self.logger.debug(
+                'Received error(xid=%s) %r', xid, callback_exception)
+            if async_object:
+                async_object.set_exception(callback_exception)
+        elif request and async_object:
+            if exists_error:
+                # It's a NoNodeError, which is fine for an exists
+                # request
+                async_object.set(None)
+            else:
+                try:
+                    response = request.deserialize(buffer, offset)
+                except Exception as exc:
+                    self.logger.exception("Exception raised during 
deserialization"
+                                          " of request: %s", request)
+                    async_object.set_exception(exc)
+                    return
+                self.logger.debug(
+                    'Received response(xid=%s): %r', xid, response)
+
+                # We special case a Transaction as we have to unchroot things
+                if request.type == Transaction.type:
+                    response = Transaction.unchroot(client, response)
+
+                async_object.set(response)
+
+            # Determine if watchers should be registered
+            watcher = getattr(request, 'watcher', None)
+            if not client._stopped.is_set() and watcher:
+                if isinstance(request, GetChildren):
+                    client._child_watchers[request.path].add(watcher)
+                else:
+                    client._data_watchers[request.path].add(watcher)
+
+        if isinstance(request, Close):
+            self.logger.log(BLATHER, 'Read close response')
+            return CLOSE_RESPONSE
+
+    def _read_socket(self, read_timeout):
+        """Called when there's something to read on the socket"""
+        client = self.client
+
+        header, buffer, offset = self._read_header(read_timeout)
+        if header.xid == PING_XID:
+            self.logger.log(BLATHER, 'Received Ping')
+            self.ping_outstanding.clear()
+        elif header.xid == AUTH_XID:
+            self.logger.log(BLATHER, 'Received AUTH')
+
+            request, async_object, xid = client._pending.popleft()
+            if header.err:
+                async_object.set_exception(AuthFailedError())
+                client._session_callback(KeeperState.AUTH_FAILED)
+            else:
+                async_object.set(True)
+        elif header.xid == WATCH_XID:
+            self._read_watch_event(buffer, offset)
+        else:
+            self.logger.log(BLATHER, 'Reading for header %r', header)
+
+            return self._read_response(header, buffer, offset)
+
+    def _send_request(self, read_timeout, connect_timeout):
+        """Called when we have something to send out on the socket"""
+        client = self.client
+        try:
+            request, async_object = client._queue[0]
+        except IndexError:
+            # Not actually something on the queue, this can occur if
+            # something happens to cancel the request such that we
+            # don't clear the pipe below after sending
+            try:
+                # Clear possible inconsistence (no request in the queue
+                # but have data in the read pipe), which causes cpu to spin.
+                os.read(self._read_pipe, 1)
+            except OSError:
+                pass
+            return
+
+        # Special case for testing, if this is a _SessionExpire object
+        # then throw a SessionExpiration error as if we were dropped
+        if request is _SESSION_EXPIRED:
+            raise SessionExpiredError("Session expired: Testing")
+        if request is _CONNECTION_DROP:
+            raise ConnectionDropped("Connection dropped: Testing")
+
+        # Special case for auth packets
+        if request.type == Auth.type:
+            xid = AUTH_XID
+        else:
+            self._xid += 1
+            xid = self._xid
+
+        self._submit(request, connect_timeout, xid)
+        client._queue.popleft()
+        os.read(self._read_pipe, 1)
+        client._pending.append((request, async_object, xid))
+
+    def _send_ping(self, connect_timeout):
+        self.ping_outstanding.set()
+        self._submit(PingInstance, connect_timeout, PING_XID)
+
+        # Determine if we need to check for a r/w server
+        if self._ro_mode:
+            result = advance_iterator(self._ro_mode)
+            if result:
+                self._rw_server = result
+                raise RWServerAvailable()
+
+    def zk_loop(self):
+        """Main Zookeeper handling loop"""
+        self.logger.log(BLATHER, 'ZK loop started')
+
+        self.connection_stopped.clear()
+
+        retry = self.retry_sleeper.copy()
+        try:
+            while not self.client._stopped.is_set():
+                # If the connect_loop returns STOP_CONNECTING, stop retrying
+                if retry(self._connect_loop, retry) is STOP_CONNECTING:
+                    break
+        except RetryFailedError:
+            self.logger.warning("Failed connecting to Zookeeper "
+                                "within the connection retry policy.")
+        finally:
+            self.connection_stopped.set()
+            self.client._session_callback(KeeperState.CLOSED)
+            self.logger.log(BLATHER, 'Connection stopped')
+
+    def _connect_loop(self, retry):
+        # Iterate through the hosts a full cycle before starting over
+        status = None
+        for host, port in self.client.hosts:
+            if self.client._stopped.is_set():
+                status = STOP_CONNECTING
+                break
+            status = self._connect_attempt(host, port, retry)
+            if status is STOP_CONNECTING:
+                break
+
+        if status is STOP_CONNECTING:
+            return STOP_CONNECTING
+        else:
+            raise ForceRetryError('Reconnecting')
+
+    def _connect_attempt(self, host, port, retry):
+        client = self.client
+        TimeoutError = self.handler.timeout_exception
+        close_connection = False
+
+        self._socket = None
+
+        # Were we given a r/w server? If so, use that instead
+        if self._rw_server:
+            self.logger.log(BLATHER,
+                            "Found r/w server to use, %s:%s", host, port)
+            host, port = self._rw_server
+            self._rw_server = None
+
+        if client._state != KeeperState.CONNECTING:
+            client._session_callback(KeeperState.CONNECTING)
+
+        try:
+            read_timeout, connect_timeout = self._connect(host, port)
+            read_timeout = read_timeout / 1000.0
+            connect_timeout = connect_timeout / 1000.0
+            retry.reset()
+            self._xid = 0
+
+            while not close_connection:
+                # Watch for something to read or send
+                jitter_time = random.randint(0, 40) / 100.0
+                # Ensure our timeout is positive
+                timeout = max([read_timeout / 2.0 - jitter_time, jitter_time])
+                s = self.handler.select([self._socket, self._read_pipe],
+                                        [], [], timeout)[0]
+
+                if not s:
+                    if self.ping_outstanding.is_set():
+                        self.ping_outstanding.clear()
+                        raise ConnectionDropped(
+                            "outstanding heartbeat ping not received")
+                    self._send_ping(connect_timeout)
+                elif s[0] == self._socket:
+                    response = self._read_socket(read_timeout)
+                    close_connection = response == CLOSE_RESPONSE
+                else:
+                    self._send_request(read_timeout, connect_timeout)
+
+            self.logger.info('Closing connection to %s:%s', host, port)
+            client._session_callback(KeeperState.CLOSED)
+            return STOP_CONNECTING
+        except (ConnectionDropped, TimeoutError) as e:
+            if isinstance(e, ConnectionDropped):
+                self.logger.warning('Connection dropped: %s', e)
+            else:
+                self.logger.warning('Connection time-out')
+            if client._state != KeeperState.CONNECTING:
+                self.logger.warning("Transition to CONNECTING")
+                client._session_callback(KeeperState.CONNECTING)
+        except AuthFailedError:
+            retry.reset()
+            self.logger.warning('AUTH_FAILED closing')
+            client._session_callback(KeeperState.AUTH_FAILED)
+            return STOP_CONNECTING
+        except SessionExpiredError:
+            retry.reset()
+            self.logger.warning('Session has expired')
+            client._session_callback(KeeperState.EXPIRED_SESSION)
+        except RWServerAvailable:
+            retry.reset()
+            self.logger.warning('Found a RW server, dropping connection')
+            client._session_callback(KeeperState.CONNECTING)
+        except Exception:
+            self.logger.exception('Unhandled exception in connection loop')
+            raise
+        finally:
+            if self._socket is not None:
+                self._socket.close()
+
+    def _connect(self, host, port):
+        client = self.client
+        self.logger.info('Connecting to %s:%s', host, port)
+
+        self.logger.log(BLATHER,
+                          '    Using session_id: %r session_passwd: %s',
+                          client._session_id,
+                          hexlify(client._session_passwd))
+
+        with self._socket_error_handling():
+            self._socket = self.handler.create_connection(
+                (host, port), client._session_timeout / 1000.0)
+
+        self._socket.setblocking(0)
+
+        connect = Connect(0, client.last_zxid, client._session_timeout,
+                          client._session_id or 0, client._session_passwd,
+                          client.read_only)
+
+        connect_result, zxid = self._invoke(client._session_timeout, connect)
+
+        if connect_result.time_out <= 0:
+            raise SessionExpiredError("Session has expired")
+
+        if zxid:
+            client.last_zxid = zxid
+
+        # Load return values
+        client._session_id = connect_result.session_id
+        client._protocol_version = connect_result.protocol_version
+        negotiated_session_timeout = connect_result.time_out
+        connect_timeout = negotiated_session_timeout / len(client.hosts)
+        read_timeout = negotiated_session_timeout * 2.0 / 3.0
+        client._session_passwd = connect_result.passwd
+
+        self.logger.log(BLATHER,
+                          'Session created, session_id: %r session_passwd: 
%s\n'
+                          '    negotiated session timeout: %s\n'
+                          '    connect timeout: %s\n'
+                          '    read timeout: %s', client._session_id,
+                          hexlify(client._session_passwd),
+                          negotiated_session_timeout, connect_timeout,
+                          read_timeout)
+
+        if connect_result.read_only:
+            client._session_callback(KeeperState.CONNECTED_RO)
+            self._ro_mode = iter(self._server_pinger())
+        else:
+            client._session_callback(KeeperState.CONNECTED)
+            self._ro_mode = None
+
+        for scheme, auth in client.auth_data:
+            ap = Auth(0, scheme, auth)
+            zxid = self._invoke(connect_timeout, ap, xid=AUTH_XID)
+            if zxid:
+                client.last_zxid = zxid
+        return read_timeout, connect_timeout

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/protocol/paths.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/protocol/paths.py 
b/slider-agent/src/main/python/kazoo/protocol/paths.py
new file mode 100644
index 0000000..52b2d6b
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/protocol/paths.py
@@ -0,0 +1,54 @@
+def normpath(path, trailing=False):
+    """Normalize path, eliminating double slashes, etc."""
+    comps = path.split('/')
+    new_comps = []
+    for comp in comps:
+        if comp == '':
+            continue
+        if comp in ('.', '..'):
+            raise ValueError('relative paths not allowed')
+        new_comps.append(comp)
+    new_path = '/'.join(new_comps)
+    if trailing is True and path.endswith('/'):
+        new_path += '/'
+    if path.startswith('/'):
+        return '/' + new_path
+    return new_path
+
+
+def join(a, *p):
+    """Join two or more pathname components, inserting '/' as needed.
+
+    If any component is an absolute path, all previous path components
+    will be discarded.
+
+    """
+    path = a
+    for b in p:
+        if b.startswith('/'):
+            path = b
+        elif path == '' or path.endswith('/'):
+            path += b
+        else:
+            path += '/' + b
+    return path
+
+
+def isabs(s):
+    """Test whether a path is absolute"""
+    return s.startswith('/')
+
+
+def basename(p):
+    """Returns the final component of a pathname"""
+    i = p.rfind('/') + 1
+    return p[i:]
+
+
+def _prefix_root(root, path, trailing=False):
+    """Prepend a root to a path. """
+    return normpath(join(_norm_root(root), path.lstrip('/')), 
trailing=trailing)
+
+
+def _norm_root(root):
+    return normpath(join('/', root))

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/protocol/serialization.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/protocol/serialization.py 
b/slider-agent/src/main/python/kazoo/protocol/serialization.py
new file mode 100644
index 0000000..f44f49a
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/protocol/serialization.py
@@ -0,0 +1,396 @@
+"""Zookeeper Serializers, Deserializers, and NamedTuple objects"""
+from collections import namedtuple
+import struct
+
+from kazoo.exceptions import EXCEPTIONS
+from kazoo.protocol.states import ZnodeStat
+from kazoo.security import ACL
+from kazoo.security import Id
+
+# Struct objects with formats compiled
+bool_struct = struct.Struct('B')
+int_struct = struct.Struct('!i')
+int_int_struct = struct.Struct('!ii')
+int_int_long_struct = struct.Struct('!iiq')
+
+int_long_int_long_struct = struct.Struct('!iqiq')
+multiheader_struct = struct.Struct('!iBi')
+reply_header_struct = struct.Struct('!iqi')
+stat_struct = struct.Struct('!qqqqiiiqiiq')
+
+try:  # pragma: nocover
+    basestring
+except NameError:
+    basestring = str
+
+
+def read_string(buffer, offset):
+    """Reads an int specified buffer into a string and returns the
+    string and the new offset in the buffer"""
+    length = int_struct.unpack_from(buffer, offset)[0]
+    offset += int_struct.size
+    if length < 0:
+        return None, offset
+    else:
+        index = offset
+        offset += length
+        return buffer[index:index + length].decode('utf-8'), offset
+
+
+def read_acl(bytes, offset):
+    perms = int_struct.unpack_from(bytes, offset)[0]
+    offset += int_struct.size
+    scheme, offset = read_string(bytes, offset)
+    id, offset = read_string(bytes, offset)
+    return ACL(perms, Id(scheme, id)), offset
+
+
+def write_string(bytes):
+    if not bytes:
+        return int_struct.pack(-1)
+    else:
+        utf8_str = bytes.encode('utf-8')
+        return int_struct.pack(len(utf8_str)) + utf8_str
+
+
+def write_buffer(bytes):
+    if bytes is None:
+        return int_struct.pack(-1)
+    else:
+        return int_struct.pack(len(bytes)) + bytes
+
+
+def read_buffer(bytes, offset):
+    length = int_struct.unpack_from(bytes, offset)[0]
+    offset += int_struct.size
+    if length < 0:
+        return None, offset
+    else:
+        index = offset
+        offset += length
+        return bytes[index:index + length], offset
+
+
+class Close(namedtuple('Close', '')):
+    type = -11
+
+    @classmethod
+    def serialize(cls):
+        return b''
+
+CloseInstance = Close()
+
+
+class Ping(namedtuple('Ping', '')):
+    type = 11
+
+    @classmethod
+    def serialize(cls):
+        return b''
+
+PingInstance = Ping()
+
+
+class Connect(namedtuple('Connect', 'protocol_version last_zxid_seen'
+                         ' time_out session_id passwd read_only')):
+    type = None
+
+    def serialize(self):
+        b = bytearray()
+        b.extend(int_long_int_long_struct.pack(
+            self.protocol_version, self.last_zxid_seen, self.time_out,
+            self.session_id))
+        b.extend(write_buffer(self.passwd))
+        b.extend([1 if self.read_only else 0])
+        return b
+
+    @classmethod
+    def deserialize(cls, bytes, offset):
+        proto_version, timeout, session_id = int_int_long_struct.unpack_from(
+            bytes, offset)
+        offset += int_int_long_struct.size
+        password, offset = read_buffer(bytes, offset)
+
+        try:
+            read_only = bool_struct.unpack_from(bytes, offset)[0] is 1
+            offset += bool_struct.size
+        except struct.error:
+            read_only = False
+        return cls(proto_version, 0, timeout, session_id, password,
+                   read_only), offset
+
+
+class Create(namedtuple('Create', 'path data acl flags')):
+    type = 1
+
+    def serialize(self):
+        b = bytearray()
+        b.extend(write_string(self.path))
+        b.extend(write_buffer(self.data))
+        b.extend(int_struct.pack(len(self.acl)))
+        for acl in self.acl:
+            b.extend(int_struct.pack(acl.perms) +
+                     write_string(acl.id.scheme) + write_string(acl.id.id))
+        b.extend(int_struct.pack(self.flags))
+        return b
+
+    @classmethod
+    def deserialize(cls, bytes, offset):
+        return read_string(bytes, offset)[0]
+
+
+class Delete(namedtuple('Delete', 'path version')):
+    type = 2
+
+    def serialize(self):
+        b = bytearray()
+        b.extend(write_string(self.path))
+        b.extend(int_struct.pack(self.version))
+        return b
+
+    @classmethod
+    def deserialize(self, bytes, offset):
+        return True
+
+
+class Exists(namedtuple('Exists', 'path watcher')):
+    type = 3
+
+    def serialize(self):
+        b = bytearray()
+        b.extend(write_string(self.path))
+        b.extend([1 if self.watcher else 0])
+        return b
+
+    @classmethod
+    def deserialize(cls, bytes, offset):
+        stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
+        return stat if stat.czxid != -1 else None
+
+
+class GetData(namedtuple('GetData', 'path watcher')):
+    type = 4
+
+    def serialize(self):
+        b = bytearray()
+        b.extend(write_string(self.path))
+        b.extend([1 if self.watcher else 0])
+        return b
+
+    @classmethod
+    def deserialize(cls, bytes, offset):
+        data, offset = read_buffer(bytes, offset)
+        stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
+        return data, stat
+
+
+class SetData(namedtuple('SetData', 'path data version')):
+    type = 5
+
+    def serialize(self):
+        b = bytearray()
+        b.extend(write_string(self.path))
+        b.extend(write_buffer(self.data))
+        b.extend(int_struct.pack(self.version))
+        return b
+
+    @classmethod
+    def deserialize(cls, bytes, offset):
+        return ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
+
+
+class GetACL(namedtuple('GetACL', 'path')):
+    type = 6
+
+    def serialize(self):
+        return bytearray(write_string(self.path))
+
+    @classmethod
+    def deserialize(cls, bytes, offset):
+        count = int_struct.unpack_from(bytes, offset)[0]
+        offset += int_struct.size
+        if count == -1:  # pragma: nocover
+            return []
+
+        acls = []
+        for c in range(count):
+            acl, offset = read_acl(bytes, offset)
+            acls.append(acl)
+        stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
+        return acls, stat
+
+
+class SetACL(namedtuple('SetACL', 'path acls version')):
+    type = 7
+
+    def serialize(self):
+        b = bytearray()
+        b.extend(write_string(self.path))
+        b.extend(int_struct.pack(len(self.acls)))
+        for acl in self.acls:
+            b.extend(int_struct.pack(acl.perms) +
+                     write_string(acl.id.scheme) + write_string(acl.id.id))
+        b.extend(int_struct.pack(self.version))
+        return b
+
+    @classmethod
+    def deserialize(cls, bytes, offset):
+        return ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
+
+
+class GetChildren(namedtuple('GetChildren', 'path watcher')):
+    type = 8
+
+    def serialize(self):
+        b = bytearray()
+        b.extend(write_string(self.path))
+        b.extend([1 if self.watcher else 0])
+        return b
+
+    @classmethod
+    def deserialize(cls, bytes, offset):
+        count = int_struct.unpack_from(bytes, offset)[0]
+        offset += int_struct.size
+        if count == -1:  # pragma: nocover
+            return []
+
+        children = []
+        for c in range(count):
+            child, offset = read_string(bytes, offset)
+            children.append(child)
+        return children
+
+
+class Sync(namedtuple('Sync', 'path')):
+    type = 9
+
+    def serialize(self):
+        return write_string(self.path)
+
+    @classmethod
+    def deserialize(cls, buffer, offset):
+        return read_string(buffer, offset)[0]
+
+
+class GetChildren2(namedtuple('GetChildren2', 'path watcher')):
+    type = 12
+
+    def serialize(self):
+        b = bytearray()
+        b.extend(write_string(self.path))
+        b.extend([1 if self.watcher else 0])
+        return b
+
+    @classmethod
+    def deserialize(cls, bytes, offset):
+        count = int_struct.unpack_from(bytes, offset)[0]
+        offset += int_struct.size
+        if count == -1:  # pragma: nocover
+            return []
+
+        children = []
+        for c in range(count):
+            child, offset = read_string(bytes, offset)
+            children.append(child)
+        stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
+        return children, stat
+
+
+class CheckVersion(namedtuple('CheckVersion', 'path version')):
+    type = 13
+
+    def serialize(self):
+        b = bytearray()
+        b.extend(write_string(self.path))
+        b.extend(int_struct.pack(self.version))
+        return b
+
+
+class Transaction(namedtuple('Transaction', 'operations')):
+    type = 14
+
+    def serialize(self):
+        b = bytearray()
+        for op in self.operations:
+            b.extend(MultiHeader(op.type, False, -1).serialize() +
+                     op.serialize())
+        return b + multiheader_struct.pack(-1, True, -1)
+
+    @classmethod
+    def deserialize(cls, bytes, offset):
+        header = MultiHeader(None, False, None)
+        results = []
+        response = None
+        while not header.done:
+            if header.type == Create.type:
+                response, offset = read_string(bytes, offset)
+            elif header.type == Delete.type:
+                response = True
+            elif header.type == SetData.type:
+                response = ZnodeStat._make(
+                    stat_struct.unpack_from(bytes, offset))
+                offset += stat_struct.size
+            elif header.type == CheckVersion.type:
+                response = True
+            elif header.type == -1:
+                err = int_struct.unpack_from(bytes, offset)[0]
+                offset += int_struct.size
+                response = EXCEPTIONS[err]()
+            if response:
+                results.append(response)
+            header, offset = MultiHeader.deserialize(bytes, offset)
+        return results
+
+    @staticmethod
+    def unchroot(client, response):
+        resp = []
+        for result in response:
+            if isinstance(result, basestring):
+                resp.append(client.unchroot(result))
+            else:
+                resp.append(result)
+        return resp
+
+
+class Auth(namedtuple('Auth', 'auth_type scheme auth')):
+    type = 100
+
+    def serialize(self):
+        return (int_struct.pack(self.auth_type) + write_string(self.scheme) +
+                write_string(self.auth))
+
+
+class Watch(namedtuple('Watch', 'type state path')):
+    @classmethod
+    def deserialize(cls, bytes, offset):
+        """Given bytes and the current bytes offset, return the
+        type, state, path, and new offset"""
+        type, state = int_int_struct.unpack_from(bytes, offset)
+        offset += int_int_struct.size
+        path, offset = read_string(bytes, offset)
+        return cls(type, state, path), offset
+
+
+class ReplyHeader(namedtuple('ReplyHeader', 'xid, zxid, err')):
+    @classmethod
+    def deserialize(cls, bytes, offset):
+        """Given bytes and the current bytes offset, return a
+        :class:`ReplyHeader` instance and the new offset"""
+        new_offset = offset + reply_header_struct.size
+        return cls._make(
+            reply_header_struct.unpack_from(bytes, offset)), new_offset
+
+
+class MultiHeader(namedtuple('MultiHeader', 'type done err')):
+    def serialize(self):
+        b = bytearray()
+        b.extend(int_struct.pack(self.type))
+        b.extend([1 if self.done else 0])
+        b.extend(int_struct.pack(self.err))
+        return b
+
+    @classmethod
+    def deserialize(cls, bytes, offset):
+        t, done, err = multiheader_struct.unpack_from(bytes, offset)
+        offset += multiheader_struct.size
+        return cls(t, done is 1, err), offset

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/protocol/states.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/protocol/states.py 
b/slider-agent/src/main/python/kazoo/protocol/states.py
new file mode 100644
index 0000000..395c013
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/protocol/states.py
@@ -0,0 +1,237 @@
+"""Kazoo State and Event objects"""
+from collections import namedtuple
+
+
+class KazooState(object):
+    """High level connection state values
+
+    States inspired by Netflix Curator.
+
+    .. attribute:: SUSPENDED
+
+        The connection has been lost but may be recovered. We should
+        operate in a "safe mode" until then. When the connection is
+        resumed, it may be discovered that the session expired. A
+        client should not assume that locks are valid during this
+        time.
+
+    .. attribute:: CONNECTED
+
+        The connection is alive and well.
+
+    .. attribute:: LOST
+
+        The connection has been confirmed dead. Any ephemeral nodes
+        will need to be recreated upon re-establishing a connection.
+        If locks were acquired or recipes using ephemeral nodes are in
+        use, they can be considered lost as well.
+
+    """
+    SUSPENDED = "SUSPENDED"
+    CONNECTED = "CONNECTED"
+    LOST = "LOST"
+
+
+class KeeperState(object):
+    """Zookeeper State
+
+    Represents the Zookeeper state. Watch functions will receive a
+    :class:`KeeperState` attribute as their state argument.
+
+    .. attribute:: AUTH_FAILED
+
+        Authentication has failed, this is an unrecoverable error.
+
+    .. attribute:: CONNECTED
+
+        Zookeeper is connected.
+
+    .. attribute:: CONNECTED_RO
+
+        Zookeeper is connected in read-only state.
+
+    .. attribute:: CONNECTING
+
+        Zookeeper is currently attempting to establish a connection.
+
+    .. attribute:: EXPIRED_SESSION
+
+        The prior session was invalid, all prior ephemeral nodes are
+        gone.
+
+    """
+    AUTH_FAILED = 'AUTH_FAILED'
+    CONNECTED = 'CONNECTED'
+    CONNECTED_RO = 'CONNECTED_RO'
+    CONNECTING = 'CONNECTING'
+    CLOSED = 'CLOSED'
+    EXPIRED_SESSION = 'EXPIRED_SESSION'
+
+
+class EventType(object):
+    """Zookeeper Event
+
+    Represents a Zookeeper event. Events trigger watch functions which
+    will receive a :class:`EventType` attribute as their event
+    argument.
+
+    .. attribute:: CREATED
+
+        A node has been created.
+
+    .. attribute:: DELETED
+
+        A node has been deleted.
+
+    .. attribute:: CHANGED
+
+        The data for a node has changed.
+
+    .. attribute:: CHILD
+
+        The children under a node have changed (a child was added or
+        removed). This event does not indicate the data for a child
+        node has changed, which must have its own watch established.
+
+    """
+    CREATED = 'CREATED'
+    DELETED = 'DELETED'
+    CHANGED = 'CHANGED'
+    CHILD = 'CHILD'
+
+EVENT_TYPE_MAP = {
+    1: EventType.CREATED,
+    2: EventType.DELETED,
+    3: EventType.CHANGED,
+    4: EventType.CHILD
+}
+
+
+class WatchedEvent(namedtuple('WatchedEvent', ('type', 'state', 'path'))):
+    """A change on ZooKeeper that a Watcher is able to respond to.
+
+    The :class:`WatchedEvent` includes exactly what happened, the
+    current state of ZooKeeper, and the path of the node that was
+    involved in the event. An instance of :class:`WatchedEvent` will be
+    passed to registered watch functions.
+
+    .. attribute:: type
+
+        A :class:`EventType` attribute indicating the event type.
+
+    .. attribute:: state
+
+        A :class:`KeeperState` attribute indicating the Zookeeper
+        state.
+
+    .. attribute:: path
+
+        The path of the node for the watch event.
+
+    """
+
+
+class Callback(namedtuple('Callback', ('type', 'func', 'args'))):
+    """A callback that is handed to a handler for dispatch
+
+    :param type: Type of the callback, currently is only 'watch'
+    :param func: Callback function
+    :param args: Argument list for the callback function
+
+    """
+
+
+class ZnodeStat(namedtuple('ZnodeStat', 'czxid mzxid ctime mtime version'
+                           ' cversion aversion ephemeralOwner dataLength'
+                           ' numChildren pzxid')):
+    """A ZnodeStat structure with convenience properties
+
+    When getting the value of a node from Zookeeper, the properties for
+    the node known as a "Stat structure" will be retrieved. The
+    :class:`ZnodeStat` object provides access to the standard Stat
+    properties and additional properties that are more readable and use
+    Python time semantics (seconds since epoch instead of ms).
+
+    .. note::
+
+        The original Zookeeper Stat name is in parens next to the name
+        when it differs from the convenience attribute. These are **not
+        functions**, just attributes.
+
+    .. attribute:: creation_transaction_id (czxid)
+
+        The transaction id of the change that caused this znode to be
+        created.
+
+    .. attribute:: last_modified_transaction_id (mzxid)
+
+        The transaction id of the change that last modified this znode.
+
+    .. attribute:: created (ctime)
+
+        The time in seconds from epoch when this node was created.
+        (ctime is in milliseconds)
+
+    .. attribute:: last_modified (mtime)
+
+        The time in seconds from epoch when this znode was last
+        modified. (mtime is in milliseconds)
+
+    .. attribute:: version
+
+        The number of changes to the data of this znode.
+
+    .. attribute:: acl_version (aversion)
+
+        The number of changes to the ACL of this znode.
+
+    .. attribute:: owner_session_id (ephemeralOwner)
+
+        The session id of the owner of this znode if the znode is an
+        ephemeral node. If it is not an ephemeral node, it will be
+        `None`. (ephemeralOwner will be 0 if it is not ephemeral)
+
+    .. attribute:: data_length (dataLength)
+
+        The length of the data field of this znode.
+
+    .. attribute:: children_count (numChildren)
+
+        The number of children of this znode.
+
+    """
+    @property
+    def acl_version(self):
+        return self.aversion
+
+    @property
+    def children_version(self):
+        return self.cversion
+
+    @property
+    def created(self):
+        return self.ctime / 1000.0
+
+    @property
+    def last_modified(self):
+        return self.mtime / 1000.0
+
+    @property
+    def owner_session_id(self):
+        return self.ephemeralOwner or None
+
+    @property
+    def creation_transaction_id(self):
+        return self.czxid
+
+    @property
+    def last_modified_transaction_id(self):
+        return self.mzxid
+
+    @property
+    def data_length(self):
+        return self.dataLength
+
+    @property
+    def children_count(self):
+        return self.numChildren

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/recipe/__init__.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/recipe/__init__.py 
b/slider-agent/src/main/python/kazoo/recipe/__init__.py
new file mode 100644
index 0000000..792d600
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/recipe/__init__.py
@@ -0,0 +1 @@
+#

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/recipe/barrier.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/recipe/barrier.py 
b/slider-agent/src/main/python/kazoo/recipe/barrier.py
new file mode 100644
index 0000000..05addb4
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/recipe/barrier.py
@@ -0,0 +1,214 @@
+"""Zookeeper Barriers
+
+:Maintainer: None
+:Status: Unknown
+
+"""
+import os
+import socket
+import uuid
+
+from kazoo.protocol.states import EventType
+from kazoo.exceptions import KazooException
+from kazoo.exceptions import NoNodeError
+from kazoo.exceptions import NodeExistsError
+
+
+class Barrier(object):
+    """Kazoo Barrier
+
+    Implements a barrier to block processing of a set of nodes until
+    a condition is met at which point the nodes will be allowed to
+    proceed. The barrier is in place if its node exists.
+
+    .. warning::
+
+        The :meth:`wait` function does not handle connection loss and
+        may raise :exc:`~kazoo.exceptions.ConnectionLossException` if
+        the connection is lost while waiting.
+
+    """
+    def __init__(self, client, path):
+        """Create a Kazoo Barrier
+
+        :param client: A :class:`~kazoo.client.KazooClient` instance.
+        :param path: The barrier path to use.
+
+        """
+        self.client = client
+        self.path = path
+
+    def create(self):
+        """Establish the barrier if it doesn't exist already"""
+        self.client.retry(self.client.ensure_path, self.path)
+
+    def remove(self):
+        """Remove the barrier
+
+        :returns: Whether the barrier actually needed to be removed.
+        :rtype: bool
+
+        """
+        try:
+            self.client.retry(self.client.delete, self.path)
+            return True
+        except NoNodeError:
+            return False
+
+    def wait(self, timeout=None):
+        """Wait on the barrier to be cleared
+
+        :returns: True if the barrier has been cleared, otherwise
+                  False.
+        :rtype: bool
+
+        """
+        cleared = self.client.handler.event_object()
+
+        def wait_for_clear(event):
+            if event.type == EventType.DELETED:
+                cleared.set()
+
+        exists = self.client.exists(self.path, watch=wait_for_clear)
+        if not exists:
+            return True
+
+        cleared.wait(timeout)
+        return cleared.is_set()
+
+
+class DoubleBarrier(object):
+    """Kazoo Double Barrier
+
+    Double barriers are used to synchronize the beginning and end of
+    a distributed task. The barrier blocks when entering it until all
+    the members have joined, and blocks when leaving until all the
+    members have left.
+
+    .. note::
+
+        You should register a listener for session loss as the process
+        will no longer be part of the barrier once the session is
+        gone. Connection losses will be retried with the default retry
+        policy.
+
+    """
+    def __init__(self, client, path, num_clients, identifier=None):
+        """Create a Double Barrier
+
+        :param client: A :class:`~kazoo.client.KazooClient` instance.
+        :param path: The barrier path to use.
+        :param num_clients: How many clients must enter the barrier to
+                            proceed.
+        :type num_clients: int
+        :param identifier: An identifier to use for this member of the
+                           barrier when participating. Defaults to the
+                           hostname + process id.
+
+        """
+        self.client = client
+        self.path = path
+        self.num_clients = num_clients
+        self._identifier = identifier or '%s-%s' % (
+            socket.getfqdn(), os.getpid())
+        self.participating = False
+        self.assured_path = False
+        self.node_name = uuid.uuid4().hex
+        self.create_path = self.path + "/" + self.node_name
+
+    def enter(self):
+        """Enter the barrier, blocks until all nodes have entered"""
+        try:
+            self.client.retry(self._inner_enter)
+            self.participating = True
+        except KazooException:
+            # We failed to enter, best effort cleanup
+            self._best_effort_cleanup()
+            self.participating = False
+
+    def _inner_enter(self):
+        # make sure our barrier parent node exists
+        if not self.assured_path:
+            self.client.ensure_path(self.path)
+            self.assured_path = True
+
+        ready = self.client.handler.event_object()
+
+        try:
+            self.client.create(self.create_path,
+                self._identifier.encode('utf-8'), ephemeral=True)
+        except NodeExistsError:
+            pass
+
+        def created(event):
+            if event.type == EventType.CREATED:
+                ready.set()
+
+        self.client.exists(self.path + '/' + 'ready', watch=created)
+
+        children = self.client.get_children(self.path)
+
+        if len(children) < self.num_clients:
+            ready.wait()
+        else:
+            self.client.ensure_path(self.path + '/ready')
+        return True
+
+    def leave(self):
+        """Leave the barrier, blocks until all nodes have left"""
+        try:
+            self.client.retry(self._inner_leave)
+        except KazooException:  # pragma: nocover
+            # Failed to cleanly leave
+            self._best_effort_cleanup()
+        self.participating = False
+
+    def _inner_leave(self):
+        # Delete the ready node if its around
+        try:
+            self.client.delete(self.path + '/ready')
+        except NoNodeError:
+            pass
+
+        while True:
+            children = self.client.get_children(self.path)
+            if not children:
+                return True
+
+            if len(children) == 1 and children[0] == self.node_name:
+                self.client.delete(self.create_path)
+                return True
+
+            children.sort()
+
+            ready = self.client.handler.event_object()
+
+            def deleted(event):
+                if event.type == EventType.DELETED:
+                    ready.set()
+
+            if self.node_name == children[0]:
+                # We're first, wait on the highest to leave
+                if not self.client.exists(self.path + '/' + children[-1],
+                                          watch=deleted):
+                    continue
+
+                ready.wait()
+                continue
+
+            # Delete our node
+            self.client.delete(self.create_path)
+
+            # Wait on the first
+            if not self.client.exists(self.path + '/' + children[0],
+                                      watch=deleted):
+                continue
+
+            # Wait for the lowest to be deleted
+            ready.wait()
+
+    def _best_effort_cleanup(self):
+        try:
+            self.client.retry(self.client.delete, self.create_path)
+        except NoNodeError:
+            pass

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/recipe/counter.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/recipe/counter.py 
b/slider-agent/src/main/python/kazoo/recipe/counter.py
new file mode 100644
index 0000000..ed80f51
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/recipe/counter.py
@@ -0,0 +1,94 @@
+"""Zookeeper Counter
+
+:Maintainer: None
+:Status: Unknown
+
+"""
+
+from kazoo.exceptions import BadVersionError
+from kazoo.retry import ForceRetryError
+
+
+class Counter(object):
+    """Kazoo Counter
+
+    A shared counter of either int or float values. Changes to the
+    counter are done atomically. The general retry policy is used to
+    retry operations if concurrent changes are detected.
+
+    The data is marshaled using `repr(value)` and converted back using
+    `type(counter.default)(value)` both using an ascii encoding. As
+    such other data types might be used for the counter value.
+
+    Counter changes can raise
+    :class:`~kazoo.exceptions.BadVersionError` if the retry policy
+    wasn't able to apply a change.
+
+    Example usage:
+
+    .. code-block:: python
+
+        zk = KazooClient()
+        counter = zk.Counter("/int")
+        counter += 2
+        counter -= 1
+        counter.value == 1
+
+        counter = zk.Counter("/float", default=1.0)
+        counter += 2.0
+        counter.value == 3.0
+
+    """
+    def __init__(self, client, path, default=0):
+        """Create a Kazoo Counter
+
+        :param client: A :class:`~kazoo.client.KazooClient` instance.
+        :param path: The counter path to use.
+        :param default: The default value.
+
+        """
+        self.client = client
+        self.path = path
+        self.default = default
+        self.default_type = type(default)
+        self._ensured_path = False
+
+    def _ensure_node(self):
+        if not self._ensured_path:
+            # make sure our node exists
+            self.client.ensure_path(self.path)
+            self._ensured_path = True
+
+    def _value(self):
+        self._ensure_node()
+        old, stat = self.client.get(self.path)
+        old = old.decode('ascii') if old != b'' else self.default
+        version = stat.version
+        data = self.default_type(old)
+        return data, version
+
+    @property
+    def value(self):
+        return self._value()[0]
+
+    def _change(self, value):
+        if not isinstance(value, self.default_type):
+            raise TypeError('invalid type for value change')
+        self.client.retry(self._inner_change, value)
+        return self
+
+    def _inner_change(self, value):
+        data, version = self._value()
+        data = repr(data + value).encode('ascii')
+        try:
+            self.client.set(self.path, data, version=version)
+        except BadVersionError:  # pragma: nocover
+            raise ForceRetryError()
+
+    def __add__(self, value):
+        """Add value to counter."""
+        return self._change(value)
+
+    def __sub__(self, value):
+        """Subtract value from counter."""
+        return self._change(-value)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/recipe/election.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/recipe/election.py 
b/slider-agent/src/main/python/kazoo/recipe/election.py
new file mode 100644
index 0000000..3089fa6
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/recipe/election.py
@@ -0,0 +1,79 @@
+"""ZooKeeper Leader Elections
+
+:Maintainer: None
+:Status: Unknown
+
+"""
+from kazoo.exceptions import CancelledError
+
+
+class Election(object):
+    """Kazoo Basic Leader Election
+
+    Example usage with a :class:`~kazoo.client.KazooClient` instance::
+
+        zk = KazooClient()
+        election = zk.Election("/electionpath", "my-identifier")
+
+        # blocks until the election is won, then calls
+        # my_leader_function()
+        election.run(my_leader_function)
+
+    """
+    def __init__(self, client, path, identifier=None):
+        """Create a Kazoo Leader Election
+
+        :param client: A :class:`~kazoo.client.KazooClient` instance.
+        :param path: The election path to use.
+        :param identifier: Name to use for this lock contender. This
+                           can be useful for querying to see who the
+                           current lock contenders are.
+
+        """
+        self.lock = client.Lock(path, identifier)
+
+    def run(self, func, *args, **kwargs):
+        """Contend for the leadership
+
+        This call will block until either this contender is cancelled
+        or this contender wins the election and the provided leadership
+        function subsequently returns or fails.
+
+        :param func: A function to be called if/when the election is
+                     won.
+        :param args: Arguments to leadership function.
+        :param kwargs: Keyword arguments to leadership function.
+
+        """
+        if not callable(func):
+            raise ValueError("leader function is not callable")
+
+        try:
+            with self.lock:
+                func(*args, **kwargs)
+
+        except CancelledError:
+            pass
+
+    def cancel(self):
+        """Cancel participation in the election
+
+        .. note::
+
+            If this contender has already been elected leader, this
+            method will not interrupt the leadership function.
+
+        """
+        self.lock.cancel()
+
+    def contenders(self):
+        """Return an ordered list of the current contenders in the
+        election
+
+        .. note::
+
+            If the contenders did not set an identifier, it will appear
+            as a blank string.
+
+        """
+        return self.lock.contenders()

Reply via email to