SLIDER-285. Slider Agents to bind and work with restarted AM (Gour Saha via smohanty)
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/58014d80 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/58014d80 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/58014d80 Branch: refs/heads/feature/SLIDER-285_Restart_AM Commit: 58014d805e6f819e24b44c6394001df99c3dccb6 Parents: cebde98 Author: Sumit Mohanty <smoha...@hortonworks.com> Authored: Mon Aug 11 11:17:46 2014 -0700 Committer: Sumit Mohanty <smoha...@hortonworks.com> Committed: Mon Aug 11 20:15:38 2014 -0700 ---------------------------------------------------------------------- slider-agent/pom.xml | 4 +- .../src/main/python/agent/AgentConfig.py | 2 + slider-agent/src/main/python/agent/Constants.py | 4 +- .../src/main/python/agent/Controller.py | 52 +- slider-agent/src/main/python/agent/Heartbeat.py | 6 +- slider-agent/src/main/python/agent/Registry.py | 54 + slider-agent/src/main/python/agent/main.py | 29 +- slider-agent/src/main/python/kazoo/__init__.py | 1 + slider-agent/src/main/python/kazoo/client.py | 1412 ++++++++++++++++++ .../src/main/python/kazoo/exceptions.py | 199 +++ .../src/main/python/kazoo/handlers/__init__.py | 1 + .../src/main/python/kazoo/handlers/gevent.py | 161 ++ .../src/main/python/kazoo/handlers/threading.py | 287 ++++ .../src/main/python/kazoo/handlers/utils.py | 93 ++ slider-agent/src/main/python/kazoo/hosts.py | 26 + .../src/main/python/kazoo/interfaces.py | 203 +++ .../src/main/python/kazoo/loggingsupport.py | 2 + .../src/main/python/kazoo/protocol/__init__.py | 1 + .../main/python/kazoo/protocol/connection.py | 623 ++++++++ .../src/main/python/kazoo/protocol/paths.py | 54 + .../main/python/kazoo/protocol/serialization.py | 396 +++++ .../src/main/python/kazoo/protocol/states.py | 237 +++ .../src/main/python/kazoo/recipe/__init__.py | 1 + .../src/main/python/kazoo/recipe/barrier.py | 214 +++ .../src/main/python/kazoo/recipe/counter.py | 94 ++ .../src/main/python/kazoo/recipe/election.py | 79 + .../src/main/python/kazoo/recipe/lock.py | 520 +++++++ .../src/main/python/kazoo/recipe/partitioner.py | 377 +++++ .../src/main/python/kazoo/recipe/party.py | 118 ++ .../src/main/python/kazoo/recipe/queue.py | 321 ++++ .../src/main/python/kazoo/recipe/watchers.py | 419 ++++++ slider-agent/src/main/python/kazoo/retry.py | 150 ++ slider-agent/src/main/python/kazoo/security.py | 138 ++ .../src/main/python/kazoo/testing/__init__.py | 5 + .../src/main/python/kazoo/testing/common.py | 283 ++++ .../src/main/python/kazoo/testing/harness.py | 180 +++ .../src/main/python/kazoo/tests/__init__.py | 0 .../src/main/python/kazoo/tests/test_barrier.py | 157 ++ .../src/main/python/kazoo/tests/test_build.py | 29 + .../src/main/python/kazoo/tests/test_client.py | 1098 ++++++++++++++ .../main/python/kazoo/tests/test_connection.py | 319 ++++ .../src/main/python/kazoo/tests/test_counter.py | 35 + .../main/python/kazoo/tests/test_election.py | 139 ++ .../main/python/kazoo/tests/test_exceptions.py | 22 + .../python/kazoo/tests/test_gevent_handler.py | 160 ++ .../src/main/python/kazoo/tests/test_lock.py | 517 +++++++ .../main/python/kazoo/tests/test_partitioner.py | 92 ++ .../src/main/python/kazoo/tests/test_party.py | 84 ++ .../src/main/python/kazoo/tests/test_paths.py | 98 ++ .../src/main/python/kazoo/tests/test_queue.py | 179 +++ .../src/main/python/kazoo/tests/test_retry.py | 77 + .../main/python/kazoo/tests/test_security.py | 40 + .../kazoo/tests/test_threading_handler.py | 326 ++++ .../main/python/kazoo/tests/test_watchers.py | 489 ++++++ .../src/main/python/kazoo/tests/util.py | 126 ++ slider-agent/src/test/python/agent/TestMain.py | 6 +- .../org/apache/slider/common/SliderKeys.java | 5 + .../providers/AbstractProviderService.java | 18 + .../slider/providers/ProviderService.java | 22 + .../slider/providers/agent/AgentKeys.java | 2 + .../providers/agent/AgentProviderService.java | 126 +- .../server/appmaster/SliderAppMaster.java | 7 + .../slider/server/appmaster/state/AppState.java | 8 + .../appmaster/web/rest/agent/HeartBeat.java | 11 + .../model/mock/MockProviderService.groovy | 12 + .../agent/TestAgentProviderService.java | 26 +- slider-core/src/test/python/agent/main.py | 5 +- 67 files changed, 10918 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/pom.xml ---------------------------------------------------------------------- diff --git a/slider-agent/pom.xml b/slider-agent/pom.xml index 4e67836..e230ec1 100644 --- a/slider-agent/pom.xml +++ b/slider-agent/pom.xml @@ -69,7 +69,7 @@ <argument>unitTests.py</argument> </arguments> <environmentVariables> - <PYTHONPATH>${project.basedir}/src/main/python/jinja2:${project.basedir}/src/test/python:${project.basedir}/src/main/python:${project.basedir}/src/main/python/agent:${project.basedir}/src/main/python/resource_management:${project.basedir}/src/test/python/agent:${project.basedir}/src/test/python/resource_management</PYTHONPATH> + <PYTHONPATH>${project.basedir}/src/main/python/jinja2:${project.basedir}/src/test/python:${project.basedir}/src/main/python:${project.basedir}/src/main/python/agent:${project.basedir}/src/main/python/resource_management:${project.basedir}/src/test/python/agent:${project.basedir}/src/test/python/resource_management:${project.basedir}/src/main/python/kazoo</PYTHONPATH> </environmentVariables> <skip>${skipTests}</skip> </configuration> @@ -102,6 +102,8 @@ <exclude>src/main/python/jinja2/**</exclude> <!-- mock files (BSD license) --> <exclude>src/test/python/mock/**</exclude> + <!-- kazoo files (Apache License, Version 2.0) --> + <exclude>src/main/python/kazoo/**</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/agent/AgentConfig.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/AgentConfig.py b/slider-agent/src/main/python/agent/AgentConfig.py index 57b1542..9adace1 100644 --- a/slider-agent/src/main/python/agent/AgentConfig.py +++ b/slider-agent/src/main/python/agent/AgentConfig.py @@ -33,6 +33,8 @@ content = """ hostname=localhost port=8440 secured_port=8441 +zk-quorum=localhost:2181 +zk-reg-path=/register/org-apache-slider/cl1 check_path=/ws/v1/slider/agents/ register_path=/ws/v1/slider/agents/{name}/register heartbeat_path=/ws/v1/slider/agents/{name}/heartbeat http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/agent/Constants.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Constants.py b/slider-agent/src/main/python/agent/Constants.py index 88cd564..a120999 100644 --- a/slider-agent/src/main/python/agent/Constants.py +++ b/slider-agent/src/main/python/agent/Constants.py @@ -29,4 +29,6 @@ AGENT_WORK_ROOT = "AGENT_WORK_ROOT" AGENT_LOG_ROOT = "AGENT_LOG_ROOT" DO_NOT_REGISTER = "DO_NOT_REGISTER" DO_NOT_HEARTBEAT = "DO_NOT_HEARTBEAT" -DO_NOT_HEARTBEAT_AFTER_ = "DO_NOT_HEARTBEAT_AFTER_" \ No newline at end of file +DO_NOT_HEARTBEAT_AFTER_ = "DO_NOT_HEARTBEAT_AFTER_" +ZK_QUORUM="zk_quorum" +ZK_REG_PATH="zk_reg_path" http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/agent/Controller.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Controller.py b/slider-agent/src/main/python/agent/Controller.py index 92e9086..4a103cb 100644 --- a/slider-agent/src/main/python/agent/Controller.py +++ b/slider-agent/src/main/python/agent/Controller.py @@ -34,6 +34,7 @@ from Heartbeat import Heartbeat from Register import Register from ActionQueue import ActionQueue from NetUtil import NetUtil +from Registry import Registry import ssl import ProcessHelper import Constants @@ -43,7 +44,12 @@ import security logger = logging.getLogger() AGENT_AUTO_RESTART_EXIT_CODE = 77 +HEART_BEAT_RETRY_THRESHOLD = 2 +WS_AGENT_CONTEXT_ROOT = '/ws' +SLIDER_PATH_AGENTS = WS_AGENT_CONTEXT_ROOT + '/v1/slider/agents/' +SLIDER_REL_PATH_REGISTER = '/register' +SLIDER_REL_PATH_HEARTBEAT = '/heartbeat' class State: INIT, INSTALLING, INSTALLED, STARTING, STARTED, FAILED = range(6) @@ -57,13 +63,12 @@ class Controller(threading.Thread): self.safeMode = True self.credential = None self.config = config - self.hostname = config.getLabel() - server_url = 'https://' + config.get(AgentConfig.SERVER_SECTION, - 'hostname') + \ - ':' + config.get(AgentConfig.SERVER_SECTION, - 'secured_port') - self.registerUrl = server_url + '/ws/v1/slider/agents/' + self.hostname + '/register' - self.heartbeatUrl = server_url + '/ws/v1/slider/agents/' + self.hostname + '/heartbeat' + self.label = config.getLabel() + self.hostname = config.get(AgentConfig.SERVER_SECTION, 'hostname') + self.secured_port = config.get(AgentConfig.SERVER_SECTION, 'secured_port') + self.server_url = 'https://' + self.hostname + ':' + self.secured_port + self.registerUrl = self.server_url + SLIDER_PATH_AGENTS + self.label + SLIDER_REL_PATH_REGISTER + self.heartbeatUrl = self.server_url + SLIDER_PATH_AGENTS + self.label + SLIDER_REL_PATH_HEARTBEAT self.netutil = NetUtil() self.responseId = -1 self.repeatRegistration = False @@ -80,6 +85,7 @@ class Controller(threading.Thread): self.componentActualState = State.INIT self.statusCommand = None self.failureCount = 0 + self.heartBeatRetryCount = 0 def __del__(self): @@ -204,8 +210,8 @@ class Controller(threading.Thread): try: if not retry: data = json.dumps( - self.heartbeat.build(commandResult, self.responseId, - self.hasMappedComponents)) + self.heartbeat.build(commandResult, self.componentActualState, + self.responseId, self.hasMappedComponents)) self.updateStateBasedOnResult(commandResult) logger.debug("Sending request: " + data) pass @@ -285,9 +291,33 @@ class Controller(threading.Thread): print( "Server certificate verify failed. Did you regenerate server certificate?") certVerifFailed = True + self.heartBeatRetryCount += 1 + logger.error( + "Heartbeat retry count = %d" % (self.heartBeatRetryCount)) + # Re-read zk registry in case AM was restarted and came up with new + # host/port, but do this only after heartbeat retry attempts crosses + # threshold + if self.heartBeatRetryCount > HEART_BEAT_RETRY_THRESHOLD: + self.isRegistered = False + self.repeatRegistration = True + self.heartBeatRetryCount = 0 + self.cachedconnect = None # Previous connection is broken now + zk_quorum = self.config.get(AgentConfig.SERVER_SECTION, Constants.ZK_QUORUM) + zk_reg_path = self.config.get(AgentConfig.SERVER_SECTION, Constants.ZK_REG_PATH) + registry = Registry(zk_quorum, zk_reg_path) + amHost, amSecuredPort = registry.readAMHostPort() + logger.info("Read from ZK registry: AM host = %s, AM secured port = %s" % (amHost, amSecuredPort)) + self.hostname = amHost + self.secured_port = amSecuredPort + self.config.set(AgentConfig.SERVER_SECTION, "hostname", self.hostname) + self.config.set(AgentConfig.SERVER_SECTION, "secured_port", self.secured_port) + self.server_url = 'https://' + self.hostname + ':' + self.secured_port + self.registerUrl = self.server_url + SLIDER_PATH_AGENTS + self.label + SLIDER_REL_PATH_REGISTER + self.heartbeatUrl = self.server_url + SLIDER_PATH_AGENTS + self.label + SLIDER_REL_PATH_HEARTBEAT + return self.cachedconnect = None # Previous connection is broken now retry = True - # Sleep for some time + # Sleep for some time timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \ - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS self.heartbeat_wait_event.wait(timeout=timeout) @@ -358,8 +388,8 @@ class Controller(threading.Thread): statusCommand["serviceName"] = command["serviceName"] statusCommand["taskId"] = "status" statusCommand['auto_generated'] = True - return statusCommand logger.info("Status command: " + pprint.pformat(statusCommand)) + return statusCommand pass http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/agent/Heartbeat.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Heartbeat.py b/slider-agent/src/main/python/agent/Heartbeat.py index 8192348..4f2207c 100644 --- a/slider-agent/src/main/python/agent/Heartbeat.py +++ b/slider-agent/src/main/python/agent/Heartbeat.py @@ -36,7 +36,8 @@ class Heartbeat: self.config = config self.reports = [] - def build(self, commandResult, id='-1', componentsMapped=False): + def build(self, commandResult, componentActualState, id='-1', + componentsMapped=False): timestamp = int(time.time() * 1000) queueResult = self.actionQueue.result() logger.info("Queue result: " + pformat(queueResult)) @@ -48,7 +49,8 @@ class Heartbeat: 'timestamp': timestamp, 'hostname': self.config.getLabel(), 'nodeStatus': nodeStatus, - 'fqdn': hostname.public_hostname() + 'fqdn': hostname.public_hostname(), + 'agentState': componentActualState } commandsInProgress = False http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/agent/Registry.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Registry.py b/slider-agent/src/main/python/agent/Registry.py new file mode 100644 index 0000000..256509f --- /dev/null +++ b/slider-agent/src/main/python/agent/Registry.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +''' + +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import json +import logging +from kazoo.client import KazooClient + +logger = logging.getLogger() + +class Registry: + def __init__(self, zk_quorum, zk_reg_path): + self.zk_quorum = zk_quorum + self.zk_reg_path = zk_reg_path + + def readAMHostPort(self): + amHost = "" + amSecuredPort = "" + try: + zk = KazooClient(hosts=self.zk_quorum, read_only=True) + zk.start() + data, stat = zk.get(self.zk_reg_path) + logger.debug("Registry Data: %s" % (data.decode("utf-8"))) + sliderRegistry = json.loads(data) + amUrl = sliderRegistry["payload"]["internalView"]["endpoints"]["org.apache.slider.agents"]["address"] + amHost = amUrl.split("/")[2].split(":")[0] + amSecuredPort = amUrl.split(":")[2].split("/")[0] + # the port needs to be utf-8 encoded + amSecuredPort = amSecuredPort.encode('utf8', 'ignore') + except Exception: + # log and let empty strings be returned + logger.error("Could not connect to zk registry at {} in quorum {}" % + (self.zk_reg_path, self.zk_quorum)) + pass + finally: + zk.close() + logger.info("AM Host = %s, AM Secured Port = %s" % (amHost, amSecuredPort)) + return amHost, amSecuredPort http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/agent/main.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/main.py b/slider-agent/src/main/python/agent/main.py index 105df5a..f68db04 100644 --- a/slider-agent/src/main/python/agent/main.py +++ b/slider-agent/src/main/python/agent/main.py @@ -34,6 +34,8 @@ import posixpath from Controller import Controller from AgentConfig import AgentConfig from NetUtil import NetUtil +from Registry import Registry +import Constants logger = logging.getLogger() IS_WINDOWS = platform.system() == "Windows" @@ -178,9 +180,8 @@ def main(): parser = OptionParser() parser.add_option("-v", "--verbose", dest="verbose", help="verbose log output", default=False) parser.add_option("-l", "--label", dest="label", help="label of the agent", default=None) - parser.add_option("--host", dest="host", help="AppMaster host", default=None) - parser.add_option("--port", dest="port", help="AppMaster port", default=None) - parser.add_option("--secured_port", dest="secured_port", help="AppMaster 2 Way port", default=None) + parser.add_option("--zk-quorum", dest=Constants.ZK_QUORUM, help="Zookeeper Quorum", default=None) + parser.add_option("--zk-reg-path", dest=Constants.ZK_REG_PATH, help="Zookeeper Registry Path", default=None) parser.add_option("--debug", dest="debug", help="Agent debug hint", default="") (options, args) = parser.parse_args() @@ -207,18 +208,24 @@ def main(): update_config_from_file(agentConfig) # update configurations if needed - if options.host: - agentConfig.set(AgentConfig.SERVER_SECTION, "hostname", options.host) + if options.zk_quorum: + agentConfig.set(AgentConfig.SERVER_SECTION, Constants.ZK_QUORUM, options.zk_quorum) - if options.port: - agentConfig.set(AgentConfig.SERVER_SECTION, "port", options.port) - - if options.secured_port: - agentConfig.set(AgentConfig.SERVER_SECTION, "secured_port", options.secured_port) + if options.zk_reg_path: + agentConfig.set(AgentConfig.SERVER_SECTION, Constants.ZK_REG_PATH, options.zk_reg_path) if options.debug: agentConfig.set(AgentConfig.AGENT_SECTION, AgentConfig.APP_DBG_CMD, options.debug) + # Extract the AM hostname and secured port from ZK registry + registry = Registry(options.zk_quorum, options.zk_reg_path) + amHost, amSecuredPort = registry.readAMHostPort() + if amHost: + agentConfig.set(AgentConfig.SERVER_SECTION, "hostname", amHost) + + if amSecuredPort: + agentConfig.set(AgentConfig.SERVER_SECTION, "secured_port", amSecuredPort) + # set the security directory to a subdirectory of the run dir secDir = posixpath.join(agentConfig.getResolvedPath(AgentConfig.RUN_DIR), "security") logger.info("Security/Keys directory: " + secDir) @@ -243,7 +250,7 @@ def main(): server_url = SERVER_STATUS_URL.format( agentConfig.get(AgentConfig.SERVER_SECTION, 'hostname'), - agentConfig.get(AgentConfig.SERVER_SECTION, 'port'), + agentConfig.get(AgentConfig.SERVER_SECTION, 'secured_port'), agentConfig.get(AgentConfig.SERVER_SECTION, 'check_path')) print("Connecting to the server at " + server_url + "...") logger.info('Connecting to the server at: ' + server_url) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/__init__.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/__init__.py b/slider-agent/src/main/python/kazoo/__init__.py new file mode 100644 index 0000000..792d600 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/__init__.py @@ -0,0 +1 @@ +# http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/client.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/client.py b/slider-agent/src/main/python/kazoo/client.py new file mode 100644 index 0000000..6e3a219 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/client.py @@ -0,0 +1,1412 @@ +"""Kazoo Zookeeper Client""" +import inspect +import logging +import os +import re +import warnings +from collections import defaultdict, deque +from functools import partial +from os.path import split + +from kazoo.exceptions import ( + AuthFailedError, + ConfigurationError, + ConnectionClosedError, + ConnectionLoss, + NoNodeError, + NodeExistsError, + SessionExpiredError, + WriterNotClosedException, +) +from kazoo.handlers.threading import SequentialThreadingHandler +from kazoo.handlers.utils import capture_exceptions, wrap +from kazoo.hosts import collect_hosts +from kazoo.loggingsupport import BLATHER +from kazoo.protocol.connection import ConnectionHandler +from kazoo.protocol.paths import normpath +from kazoo.protocol.paths import _prefix_root +from kazoo.protocol.serialization import ( + Auth, + CheckVersion, + CloseInstance, + Create, + Delete, + Exists, + GetChildren, + GetChildren2, + GetACL, + SetACL, + GetData, + SetData, + Sync, + Transaction +) +from kazoo.protocol.states import KazooState +from kazoo.protocol.states import KeeperState +from kazoo.retry import KazooRetry +from kazoo.security import ACL +from kazoo.security import OPEN_ACL_UNSAFE + +# convenience API +from kazoo.recipe.barrier import Barrier +from kazoo.recipe.barrier import DoubleBarrier +from kazoo.recipe.counter import Counter +from kazoo.recipe.election import Election +from kazoo.recipe.lock import Lock +from kazoo.recipe.lock import Semaphore +from kazoo.recipe.partitioner import SetPartitioner +from kazoo.recipe.party import Party +from kazoo.recipe.party import ShallowParty +from kazoo.recipe.queue import Queue +from kazoo.recipe.queue import LockingQueue +from kazoo.recipe.watchers import ChildrenWatch +from kazoo.recipe.watchers import DataWatch + +try: # pragma: nocover + basestring +except NameError: # pragma: nocover + basestring = str + +LOST_STATES = (KeeperState.EXPIRED_SESSION, KeeperState.AUTH_FAILED, + KeeperState.CLOSED) +ENVI_VERSION = re.compile('[\w\s:.]*=([\d\.]*).*', re.DOTALL) +log = logging.getLogger(__name__) + + +_RETRY_COMPAT_DEFAULTS = dict( + max_retries=None, + retry_delay=0.1, + retry_backoff=2, + retry_jitter=0.8, + retry_max_delay=3600, +) + +_RETRY_COMPAT_MAPPING = dict( + max_retries='max_tries', + retry_delay='delay', + retry_backoff='backoff', + retry_jitter='max_jitter', + retry_max_delay='max_delay', +) + + +class KazooClient(object): + """An Apache Zookeeper Python client supporting alternate callback + handlers and high-level functionality. + + Watch functions registered with this class will not get session + events, unlike the default Zookeeper watches. They will also be + called with a single argument, a + :class:`~kazoo.protocol.states.WatchedEvent` instance. + + """ + def __init__(self, hosts='127.0.0.1:2181', + timeout=10.0, client_id=None, handler=None, + default_acl=None, auth_data=None, read_only=None, + randomize_hosts=True, connection_retry=None, + command_retry=None, logger=None, **kwargs): + """Create a :class:`KazooClient` instance. All time arguments + are in seconds. + + :param hosts: Comma-separated list of hosts to connect to + (e.g. 127.0.0.1:2181,127.0.0.1:2182,[::1]:2183). + :param timeout: The longest to wait for a Zookeeper connection. + :param client_id: A Zookeeper client id, used when + re-establishing a prior session connection. + :param handler: An instance of a class implementing the + :class:`~kazoo.interfaces.IHandler` interface + for callback handling. + :param default_acl: A default ACL used on node creation. + :param auth_data: + A list of authentication credentials to use for the + connection. Should be a list of (scheme, credential) + tuples as :meth:`add_auth` takes. + :param read_only: Allow connections to read only servers. + :param randomize_hosts: By default randomize host selection. + :param connection_retry: + A :class:`kazoo.retry.KazooRetry` object to use for + retrying the connection to Zookeeper. Also can be a dict of + options which will be used for creating one. + :param command_retry: + A :class:`kazoo.retry.KazooRetry` object to use for + the :meth:`KazooClient.retry` method. Also can be a dict of + options which will be used for creating one. + :param logger: A custom logger to use instead of the module + global `log` instance. + + Basic Example: + + .. code-block:: python + + zk = KazooClient() + zk.start() + children = zk.get_children('/') + zk.stop() + + As a convenience all recipe classes are available as attributes + and get automatically bound to the client. For example:: + + zk = KazooClient() + zk.start() + lock = zk.Lock('/lock_path') + + .. versionadded:: 0.6 + The read_only option. Requires Zookeeper 3.4+ + + .. versionadded:: 0.6 + The retry_max_delay option. + + .. versionadded:: 0.6 + The randomize_hosts option. + + .. versionchanged:: 0.8 + Removed the unused watcher argument (was second argument). + + .. versionadded:: 1.2 + The connection_retry, command_retry and logger options. + + """ + self.logger = logger or log + + # Record the handler strategy used + self.handler = handler if handler else SequentialThreadingHandler() + if inspect.isclass(self.handler): + raise ConfigurationError("Handler must be an instance of a class, " + "not the class: %s" % self.handler) + + self.auth_data = auth_data if auth_data else set([]) + self.default_acl = default_acl + self.randomize_hosts = randomize_hosts + self.hosts = None + self.chroot = None + self.set_hosts(hosts) + + # Curator like simplified state tracking, and listeners for + # state transitions + self._state = KeeperState.CLOSED + self.state = KazooState.LOST + self.state_listeners = set() + + self._reset() + self.read_only = read_only + + if client_id: + self._session_id = client_id[0] + self._session_passwd = client_id[1] + else: + self._reset_session() + + # ZK uses milliseconds + self._session_timeout = int(timeout * 1000) + + # We use events like twitter's client to track current and + # desired state (connected, and whether to shutdown) + self._live = self.handler.event_object() + self._writer_stopped = self.handler.event_object() + self._stopped = self.handler.event_object() + self._stopped.set() + self._writer_stopped.set() + + self.retry = self._conn_retry = None + + if type(connection_retry) is dict: + self._conn_retry = KazooRetry(**connection_retry) + elif type(connection_retry) is KazooRetry: + self._conn_retry = connection_retry + + if type(command_retry) is dict: + self.retry = KazooRetry(**command_retry) + elif type(command_retry) is KazooRetry: + self.retry = command_retry + + + if type(self._conn_retry) is KazooRetry: + if self.handler.sleep_func != self._conn_retry.sleep_func: + raise ConfigurationError("Retry handler and event handler " + " must use the same sleep func") + + if type(self.retry) is KazooRetry: + if self.handler.sleep_func != self.retry.sleep_func: + raise ConfigurationError("Command retry handler and event " + "handler must use the same sleep func") + + if self.retry is None or self._conn_retry is None: + old_retry_keys = dict(_RETRY_COMPAT_DEFAULTS) + for key in old_retry_keys: + try: + old_retry_keys[key] = kwargs.pop(key) + warnings.warn('Passing retry configuration param %s to the' + ' client directly is deprecated, please pass a' + ' configured retry object (using param %s)' % ( + key, _RETRY_COMPAT_MAPPING[key]), + DeprecationWarning, stacklevel=2) + except KeyError: + pass + + retry_keys = {} + for oldname, value in old_retry_keys.items(): + retry_keys[_RETRY_COMPAT_MAPPING[oldname]] = value + + if self._conn_retry is None: + self._conn_retry = KazooRetry( + sleep_func=self.handler.sleep_func, + **retry_keys) + if self.retry is None: + self.retry = KazooRetry( + sleep_func=self.handler.sleep_func, + **retry_keys) + + self._conn_retry.interrupt = lambda: self._stopped.is_set() + self._connection = ConnectionHandler(self, self._conn_retry.copy(), + logger=self.logger) + + # Every retry call should have its own copy of the retry helper + # to avoid shared retry counts + self._retry = self.retry + def _retry(*args, **kwargs): + return self._retry.copy()(*args, **kwargs) + self.retry = _retry + + self.Barrier = partial(Barrier, self) + self.Counter = partial(Counter, self) + self.DoubleBarrier = partial(DoubleBarrier, self) + self.ChildrenWatch = partial(ChildrenWatch, self) + self.DataWatch = partial(DataWatch, self) + self.Election = partial(Election, self) + self.Lock = partial(Lock, self) + self.Party = partial(Party, self) + self.Queue = partial(Queue, self) + self.LockingQueue = partial(LockingQueue, self) + self.SetPartitioner = partial(SetPartitioner, self) + self.Semaphore = partial(Semaphore, self) + self.ShallowParty = partial(ShallowParty, self) + + # If we got any unhandled keywords, complain like python would + if kwargs: + raise TypeError('__init__() got unexpected keyword arguments: %s' + % (kwargs.keys(),)) + + def _reset(self): + """Resets a variety of client states for a new connection.""" + self._queue = deque() + self._pending = deque() + + self._reset_watchers() + self._reset_session() + self.last_zxid = 0 + self._protocol_version = None + + def _reset_watchers(self): + self._child_watchers = defaultdict(set) + self._data_watchers = defaultdict(set) + + def _reset_session(self): + self._session_id = None + self._session_passwd = b'\x00' * 16 + + @property + def client_state(self): + """Returns the last Zookeeper client state + + This is the non-simplified state information and is generally + not as useful as the simplified KazooState information. + + """ + return self._state + + @property + def client_id(self): + """Returns the client id for this Zookeeper session if + connected. + + :returns: client id which consists of the session id and + password. + :rtype: tuple + """ + if self._live.is_set(): + return (self._session_id, self._session_passwd) + return None + + @property + def connected(self): + """Returns whether the Zookeeper connection has been + established.""" + return self._live.is_set() + + def set_hosts(self, hosts, randomize_hosts=None): + """ sets the list of hosts used by this client. + + This function accepts the same format hosts parameter as the init + function and sets the client to use the new hosts the next time it + needs to look up a set of hosts. This function does not affect the + current connected status. + + It is not currently possible to change the chroot with this function, + setting a host list with a new chroot will raise a ConfigurationError. + + :param hosts: see description in :meth:`KazooClient.__init__` + :param randomize_hosts: override client default for host randomization + :raises: + :exc:`ConfigurationError` if the hosts argument changes the chroot + + .. versionadded:: 1.4 + + .. warning:: + + Using this function to point a client to a completely disparate + zookeeper server cluster has undefined behavior. + + """ + + if randomize_hosts is None: + randomize_hosts = self.randomize_hosts + + self.hosts, chroot = collect_hosts(hosts, randomize_hosts) + + if chroot: + new_chroot = normpath(chroot) + else: + new_chroot = '' + + if self.chroot is not None and new_chroot != self.chroot: + raise ConfigurationError("Changing chroot at runtime is not " + "currently supported") + + self.chroot = new_chroot + + def add_listener(self, listener): + """Add a function to be called for connection state changes. + + This function will be called with a + :class:`~kazoo.protocol.states.KazooState` instance indicating + the new connection state on state transitions. + + .. warning:: + + This function must not block. If its at all likely that it + might need data or a value that could result in blocking + than the :meth:`~kazoo.interfaces.IHandler.spawn` method + should be used so that the listener can return immediately. + + """ + if not (listener and callable(listener)): + raise ConfigurationError("listener must be callable") + self.state_listeners.add(listener) + + def remove_listener(self, listener): + """Remove a listener function""" + self.state_listeners.discard(listener) + + def _make_state_change(self, state): + # skip if state is current + if self.state == state: + return + + self.state = state + + # Create copy of listeners for iteration in case one needs to + # remove itself + for listener in list(self.state_listeners): + try: + remove = listener(state) + if remove is True: + self.remove_listener(listener) + except Exception: + self.logger.exception("Error in connection state listener") + + def _session_callback(self, state): + if state == self._state: + return + + # Note that we don't check self.state == LOST since that's also + # the client's initial state + dead_state = self._state in LOST_STATES + self._state = state + + # If we were previously closed or had an expired session, and + # are now connecting, don't bother with the rest of the + # transitions since they only apply after + # we've established a connection + if dead_state and state == KeeperState.CONNECTING: + self.logger.log(BLATHER, "Skipping state change") + return + + if state in (KeeperState.CONNECTED, KeeperState.CONNECTED_RO): + self.logger.info("Zookeeper connection established, state: %s", state) + self._live.set() + self._make_state_change(KazooState.CONNECTED) + elif state in LOST_STATES: + self.logger.info("Zookeeper session lost, state: %s", state) + self._live.clear() + self._make_state_change(KazooState.LOST) + self._notify_pending(state) + self._reset() + else: + self.logger.info("Zookeeper connection lost") + # Connection lost + self._live.clear() + self._notify_pending(state) + self._make_state_change(KazooState.SUSPENDED) + self._reset_watchers() + + def _notify_pending(self, state): + """Used to clear a pending response queue and request queue + during connection drops.""" + if state == KeeperState.AUTH_FAILED: + exc = AuthFailedError() + elif state == KeeperState.EXPIRED_SESSION: + exc = SessionExpiredError() + else: + exc = ConnectionLoss() + + while True: + try: + request, async_object, xid = self._pending.popleft() + if async_object: + async_object.set_exception(exc) + except IndexError: + break + + while True: + try: + request, async_object = self._queue.popleft() + if async_object: + async_object.set_exception(exc) + except IndexError: + break + + def _safe_close(self): + self.handler.stop() + timeout = self._session_timeout // 1000 + if timeout < 10: + timeout = 10 + if not self._connection.stop(timeout): + raise WriterNotClosedException( + "Writer still open from prior connection " + "and wouldn't close after %s seconds" % timeout) + + def _call(self, request, async_object): + """Ensure there's an active connection and put the request in + the queue if there is. + + Returns False if the call short circuits due to AUTH_FAILED, + CLOSED, EXPIRED_SESSION or CONNECTING state. + + """ + + if self._state == KeeperState.AUTH_FAILED: + async_object.set_exception(AuthFailedError()) + return False + elif self._state == KeeperState.CLOSED: + async_object.set_exception(ConnectionClosedError( + "Connection has been closed")) + return False + elif self._state in (KeeperState.EXPIRED_SESSION, + KeeperState.CONNECTING): + async_object.set_exception(SessionExpiredError()) + return False + + self._queue.append((request, async_object)) + + # wake the connection, guarding against a race with close() + write_pipe = self._connection._write_pipe + if write_pipe is None: + async_object.set_exception(ConnectionClosedError( + "Connection has been closed")) + try: + os.write(write_pipe, b'\0') + except: + async_object.set_exception(ConnectionClosedError( + "Connection has been closed")) + + def start(self, timeout=15): + """Initiate connection to ZK. + + :param timeout: Time in seconds to wait for connection to + succeed. + :raises: :attr:`~kazoo.interfaces.IHandler.timeout_exception` + if the connection wasn't established within `timeout` + seconds. + + """ + event = self.start_async() + event.wait(timeout=timeout) + if not self.connected: + # We time-out, ensure we are disconnected + self.stop() + raise self.handler.timeout_exception("Connection time-out") + + if self.chroot and not self.exists("/"): + warnings.warn("No chroot path exists, the chroot path " + "should be created before normal use.") + + def start_async(self): + """Asynchronously initiate connection to ZK. + + :returns: An event object that can be checked to see if the + connection is alive. + :rtype: :class:`~threading.Event` compatible object. + + """ + # If we're already connected, ignore + if self._live.is_set(): + return self._live + + # Make sure we're safely closed + self._safe_close() + + # We've been asked to connect, clear the stop and our writer + # thread indicator + self._stopped.clear() + self._writer_stopped.clear() + + # Start the handler + self.handler.start() + + # Start the connection + self._connection.start() + return self._live + + def stop(self): + """Gracefully stop this Zookeeper session. + + This method can be called while a reconnection attempt is in + progress, which will then be halted. + + Once the connection is closed, its session becomes invalid. All + the ephemeral nodes in the ZooKeeper server associated with the + session will be removed. The watches left on those nodes (and + on their parents) will be triggered. + + """ + if self._stopped.is_set(): + return + + self._stopped.set() + self._queue.append((CloseInstance, None)) + os.write(self._connection._write_pipe, b'\0') + self._safe_close() + + def restart(self): + """Stop and restart the Zookeeper session.""" + self.stop() + self.start() + + def close(self): + """Free any resources held by the client. + + This method should be called on a stopped client before it is + discarded. Not doing so may result in filehandles being leaked. + + .. versionadded:: 1.0 + """ + self._connection.close() + + def command(self, cmd=b'ruok'): + """Sent a management command to the current ZK server. + + Examples are `ruok`, `envi` or `stat`. + + :returns: An unstructured textual response. + :rtype: str + + :raises: + :exc:`ConnectionLoss` if there is no connection open, or + possibly a :exc:`socket.error` if there's a problem with + the connection used just for this command. + + .. versionadded:: 0.5 + + """ + if not self._live.is_set(): + raise ConnectionLoss("No connection to server") + + peer = self._connection._socket.getpeername() + sock = self.handler.create_connection( + peer, timeout=self._session_timeout / 1000.0) + sock.sendall(cmd) + result = sock.recv(8192) + sock.close() + return result.decode('utf-8', 'replace') + + def server_version(self): + """Get the version of the currently connected ZK server. + + :returns: The server version, for example (3, 4, 3). + :rtype: tuple + + .. versionadded:: 0.5 + + """ + data = self.command(b'envi') + string = ENVI_VERSION.match(data).group(1) + return tuple([int(i) for i in string.split('.')]) + + def add_auth(self, scheme, credential): + """Send credentials to server. + + :param scheme: authentication scheme (default supported: + "digest"). + :param credential: the credential -- value depends on scheme. + + :returns: True if it was successful. + :rtype: bool + + :raises: + :exc:`~kazoo.exceptions.AuthFailedError` if it failed though + the session state will be set to AUTH_FAILED as well. + + """ + return self.add_auth_async(scheme, credential).get() + + def add_auth_async(self, scheme, credential): + """Asynchronously send credentials to server. Takes the same + arguments as :meth:`add_auth`. + + :rtype: :class:`~kazoo.interfaces.IAsyncResult` + + """ + if not isinstance(scheme, basestring): + raise TypeError("Invalid type for scheme") + if not isinstance(credential, basestring): + raise TypeError("Invalid type for credential") + + # we need this auth data to re-authenticate on reconnect + self.auth_data.add((scheme, credential)) + + async_result = self.handler.async_result() + self._call(Auth(0, scheme, credential), async_result) + return async_result + + def unchroot(self, path): + """Strip the chroot if applicable from the path.""" + if not self.chroot: + return path + + if path.startswith(self.chroot): + return path[len(self.chroot):] + else: + return path + + def sync_async(self, path): + """Asynchronous sync. + + :rtype: :class:`~kazoo.interfaces.IAsyncResult` + + """ + async_result = self.handler.async_result() + self._call(Sync(_prefix_root(self.chroot, path)), async_result) + return async_result + + def sync(self, path): + """Sync, blocks until response is acknowledged. + + Flushes channel between process and leader. + + :param path: path of node. + :returns: The node path that was synced. + :raises: + :exc:`~kazoo.exceptions.ZookeeperError` if the server + returns a non-zero error code. + + .. versionadded:: 0.5 + + """ + return self.sync_async(path).get() + + def create(self, path, value=b"", acl=None, ephemeral=False, + sequence=False, makepath=False): + """Create a node with the given value as its data. Optionally + set an ACL on the node. + + The ephemeral and sequence arguments determine the type of the + node. + + An ephemeral node will be automatically removed by ZooKeeper + when the session associated with the creation of the node + expires. + + A sequential node will be given the specified path plus a + suffix `i` where i is the current sequential number of the + node. The sequence number is always fixed length of 10 digits, + 0 padded. Once such a node is created, the sequential number + will be incremented by one. + + If a node with the same actual path already exists in + ZooKeeper, a NodeExistsError will be raised. Note that since a + different actual path is used for each invocation of creating + sequential nodes with the same path argument, the call will + never raise NodeExistsError. + + If the parent node does not exist in ZooKeeper, a NoNodeError + will be raised. Setting the optional `makepath` argument to + `True` will create all missing parent nodes instead. + + An ephemeral node cannot have children. If the parent node of + the given path is ephemeral, a NoChildrenForEphemeralsError + will be raised. + + This operation, if successful, will trigger all the watches + left on the node of the given path by :meth:`exists` and + :meth:`get` API calls, and the watches left on the parent node + by :meth:`get_children` API calls. + + The maximum allowable size of the node value is 1 MB. Values + larger than this will cause a ZookeeperError to be raised. + + :param path: Path of node. + :param value: Initial bytes value of node. + :param acl: :class:`~kazoo.security.ACL` list. + :param ephemeral: Boolean indicating whether node is ephemeral + (tied to this session). + :param sequence: Boolean indicating whether path is suffixed + with a unique index. + :param makepath: Whether the path should be created if it + doesn't exist. + :returns: Real path of the new node. + :rtype: str + + :raises: + :exc:`~kazoo.exceptions.NodeExistsError` if the node + already exists. + + :exc:`~kazoo.exceptions.NoNodeError` if parent nodes are + missing. + + :exc:`~kazoo.exceptions.NoChildrenForEphemeralsError` if + the parent node is an ephemeral node. + + :exc:`~kazoo.exceptions.ZookeeperError` if the provided + value is too large. + + :exc:`~kazoo.exceptions.ZookeeperError` if the server + returns a non-zero error code. + + """ + acl = acl or self.default_acl + return self.create_async(path, value, acl=acl, ephemeral=ephemeral, + sequence=sequence, makepath=makepath).get() + + def create_async(self, path, value=b"", acl=None, ephemeral=False, + sequence=False, makepath=False): + """Asynchronously create a ZNode. Takes the same arguments as + :meth:`create`. + + :rtype: :class:`~kazoo.interfaces.IAsyncResult` + + .. versionadded:: 1.1 + The makepath option. + + """ + if acl is None and self.default_acl: + acl = self.default_acl + + if not isinstance(path, basestring): + raise TypeError("path must be a string") + if acl and (isinstance(acl, ACL) or + not isinstance(acl, (tuple, list))): + raise TypeError("acl must be a tuple/list of ACL's") + if value is not None and not isinstance(value, bytes): + raise TypeError("value must be a byte string") + if not isinstance(ephemeral, bool): + raise TypeError("ephemeral must be a bool") + if not isinstance(sequence, bool): + raise TypeError("sequence must be a bool") + if not isinstance(makepath, bool): + raise TypeError("makepath must be a bool") + + flags = 0 + if ephemeral: + flags |= 1 + if sequence: + flags |= 2 + if acl is None: + acl = OPEN_ACL_UNSAFE + + async_result = self.handler.async_result() + + @capture_exceptions(async_result) + def do_create(): + result = self._create_async_inner(path, value, acl, flags, trailing=sequence) + result.rawlink(create_completion) + + @capture_exceptions(async_result) + def retry_completion(result): + result.get() + do_create() + + @wrap(async_result) + def create_completion(result): + try: + return self.unchroot(result.get()) + except NoNodeError: + if not makepath: + raise + if sequence and path.endswith('/'): + parent = path.rstrip('/') + else: + parent, _ = split(path) + self.ensure_path_async(parent, acl).rawlink(retry_completion) + + do_create() + return async_result + + def _create_async_inner(self, path, value, acl, flags, trailing=False): + async_result = self.handler.async_result() + call_result = self._call( + Create(_prefix_root(self.chroot, path, trailing=trailing), + value, acl, flags), async_result) + if call_result is False: + # We hit a short-circuit exit on the _call. Because we are + # not using the original async_result here, we bubble the + # exception upwards to the do_create function in + # KazooClient.create so that it gets set on the correct + # async_result object + raise async_result.exception + return async_result + + def ensure_path(self, path, acl=None): + """Recursively create a path if it doesn't exist. + + :param path: Path of node. + :param acl: Permissions for node. + + """ + return self.ensure_path_async(path, acl).get() + + def ensure_path_async(self, path, acl=None): + """Recursively create a path asynchronously if it doesn't + exist. Takes the same arguments as :meth:`ensure_path`. + + :rtype: :class:`~kazoo.interfaces.IAsyncResult` + + .. versionadded:: 1.1 + + """ + acl = acl or self.default_acl + async_result = self.handler.async_result() + + @wrap(async_result) + def create_completion(result): + try: + return result.get() + except NodeExistsError: + return True + + @capture_exceptions(async_result) + def prepare_completion(next_path, result): + result.get() + self.create_async(next_path, acl=acl).rawlink(create_completion) + + @wrap(async_result) + def exists_completion(path, result): + if result.get(): + return True + parent, node = split(path) + if node: + self.ensure_path_async(parent, acl=acl).rawlink( + partial(prepare_completion, path)) + else: + self.create_async(path, acl=acl).rawlink(create_completion) + + self.exists_async(path).rawlink(partial(exists_completion, path)) + + return async_result + + def exists(self, path, watch=None): + """Check if a node exists. + + If a watch is provided, it will be left on the node with the + given path. The watch will be triggered by a successful + operation that creates/deletes the node or sets the data on the + node. + + :param path: Path of node. + :param watch: Optional watch callback to set for future changes + to this path. + :returns: ZnodeStat of the node if it exists, else None if the + node does not exist. + :rtype: :class:`~kazoo.protocol.states.ZnodeStat` or `None`. + + :raises: + :exc:`~kazoo.exceptions.ZookeeperError` if the server + returns a non-zero error code. + + """ + return self.exists_async(path, watch).get() + + def exists_async(self, path, watch=None): + """Asynchronously check if a node exists. Takes the same + arguments as :meth:`exists`. + + :rtype: :class:`~kazoo.interfaces.IAsyncResult` + + """ + if not isinstance(path, basestring): + raise TypeError("path must be a string") + if watch and not callable(watch): + raise TypeError("watch must be a callable") + + async_result = self.handler.async_result() + self._call(Exists(_prefix_root(self.chroot, path), watch), + async_result) + return async_result + + def get(self, path, watch=None): + """Get the value of a node. + + If a watch is provided, it will be left on the node with the + given path. The watch will be triggered by a successful + operation that sets data on the node, or deletes the node. + + :param path: Path of node. + :param watch: Optional watch callback to set for future changes + to this path. + :returns: + Tuple (value, :class:`~kazoo.protocol.states.ZnodeStat`) of + node. + :rtype: tuple + + :raises: + :exc:`~kazoo.exceptions.NoNodeError` if the node doesn't + exist + + :exc:`~kazoo.exceptions.ZookeeperError` if the server + returns a non-zero error code + + """ + return self.get_async(path, watch).get() + + def get_async(self, path, watch=None): + """Asynchronously get the value of a node. Takes the same + arguments as :meth:`get`. + + :rtype: :class:`~kazoo.interfaces.IAsyncResult` + + """ + if not isinstance(path, basestring): + raise TypeError("path must be a string") + if watch and not callable(watch): + raise TypeError("watch must be a callable") + + async_result = self.handler.async_result() + self._call(GetData(_prefix_root(self.chroot, path), watch), + async_result) + return async_result + + def get_children(self, path, watch=None, include_data=False): + """Get a list of child nodes of a path. + + If a watch is provided it will be left on the node with the + given path. The watch will be triggered by a successful + operation that deletes the node of the given path or + creates/deletes a child under the node. + + The list of children returned is not sorted and no guarantee is + provided as to its natural or lexical order. + + :param path: Path of node to list. + :param watch: Optional watch callback to set for future changes + to this path. + :param include_data: + Include the :class:`~kazoo.protocol.states.ZnodeStat` of + the node in addition to the children. This option changes + the return value to be a tuple of (children, stat). + + :returns: List of child node names, or tuple if `include_data` + is `True`. + :rtype: list + + :raises: + :exc:`~kazoo.exceptions.NoNodeError` if the node doesn't + exist. + + :exc:`~kazoo.exceptions.ZookeeperError` if the server + returns a non-zero error code. + + .. versionadded:: 0.5 + The `include_data` option. + + """ + return self.get_children_async(path, watch, include_data).get() + + def get_children_async(self, path, watch=None, include_data=False): + """Asynchronously get a list of child nodes of a path. Takes + the same arguments as :meth:`get_children`. + + :rtype: :class:`~kazoo.interfaces.IAsyncResult` + + """ + if not isinstance(path, basestring): + raise TypeError("path must be a string") + if watch and not callable(watch): + raise TypeError("watch must be a callable") + if not isinstance(include_data, bool): + raise TypeError("include_data must be a bool") + + async_result = self.handler.async_result() + if include_data: + req = GetChildren2(_prefix_root(self.chroot, path), watch) + else: + req = GetChildren(_prefix_root(self.chroot, path), watch) + self._call(req, async_result) + return async_result + + def get_acls(self, path): + """Return the ACL and stat of the node of the given path. + + :param path: Path of the node. + :returns: The ACL array of the given node and its + :class:`~kazoo.protocol.states.ZnodeStat`. + :rtype: tuple of (:class:`~kazoo.security.ACL` list, + :class:`~kazoo.protocol.states.ZnodeStat`) + :raises: + :exc:`~kazoo.exceptions.NoNodeError` if the node doesn't + exist. + + :exc:`~kazoo.exceptions.ZookeeperError` if the server + returns a non-zero error code + + .. versionadded:: 0.5 + + """ + return self.get_acls_async(path).get() + + def get_acls_async(self, path): + """Return the ACL and stat of the node of the given path. Takes + the same arguments as :meth:`get_acls`. + + :rtype: :class:`~kazoo.interfaces.IAsyncResult` + + """ + if not isinstance(path, basestring): + raise TypeError("path must be a string") + + async_result = self.handler.async_result() + self._call(GetACL(_prefix_root(self.chroot, path)), async_result) + return async_result + + def set_acls(self, path, acls, version=-1): + """Set the ACL for the node of the given path. + + Set the ACL for the node of the given path if such a node + exists and the given version matches the version of the node. + + :param path: Path for the node. + :param acls: List of :class:`~kazoo.security.ACL` objects to + set. + :param version: The expected node version that must match. + :returns: The stat of the node. + :raises: + :exc:`~kazoo.exceptions.BadVersionError` if version doesn't + match. + + :exc:`~kazoo.exceptions.NoNodeError` if the node doesn't + exist. + + :exc:`~kazoo.exceptions.InvalidACLError` if the ACL is + invalid. + + :exc:`~kazoo.exceptions.ZookeeperError` if the server + returns a non-zero error code. + + .. versionadded:: 0.5 + + """ + return self.set_acls_async(path, acls, version).get() + + def set_acls_async(self, path, acls, version=-1): + """Set the ACL for the node of the given path. Takes the same + arguments as :meth:`set_acls`. + + :rtype: :class:`~kazoo.interfaces.IAsyncResult` + + """ + if not isinstance(path, basestring): + raise TypeError("path must be a string") + if isinstance(acls, ACL) or not isinstance(acls, (tuple, list)): + raise TypeError("acl must be a tuple/list of ACL's") + if not isinstance(version, int): + raise TypeError("version must be an int") + + async_result = self.handler.async_result() + self._call(SetACL(_prefix_root(self.chroot, path), acls, version), + async_result) + return async_result + + def set(self, path, value, version=-1): + """Set the value of a node. + + If the version of the node being updated is newer than the + supplied version (and the supplied version is not -1), a + BadVersionError will be raised. + + This operation, if successful, will trigger all the watches on + the node of the given path left by :meth:`get` API calls. + + The maximum allowable size of the value is 1 MB. Values larger + than this will cause a ZookeeperError to be raised. + + :param path: Path of node. + :param value: New data value. + :param version: Version of node being updated, or -1. + :returns: Updated :class:`~kazoo.protocol.states.ZnodeStat` of + the node. + + :raises: + :exc:`~kazoo.exceptions.BadVersionError` if version doesn't + match. + + :exc:`~kazoo.exceptions.NoNodeError` if the node doesn't + exist. + + :exc:`~kazoo.exceptions.ZookeeperError` if the provided + value is too large. + + :exc:`~kazoo.exceptions.ZookeeperError` if the server + returns a non-zero error code. + + """ + return self.set_async(path, value, version).get() + + def set_async(self, path, value, version=-1): + """Set the value of a node. Takes the same arguments as + :meth:`set`. + + :rtype: :class:`~kazoo.interfaces.IAsyncResult` + + """ + if not isinstance(path, basestring): + raise TypeError("path must be a string") + if value is not None and not isinstance(value, bytes): + raise TypeError("value must be a byte string") + if not isinstance(version, int): + raise TypeError("version must be an int") + + async_result = self.handler.async_result() + self._call(SetData(_prefix_root(self.chroot, path), value, version), + async_result) + return async_result + + def transaction(self): + """Create and return a :class:`TransactionRequest` object + + Creates a :class:`TransactionRequest` object. A Transaction can + consist of multiple operations which can be committed as a + single atomic unit. Either all of the operations will succeed + or none of them. + + :returns: A TransactionRequest. + :rtype: :class:`TransactionRequest` + + .. versionadded:: 0.6 + Requires Zookeeper 3.4+ + + """ + return TransactionRequest(self) + + def delete(self, path, version=-1, recursive=False): + """Delete a node. + + The call will succeed if such a node exists, and the given + version matches the node's version (if the given version is -1, + the default, it matches any node's versions). + + This operation, if successful, will trigger all the watches on + the node of the given path left by `exists` API calls, and the + watches on the parent node left by `get_children` API calls. + + :param path: Path of node to delete. + :param version: Version of node to delete, or -1 for any. + :param recursive: Recursively delete node and all its children, + defaults to False. + :type recursive: bool + + :raises: + :exc:`~kazoo.exceptions.BadVersionError` if version doesn't + match. + + :exc:`~kazoo.exceptions.NoNodeError` if the node doesn't + exist. + + :exc:`~kazoo.exceptions.NotEmptyError` if the node has + children. + + :exc:`~kazoo.exceptions.ZookeeperError` if the server + returns a non-zero error code. + + """ + if not isinstance(recursive, bool): + raise TypeError("recursive must be a bool") + + if recursive: + return self._delete_recursive(path) + else: + return self.delete_async(path, version).get() + + def delete_async(self, path, version=-1): + """Asynchronously delete a node. Takes the same arguments as + :meth:`delete`, with the exception of `recursive`. + + :rtype: :class:`~kazoo.interfaces.IAsyncResult` + + """ + if not isinstance(path, basestring): + raise TypeError("path must be a string") + if not isinstance(version, int): + raise TypeError("version must be an int") + async_result = self.handler.async_result() + self._call(Delete(_prefix_root(self.chroot, path), version), + async_result) + return async_result + + def _delete_recursive(self, path): + try: + children = self.get_children(path) + except NoNodeError: + return True + + if children: + for child in children: + if path == "/": + child_path = path + child + else: + child_path = path + "/" + child + + self._delete_recursive(child_path) + try: + self.delete(path) + except NoNodeError: # pragma: nocover + pass + + +class TransactionRequest(object): + """A Zookeeper Transaction Request + + A Transaction provides a builder object that can be used to + construct and commit an atomic set of operations. The transaction + must be committed before its sent. + + Transactions are not thread-safe and should not be accessed from + multiple threads at once. + + .. versionadded:: 0.6 + Requires Zookeeper 3.4+ + + """ + def __init__(self, client): + self.client = client + self.operations = [] + self.committed = False + + def create(self, path, value=b"", acl=None, ephemeral=False, + sequence=False): + """Add a create ZNode to the transaction. Takes the same + arguments as :meth:`KazooClient.create`, with the exception + of `makepath`. + + :returns: None + + """ + if acl is None and self.client.default_acl: + acl = self.client.default_acl + + if not isinstance(path, basestring): + raise TypeError("path must be a string") + if acl and not isinstance(acl, (tuple, list)): + raise TypeError("acl must be a tuple/list of ACL's") + if not isinstance(value, bytes): + raise TypeError("value must be a byte string") + if not isinstance(ephemeral, bool): + raise TypeError("ephemeral must be a bool") + if not isinstance(sequence, bool): + raise TypeError("sequence must be a bool") + + flags = 0 + if ephemeral: + flags |= 1 + if sequence: + flags |= 2 + if acl is None: + acl = OPEN_ACL_UNSAFE + + self._add(Create(_prefix_root(self.client.chroot, path), value, acl, + flags), None) + + def delete(self, path, version=-1): + """Add a delete ZNode to the transaction. Takes the same + arguments as :meth:`KazooClient.delete`, with the exception of + `recursive`. + + """ + if not isinstance(path, basestring): + raise TypeError("path must be a string") + if not isinstance(version, int): + raise TypeError("version must be an int") + self._add(Delete(_prefix_root(self.client.chroot, path), version)) + + def set_data(self, path, value, version=-1): + """Add a set ZNode value to the transaction. Takes the same + arguments as :meth:`KazooClient.set`. + + """ + if not isinstance(path, basestring): + raise TypeError("path must be a string") + if not isinstance(value, bytes): + raise TypeError("value must be a byte string") + if not isinstance(version, int): + raise TypeError("version must be an int") + self._add(SetData(_prefix_root(self.client.chroot, path), value, + version)) + + def check(self, path, version): + """Add a Check Version to the transaction. + + This command will fail and abort a transaction if the path + does not match the specified version. + + """ + if not isinstance(path, basestring): + raise TypeError("path must be a string") + if not isinstance(version, int): + raise TypeError("version must be an int") + self._add(CheckVersion(_prefix_root(self.client.chroot, path), + version)) + + def commit_async(self): + """Commit the transaction asynchronously. + + :rtype: :class:`~kazoo.interfaces.IAsyncResult` + + """ + self._check_tx_state() + self.committed = True + async_object = self.client.handler.async_result() + self.client._call(Transaction(self.operations), async_object) + return async_object + + def commit(self): + """Commit the transaction. + + :returns: A list of the results for each operation in the + transaction. + + """ + return self.commit_async().get() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + """Commit and cleanup accumulated transaction data.""" + if not exc_type: + self.commit() + + def _check_tx_state(self): + if self.committed: + raise ValueError('Transaction already committed') + + def _add(self, request, post_processor=None): + self._check_tx_state() + self.client.logger.log(BLATHER, 'Added %r to %r', request, self) + self.operations.append(request) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/exceptions.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/exceptions.py b/slider-agent/src/main/python/kazoo/exceptions.py new file mode 100644 index 0000000..8d4f0f3 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/exceptions.py @@ -0,0 +1,199 @@ +"""Kazoo Exceptions""" +from collections import defaultdict + + +class KazooException(Exception): + """Base Kazoo exception that all other kazoo library exceptions + inherit from""" + + +class ZookeeperError(KazooException): + """Base Zookeeper exception for errors originating from the + Zookeeper server""" + + +class CancelledError(KazooException): + """Raised when a process is cancelled by another thread""" + + +class ConfigurationError(KazooException): + """Raised if the configuration arguments to an object are + invalid""" + + +class ZookeeperStoppedError(KazooException): + """Raised when the kazoo client stopped (and thus not connected)""" + + +class ConnectionDropped(KazooException): + """Internal error for jumping out of loops""" + + +class LockTimeout(KazooException): + """Raised if failed to acquire a lock. + + .. versionadded:: 1.1 + """ + + +class WriterNotClosedException(KazooException): + """Raised if the writer is unable to stop closing when requested. + + .. versionadded:: 1.2 + """ + + +def _invalid_error_code(): + raise RuntimeError('Invalid error code') + + +EXCEPTIONS = defaultdict(_invalid_error_code) + + +def _zookeeper_exception(code): + def decorator(klass): + def create(*args, **kwargs): + return klass(args, kwargs) + + EXCEPTIONS[code] = create + klass.code = code + return klass + + return decorator + + +@_zookeeper_exception(0) +class RolledBackError(ZookeeperError): + pass + + +@_zookeeper_exception(-1) +class SystemZookeeperError(ZookeeperError): + pass + + +@_zookeeper_exception(-2) +class RuntimeInconsistency(ZookeeperError): + pass + + +@_zookeeper_exception(-3) +class DataInconsistency(ZookeeperError): + pass + + +@_zookeeper_exception(-4) +class ConnectionLoss(ZookeeperError): + pass + + +@_zookeeper_exception(-5) +class MarshallingError(ZookeeperError): + pass + + +@_zookeeper_exception(-6) +class UnimplementedError(ZookeeperError): + pass + + +@_zookeeper_exception(-7) +class OperationTimeoutError(ZookeeperError): + pass + + +@_zookeeper_exception(-8) +class BadArgumentsError(ZookeeperError): + pass + + +@_zookeeper_exception(-100) +class APIError(ZookeeperError): + pass + + +@_zookeeper_exception(-101) +class NoNodeError(ZookeeperError): + pass + + +@_zookeeper_exception(-102) +class NoAuthError(ZookeeperError): + pass + + +@_zookeeper_exception(-103) +class BadVersionError(ZookeeperError): + pass + + +@_zookeeper_exception(-108) +class NoChildrenForEphemeralsError(ZookeeperError): + pass + + +@_zookeeper_exception(-110) +class NodeExistsError(ZookeeperError): + pass + + +@_zookeeper_exception(-111) +class NotEmptyError(ZookeeperError): + pass + + +@_zookeeper_exception(-112) +class SessionExpiredError(ZookeeperError): + pass + + +@_zookeeper_exception(-113) +class InvalidCallbackError(ZookeeperError): + pass + + +@_zookeeper_exception(-114) +class InvalidACLError(ZookeeperError): + pass + + +@_zookeeper_exception(-115) +class AuthFailedError(ZookeeperError): + pass + + +@_zookeeper_exception(-118) +class SessionMovedError(ZookeeperError): + pass + + +@_zookeeper_exception(-119) +class NotReadOnlyCallError(ZookeeperError): + """An API call that is not read-only was used while connected to + a read-only server""" + + +class ConnectionClosedError(SessionExpiredError): + """Connection is closed""" + + +# BW Compat aliases for C lib style exceptions +ConnectionLossException = ConnectionLoss +MarshallingErrorException = MarshallingError +SystemErrorException = SystemZookeeperError +RuntimeInconsistencyException = RuntimeInconsistency +DataInconsistencyException = DataInconsistency +UnimplementedException = UnimplementedError +OperationTimeoutException = OperationTimeoutError +BadArgumentsException = BadArgumentsError +ApiErrorException = APIError +NoNodeException = NoNodeError +NoAuthException = NoAuthError +BadVersionException = BadVersionError +NoChildrenForEphemeralsException = NoChildrenForEphemeralsError +NodeExistsException = NodeExistsError +InvalidACLException = InvalidACLError +AuthFailedException = AuthFailedError +NotEmptyException = NotEmptyError +SessionExpiredException = SessionExpiredError +InvalidCallbackException = InvalidCallbackError http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/handlers/__init__.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/handlers/__init__.py b/slider-agent/src/main/python/kazoo/handlers/__init__.py new file mode 100644 index 0000000..792d600 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/handlers/__init__.py @@ -0,0 +1 @@ +# http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/handlers/gevent.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/handlers/gevent.py b/slider-agent/src/main/python/kazoo/handlers/gevent.py new file mode 100644 index 0000000..6e40cae --- /dev/null +++ b/slider-agent/src/main/python/kazoo/handlers/gevent.py @@ -0,0 +1,161 @@ +"""A gevent based handler.""" +from __future__ import absolute_import + +import atexit +import logging + +import gevent +import gevent.event +import gevent.queue +import gevent.select +import gevent.thread + +from gevent.queue import Empty +from gevent.queue import Queue +from gevent import socket +try: + from gevent.lock import Semaphore, RLock +except ImportError: + from gevent.coros import Semaphore, RLock + +from kazoo.handlers.utils import create_tcp_socket, create_tcp_connection + +_using_libevent = gevent.__version__.startswith('0.') + +log = logging.getLogger(__name__) + +_STOP = object() + +AsyncResult = gevent.event.AsyncResult + + +class SequentialGeventHandler(object): + """Gevent handler for sequentially executing callbacks. + + This handler executes callbacks in a sequential manner. A queue is + created for each of the callback events, so that each type of event + has its callback type run sequentially. + + Each queue type has a greenlet worker that pulls the callback event + off the queue and runs it in the order the client sees it. + + This split helps ensure that watch callbacks won't block session + re-establishment should the connection be lost during a Zookeeper + client call. + + Watch callbacks should avoid blocking behavior as the next callback + of that type won't be run until it completes. If you need to block, + spawn a new greenlet and return immediately so callbacks can + proceed. + + """ + name = "sequential_gevent_handler" + sleep_func = staticmethod(gevent.sleep) + + def __init__(self): + """Create a :class:`SequentialGeventHandler` instance""" + self.callback_queue = Queue() + self._running = False + self._async = None + self._state_change = Semaphore() + self._workers = [] + + class timeout_exception(gevent.event.Timeout): + def __init__(self, msg): + gevent.event.Timeout.__init__(self, exception=msg) + + def _create_greenlet_worker(self, queue): + def greenlet_worker(): + while True: + try: + func = queue.get() + if func is _STOP: + break + func() + except Empty: + continue + except Exception as exc: + log.warning("Exception in worker greenlet") + log.exception(exc) + return gevent.spawn(greenlet_worker) + + def start(self): + """Start the greenlet workers.""" + with self._state_change: + if self._running: + return + + self._running = True + + # Spawn our worker greenlets, we have + # - A callback worker for watch events to be called + for queue in (self.callback_queue,): + w = self._create_greenlet_worker(queue) + self._workers.append(w) + atexit.register(self.stop) + + def stop(self): + """Stop the greenlet workers and empty all queues.""" + with self._state_change: + if not self._running: + return + + self._running = False + + for queue in (self.callback_queue,): + queue.put(_STOP) + + while self._workers: + worker = self._workers.pop() + worker.join() + + # Clear the queues + self.callback_queue = Queue() # pragma: nocover + + if hasattr(atexit, "unregister"): + atexit.unregister(self.stop) + + def select(self, *args, **kwargs): + return gevent.select.select(*args, **kwargs) + + def socket(self, *args, **kwargs): + return create_tcp_socket(socket) + + def create_connection(self, *args, **kwargs): + return create_tcp_connection(socket, *args, **kwargs) + + def event_object(self): + """Create an appropriate Event object""" + return gevent.event.Event() + + def lock_object(self): + """Create an appropriate Lock object""" + return gevent.thread.allocate_lock() + + def rlock_object(self): + """Create an appropriate RLock object""" + return RLock() + + def async_result(self): + """Create a :class:`AsyncResult` instance + + The :class:`AsyncResult` instance will have its completion + callbacks executed in the thread the + :class:`SequentialGeventHandler` is created in (which should be + the gevent/main thread). + + """ + return AsyncResult() + + def spawn(self, func, *args, **kwargs): + """Spawn a function to run asynchronously""" + return gevent.spawn(func, *args, **kwargs) + + def dispatch_callback(self, callback): + """Dispatch to the callback object + + The callback is put on separate queues to run depending on the + type as documented for the :class:`SequentialGeventHandler`. + + """ + self.callback_queue.put(lambda: callback.func(*callback.args))