http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/handlers/threading.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/handlers/threading.py b/slider-agent/src/main/python/kazoo/handlers/threading.py new file mode 100644 index 0000000..8e6648d --- /dev/null +++ b/slider-agent/src/main/python/kazoo/handlers/threading.py @@ -0,0 +1,287 @@ +"""A threading based handler. + +The :class:`SequentialThreadingHandler` is intended for regular Python +environments that use threads. + +.. warning:: + + Do not use :class:`SequentialThreadingHandler` with applications + using asynchronous event loops (like gevent). Use the + :class:`~kazoo.handlers.gevent.SequentialGeventHandler` instead. + +""" +from __future__ import absolute_import + +import atexit +import logging +import select +import socket +import threading +import time + +try: + import Queue +except ImportError: # pragma: nocover + import queue as Queue + +from kazoo.handlers.utils import create_tcp_socket, create_tcp_connection + +# sentinel objects +_NONE = object() +_STOP = object() + +log = logging.getLogger(__name__) + + +class TimeoutError(Exception): + pass + + +class AsyncResult(object): + """A one-time event that stores a value or an exception""" + def __init__(self, handler): + self._handler = handler + self.value = None + self._exception = _NONE + self._condition = threading.Condition() + self._callbacks = [] + + def ready(self): + """Return true if and only if it holds a value or an + exception""" + return self._exception is not _NONE + + def successful(self): + """Return true if and only if it is ready and holds a value""" + return self._exception is None + + @property + def exception(self): + if self._exception is not _NONE: + return self._exception + + def set(self, value=None): + """Store the value. Wake up the waiters.""" + with self._condition: + self.value = value + self._exception = None + + for callback in self._callbacks: + self._handler.completion_queue.put( + lambda: callback(self) + ) + self._condition.notify_all() + + def set_exception(self, exception): + """Store the exception. Wake up the waiters.""" + with self._condition: + self._exception = exception + + for callback in self._callbacks: + self._handler.completion_queue.put( + lambda: callback(self) + ) + self._condition.notify_all() + + def get(self, block=True, timeout=None): + """Return the stored value or raise the exception. + + If there is no value raises TimeoutError. + + """ + with self._condition: + if self._exception is not _NONE: + if self._exception is None: + return self.value + raise self._exception + elif block: + self._condition.wait(timeout) + if self._exception is not _NONE: + if self._exception is None: + return self.value + raise self._exception + + # if we get to this point we timeout + raise TimeoutError() + + def get_nowait(self): + """Return the value or raise the exception without blocking. + + If nothing is available, raises TimeoutError + + """ + return self.get(block=False) + + def wait(self, timeout=None): + """Block until the instance is ready.""" + with self._condition: + self._condition.wait(timeout) + return self._exception is not _NONE + + def rawlink(self, callback): + """Register a callback to call when a value or an exception is + set""" + with self._condition: + # Are we already set? Dispatch it now + if self.ready(): + self._handler.completion_queue.put( + lambda: callback(self) + ) + return + + if callback not in self._callbacks: + self._callbacks.append(callback) + + def unlink(self, callback): + """Remove the callback set by :meth:`rawlink`""" + with self._condition: + if self.ready(): + # Already triggered, ignore + return + + if callback in self._callbacks: + self._callbacks.remove(callback) + + +class SequentialThreadingHandler(object): + """Threading handler for sequentially executing callbacks. + + This handler executes callbacks in a sequential manner. A queue is + created for each of the callback events, so that each type of event + has its callback type run sequentially. These are split into two + queues, one for watch events and one for async result completion + callbacks. + + Each queue type has a thread worker that pulls the callback event + off the queue and runs it in the order the client sees it. + + This split helps ensure that watch callbacks won't block session + re-establishment should the connection be lost during a Zookeeper + client call. + + Watch and completion callbacks should avoid blocking behavior as + the next callback of that type won't be run until it completes. If + you need to block, spawn a new thread and return immediately so + callbacks can proceed. + + .. note:: + + Completion callbacks can block to wait on Zookeeper calls, but + no other completion callbacks will execute until the callback + returns. + + """ + name = "sequential_threading_handler" + timeout_exception = TimeoutError + sleep_func = staticmethod(time.sleep) + queue_impl = Queue.Queue + queue_empty = Queue.Empty + + def __init__(self): + """Create a :class:`SequentialThreadingHandler` instance""" + self.callback_queue = self.queue_impl() + self.completion_queue = self.queue_impl() + self._running = False + self._state_change = threading.Lock() + self._workers = [] + + def _create_thread_worker(self, queue): + def thread_worker(): # pragma: nocover + while True: + try: + func = queue.get() + try: + if func is _STOP: + break + func() + except Exception: + log.exception("Exception in worker queue thread") + finally: + queue.task_done() + except self.queue_empty: + continue + t = threading.Thread(target=thread_worker) + + # Even though these should be joined, it's possible stop might + # not issue in time so we set them to daemon to let the program + # exit anyways + t.daemon = True + t.start() + return t + + def start(self): + """Start the worker threads.""" + with self._state_change: + if self._running: + return + + # Spawn our worker threads, we have + # - A callback worker for watch events to be called + # - A completion worker for completion events to be called + for queue in (self.completion_queue, self.callback_queue): + w = self._create_thread_worker(queue) + self._workers.append(w) + self._running = True + atexit.register(self.stop) + + def stop(self): + """Stop the worker threads and empty all queues.""" + with self._state_change: + if not self._running: + return + + self._running = False + + for queue in (self.completion_queue, self.callback_queue): + queue.put(_STOP) + + self._workers.reverse() + while self._workers: + worker = self._workers.pop() + worker.join() + + # Clear the queues + self.callback_queue = self.queue_impl() + self.completion_queue = self.queue_impl() + if hasattr(atexit, "unregister"): + atexit.unregister(self.stop) + + def select(self, *args, **kwargs): + return select.select(*args, **kwargs) + + def socket(self): + return create_tcp_socket(socket) + + def create_connection(self, *args, **kwargs): + return create_tcp_connection(socket, *args, **kwargs) + + def event_object(self): + """Create an appropriate Event object""" + return threading.Event() + + def lock_object(self): + """Create a lock object""" + return threading.Lock() + + def rlock_object(self): + """Create an appropriate RLock object""" + return threading.RLock() + + def async_result(self): + """Create a :class:`AsyncResult` instance""" + return AsyncResult(self) + + def spawn(self, func, *args, **kwargs): + t = threading.Thread(target=func, args=args, kwargs=kwargs) + t.daemon = True + t.start() + return t + + def dispatch_callback(self, callback): + """Dispatch to the callback object + + The callback is put on separate queues to run depending on the + type as documented for the :class:`SequentialThreadingHandler`. + + """ + self.callback_queue.put(lambda: callback.func(*callback.args))
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/handlers/utils.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/handlers/utils.py b/slider-agent/src/main/python/kazoo/handlers/utils.py new file mode 100644 index 0000000..ad204b5 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/handlers/utils.py @@ -0,0 +1,93 @@ +"""Kazoo handler helpers""" + +HAS_FNCTL = True +try: + import fcntl +except ImportError: # pragma: nocover + HAS_FNCTL = False +import functools +import os + + +def _set_fd_cloexec(fd): + flags = fcntl.fcntl(fd, fcntl.F_GETFD) + fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) + + +def _set_default_tcpsock_options(module, sock): + sock.setsockopt(module.IPPROTO_TCP, module.TCP_NODELAY, 1) + if HAS_FNCTL: + _set_fd_cloexec(sock) + return sock + + +def create_pipe(): + """Create a non-blocking read/write pipe. + """ + r, w = os.pipe() + if HAS_FNCTL: + fcntl.fcntl(r, fcntl.F_SETFL, os.O_NONBLOCK) + fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK) + _set_fd_cloexec(r) + _set_fd_cloexec(w) + return r, w + + +def create_tcp_socket(module): + """Create a TCP socket with the CLOEXEC flag set. + """ + type_ = module.SOCK_STREAM + if hasattr(module, 'SOCK_CLOEXEC'): # pragma: nocover + # if available, set cloexec flag during socket creation + type_ |= module.SOCK_CLOEXEC + sock = module.socket(module.AF_INET, type_) + _set_default_tcpsock_options(module, sock) + return sock + + +def create_tcp_connection(module, address, timeout=None): + if timeout is None: + # thanks to create_connection() developers for + # this ugliness... + timeout = module._GLOBAL_DEFAULT_TIMEOUT + + sock = module.create_connection(address, timeout) + _set_default_tcpsock_options(module, sock) + return sock + + +def capture_exceptions(async_result): + """Return a new decorated function that propagates the exceptions of the + wrapped function to an async_result. + + :param async_result: An async result implementing :class:`IAsyncResult` + + """ + def capture(function): + @functools.wraps(function) + def captured_function(*args, **kwargs): + try: + return function(*args, **kwargs) + except Exception as exc: + async_result.set_exception(exc) + return captured_function + return capture + + +def wrap(async_result): + """Return a new decorated function that propagates the return value or + exception of wrapped function to an async_result. NOTE: Only propagates a + non-None return value. + + :param async_result: An async result implementing :class:`IAsyncResult` + + """ + def capture(function): + @capture_exceptions(async_result) + def captured_function(*args, **kwargs): + value = function(*args, **kwargs) + if value is not None: + async_result.set(value) + return value + return captured_function + return capture http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/hosts.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/hosts.py b/slider-agent/src/main/python/kazoo/hosts.py new file mode 100644 index 0000000..15ce447 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/hosts.py @@ -0,0 +1,26 @@ +import random + +try: + from urlparse import urlsplit +except ImportError: + # try python3 then + from urllib.parse import urlsplit + +def collect_hosts(hosts, randomize=True): + """Collect a set of hosts and an optional chroot from a string.""" + host_ports, chroot = hosts.partition("/")[::2] + chroot = "/" + chroot if chroot else None + + result = [] + for host_port in host_ports.split(","): + # put all complexity of dealing with + # IPv4 & IPv6 address:port on the urlsplit + res = urlsplit("xxx://" + host_port) + host = res.hostname + port = int(res.port) if res.port else 2181 + result.append((host.strip(), port)) + + if randomize: + random.shuffle(result) + + return result, chroot http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/interfaces.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/interfaces.py b/slider-agent/src/main/python/kazoo/interfaces.py new file mode 100644 index 0000000..351f1fd --- /dev/null +++ b/slider-agent/src/main/python/kazoo/interfaces.py @@ -0,0 +1,203 @@ +"""Kazoo Interfaces + +.. versionchanged:: 1.4 + + The classes in this module used to be interface declarations based on + `zope.interface.Interface`. They were converted to normal classes and + now serve as documentation only. + +""" + +# public API + + +class IHandler(object): + """A Callback Handler for Zookeeper completion and watch callbacks. + + This object must implement several methods responsible for + determining how completion / watch callbacks are handled as well as + the method for calling :class:`IAsyncResult` callback functions. + + These functions are used to abstract differences between a Python + threading environment and asynchronous single-threaded environments + like gevent. The minimum functionality needed for Kazoo to handle + these differences is encompassed in this interface. + + The Handler should document how callbacks are called for: + + * Zookeeper completion events + * Zookeeper watch events + + .. attribute:: name + + Human readable name of the Handler interface. + + .. attribute:: timeout_exception + + Exception class that should be thrown and captured if a + result is not available within the given time. + + .. attribute:: sleep_func + + Appropriate sleep function that can be called with a single + argument and sleep. + + """ + + def start(self): + """Start the handler, used for setting up the handler.""" + + def stop(self): + """Stop the handler. Should block until the handler is safely + stopped.""" + + def select(self): + """A select method that implements Python's select.select + API""" + + def socket(self): + """A socket method that implements Python's socket.socket + API""" + + def create_connection(self): + """A socket method that implements Python's + socket.create_connection API""" + + def event_object(self): + """Return an appropriate object that implements Python's + threading.Event API""" + + def lock_object(self): + """Return an appropriate object that implements Python's + threading.Lock API""" + + def rlock_object(self): + """Return an appropriate object that implements Python's + threading.RLock API""" + + def async_result(self): + """Return an instance that conforms to the + :class:`~IAsyncResult` interface appropriate for this + handler""" + + def spawn(self, func, *args, **kwargs): + """Spawn a function to run asynchronously + + :param args: args to call the function with. + :param kwargs: keyword args to call the function with. + + This method should return immediately and execute the function + with the provided args and kwargs in an asynchronous manner. + + """ + + def dispatch_callback(self, callback): + """Dispatch to the callback object + + :param callback: A :class:`~kazoo.protocol.states.Callback` + object to be called. + + """ + + +class IAsyncResult(object): + """An Async Result object that can be queried for a value that has + been set asynchronously. + + This object is modeled on the ``gevent`` AsyncResult object. + + The implementation must account for the fact that the :meth:`set` + and :meth:`set_exception` methods will be called from within the + Zookeeper thread which may require extra care under asynchronous + environments. + + .. attribute:: value + + Holds the value passed to :meth:`set` if :meth:`set` was + called. Otherwise `None`. + + .. attribute:: exception + + Holds the exception instance passed to :meth:`set_exception` + if :meth:`set_exception` was called. Otherwise `None`. + + """ + + def ready(self): + """Return `True` if and only if it holds a value or an + exception""" + + def successful(self): + """Return `True` if and only if it is ready and holds a + value""" + + def set(self, value=None): + """Store the value. Wake up the waiters. + + :param value: Value to store as the result. + + Any waiters blocking on :meth:`get` or :meth:`wait` are woken + up. Sequential calls to :meth:`wait` and :meth:`get` will not + block at all.""" + + def set_exception(self, exception): + """Store the exception. Wake up the waiters. + + :param exception: Exception to raise when fetching the value. + + Any waiters blocking on :meth:`get` or :meth:`wait` are woken + up. Sequential calls to :meth:`wait` and :meth:`get` will not + block at all.""" + + def get(self, block=True, timeout=None): + """Return the stored value or raise the exception + + :param block: Whether this method should block or return + immediately. + :type block: bool + :param timeout: How long to wait for a value when `block` is + `True`. + :type timeout: float + + If this instance already holds a value / an exception, return / + raise it immediately. Otherwise, block until :meth:`set` or + :meth:`set_exception` has been called or until the optional + timeout occurs.""" + + def get_nowait(self): + """Return the value or raise the exception without blocking. + + If nothing is available, raise the Timeout exception class on + the associated :class:`IHandler` interface.""" + + def wait(self, timeout=None): + """Block until the instance is ready. + + :param timeout: How long to wait for a value when `block` is + `True`. + :type timeout: float + + If this instance already holds a value / an exception, return / + raise it immediately. Otherwise, block until :meth:`set` or + :meth:`set_exception` has been called or until the optional + timeout occurs.""" + + def rawlink(self, callback): + """Register a callback to call when a value or an exception is + set + + :param callback: + A callback function to call after :meth:`set` or + :meth:`set_exception` has been called. This function will + be passed a single argument, this instance. + :type callback: func + + """ + + def unlink(self, callback): + """Remove the callback set by :meth:`rawlink` + + :param callback: A callback function to remove. + :type callback: func + + """ http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/loggingsupport.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/loggingsupport.py b/slider-agent/src/main/python/kazoo/loggingsupport.py new file mode 100644 index 0000000..5ed2f8f --- /dev/null +++ b/slider-agent/src/main/python/kazoo/loggingsupport.py @@ -0,0 +1,2 @@ +BLATHER = 5 # log level for low-level debugging + http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/protocol/__init__.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/protocol/__init__.py b/slider-agent/src/main/python/kazoo/protocol/__init__.py new file mode 100644 index 0000000..792d600 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/protocol/__init__.py @@ -0,0 +1 @@ +# http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/protocol/connection.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/protocol/connection.py b/slider-agent/src/main/python/kazoo/protocol/connection.py new file mode 100644 index 0000000..e6ad63d --- /dev/null +++ b/slider-agent/src/main/python/kazoo/protocol/connection.py @@ -0,0 +1,623 @@ +"""Zookeeper Protocol Connection Handler""" +import logging +import os +import random +import select +import socket +import sys +import time +from binascii import hexlify +from contextlib import contextmanager + +from kazoo.exceptions import ( + AuthFailedError, + ConnectionDropped, + EXCEPTIONS, + SessionExpiredError, + NoNodeError +) +from kazoo.handlers.utils import create_pipe +from kazoo.loggingsupport import BLATHER +from kazoo.protocol.serialization import ( + Auth, + Close, + Connect, + Exists, + GetChildren, + Ping, + PingInstance, + ReplyHeader, + Transaction, + Watch, + int_struct +) +from kazoo.protocol.states import ( + Callback, + KeeperState, + WatchedEvent, + EVENT_TYPE_MAP, +) +from kazoo.retry import ( + ForceRetryError, + RetryFailedError +) + +log = logging.getLogger(__name__) + + +# Special testing hook objects used to force a session expired error as +# if it came from the server +_SESSION_EXPIRED = object() +_CONNECTION_DROP = object() + +STOP_CONNECTING = object() + +CREATED_EVENT = 1 +DELETED_EVENT = 2 +CHANGED_EVENT = 3 +CHILD_EVENT = 4 + +WATCH_XID = -1 +PING_XID = -2 +AUTH_XID = -4 + +CLOSE_RESPONSE = Close.type + +if sys.version_info > (3, ): # pragma: nocover + def buffer(obj, offset=0): + return memoryview(obj)[offset:] + + advance_iterator = next +else: # pragma: nocover + def advance_iterator(it): + return it.next() + + +class RWPinger(object): + """A Read/Write Server Pinger Iterable + + This object is initialized with the hosts iterator object and the + socket creation function. Anytime `next` is called on its iterator + it yields either False, or a host, port tuple if it found a r/w + capable Zookeeper node. + + After the first run-through of hosts, an exponential back-off delay + is added before the next run. This delay is tracked internally and + the iterator will yield False if called too soon. + + """ + def __init__(self, hosts, connection_func, socket_handling): + self.hosts = hosts + self.connection = connection_func + self.last_attempt = None + self.socket_handling = socket_handling + + def __iter__(self): + if not self.last_attempt: + self.last_attempt = time.time() + delay = 0.5 + while True: + yield self._next_server(delay) + + def _next_server(self, delay): + jitter = random.randint(0, 100) / 100.0 + while time.time() < self.last_attempt + delay + jitter: + # Skip rw ping checks if its too soon + return False + for host, port in self.hosts: + log.debug("Pinging server for r/w: %s:%s", host, port) + self.last_attempt = time.time() + try: + with self.socket_handling(): + sock = self.connection((host, port)) + sock.sendall(b"isro") + result = sock.recv(8192) + sock.close() + if result == b'rw': + return (host, port) + else: + return False + except ConnectionDropped: + return False + + # Add some jitter between host pings + while time.time() < self.last_attempt + jitter: + return False + delay *= 2 + + +class RWServerAvailable(Exception): + """Thrown if a RW Server becomes available""" + + +class ConnectionHandler(object): + """Zookeeper connection handler""" + def __init__(self, client, retry_sleeper, logger=None): + self.client = client + self.handler = client.handler + self.retry_sleeper = retry_sleeper + self.logger = logger or log + + # Our event objects + self.connection_closed = client.handler.event_object() + self.connection_closed.set() + self.connection_stopped = client.handler.event_object() + self.connection_stopped.set() + self.ping_outstanding = client.handler.event_object() + + self._read_pipe = None + self._write_pipe = None + + self._socket = None + self._xid = None + self._rw_server = None + self._ro_mode = False + + self._connection_routine = None + + # This is instance specific to avoid odd thread bug issues in Python + # during shutdown global cleanup + @contextmanager + def _socket_error_handling(self): + try: + yield + except (socket.error, select.error) as e: + err = getattr(e, 'strerror', e) + raise ConnectionDropped("socket connection error: %s" % (err,)) + + def start(self): + """Start the connection up""" + if self.connection_closed.is_set(): + self._read_pipe, self._write_pipe = create_pipe() + self.connection_closed.clear() + if self._connection_routine: + raise Exception("Unable to start, connection routine already " + "active.") + self._connection_routine = self.handler.spawn(self.zk_loop) + + def stop(self, timeout=None): + """Ensure the writer has stopped, wait to see if it does.""" + self.connection_stopped.wait(timeout) + if self._connection_routine: + self._connection_routine.join() + self._connection_routine = None + return self.connection_stopped.is_set() + + def close(self): + """Release resources held by the connection + + The connection can be restarted afterwards. + """ + if not self.connection_stopped.is_set(): + raise Exception("Cannot close connection until it is stopped") + self.connection_closed.set() + wp, rp = self._write_pipe, self._read_pipe + self._write_pipe = self._read_pipe = None + if wp is not None: + os.close(wp) + if rp is not None: + os.close(rp) + + def _server_pinger(self): + """Returns a server pinger iterable, that will ping the next + server in the list, and apply a back-off between attempts.""" + return RWPinger(self.client.hosts, self.handler.create_connection, + self._socket_error_handling) + + def _read_header(self, timeout): + b = self._read(4, timeout) + length = int_struct.unpack(b)[0] + b = self._read(length, timeout) + header, offset = ReplyHeader.deserialize(b, 0) + return header, b, offset + + def _read(self, length, timeout): + msgparts = [] + remaining = length + with self._socket_error_handling(): + while remaining > 0: + s = self.handler.select([self._socket], [], [], timeout)[0] + if not s: # pragma: nocover + # If the read list is empty, we got a timeout. We don't + # have to check wlist and xlist as we don't set any + raise self.handler.timeout_exception("socket time-out") + + chunk = self._socket.recv(remaining) + if chunk == b'': + raise ConnectionDropped('socket connection broken') + msgparts.append(chunk) + remaining -= len(chunk) + return b"".join(msgparts) + + def _invoke(self, timeout, request, xid=None): + """A special writer used during connection establishment + only""" + self._submit(request, timeout, xid) + zxid = None + if xid: + header, buffer, offset = self._read_header(timeout) + if header.xid != xid: + raise RuntimeError('xids do not match, expected %r received %r', + xid, header.xid) + if header.zxid > 0: + zxid = header.zxid + if header.err: + callback_exception = EXCEPTIONS[header.err]() + self.logger.debug( + 'Received error(xid=%s) %r', xid, callback_exception) + raise callback_exception + return zxid + + msg = self._read(4, timeout) + length = int_struct.unpack(msg)[0] + msg = self._read(length, timeout) + + if hasattr(request, 'deserialize'): + try: + obj, _ = request.deserialize(msg, 0) + except Exception: + self.logger.exception("Exception raised during deserialization" + " of request: %s", request) + + # raise ConnectionDropped so connect loop will retry + raise ConnectionDropped('invalid server response') + self.logger.log(BLATHER, 'Read response %s', obj) + return obj, zxid + + return zxid + + def _submit(self, request, timeout, xid=None): + """Submit a request object with a timeout value and optional + xid""" + b = bytearray() + if xid: + b.extend(int_struct.pack(xid)) + if request.type: + b.extend(int_struct.pack(request.type)) + b += request.serialize() + self.logger.log((BLATHER if isinstance(request, Ping) else logging.DEBUG), + "Sending request(xid=%s): %s", xid, request) + self._write(int_struct.pack(len(b)) + b, timeout) + + def _write(self, msg, timeout): + """Write a raw msg to the socket""" + sent = 0 + msg_length = len(msg) + with self._socket_error_handling(): + while sent < msg_length: + s = self.handler.select([], [self._socket], [], timeout)[1] + if not s: # pragma: nocover + # If the write list is empty, we got a timeout. We don't + # have to check rlist and xlist as we don't set any + raise self.handler.timeout_exception("socket time-out") + msg_slice = buffer(msg, sent) + bytes_sent = self._socket.send(msg_slice) + if not bytes_sent: + raise ConnectionDropped('socket connection broken') + sent += bytes_sent + + def _read_watch_event(self, buffer, offset): + client = self.client + watch, offset = Watch.deserialize(buffer, offset) + path = watch.path + + self.logger.debug('Received EVENT: %s', watch) + + watchers = [] + + if watch.type in (CREATED_EVENT, CHANGED_EVENT): + watchers.extend(client._data_watchers.pop(path, [])) + elif watch.type == DELETED_EVENT: + watchers.extend(client._data_watchers.pop(path, [])) + watchers.extend(client._child_watchers.pop(path, [])) + elif watch.type == CHILD_EVENT: + watchers.extend(client._child_watchers.pop(path, [])) + else: + self.logger.warn('Received unknown event %r', watch.type) + return + + # Strip the chroot if needed + path = client.unchroot(path) + ev = WatchedEvent(EVENT_TYPE_MAP[watch.type], client._state, path) + + # Last check to ignore watches if we've been stopped + if client._stopped.is_set(): + return + + # Dump the watchers to the watch thread + for watch in watchers: + client.handler.dispatch_callback(Callback('watch', watch, (ev,))) + + def _read_response(self, header, buffer, offset): + client = self.client + request, async_object, xid = client._pending.popleft() + if header.zxid and header.zxid > 0: + client.last_zxid = header.zxid + if header.xid != xid: + raise RuntimeError('xids do not match, expected %r ' + 'received %r', xid, header.xid) + + # Determine if its an exists request and a no node error + exists_error = (header.err == NoNodeError.code and + request.type == Exists.type) + + # Set the exception if its not an exists error + if header.err and not exists_error: + callback_exception = EXCEPTIONS[header.err]() + self.logger.debug( + 'Received error(xid=%s) %r', xid, callback_exception) + if async_object: + async_object.set_exception(callback_exception) + elif request and async_object: + if exists_error: + # It's a NoNodeError, which is fine for an exists + # request + async_object.set(None) + else: + try: + response = request.deserialize(buffer, offset) + except Exception as exc: + self.logger.exception("Exception raised during deserialization" + " of request: %s", request) + async_object.set_exception(exc) + return + self.logger.debug( + 'Received response(xid=%s): %r', xid, response) + + # We special case a Transaction as we have to unchroot things + if request.type == Transaction.type: + response = Transaction.unchroot(client, response) + + async_object.set(response) + + # Determine if watchers should be registered + watcher = getattr(request, 'watcher', None) + if not client._stopped.is_set() and watcher: + if isinstance(request, GetChildren): + client._child_watchers[request.path].add(watcher) + else: + client._data_watchers[request.path].add(watcher) + + if isinstance(request, Close): + self.logger.log(BLATHER, 'Read close response') + return CLOSE_RESPONSE + + def _read_socket(self, read_timeout): + """Called when there's something to read on the socket""" + client = self.client + + header, buffer, offset = self._read_header(read_timeout) + if header.xid == PING_XID: + self.logger.log(BLATHER, 'Received Ping') + self.ping_outstanding.clear() + elif header.xid == AUTH_XID: + self.logger.log(BLATHER, 'Received AUTH') + + request, async_object, xid = client._pending.popleft() + if header.err: + async_object.set_exception(AuthFailedError()) + client._session_callback(KeeperState.AUTH_FAILED) + else: + async_object.set(True) + elif header.xid == WATCH_XID: + self._read_watch_event(buffer, offset) + else: + self.logger.log(BLATHER, 'Reading for header %r', header) + + return self._read_response(header, buffer, offset) + + def _send_request(self, read_timeout, connect_timeout): + """Called when we have something to send out on the socket""" + client = self.client + try: + request, async_object = client._queue[0] + except IndexError: + # Not actually something on the queue, this can occur if + # something happens to cancel the request such that we + # don't clear the pipe below after sending + try: + # Clear possible inconsistence (no request in the queue + # but have data in the read pipe), which causes cpu to spin. + os.read(self._read_pipe, 1) + except OSError: + pass + return + + # Special case for testing, if this is a _SessionExpire object + # then throw a SessionExpiration error as if we were dropped + if request is _SESSION_EXPIRED: + raise SessionExpiredError("Session expired: Testing") + if request is _CONNECTION_DROP: + raise ConnectionDropped("Connection dropped: Testing") + + # Special case for auth packets + if request.type == Auth.type: + xid = AUTH_XID + else: + self._xid += 1 + xid = self._xid + + self._submit(request, connect_timeout, xid) + client._queue.popleft() + os.read(self._read_pipe, 1) + client._pending.append((request, async_object, xid)) + + def _send_ping(self, connect_timeout): + self.ping_outstanding.set() + self._submit(PingInstance, connect_timeout, PING_XID) + + # Determine if we need to check for a r/w server + if self._ro_mode: + result = advance_iterator(self._ro_mode) + if result: + self._rw_server = result + raise RWServerAvailable() + + def zk_loop(self): + """Main Zookeeper handling loop""" + self.logger.log(BLATHER, 'ZK loop started') + + self.connection_stopped.clear() + + retry = self.retry_sleeper.copy() + try: + while not self.client._stopped.is_set(): + # If the connect_loop returns STOP_CONNECTING, stop retrying + if retry(self._connect_loop, retry) is STOP_CONNECTING: + break + except RetryFailedError: + self.logger.warning("Failed connecting to Zookeeper " + "within the connection retry policy.") + finally: + self.connection_stopped.set() + self.client._session_callback(KeeperState.CLOSED) + self.logger.log(BLATHER, 'Connection stopped') + + def _connect_loop(self, retry): + # Iterate through the hosts a full cycle before starting over + status = None + for host, port in self.client.hosts: + if self.client._stopped.is_set(): + status = STOP_CONNECTING + break + status = self._connect_attempt(host, port, retry) + if status is STOP_CONNECTING: + break + + if status is STOP_CONNECTING: + return STOP_CONNECTING + else: + raise ForceRetryError('Reconnecting') + + def _connect_attempt(self, host, port, retry): + client = self.client + TimeoutError = self.handler.timeout_exception + close_connection = False + + self._socket = None + + # Were we given a r/w server? If so, use that instead + if self._rw_server: + self.logger.log(BLATHER, + "Found r/w server to use, %s:%s", host, port) + host, port = self._rw_server + self._rw_server = None + + if client._state != KeeperState.CONNECTING: + client._session_callback(KeeperState.CONNECTING) + + try: + read_timeout, connect_timeout = self._connect(host, port) + read_timeout = read_timeout / 1000.0 + connect_timeout = connect_timeout / 1000.0 + retry.reset() + self._xid = 0 + + while not close_connection: + # Watch for something to read or send + jitter_time = random.randint(0, 40) / 100.0 + # Ensure our timeout is positive + timeout = max([read_timeout / 2.0 - jitter_time, jitter_time]) + s = self.handler.select([self._socket, self._read_pipe], + [], [], timeout)[0] + + if not s: + if self.ping_outstanding.is_set(): + self.ping_outstanding.clear() + raise ConnectionDropped( + "outstanding heartbeat ping not received") + self._send_ping(connect_timeout) + elif s[0] == self._socket: + response = self._read_socket(read_timeout) + close_connection = response == CLOSE_RESPONSE + else: + self._send_request(read_timeout, connect_timeout) + + self.logger.info('Closing connection to %s:%s', host, port) + client._session_callback(KeeperState.CLOSED) + return STOP_CONNECTING + except (ConnectionDropped, TimeoutError) as e: + if isinstance(e, ConnectionDropped): + self.logger.warning('Connection dropped: %s', e) + else: + self.logger.warning('Connection time-out') + if client._state != KeeperState.CONNECTING: + self.logger.warning("Transition to CONNECTING") + client._session_callback(KeeperState.CONNECTING) + except AuthFailedError: + retry.reset() + self.logger.warning('AUTH_FAILED closing') + client._session_callback(KeeperState.AUTH_FAILED) + return STOP_CONNECTING + except SessionExpiredError: + retry.reset() + self.logger.warning('Session has expired') + client._session_callback(KeeperState.EXPIRED_SESSION) + except RWServerAvailable: + retry.reset() + self.logger.warning('Found a RW server, dropping connection') + client._session_callback(KeeperState.CONNECTING) + except Exception: + self.logger.exception('Unhandled exception in connection loop') + raise + finally: + if self._socket is not None: + self._socket.close() + + def _connect(self, host, port): + client = self.client + self.logger.info('Connecting to %s:%s', host, port) + + self.logger.log(BLATHER, + ' Using session_id: %r session_passwd: %s', + client._session_id, + hexlify(client._session_passwd)) + + with self._socket_error_handling(): + self._socket = self.handler.create_connection( + (host, port), client._session_timeout / 1000.0) + + self._socket.setblocking(0) + + connect = Connect(0, client.last_zxid, client._session_timeout, + client._session_id or 0, client._session_passwd, + client.read_only) + + connect_result, zxid = self._invoke(client._session_timeout, connect) + + if connect_result.time_out <= 0: + raise SessionExpiredError("Session has expired") + + if zxid: + client.last_zxid = zxid + + # Load return values + client._session_id = connect_result.session_id + client._protocol_version = connect_result.protocol_version + negotiated_session_timeout = connect_result.time_out + connect_timeout = negotiated_session_timeout / len(client.hosts) + read_timeout = negotiated_session_timeout * 2.0 / 3.0 + client._session_passwd = connect_result.passwd + + self.logger.log(BLATHER, + 'Session created, session_id: %r session_passwd: %s\n' + ' negotiated session timeout: %s\n' + ' connect timeout: %s\n' + ' read timeout: %s', client._session_id, + hexlify(client._session_passwd), + negotiated_session_timeout, connect_timeout, + read_timeout) + + if connect_result.read_only: + client._session_callback(KeeperState.CONNECTED_RO) + self._ro_mode = iter(self._server_pinger()) + else: + client._session_callback(KeeperState.CONNECTED) + self._ro_mode = None + + for scheme, auth in client.auth_data: + ap = Auth(0, scheme, auth) + zxid = self._invoke(connect_timeout, ap, xid=AUTH_XID) + if zxid: + client.last_zxid = zxid + return read_timeout, connect_timeout http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/protocol/paths.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/protocol/paths.py b/slider-agent/src/main/python/kazoo/protocol/paths.py new file mode 100644 index 0000000..52b2d6b --- /dev/null +++ b/slider-agent/src/main/python/kazoo/protocol/paths.py @@ -0,0 +1,54 @@ +def normpath(path, trailing=False): + """Normalize path, eliminating double slashes, etc.""" + comps = path.split('/') + new_comps = [] + for comp in comps: + if comp == '': + continue + if comp in ('.', '..'): + raise ValueError('relative paths not allowed') + new_comps.append(comp) + new_path = '/'.join(new_comps) + if trailing is True and path.endswith('/'): + new_path += '/' + if path.startswith('/'): + return '/' + new_path + return new_path + + +def join(a, *p): + """Join two or more pathname components, inserting '/' as needed. + + If any component is an absolute path, all previous path components + will be discarded. + + """ + path = a + for b in p: + if b.startswith('/'): + path = b + elif path == '' or path.endswith('/'): + path += b + else: + path += '/' + b + return path + + +def isabs(s): + """Test whether a path is absolute""" + return s.startswith('/') + + +def basename(p): + """Returns the final component of a pathname""" + i = p.rfind('/') + 1 + return p[i:] + + +def _prefix_root(root, path, trailing=False): + """Prepend a root to a path. """ + return normpath(join(_norm_root(root), path.lstrip('/')), trailing=trailing) + + +def _norm_root(root): + return normpath(join('/', root)) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/protocol/serialization.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/protocol/serialization.py b/slider-agent/src/main/python/kazoo/protocol/serialization.py new file mode 100644 index 0000000..f44f49a --- /dev/null +++ b/slider-agent/src/main/python/kazoo/protocol/serialization.py @@ -0,0 +1,396 @@ +"""Zookeeper Serializers, Deserializers, and NamedTuple objects""" +from collections import namedtuple +import struct + +from kazoo.exceptions import EXCEPTIONS +from kazoo.protocol.states import ZnodeStat +from kazoo.security import ACL +from kazoo.security import Id + +# Struct objects with formats compiled +bool_struct = struct.Struct('B') +int_struct = struct.Struct('!i') +int_int_struct = struct.Struct('!ii') +int_int_long_struct = struct.Struct('!iiq') + +int_long_int_long_struct = struct.Struct('!iqiq') +multiheader_struct = struct.Struct('!iBi') +reply_header_struct = struct.Struct('!iqi') +stat_struct = struct.Struct('!qqqqiiiqiiq') + +try: # pragma: nocover + basestring +except NameError: + basestring = str + + +def read_string(buffer, offset): + """Reads an int specified buffer into a string and returns the + string and the new offset in the buffer""" + length = int_struct.unpack_from(buffer, offset)[0] + offset += int_struct.size + if length < 0: + return None, offset + else: + index = offset + offset += length + return buffer[index:index + length].decode('utf-8'), offset + + +def read_acl(bytes, offset): + perms = int_struct.unpack_from(bytes, offset)[0] + offset += int_struct.size + scheme, offset = read_string(bytes, offset) + id, offset = read_string(bytes, offset) + return ACL(perms, Id(scheme, id)), offset + + +def write_string(bytes): + if not bytes: + return int_struct.pack(-1) + else: + utf8_str = bytes.encode('utf-8') + return int_struct.pack(len(utf8_str)) + utf8_str + + +def write_buffer(bytes): + if bytes is None: + return int_struct.pack(-1) + else: + return int_struct.pack(len(bytes)) + bytes + + +def read_buffer(bytes, offset): + length = int_struct.unpack_from(bytes, offset)[0] + offset += int_struct.size + if length < 0: + return None, offset + else: + index = offset + offset += length + return bytes[index:index + length], offset + + +class Close(namedtuple('Close', '')): + type = -11 + + @classmethod + def serialize(cls): + return b'' + +CloseInstance = Close() + + +class Ping(namedtuple('Ping', '')): + type = 11 + + @classmethod + def serialize(cls): + return b'' + +PingInstance = Ping() + + +class Connect(namedtuple('Connect', 'protocol_version last_zxid_seen' + ' time_out session_id passwd read_only')): + type = None + + def serialize(self): + b = bytearray() + b.extend(int_long_int_long_struct.pack( + self.protocol_version, self.last_zxid_seen, self.time_out, + self.session_id)) + b.extend(write_buffer(self.passwd)) + b.extend([1 if self.read_only else 0]) + return b + + @classmethod + def deserialize(cls, bytes, offset): + proto_version, timeout, session_id = int_int_long_struct.unpack_from( + bytes, offset) + offset += int_int_long_struct.size + password, offset = read_buffer(bytes, offset) + + try: + read_only = bool_struct.unpack_from(bytes, offset)[0] is 1 + offset += bool_struct.size + except struct.error: + read_only = False + return cls(proto_version, 0, timeout, session_id, password, + read_only), offset + + +class Create(namedtuple('Create', 'path data acl flags')): + type = 1 + + def serialize(self): + b = bytearray() + b.extend(write_string(self.path)) + b.extend(write_buffer(self.data)) + b.extend(int_struct.pack(len(self.acl))) + for acl in self.acl: + b.extend(int_struct.pack(acl.perms) + + write_string(acl.id.scheme) + write_string(acl.id.id)) + b.extend(int_struct.pack(self.flags)) + return b + + @classmethod + def deserialize(cls, bytes, offset): + return read_string(bytes, offset)[0] + + +class Delete(namedtuple('Delete', 'path version')): + type = 2 + + def serialize(self): + b = bytearray() + b.extend(write_string(self.path)) + b.extend(int_struct.pack(self.version)) + return b + + @classmethod + def deserialize(self, bytes, offset): + return True + + +class Exists(namedtuple('Exists', 'path watcher')): + type = 3 + + def serialize(self): + b = bytearray() + b.extend(write_string(self.path)) + b.extend([1 if self.watcher else 0]) + return b + + @classmethod + def deserialize(cls, bytes, offset): + stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset)) + return stat if stat.czxid != -1 else None + + +class GetData(namedtuple('GetData', 'path watcher')): + type = 4 + + def serialize(self): + b = bytearray() + b.extend(write_string(self.path)) + b.extend([1 if self.watcher else 0]) + return b + + @classmethod + def deserialize(cls, bytes, offset): + data, offset = read_buffer(bytes, offset) + stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset)) + return data, stat + + +class SetData(namedtuple('SetData', 'path data version')): + type = 5 + + def serialize(self): + b = bytearray() + b.extend(write_string(self.path)) + b.extend(write_buffer(self.data)) + b.extend(int_struct.pack(self.version)) + return b + + @classmethod + def deserialize(cls, bytes, offset): + return ZnodeStat._make(stat_struct.unpack_from(bytes, offset)) + + +class GetACL(namedtuple('GetACL', 'path')): + type = 6 + + def serialize(self): + return bytearray(write_string(self.path)) + + @classmethod + def deserialize(cls, bytes, offset): + count = int_struct.unpack_from(bytes, offset)[0] + offset += int_struct.size + if count == -1: # pragma: nocover + return [] + + acls = [] + for c in range(count): + acl, offset = read_acl(bytes, offset) + acls.append(acl) + stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset)) + return acls, stat + + +class SetACL(namedtuple('SetACL', 'path acls version')): + type = 7 + + def serialize(self): + b = bytearray() + b.extend(write_string(self.path)) + b.extend(int_struct.pack(len(self.acls))) + for acl in self.acls: + b.extend(int_struct.pack(acl.perms) + + write_string(acl.id.scheme) + write_string(acl.id.id)) + b.extend(int_struct.pack(self.version)) + return b + + @classmethod + def deserialize(cls, bytes, offset): + return ZnodeStat._make(stat_struct.unpack_from(bytes, offset)) + + +class GetChildren(namedtuple('GetChildren', 'path watcher')): + type = 8 + + def serialize(self): + b = bytearray() + b.extend(write_string(self.path)) + b.extend([1 if self.watcher else 0]) + return b + + @classmethod + def deserialize(cls, bytes, offset): + count = int_struct.unpack_from(bytes, offset)[0] + offset += int_struct.size + if count == -1: # pragma: nocover + return [] + + children = [] + for c in range(count): + child, offset = read_string(bytes, offset) + children.append(child) + return children + + +class Sync(namedtuple('Sync', 'path')): + type = 9 + + def serialize(self): + return write_string(self.path) + + @classmethod + def deserialize(cls, buffer, offset): + return read_string(buffer, offset)[0] + + +class GetChildren2(namedtuple('GetChildren2', 'path watcher')): + type = 12 + + def serialize(self): + b = bytearray() + b.extend(write_string(self.path)) + b.extend([1 if self.watcher else 0]) + return b + + @classmethod + def deserialize(cls, bytes, offset): + count = int_struct.unpack_from(bytes, offset)[0] + offset += int_struct.size + if count == -1: # pragma: nocover + return [] + + children = [] + for c in range(count): + child, offset = read_string(bytes, offset) + children.append(child) + stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset)) + return children, stat + + +class CheckVersion(namedtuple('CheckVersion', 'path version')): + type = 13 + + def serialize(self): + b = bytearray() + b.extend(write_string(self.path)) + b.extend(int_struct.pack(self.version)) + return b + + +class Transaction(namedtuple('Transaction', 'operations')): + type = 14 + + def serialize(self): + b = bytearray() + for op in self.operations: + b.extend(MultiHeader(op.type, False, -1).serialize() + + op.serialize()) + return b + multiheader_struct.pack(-1, True, -1) + + @classmethod + def deserialize(cls, bytes, offset): + header = MultiHeader(None, False, None) + results = [] + response = None + while not header.done: + if header.type == Create.type: + response, offset = read_string(bytes, offset) + elif header.type == Delete.type: + response = True + elif header.type == SetData.type: + response = ZnodeStat._make( + stat_struct.unpack_from(bytes, offset)) + offset += stat_struct.size + elif header.type == CheckVersion.type: + response = True + elif header.type == -1: + err = int_struct.unpack_from(bytes, offset)[0] + offset += int_struct.size + response = EXCEPTIONS[err]() + if response: + results.append(response) + header, offset = MultiHeader.deserialize(bytes, offset) + return results + + @staticmethod + def unchroot(client, response): + resp = [] + for result in response: + if isinstance(result, basestring): + resp.append(client.unchroot(result)) + else: + resp.append(result) + return resp + + +class Auth(namedtuple('Auth', 'auth_type scheme auth')): + type = 100 + + def serialize(self): + return (int_struct.pack(self.auth_type) + write_string(self.scheme) + + write_string(self.auth)) + + +class Watch(namedtuple('Watch', 'type state path')): + @classmethod + def deserialize(cls, bytes, offset): + """Given bytes and the current bytes offset, return the + type, state, path, and new offset""" + type, state = int_int_struct.unpack_from(bytes, offset) + offset += int_int_struct.size + path, offset = read_string(bytes, offset) + return cls(type, state, path), offset + + +class ReplyHeader(namedtuple('ReplyHeader', 'xid, zxid, err')): + @classmethod + def deserialize(cls, bytes, offset): + """Given bytes and the current bytes offset, return a + :class:`ReplyHeader` instance and the new offset""" + new_offset = offset + reply_header_struct.size + return cls._make( + reply_header_struct.unpack_from(bytes, offset)), new_offset + + +class MultiHeader(namedtuple('MultiHeader', 'type done err')): + def serialize(self): + b = bytearray() + b.extend(int_struct.pack(self.type)) + b.extend([1 if self.done else 0]) + b.extend(int_struct.pack(self.err)) + return b + + @classmethod + def deserialize(cls, bytes, offset): + t, done, err = multiheader_struct.unpack_from(bytes, offset) + offset += multiheader_struct.size + return cls(t, done is 1, err), offset http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/protocol/states.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/protocol/states.py b/slider-agent/src/main/python/kazoo/protocol/states.py new file mode 100644 index 0000000..395c013 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/protocol/states.py @@ -0,0 +1,237 @@ +"""Kazoo State and Event objects""" +from collections import namedtuple + + +class KazooState(object): + """High level connection state values + + States inspired by Netflix Curator. + + .. attribute:: SUSPENDED + + The connection has been lost but may be recovered. We should + operate in a "safe mode" until then. When the connection is + resumed, it may be discovered that the session expired. A + client should not assume that locks are valid during this + time. + + .. attribute:: CONNECTED + + The connection is alive and well. + + .. attribute:: LOST + + The connection has been confirmed dead. Any ephemeral nodes + will need to be recreated upon re-establishing a connection. + If locks were acquired or recipes using ephemeral nodes are in + use, they can be considered lost as well. + + """ + SUSPENDED = "SUSPENDED" + CONNECTED = "CONNECTED" + LOST = "LOST" + + +class KeeperState(object): + """Zookeeper State + + Represents the Zookeeper state. Watch functions will receive a + :class:`KeeperState` attribute as their state argument. + + .. attribute:: AUTH_FAILED + + Authentication has failed, this is an unrecoverable error. + + .. attribute:: CONNECTED + + Zookeeper is connected. + + .. attribute:: CONNECTED_RO + + Zookeeper is connected in read-only state. + + .. attribute:: CONNECTING + + Zookeeper is currently attempting to establish a connection. + + .. attribute:: EXPIRED_SESSION + + The prior session was invalid, all prior ephemeral nodes are + gone. + + """ + AUTH_FAILED = 'AUTH_FAILED' + CONNECTED = 'CONNECTED' + CONNECTED_RO = 'CONNECTED_RO' + CONNECTING = 'CONNECTING' + CLOSED = 'CLOSED' + EXPIRED_SESSION = 'EXPIRED_SESSION' + + +class EventType(object): + """Zookeeper Event + + Represents a Zookeeper event. Events trigger watch functions which + will receive a :class:`EventType` attribute as their event + argument. + + .. attribute:: CREATED + + A node has been created. + + .. attribute:: DELETED + + A node has been deleted. + + .. attribute:: CHANGED + + The data for a node has changed. + + .. attribute:: CHILD + + The children under a node have changed (a child was added or + removed). This event does not indicate the data for a child + node has changed, which must have its own watch established. + + """ + CREATED = 'CREATED' + DELETED = 'DELETED' + CHANGED = 'CHANGED' + CHILD = 'CHILD' + +EVENT_TYPE_MAP = { + 1: EventType.CREATED, + 2: EventType.DELETED, + 3: EventType.CHANGED, + 4: EventType.CHILD +} + + +class WatchedEvent(namedtuple('WatchedEvent', ('type', 'state', 'path'))): + """A change on ZooKeeper that a Watcher is able to respond to. + + The :class:`WatchedEvent` includes exactly what happened, the + current state of ZooKeeper, and the path of the node that was + involved in the event. An instance of :class:`WatchedEvent` will be + passed to registered watch functions. + + .. attribute:: type + + A :class:`EventType` attribute indicating the event type. + + .. attribute:: state + + A :class:`KeeperState` attribute indicating the Zookeeper + state. + + .. attribute:: path + + The path of the node for the watch event. + + """ + + +class Callback(namedtuple('Callback', ('type', 'func', 'args'))): + """A callback that is handed to a handler for dispatch + + :param type: Type of the callback, currently is only 'watch' + :param func: Callback function + :param args: Argument list for the callback function + + """ + + +class ZnodeStat(namedtuple('ZnodeStat', 'czxid mzxid ctime mtime version' + ' cversion aversion ephemeralOwner dataLength' + ' numChildren pzxid')): + """A ZnodeStat structure with convenience properties + + When getting the value of a node from Zookeeper, the properties for + the node known as a "Stat structure" will be retrieved. The + :class:`ZnodeStat` object provides access to the standard Stat + properties and additional properties that are more readable and use + Python time semantics (seconds since epoch instead of ms). + + .. note:: + + The original Zookeeper Stat name is in parens next to the name + when it differs from the convenience attribute. These are **not + functions**, just attributes. + + .. attribute:: creation_transaction_id (czxid) + + The transaction id of the change that caused this znode to be + created. + + .. attribute:: last_modified_transaction_id (mzxid) + + The transaction id of the change that last modified this znode. + + .. attribute:: created (ctime) + + The time in seconds from epoch when this node was created. + (ctime is in milliseconds) + + .. attribute:: last_modified (mtime) + + The time in seconds from epoch when this znode was last + modified. (mtime is in milliseconds) + + .. attribute:: version + + The number of changes to the data of this znode. + + .. attribute:: acl_version (aversion) + + The number of changes to the ACL of this znode. + + .. attribute:: owner_session_id (ephemeralOwner) + + The session id of the owner of this znode if the znode is an + ephemeral node. If it is not an ephemeral node, it will be + `None`. (ephemeralOwner will be 0 if it is not ephemeral) + + .. attribute:: data_length (dataLength) + + The length of the data field of this znode. + + .. attribute:: children_count (numChildren) + + The number of children of this znode. + + """ + @property + def acl_version(self): + return self.aversion + + @property + def children_version(self): + return self.cversion + + @property + def created(self): + return self.ctime / 1000.0 + + @property + def last_modified(self): + return self.mtime / 1000.0 + + @property + def owner_session_id(self): + return self.ephemeralOwner or None + + @property + def creation_transaction_id(self): + return self.czxid + + @property + def last_modified_transaction_id(self): + return self.mzxid + + @property + def data_length(self): + return self.dataLength + + @property + def children_count(self): + return self.numChildren http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/recipe/__init__.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/recipe/__init__.py b/slider-agent/src/main/python/kazoo/recipe/__init__.py new file mode 100644 index 0000000..792d600 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/recipe/__init__.py @@ -0,0 +1 @@ +# http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/recipe/barrier.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/recipe/barrier.py b/slider-agent/src/main/python/kazoo/recipe/barrier.py new file mode 100644 index 0000000..05addb4 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/recipe/barrier.py @@ -0,0 +1,214 @@ +"""Zookeeper Barriers + +:Maintainer: None +:Status: Unknown + +""" +import os +import socket +import uuid + +from kazoo.protocol.states import EventType +from kazoo.exceptions import KazooException +from kazoo.exceptions import NoNodeError +from kazoo.exceptions import NodeExistsError + + +class Barrier(object): + """Kazoo Barrier + + Implements a barrier to block processing of a set of nodes until + a condition is met at which point the nodes will be allowed to + proceed. The barrier is in place if its node exists. + + .. warning:: + + The :meth:`wait` function does not handle connection loss and + may raise :exc:`~kazoo.exceptions.ConnectionLossException` if + the connection is lost while waiting. + + """ + def __init__(self, client, path): + """Create a Kazoo Barrier + + :param client: A :class:`~kazoo.client.KazooClient` instance. + :param path: The barrier path to use. + + """ + self.client = client + self.path = path + + def create(self): + """Establish the barrier if it doesn't exist already""" + self.client.retry(self.client.ensure_path, self.path) + + def remove(self): + """Remove the barrier + + :returns: Whether the barrier actually needed to be removed. + :rtype: bool + + """ + try: + self.client.retry(self.client.delete, self.path) + return True + except NoNodeError: + return False + + def wait(self, timeout=None): + """Wait on the barrier to be cleared + + :returns: True if the barrier has been cleared, otherwise + False. + :rtype: bool + + """ + cleared = self.client.handler.event_object() + + def wait_for_clear(event): + if event.type == EventType.DELETED: + cleared.set() + + exists = self.client.exists(self.path, watch=wait_for_clear) + if not exists: + return True + + cleared.wait(timeout) + return cleared.is_set() + + +class DoubleBarrier(object): + """Kazoo Double Barrier + + Double barriers are used to synchronize the beginning and end of + a distributed task. The barrier blocks when entering it until all + the members have joined, and blocks when leaving until all the + members have left. + + .. note:: + + You should register a listener for session loss as the process + will no longer be part of the barrier once the session is + gone. Connection losses will be retried with the default retry + policy. + + """ + def __init__(self, client, path, num_clients, identifier=None): + """Create a Double Barrier + + :param client: A :class:`~kazoo.client.KazooClient` instance. + :param path: The barrier path to use. + :param num_clients: How many clients must enter the barrier to + proceed. + :type num_clients: int + :param identifier: An identifier to use for this member of the + barrier when participating. Defaults to the + hostname + process id. + + """ + self.client = client + self.path = path + self.num_clients = num_clients + self._identifier = identifier or '%s-%s' % ( + socket.getfqdn(), os.getpid()) + self.participating = False + self.assured_path = False + self.node_name = uuid.uuid4().hex + self.create_path = self.path + "/" + self.node_name + + def enter(self): + """Enter the barrier, blocks until all nodes have entered""" + try: + self.client.retry(self._inner_enter) + self.participating = True + except KazooException: + # We failed to enter, best effort cleanup + self._best_effort_cleanup() + self.participating = False + + def _inner_enter(self): + # make sure our barrier parent node exists + if not self.assured_path: + self.client.ensure_path(self.path) + self.assured_path = True + + ready = self.client.handler.event_object() + + try: + self.client.create(self.create_path, + self._identifier.encode('utf-8'), ephemeral=True) + except NodeExistsError: + pass + + def created(event): + if event.type == EventType.CREATED: + ready.set() + + self.client.exists(self.path + '/' + 'ready', watch=created) + + children = self.client.get_children(self.path) + + if len(children) < self.num_clients: + ready.wait() + else: + self.client.ensure_path(self.path + '/ready') + return True + + def leave(self): + """Leave the barrier, blocks until all nodes have left""" + try: + self.client.retry(self._inner_leave) + except KazooException: # pragma: nocover + # Failed to cleanly leave + self._best_effort_cleanup() + self.participating = False + + def _inner_leave(self): + # Delete the ready node if its around + try: + self.client.delete(self.path + '/ready') + except NoNodeError: + pass + + while True: + children = self.client.get_children(self.path) + if not children: + return True + + if len(children) == 1 and children[0] == self.node_name: + self.client.delete(self.create_path) + return True + + children.sort() + + ready = self.client.handler.event_object() + + def deleted(event): + if event.type == EventType.DELETED: + ready.set() + + if self.node_name == children[0]: + # We're first, wait on the highest to leave + if not self.client.exists(self.path + '/' + children[-1], + watch=deleted): + continue + + ready.wait() + continue + + # Delete our node + self.client.delete(self.create_path) + + # Wait on the first + if not self.client.exists(self.path + '/' + children[0], + watch=deleted): + continue + + # Wait for the lowest to be deleted + ready.wait() + + def _best_effort_cleanup(self): + try: + self.client.retry(self.client.delete, self.create_path) + except NoNodeError: + pass http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/recipe/counter.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/recipe/counter.py b/slider-agent/src/main/python/kazoo/recipe/counter.py new file mode 100644 index 0000000..ed80f51 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/recipe/counter.py @@ -0,0 +1,94 @@ +"""Zookeeper Counter + +:Maintainer: None +:Status: Unknown + +""" + +from kazoo.exceptions import BadVersionError +from kazoo.retry import ForceRetryError + + +class Counter(object): + """Kazoo Counter + + A shared counter of either int or float values. Changes to the + counter are done atomically. The general retry policy is used to + retry operations if concurrent changes are detected. + + The data is marshaled using `repr(value)` and converted back using + `type(counter.default)(value)` both using an ascii encoding. As + such other data types might be used for the counter value. + + Counter changes can raise + :class:`~kazoo.exceptions.BadVersionError` if the retry policy + wasn't able to apply a change. + + Example usage: + + .. code-block:: python + + zk = KazooClient() + counter = zk.Counter("/int") + counter += 2 + counter -= 1 + counter.value == 1 + + counter = zk.Counter("/float", default=1.0) + counter += 2.0 + counter.value == 3.0 + + """ + def __init__(self, client, path, default=0): + """Create a Kazoo Counter + + :param client: A :class:`~kazoo.client.KazooClient` instance. + :param path: The counter path to use. + :param default: The default value. + + """ + self.client = client + self.path = path + self.default = default + self.default_type = type(default) + self._ensured_path = False + + def _ensure_node(self): + if not self._ensured_path: + # make sure our node exists + self.client.ensure_path(self.path) + self._ensured_path = True + + def _value(self): + self._ensure_node() + old, stat = self.client.get(self.path) + old = old.decode('ascii') if old != b'' else self.default + version = stat.version + data = self.default_type(old) + return data, version + + @property + def value(self): + return self._value()[0] + + def _change(self, value): + if not isinstance(value, self.default_type): + raise TypeError('invalid type for value change') + self.client.retry(self._inner_change, value) + return self + + def _inner_change(self, value): + data, version = self._value() + data = repr(data + value).encode('ascii') + try: + self.client.set(self.path, data, version=version) + except BadVersionError: # pragma: nocover + raise ForceRetryError() + + def __add__(self, value): + """Add value to counter.""" + return self._change(value) + + def __sub__(self, value): + """Subtract value from counter.""" + return self._change(-value) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/recipe/election.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/recipe/election.py b/slider-agent/src/main/python/kazoo/recipe/election.py new file mode 100644 index 0000000..3089fa6 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/recipe/election.py @@ -0,0 +1,79 @@ +"""ZooKeeper Leader Elections + +:Maintainer: None +:Status: Unknown + +""" +from kazoo.exceptions import CancelledError + + +class Election(object): + """Kazoo Basic Leader Election + + Example usage with a :class:`~kazoo.client.KazooClient` instance:: + + zk = KazooClient() + election = zk.Election("/electionpath", "my-identifier") + + # blocks until the election is won, then calls + # my_leader_function() + election.run(my_leader_function) + + """ + def __init__(self, client, path, identifier=None): + """Create a Kazoo Leader Election + + :param client: A :class:`~kazoo.client.KazooClient` instance. + :param path: The election path to use. + :param identifier: Name to use for this lock contender. This + can be useful for querying to see who the + current lock contenders are. + + """ + self.lock = client.Lock(path, identifier) + + def run(self, func, *args, **kwargs): + """Contend for the leadership + + This call will block until either this contender is cancelled + or this contender wins the election and the provided leadership + function subsequently returns or fails. + + :param func: A function to be called if/when the election is + won. + :param args: Arguments to leadership function. + :param kwargs: Keyword arguments to leadership function. + + """ + if not callable(func): + raise ValueError("leader function is not callable") + + try: + with self.lock: + func(*args, **kwargs) + + except CancelledError: + pass + + def cancel(self): + """Cancel participation in the election + + .. note:: + + If this contender has already been elected leader, this + method will not interrupt the leadership function. + + """ + self.lock.cancel() + + def contenders(self): + """Return an ordered list of the current contenders in the + election + + .. note:: + + If the contenders did not set an identifier, it will appear + as a blank string. + + """ + return self.lock.contenders()