Rush has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/365868 )
Change subject: labtest: rabbitmq for openstack control node ...................................................................... labtest: rabbitmq for openstack control node refactor as module/profile/role model We are untangling the 'openstack' module, this supercedes the rabbitmq logic mixed in there and that logic will be removed once this is ready. Bug: T167559 Change-Id: If2a5b78856104d9b75d4d3e2c83923628fb2d3be --- A modules/profile/manifests/rabbitmq/server.pp A modules/rabbitmq/files/rabbitmq.py A modules/rabbitmq/files/rabbitmqadmin A modules/rabbitmq/manifests/init.pp A modules/rabbitmq/manifests/monitor.pp A modules/rabbitmq/templates/rabbitmq-server.default.erb A modules/role/manifests/wmcloud/openstack/control.pp 7 files changed, 1,328 insertions(+), 0 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/operations/puppet refs/changes/68/365868/1 diff --git a/modules/profile/manifests/rabbitmq/server.pp b/modules/profile/manifests/rabbitmq/server.pp new file mode 100644 index 0000000..409be05 --- /dev/null +++ b/modules/profile/manifests/rabbitmq/server.pp @@ -0,0 +1,15 @@ +class profile::rabbitmq::server( + $monitor_user = hiera('profile::rabbitmq::monitor::user'), + $monitor_password = hiera('profile::rabbitmq::monitor::password'), + $file_handles = hiera('profile::rabbitmq::file_handles'), +){ + + class { 'rabbitmq': + file_handles => $file_handles, + } + + class { 'rabbitmq::monitor': + rabbit_monitor_username => $monitor_user, + rabbit_monitor_pass => $monitor_password, + } +} diff --git a/modules/rabbitmq/files/rabbitmq.py b/modules/rabbitmq/files/rabbitmq.py new file mode 100644 index 0000000..76c6aa0 --- /dev/null +++ b/modules/rabbitmq/files/rabbitmq.py @@ -0,0 +1,264 @@ +# coding=utf-8 + +""" +Collects data from RabbitMQ through the admin interface + +#### Notes + ** With added support for breaking down queue metrics by vhost, we have + attempted to keep results generated by existing configurations from + changing. This means that the old behaviour of clobbering queue metrics + when a single queue name exists in multiple vhosts still exists if the + configuration is not updated. If no vhosts block is defined it will also + keep the metric path as it was historically with no vhost name in it. + + old path => systems.myServer.rabbitmq.queues.myQueue.* + new path => systems.myServer.rabbitmq.myVhost.queues.myQueue.* + + ** If a [vhosts] section exists but is empty, then no queues will be polled. + ** To poll all vhosts and all queues, add the following. + ** [vhosts] + ** * = * + ** +""" + +import diamond.collector +import re +from urlparse import urljoin +from urllib import quote +import urllib2 +from base64 import b64encode + +try: + import json +except ImportError: + import simplejson as json + + +class RabbitMQClient(object): + """ + Tiny interface into the rabbit http api + """ + + def __init__(self, host, user, password, timeout=5, scheme="http"): + self.base_url = '%s://%s/api/' % (scheme, host) + self.timeout = timeout + self._authorization = 'Basic ' + b64encode('%s:%s' % (user, password)) + + def do_call(self, path): + url = urljoin(self.base_url, path) + req = urllib2.Request(url) + req.add_header('Authorization', self._authorization) + return json.load(urllib2.urlopen(req, timeout=self.timeout)) + + def get_all_vhosts(self): + return self.do_call('vhosts') + + def get_vhost_names(self): + return [i['name'] for i in self.get_all_vhosts()] + + def get_queues(self, vhost=None): + path = 'queues' + if vhost: + vhost = quote(vhost, '') + path += '/%s' % vhost + + queues = self.do_call(path) + return queues or [] + + def get_overview(self): + return self.do_call('overview') + + def get_nodes(self): + return self.do_call('nodes') + + def get_node(self, node): + return self.do_call('nodes/%s' % node) + + +class RabbitMQCollector(diamond.collector.Collector): + + def get_default_config_help(self): + config_help = super(RabbitMQCollector, self).get_default_config_help() + config_help.update({ + 'host': 'Hostname and port to collect from', + 'user': 'Username', + 'password': 'Password', + 'replace_dot': + 'A value to replace dot in queue names and vhosts names by', + 'replace_slash': + 'A value to replace a slash in queue names and vhosts names by', + 'queues': 'Queues to publish. Leave empty to publish all.', + 'vhosts': + 'A list of vhosts and queues for which we want to collect', + 'queues_ignored': + 'A list of queues or regexes for queue names not to report on.', + 'cluster': + 'If this node is part of a cluster, will collect metrics on the' + ' cluster health' + }) + return config_help + + def get_default_config(self): + """ + Returns the default collector settings + """ + config = super(RabbitMQCollector, self).get_default_config() + config.update({ + 'path': 'rabbitmq', + 'host': 'localhost:55672', + 'user': 'guest', + 'password': 'guest', + 'replace_dot': False, + 'replace_slash': False, + 'queues_ignored': '', + 'cluster': False, + 'scheme': 'http', + }) + return config + + def collect_health(self): + health_metrics = [ + 'fd_used', + 'fd_total', + 'mem_used', + 'mem_limit', + 'sockets_used', + 'sockets_total', + 'disk_free_limit', + 'disk_free', + 'proc_used', + 'proc_total', + ] + try: + client = RabbitMQClient(self.config['host'], + self.config['user'], + self.config['password'], + scheme=self.config['scheme']) + node_name = client.get_overview()['node'] + node_data = client.get_node(node_name) + for metric in health_metrics: + self.publish('health.{0}'.format(metric), node_data[metric]) + if self.config['cluster']: + self.publish('cluster.partitions', + len(node_data['partitions'])) + content = client.get_nodes() + self.publish('cluster.nodes', len(content)) + except Exception, e: + self.log.error('Couldnt connect to rabbitmq %s', e) + return {} + + def collect(self): + self.collect_health() + matchers = [] + if self.config['queues_ignored']: + for reg in self.config['queues_ignored'].split(): + matchers.append(re.compile(reg)) + try: + client = RabbitMQClient(self.config['host'], + self.config['user'], + self.config['password'], + scheme=self.config['scheme']) + + legacy = False + + if 'vhosts' not in self.config: + legacy = True + + if 'queues' in self.config: + vhost_conf = {"*": self.config['queues']} + else: + vhost_conf = {"*": ""} + + # Legacy configurations, those that don't include the [vhosts] + # section require special care so that we do not break metric + # gathering for people that were using this collector before the + # update to support vhosts. + + if not legacy: + vhost_names = client.get_vhost_names() + if "*" in self.config['vhosts']: + for vhost in vhost_names: + # Copy the glob queue list to each vhost not + # specifically defined in the configuration. + if vhost not in self.config['vhosts']: + self.config['vhosts'][vhost] = self.config[ + 'vhosts']['*'] + + del self.config['vhosts']["*"] + vhost_conf = self.config['vhosts'] + + # Iterate all vhosts in our vhosts configuration. For legacy this + # is "*" to force a single run. + for vhost in vhost_conf: + vhost_name = vhost + if self.config['replace_dot']: + vhost_name = vhost_name.replace( + '.', self.config['replace_dot']) + + if self.config['replace_slash']: + vhost_name = vhost_name.replace( + '/', self.config['replace_slash']) + + queues = vhost_conf[vhost] + + # Allow the use of a asterix to glob the queues, but replace + # with a empty string to match how legacy config was. + if queues == "*": + queues = "" + allowed_queues = queues.split() + + # When we fetch queues, we do not want to define a vhost if + # legacy. + if legacy: + vhost = None + + for queue in client.get_queues(vhost): + # If queues are defined and it doesn't match, then skip. + if ((queue['name'] not in allowed_queues and + len(allowed_queues) > 0)): + continue + if matchers and any( + [m.match(queue['name']) for m in matchers]): + continue + for key in queue: + prefix = "queues" + if not legacy: + prefix = "vhosts.%s.%s" % (vhost_name, "queues") + + queue_name = queue['name'] + if self.config['replace_dot']: + queue_name = queue_name.replace( + '.', self.config['replace_dot']) + + if self.config['replace_slash']: + queue_name = queue_name.replace( + '/', self.config['replace_slash']) + + name = '{0}.{1}'.format(prefix, queue_name) + + self._publish_metrics(name, [], key, queue) + + overview = client.get_overview() + for key in overview: + self._publish_metrics('', [], key, overview) + except Exception, e: + self.log.error('An error occurred collecting from RabbitMQ, %s', e) + return {} + + def _publish_metrics(self, name, prev_keys, key, data): + """Recursively publish keys""" + value = data[key] + keys = prev_keys + [key] + if isinstance(value, dict): + for new_key in value: + self._publish_metrics(name, keys, new_key, value) + elif isinstance(value, (float, int, long)): + joined_keys = '.'.join(keys) + if name: + publish_key = '{0}.{1}'.format(name, joined_keys) + else: + publish_key = joined_keys + if isinstance(value, bool): + value = int(value) + + self.publish(publish_key, value) diff --git a/modules/rabbitmq/files/rabbitmqadmin b/modules/rabbitmq/files/rabbitmqadmin new file mode 100644 index 0000000..2f5e255 --- /dev/null +++ b/modules/rabbitmq/files/rabbitmqadmin @@ -0,0 +1,965 @@ +#!/usr/bin/env python + +# The contents of this file are subject to the Mozilla Public License +# Version 1.1 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# http://www.mozilla.org/MPL/ +# +# Software distributed under the License is distributed on an "AS IS" +# basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +# License for the specific language governing rights and limitations +# under the License. +# +# The Original Code is RabbitMQ Management Plugin. +# +# The Initial Developer of the Original Code is GoPivotal, Inc. +# Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved. + +import sys +if sys.version_info[0] < 2 or (sys.version_info[0] == 2 and sys.version_info[1] < 6): + print("Sorry, rabbitmqadmin requires at least Python 2.6.") + sys.exit(1) + +from optparse import OptionParser, TitledHelpFormatter +import urllib +import base64 +import json +import os +import socket + +if sys.version_info[0] == 2: + from ConfigParser import ConfigParser, NoSectionError + import httplib + import urlparse + from urllib import quote_plus + def b64(s): + return base64.b64encode(s) +else: + from configparser import ConfigParser, NoSectionError + import http.client as httplib + import urllib.parse as urlparse + from urllib.parse import quote_plus + def b64(s): + return base64.b64encode(s.encode('utf-8')).decode('utf-8') + +VERSION = '%%VSN%%' + +LISTABLE = {'connections': {'vhost': False, 'cols': ['name','user','channels']}, + 'channels': {'vhost': False, 'cols': ['name', 'user']}, + 'consumers': {'vhost': True}, + 'exchanges': {'vhost': True, 'cols': ['name', 'type']}, + 'queues': {'vhost': True, 'cols': ['name', 'messages']}, + 'bindings': {'vhost': True, 'cols': ['source', 'destination', + 'routing_key']}, + 'users': {'vhost': False}, + 'vhosts': {'vhost': False, 'cols': ['name', 'messages']}, + 'permissions': {'vhost': False}, + 'nodes': {'vhost': False, 'cols': ['name','type','mem_used']}, + 'parameters': {'vhost': False, 'json': ['value']}, + 'policies': {'vhost': False, 'json': ['definition']}} + +SHOWABLE = {'overview': {'vhost': False, 'cols': ['rabbitmq_version', + 'cluster_name', + 'queue_totals.messages', + 'object_totals.queues']}} + +PROMOTE_COLUMNS = ['vhost', 'name', 'type', + 'source', 'destination', 'destination_type', 'routing_key'] + +URIS = { + 'exchange': '/exchanges/{vhost}/{name}', + 'queue': '/queues/{vhost}/{name}', + 'binding': '/bindings/{vhost}/e/{source}/{destination_char}/{destination}', + 'binding_del':'/bindings/{vhost}/e/{source}/{destination_char}/{destination}/{properties_key}', + 'vhost': '/vhosts/{name}', + 'user': '/users/{name}', + 'permission': '/permissions/{vhost}/{user}', + 'parameter': '/parameters/{component}/{vhost}/{name}', + 'policy': '/policies/{vhost}/{name}' + } + +DECLARABLE = { + 'exchange': {'mandatory': ['name', 'type'], + 'json': ['arguments'], + 'optional': {'auto_delete': 'false', 'durable': 'true', + 'internal': 'false', 'arguments': {}}}, + 'queue': {'mandatory': ['name'], + 'json': ['arguments'], + 'optional': {'auto_delete': 'false', 'durable': 'true', + 'arguments': {}, 'node': None}}, + 'binding': {'mandatory': ['source', 'destination'], + 'json': ['arguments'], + 'optional': {'destination_type': 'queue', + 'routing_key': '', 'arguments': {}}}, + 'vhost': {'mandatory': ['name'], + 'optional': {'tracing': None}}, + 'user': {'mandatory': ['name', 'password', 'tags'], + 'optional': {}}, + 'permission': {'mandatory': ['vhost', 'user', 'configure', 'write', 'read'], + 'optional': {}}, + 'parameter': {'mandatory': ['component', 'name', 'value'], + 'json': ['value'], + 'optional': {}}, + # Priority is 'json' to convert to int + 'policy': {'mandatory': ['name', 'pattern', 'definition'], + 'json': ['definition', 'priority'], + 'optional': {'priority' : 0, 'apply-to': None}} + } + +DELETABLE = { + 'exchange': {'mandatory': ['name']}, + 'queue': {'mandatory': ['name']}, + 'binding': {'mandatory': ['source', 'destination_type', 'destination', + 'properties_key']}, + 'vhost': {'mandatory': ['name']}, + 'user': {'mandatory': ['name']}, + 'permission': {'mandatory': ['vhost', 'user']}, + 'parameter': {'mandatory': ['component', 'name']}, + 'policy': {'mandatory': ['name']} + } + +CLOSABLE = { + 'connection': {'mandatory': ['name'], + 'optional': {}, + 'uri': '/connections/{name}'} + } + +PURGABLE = { + 'queue': {'mandatory': ['name'], + 'optional': {}, + 'uri': '/queues/{vhost}/{name}/contents'} + } + +EXTRA_VERBS = { + 'publish': {'mandatory': ['routing_key'], + 'optional': {'payload': None, + 'properties': {}, + 'exchange': 'amq.default', + 'payload_encoding': 'string'}, + 'json': ['properties'], + 'uri': '/exchanges/{vhost}/{exchange}/publish'}, + 'get': {'mandatory': ['queue'], + 'optional': {'count': '1', 'requeue': 'true', + 'payload_file': None, 'encoding': 'auto'}, + 'uri': '/queues/{vhost}/{queue}/get'} +} + +for k in DECLARABLE: + DECLARABLE[k]['uri'] = URIS[k] + +for k in DELETABLE: + DELETABLE[k]['uri'] = URIS[k] + DELETABLE[k]['optional'] = {} +DELETABLE['binding']['uri'] = URIS['binding_del'] + +def short_usage(): + return "rabbitmqadmin [options] subcommand" + +def title(name): + return "\n%s\n%s\n\n" % (name, '=' * len(name)) + +def subcommands_usage(): + usage = """Usage +===== + """ + short_usage() + """ + + where subcommand is one of: +""" + title("Display") + + for l in LISTABLE: + usage += " list {0} [<column>...]\n".format(l) + for s in SHOWABLE: + usage += " show {0} [<column>...]\n".format(s) + usage += title("Object Manipulation") + usage += fmt_usage_stanza(DECLARABLE, 'declare') + usage += fmt_usage_stanza(DELETABLE, 'delete') + usage += fmt_usage_stanza(CLOSABLE, 'close') + usage += fmt_usage_stanza(PURGABLE, 'purge') + usage += title("Broker Definitions") + usage += """ export <file> + import <file> +""" + usage += title("Publishing and Consuming") + usage += fmt_usage_stanza(EXTRA_VERBS, '') + usage += """ + * If payload is not specified on publish, standard input is used + + * If payload_file is not specified on get, the payload will be shown on + standard output along with the message metadata + + * If payload_file is specified on get, count must not be set +""" + return usage + +def config_usage(): + usage = "Usage\n=====\n" + short_usage() + usage += "\n" + title("Configuration File") + usage += """ It is possible to specify a configuration file from the command line. + Hosts can be configured easily in a configuration file and called + from the command line. +""" + usage += title("Example") + usage += """ # rabbitmqadmin.conf.example START + + [host_normal] + hostname = localhost + port = 15672 + username = guest + password = guest + declare_vhost = / # Used as default for declare / delete only + vhost = / # Used as default for declare / delete / list + + [host_ssl] + hostname = otherhost + port = 15672 + username = guest + password = guest + ssl = True + ssl_key_file = /path/to/key.pem + ssl_cert_file = /path/to/cert.pem + + # rabbitmqadmin.conf.example END +""" + usage += title("Use") + usage += """ rabbitmqadmin -c rabbitmqadmin.conf.example -N host_normal ...""" + return usage + +def more_help(): + return """ +More Help +========= + +For more help use the help subcommand: + + rabbitmqadmin help subcommands # For a list of available subcommands + rabbitmqadmin help config # For help with the configuration file +""" + +def fmt_usage_stanza(root, verb): + def fmt_args(args): + res = " ".join(["{0}=...".format(a) for a in args['mandatory']]) + opts = " ".join("{0}=...".format(o) for o in args['optional'].keys()) + if opts != "": + res += " [{0}]".format(opts) + return res + + text = "" + if verb != "": + verb = " " + verb + for k in root.keys(): + text += " {0} {1} {2}\n".format(verb, k, fmt_args(root[k])) + return text + +default_options = { "hostname" : "localhost", + "port" : "15672", + "declare_vhost" : "/", + "username" : "guest", + "password" : "guest", + "ssl" : False, + "verbose" : True, + "format" : "table", + "depth" : 1, + "bash_completion" : False } + + +class MyFormatter(TitledHelpFormatter): + def format_epilog(self, epilog): + return epilog + +parser = OptionParser(usage=short_usage(), + formatter=MyFormatter(), + epilog=more_help()) + +def make_parser(): + def add(*args, **kwargs): + key = kwargs['dest'] + if key in default_options: + default = " [default: %s]" % default_options[key] + kwargs['help'] = kwargs['help'] + default + parser.add_option(*args, **kwargs) + + add("-c", "--config", dest="config", + help="configuration file [default: ~/.rabbitmqadmin.conf]", + metavar="CONFIG") + add("-N", "--node", dest="node", + help="node described in the configuration file [default: 'default'" + \ + " only if configuration file is specified]", + metavar="NODE") + add("-H", "--host", dest="hostname", + help="connect to host HOST" , + metavar="HOST") + add("-P", "--port", dest="port", + help="connect to port PORT", + metavar="PORT") + add("-V", "--vhost", dest="vhost", + help="connect to vhost VHOST [default: all vhosts for list, '/' for declare]", + metavar="VHOST") + add("-u", "--username", dest="username", + help="connect using username USERNAME", + metavar="USERNAME") + add("-p", "--password", dest="password", + help="connect using password PASSWORD", + metavar="PASSWORD") + add("-q", "--quiet", action="store_false", dest="verbose", + help="suppress status messages") + add("-s", "--ssl", action="store_true", dest="ssl", + help="connect with ssl") + add("--ssl-key-file", dest="ssl_key_file", + help="PEM format key file for SSL") + add("--ssl-cert-file", dest="ssl_cert_file", + help="PEM format certificate file for SSL") + add("-f", "--format", dest="format", + help="format for listing commands - one of [" + ", ".join(FORMATS.keys()) + "]") + add("-S", "--sort", dest="sort", help="sort key for listing queries") + add("-R", "--sort-reverse", action="store_true", dest="sort_reverse", + help="reverse the sort order") + add("-d", "--depth", dest="depth", + help="maximum depth to recurse for listing tables") + add("--bash-completion", action="store_true", + dest="bash_completion", + help="Print bash completion script") + add("--version", action="store_true", + dest="version", + help="Display version and exit") + +def default_config(): + home = os.getenv('USERPROFILE') or os.getenv('HOME') + if home is not None: + config_file = home + os.sep + ".rabbitmqadmin.conf" + if os.path.isfile(config_file): + return config_file + return None + +def make_configuration(): + make_parser() + (options, args) = parser.parse_args() + setattr(options, "declare_vhost", None) + if options.version: + print_version() + if options.config is None: + config_file = default_config() + if config_file is not None: + setattr(options, "config", config_file) + else: + if not os.path.isfile(options.config): + assert_usage(False, + "Could not read config file '%s'" % options.config) + + if options.node is None and options.config: + options.node = "default" + else: + options.node = options.node + for (key, val) in default_options.items(): + if getattr(options, key) is None: + setattr(options, key, val) + + if options.config is not None: + config = ConfigParser() + try: + config.read(options.config) + new_conf = dict(config.items(options.node)) + except NoSectionError as error: + if options.node == "default": + pass + else: + assert_usage(False, ("Could not read section '%s' in config file" + + " '%s':\n %s") % + (options.node, options.config, error)) + else: + for key, val in new_conf.items(): + setattr(options, key, val) + + return (options, args) + +def assert_usage(expr, error): + if not expr: + output("\nERROR: {0}\n".format(error)) + output("{0} --help for help\n".format(os.path.basename(sys.argv[0]))) + sys.exit(1) + +def print_version(): + output("rabbitmqadmin {0}".format(VERSION)) + sys.exit(0) + +def column_sort_key(col): + if col in PROMOTE_COLUMNS: + return (1, PROMOTE_COLUMNS.index(col)) + else: + return (2, col) + +def main(): + (options, args) = make_configuration() + if options.bash_completion: + print_bash_completion() + exit(0) + assert_usage(len(args) > 0, 'Action not specified') + mgmt = Management(options, args[1:]) + mode = "invoke_" + args[0] + assert_usage(hasattr(mgmt, mode), + 'Action {0} not understood'.format(args[0])) + method = getattr(mgmt, "invoke_%s" % args[0]) + method() + +def output(s): + print(maybe_utf8(s, sys.stdout)) + +def die(s): + sys.stderr.write(maybe_utf8("*** {0}\n".format(s), sys.stderr)) + exit(1) + +def maybe_utf8(s, stream): + if sys.version_info[0] == 3 or stream.isatty(): + # It will have an encoding, which Python will respect + return s + else: + # It won't have an encoding, and Python will pick ASCII by default + return s.encode('utf-8') + +class Management: + def __init__(self, options, args): + self.options = options + self.args = args + + def get(self, path): + return self.http("GET", "/api%s" % path, "") + + def put(self, path, body): + return self.http("PUT", "/api%s" % path, body) + + def post(self, path, body): + return self.http("POST", "/api%s" % path, body) + + def delete(self, path): + return self.http("DELETE", "/api%s" % path, "") + + def http(self, method, path, body): + if self.options.ssl: + conn = httplib.HTTPSConnection(self.options.hostname, + self.options.port, + self.options.ssl_key_file, + self.options.ssl_cert_file) + else: + conn = httplib.HTTPConnection(self.options.hostname, + self.options.port) + auth = (self.options.username + ":" + self.options.password) + + headers = {"Authorization": "Basic " + b64(auth)} + if body != "": + headers["Content-Type"] = "application/json" + try: + conn.request(method, path, body, headers) + except socket.error as e: + die("Could not connect: {0}".format(e)) + resp = conn.getresponse() + if resp.status == 400: + die(json.loads(resp.read())['reason']) + if resp.status == 401: + die("Access refused: {0}".format(path)) + if resp.status == 404: + die("Not found: {0}".format(path)) + if resp.status == 301: + url = urlparse.urlparse(resp.getheader('location')) + [host, port] = url.netloc.split(':') + self.options.hostname = host + self.options.port = int(port) + return self.http(method, url.path + '?' + url.query, body) + if resp.status < 200 or resp.status > 400: + raise Exception("Received %d %s for path %s\n%s" + % (resp.status, resp.reason, path, resp.read())) + return resp.read().decode('utf-8') + + def verbose(self, string): + if self.options.verbose: + output(string) + + def get_arg(self): + assert_usage(len(self.args) == 1, 'Exactly one argument required') + return self.args[0] + + def use_cols(self): + # Deliberately do not cast to int here; we only care about the + # default, not explicit setting. + return self.options.depth == 1 and not 'json' in self.options.format + + def invoke_help(self): + if len(self.args) == 0: + parser.print_help() + else: + help_cmd = self.get_arg() + if help_cmd == 'subcommands': + usage = subcommands_usage() + elif help_cmd == 'config': + usage = config_usage() + else: + assert_usage(False, """help topic must be one of: + subcommands + config""") + print(usage) + exit(0) + + def invoke_publish(self): + (uri, upload) = self.parse_args(self.args, EXTRA_VERBS['publish']) + if not 'payload' in upload: + data = sys.stdin.read() + upload['payload'] = b64(data) + upload['payload_encoding'] = 'base64' + resp = json.loads(self.post(uri, json.dumps(upload))) + if resp['routed']: + self.verbose("Message published") + else: + self.verbose("Message published but NOT routed") + + def invoke_get(self): + (uri, upload) = self.parse_args(self.args, EXTRA_VERBS['get']) + payload_file = 'payload_file' in upload and upload['payload_file'] or None + assert_usage(not payload_file or upload['count'] == '1', + 'Cannot get multiple messages using payload_file') + result = self.post(uri, json.dumps(upload)) + if payload_file: + write_payload_file(payload_file, result) + columns = ['routing_key', 'exchange', 'message_count', + 'payload_bytes', 'redelivered'] + format_list(result, columns, {}, self.options) + else: + format_list(result, [], {}, self.options) + + def invoke_export(self): + path = self.get_arg() + definitions = self.get("/definitions") + f = open(path, 'w') + f.write(definitions) + f.close() + self.verbose("Exported definitions for %s to \"%s\"" + % (self.options.hostname, path)) + + def invoke_import(self): + path = self.get_arg() + f = open(path, 'r') + definitions = f.read() + f.close() + self.post("/definitions", definitions) + self.verbose("Imported definitions for %s from \"%s\"" + % (self.options.hostname, path)) + + def invoke_list(self): + (uri, obj_info, cols) = self.list_show_uri(LISTABLE, 'list') + format_list(self.get(uri), cols, obj_info, self.options) + + def invoke_show(self): + (uri, obj_info, cols) = self.list_show_uri(SHOWABLE, 'show') + format_list('[{0}]'.format(self.get(uri)), cols, obj_info, self.options) + + def list_show_uri(self, obj_types, verb): + obj_type = self.args[0] + assert_usage(obj_type in obj_types, + "Don't know how to {0} {1}".format(verb, obj_type)) + obj_info = obj_types[obj_type] + uri = "/%s" % obj_type + query = [] + if obj_info['vhost'] and self.options.vhost: + uri += "/%s" % quote_plus(self.options.vhost) + cols = self.args[1:] + if cols == [] and 'cols' in obj_info and self.use_cols(): + cols = obj_info['cols'] + if cols != []: + query.append("columns=" + ",".join(cols)) + sort = self.options.sort + if sort: + query.append("sort=" + sort) + if self.options.sort_reverse: + query.append("sort_reverse=true") + query = "&".join(query) + if query != "": + uri += "?" + query + return (uri, obj_info, cols) + + def invoke_declare(self): + (obj_type, uri, upload) = self.declare_delete_parse(DECLARABLE) + if obj_type == 'binding': + self.post(uri, json.dumps(upload)) + else: + self.put(uri, json.dumps(upload)) + self.verbose("{0} declared".format(obj_type)) + + def invoke_delete(self): + (obj_type, uri, upload) = self.declare_delete_parse(DELETABLE) + self.delete(uri) + self.verbose("{0} deleted".format(obj_type)) + + def invoke_close(self): + (obj_type, uri, upload) = self.declare_delete_parse(CLOSABLE) + self.delete(uri) + self.verbose("{0} closed".format(obj_type)) + + def invoke_purge(self): + (obj_type, uri, upload) = self.declare_delete_parse(PURGABLE) + self.delete(uri) + self.verbose("{0} purged".format(obj_type)) + + def declare_delete_parse(self, root): + assert_usage(len(self.args) > 0, 'Type not specified') + obj_type = self.args[0] + assert_usage(obj_type in root, + 'Type {0} not recognised'.format(obj_type)) + obj = root[obj_type] + (uri, upload) = self.parse_args(self.args[1:], obj) + return (obj_type, uri, upload) + + def parse_args(self, args, obj): + mandatory = obj['mandatory'] + optional = obj['optional'] + uri_template = obj['uri'] + upload = {} + for k in optional.keys(): + if optional[k] is not None: + upload[k] = optional[k] + for arg in args: + assert_usage("=" in arg, + 'Argument "{0}" not in format name=value'.format(arg)) + (name, value) = arg.split("=", 1) + assert_usage(name in mandatory or name in optional.keys(), + 'Argument "{0}" not recognised'.format(name)) + if 'json' in obj and name in obj['json']: + upload[name] = self.parse_json(value) + else: + upload[name] = value + for m in mandatory: + assert_usage(m in upload.keys(), + 'mandatory argument "{0}" required'.format(m)) + if 'vhost' not in mandatory: + upload['vhost'] = self.options.vhost or self.options.declare_vhost + uri_args = {} + for k in upload: + v = upload[k] + if v and isinstance(v, (str, bytes)): + uri_args[k] = quote_plus(v) + if k == 'destination_type': + uri_args['destination_char'] = v[0] + uri = uri_template.format(**uri_args) + return (uri, upload) + + def parse_json(self, text): + try: + return json.loads(text) + except ValueError: + print("Could not parse JSON:\n {0}".format(text)) + sys.exit(1) + +def format_list(json_list, columns, args, options): + format = options.format + formatter = None + if format == "raw_json": + output(json_list) + return + elif format == "pretty_json": + enc = json.JSONEncoder(False, False, True, True, True, 2) + output(enc.encode(json.loads(json_list))) + return + else: + formatter = FORMATS[format] + assert_usage(formatter != None, + "Format {0} not recognised".format(format)) + formatter_instance = formatter(columns, args, options) + formatter_instance.display(json_list) + +class Lister: + def verbose(self, string): + if self.options.verbose: + output(string) + + def display(self, json_list): + depth = sys.maxsize + if len(self.columns) == 0: + depth = int(self.options.depth) + (columns, table) = self.list_to_table(json.loads(json_list), depth) + if len(table) > 0: + self.display_list(columns, table) + else: + self.verbose("No items") + + def list_to_table(self, items, max_depth): + columns = {} + column_ix = {} + row = None + table = [] + + def add(prefix, depth, item, fun): + for key in item: + column = prefix == '' and key or (prefix + '.' + key) + subitem = item[key] + if type(subitem) == dict: + if 'json' in self.obj_info and key in self.obj_info['json']: + fun(column, json.dumps(subitem)) + else: + if depth < max_depth: + add(column, depth + 1, subitem, fun) + elif type(subitem) == list: + # The first branch has slave nodes in queues in + # mind (which come out looking decent); the second + # one has applications in nodes (which look less + # so, but what would look good?). + if [x for x in subitem if type(x) != str] == []: + serialised = " ".join(subitem) + else: + serialised = json.dumps(subitem) + fun(column, serialised) + else: + fun(column, subitem) + + def add_to_columns(col, val): + columns[col] = True + + def add_to_row(col, val): + if col in column_ix: + row[column_ix[col]] = str(val) + + if len(self.columns) == 0: + for item in items: + add('', 1, item, add_to_columns) + columns = list(columns.keys()) + columns.sort(key=column_sort_key) + else: + columns = self.columns + + for i in range(0, len(columns)): + column_ix[columns[i]] = i + for item in items: + row = len(columns) * [''] + add('', 1, item, add_to_row) + table.append(row) + + return (columns, table) + +class TSVList(Lister): + def __init__(self, columns, obj_info, options): + self.columns = columns + self.obj_info = obj_info + self.options = options + + def display_list(self, columns, table): + head = "\t".join(columns) + self.verbose(head) + + for row in table: + line = "\t".join(row) + output(line) + +class LongList(Lister): + def __init__(self, columns, obj_info, options): + self.columns = columns + self.obj_info = obj_info + self.options = options + + def display_list(self, columns, table): + sep = "\n" + "-" * 80 + "\n" + max_width = 0 + for col in columns: + max_width = max(max_width, len(col)) + fmt = "{0:>" + str(max_width) + "}: {1}" + output(sep) + for i in range(0, len(table)): + for j in range(0, len(columns)): + output(fmt.format(columns[j], table[i][j])) + output(sep) + +class TableList(Lister): + def __init__(self, columns, obj_info, options): + self.columns = columns + self.obj_info = obj_info + self.options = options + + def display_list(self, columns, table): + total = [columns] + total.extend(table) + self.ascii_table(total) + + def ascii_table(self, rows): + table = "" + col_widths = [0] * len(rows[0]) + for i in range(0, len(rows[0])): + for j in range(0, len(rows)): + col_widths[i] = max(col_widths[i], len(rows[j][i])) + self.ascii_bar(col_widths) + self.ascii_row(col_widths, rows[0], "^") + self.ascii_bar(col_widths) + for row in rows[1:]: + self.ascii_row(col_widths, row, "<") + self.ascii_bar(col_widths) + + def ascii_row(self, col_widths, row, align): + txt = "|" + for i in range(0, len(col_widths)): + fmt = " {0:" + align + str(col_widths[i]) + "} " + txt += fmt.format(row[i]) + "|" + output(txt) + + def ascii_bar(self, col_widths): + txt = "+" + for w in col_widths: + txt += ("-" * (w + 2)) + "+" + output(txt) + +class KeyValueList(Lister): + def __init__(self, columns, obj_info, options): + self.columns = columns + self.obj_info = obj_info + self.options = options + + def display_list(self, columns, table): + for i in range(0, len(table)): + row = [] + for j in range(0, len(columns)): + row.append("{0}=\"{1}\"".format(columns[j], table[i][j])) + output(" ".join(row)) + +# TODO handle spaces etc in completable names +class BashList(Lister): + def __init__(self, columns, obj_info, options): + self.columns = columns + self.obj_info = obj_info + self.options = options + + def display_list(self, columns, table): + ix = None + for i in range(0, len(columns)): + if columns[i] == 'name': + ix = i + if ix is not None: + res = [] + for row in table: + res.append(row[ix]) + output(" ".join(res)) + +FORMATS = { + 'raw_json' : None, # Special cased + 'pretty_json' : None, # Ditto + 'tsv' : TSVList, + 'long' : LongList, + 'table' : TableList, + 'kvp' : KeyValueList, + 'bash' : BashList +} + +def write_payload_file(payload_file, json_list): + result = json.loads(json_list)[0] + payload = result['payload'] + payload_encoding = result['payload_encoding'] + f = open(payload_file, 'w') + if payload_encoding == 'base64': + data = base64.b64decode(payload) + else: + data = payload + f.write(data) + f.close() + +def print_bash_completion(): + script = """# This is a bash completion script for rabbitmqadmin. +# Redirect it to a file, then source it or copy it to /etc/bash_completion.d +# to get tab completion. rabbitmqadmin must be on your PATH for this to work. +_rabbitmqadmin() +{ + local cur prev opts base + COMPREPLY=() + cur="${COMP_WORDS[COMP_CWORD]}" + prev="${COMP_WORDS[COMP_CWORD-1]}" + + opts="list show declare delete close purge import export get publish help" + fargs="--help --host --port --vhost --username --password --format --depth --sort --sort-reverse" + + case "${prev}" in + list) + COMPREPLY=( $(compgen -W '""" + " ".join(LISTABLE) + """' -- ${cur}) ) + return 0 + ;; + show) + COMPREPLY=( $(compgen -W '""" + " ".join(SHOWABLE) + """' -- ${cur}) ) + return 0 + ;; + declare) + COMPREPLY=( $(compgen -W '""" + " ".join(DECLARABLE.keys()) + """' -- ${cur}) ) + return 0 + ;; + delete) + COMPREPLY=( $(compgen -W '""" + " ".join(DELETABLE.keys()) + """' -- ${cur}) ) + return 0 + ;; + close) + COMPREPLY=( $(compgen -W '""" + " ".join(CLOSABLE.keys()) + """' -- ${cur}) ) + return 0 + ;; + purge) + COMPREPLY=( $(compgen -W '""" + " ".join(PURGABLE.keys()) + """' -- ${cur}) ) + return 0 + ;; + export) + COMPREPLY=( $(compgen -f ${cur}) ) + return 0 + ;; + import) + COMPREPLY=( $(compgen -f ${cur}) ) + return 0 + ;; + help) + opts="subcommands config" + COMPREPLY=( $(compgen -W "${opts}" -- ${cur}) ) + return 0 + ;; + -H) + COMPREPLY=( $(compgen -A hostname ${cur}) ) + return 0 + ;; + --host) + COMPREPLY=( $(compgen -A hostname ${cur}) ) + return 0 + ;; + -V) + opts="$(rabbitmqadmin -q -f bash list vhosts)" + COMPREPLY=( $(compgen -W "${opts}" -- ${cur}) ) + return 0 + ;; + --vhost) + opts="$(rabbitmqadmin -q -f bash list vhosts)" + COMPREPLY=( $(compgen -W "${opts}" -- ${cur}) ) + return 0 + ;; + -u) + opts="$(rabbitmqadmin -q -f bash list users)" + COMPREPLY=( $(compgen -W "${opts}" -- ${cur}) ) + return 0 + ;; + --username) + opts="$(rabbitmqadmin -q -f bash list users)" + COMPREPLY=( $(compgen -W "${opts}" -- ${cur}) ) + return 0 + ;; + -f) + COMPREPLY=( $(compgen -W \"""" + " ".join(FORMATS.keys()) + """\" -- ${cur}) ) + return 0 + ;; + --format) + COMPREPLY=( $(compgen -W \"""" + " ".join(FORMATS.keys()) + """\" -- ${cur}) ) + return 0 + ;; + +""" + for l in LISTABLE: + key = l[0:len(l) - 1] + script += " " + key + """) + opts="$(rabbitmqadmin -q -f bash list """ + l + """)" + COMPREPLY=( $(compgen -W "${opts}" -- ${cur}) ) + return 0 + ;; +""" + script += """ *) + ;; + esac + + COMPREPLY=($(compgen -W "${opts} ${fargs}" -- ${cur})) + return 0 +} +complete -F _rabbitmqadmin rabbitmqadmin +""" + output(script) + +if __name__ == "__main__": + main() diff --git a/modules/rabbitmq/manifests/init.pp b/modules/rabbitmq/manifests/init.pp new file mode 100644 index 0000000..ed9b276 --- /dev/null +++ b/modules/rabbitmq/manifests/init.pp @@ -0,0 +1,49 @@ +# https://www.rabbitmq.com/ +# +# User MAC's are not handled by Puppet +# +# Changing a user password +# rabbitmqctl change_password <user> <password> +# Adding a user +# rabbitmqctl add_user <user> <password> +# +# Creating user "<user>" ... +# rabbitmqctl change_password <user> <password> +# rabbitmqctl set_user_tags <user> administrator +# Setting tags for user "<user>" to [administrator] ... +# rabbitmqctl set_permissions -p / <user> ".*" ".*" ".*" +# +# The management plugin may be desired +# rabbitmq-plugins enable rabbitmq_management + +class rabbitmq( + $file_handles='1024', + ) { + + package { [ 'rabbitmq-server' ]: + ensure => present, + } + + file { '/etc/default/rabbitmq-server': + ensure => present, + owner => 'root', + group => 'root', + mode => '0444', + content => template('rabbitmq/rabbitmq-server.default.erb'), + require => Package['rabbitmq-server'], + notify => Service['rabbitmq-server'], + } + + file { '/usr/local/sbin/rabbitmqadmin': + ensure => present, + owner => 'root', + group => 'root', + mode => '0655', + source => 'puppet:///modules/rabbitmq/rabbitmqadmin', + } + + service { 'rabbitmq-server': + ensure => running, + require => Package['rabbitmq-server'], + } +} diff --git a/modules/rabbitmq/manifests/monitor.pp b/modules/rabbitmq/manifests/monitor.pp new file mode 100644 index 0000000..54f8f66 --- /dev/null +++ b/modules/rabbitmq/manifests/monitor.pp @@ -0,0 +1,14 @@ +class rabbitmq::monitor( + $rabbit_monitor_username, + $rabbit_monitor_pass, + $rabbit_host='localhost:15672', + ) { + diamond::collector { 'RabbitMQ': + settings => { + 'host' => $rabbit_host, + 'user' => $rabbit_monitor_username, + 'password' => $rabbit_monitor_password, + }, + source => 'puppet:///modules/openstack/rabbitmq/rabbitmq.py', + } +} diff --git a/modules/rabbitmq/templates/rabbitmq-server.default.erb b/modules/rabbitmq/templates/rabbitmq-server.default.erb new file mode 100644 index 0000000..e1c348a --- /dev/null +++ b/modules/rabbitmq/templates/rabbitmq-server.default.erb @@ -0,0 +1,18 @@ +##################################################################### +### THIS FILE IS MANAGED BY PUPPET +### puppet:///modules/openstack/rabbitmq/labs-rabbitmq.default +##################################################################### + + +# This file is sourced by /etc/init.d/rabbitmq-server. Its primary +# reason for existing is to allow adjustment of system limits for the +# rabbitmq-server process. +# +# Default file handles limit is 1024. We need a handle +# for every worker thread of every service on every node +# which shouldn't add up to 1024 but Rabbit has been dying +# and this is an inexpensive thing to try. +# +# Many big production OS installs have this set to 16k or even 64k. +# +ulimit -n <%= @file_handles %> diff --git a/modules/role/manifests/wmcloud/openstack/control.pp b/modules/role/manifests/wmcloud/openstack/control.pp new file mode 100644 index 0000000..7608340 --- /dev/null +++ b/modules/role/manifests/wmcloud/openstack/control.pp @@ -0,0 +1,3 @@ +class role::wmcloud::openstack::control { + include ::profile::rabbitmq::server +} -- To view, visit https://gerrit.wikimedia.org/r/365868 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: If2a5b78856104d9b75d4d3e2c83923628fb2d3be Gerrit-PatchSet: 1 Gerrit-Project: operations/puppet Gerrit-Branch: production Gerrit-Owner: Rush <r...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits