Hi Gary, ACK from me.
Best Regards, ThuanTr -----Original Message----- From: Gary Lee <gary....@dektech.com.au> Sent: Tuesday, October 1, 2019 9:54 AM To: hans.nordeb...@ericsson.com; minh.c...@dektech.com.au; anders.wid...@ericsson.com Cc: opensaf-devel@lists.sourceforge.net Subject: [devel] [PATCH 1/1] osaf: add tcp arbitrator [#3064] --- src/osaf/consensus/plugins/tcp/README | 41 ++ src/osaf/consensus/plugins/tcp/certificate.pem | 20 + src/osaf/consensus/plugins/tcp/key.pem | 28 ++ src/osaf/consensus/plugins/tcp/tcp.plugin | 520 +++++++++++++++++++++++++ src/osaf/consensus/plugins/tcp/tcp_server.py | 157 ++++++++ 5 files changed, 766 insertions(+) create mode 100644 src/osaf/consensus/plugins/tcp/README create mode 100644 src/osaf/consensus/plugins/tcp/certificate.pem create mode 100644 src/osaf/consensus/plugins/tcp/key.pem create mode 100755 src/osaf/consensus/plugins/tcp/tcp.plugin create mode 100755 src/osaf/consensus/plugins/tcp/tcp_server.py diff --git a/src/osaf/consensus/plugins/tcp/README b/src/osaf/consensus/plugins/tcp/README new file mode 100644 index 0000000..6f739e8 --- /dev/null +++ b/src/osaf/consensus/plugins/tcp/README @@ -0,0 +1,41 @@ +TCP arbitrator + +The TCP arbitrator may be useful for deployments where deploying etcd +is not feasible. An example arbitrator is provided to help prevent +split brain in clusters that contain up to 2 system controllers. + +The example arbitrator is a simple python based program that can be +deployed on a single payload or a node external to the cluster. + +Two main pieces of information are stored on the arbitrator: the +hostname of the current active controller and a heartbeat timestamp. + +An active controller sends a heartbeat to the controller every 100ms +using TLs over a persistent TCP connection. It should self-fence if it +is unable to heartbeat, as it is likely to be separated from the arbitrator. + +A candidate active controller must check the existing controller is not +heartbeating before promoting itself active. On a cluster using TIPC, +the timeout value is the TIPC link tolerance timeout. On a TCP based +cluster, the timeout is calculated from FMS_TAKEOVER_REQUEST_VALID_TIME. + +Suggested fmd.conf configuration: + +export FMS_SPLIT_BRAIN_PREVENTION=1 +export FMS_KEYVALUE_STORE_PLUGIN_CMD=/full/path/to/tcp.plugin +export FMS_TAKEOVER_PRIORITISE_PARTITION_SIZE=0 (any other setting is +ignored) export FMS_RELAXED_NODE_PROMOTION=1 + +The above settings will allow a controller to be elected active during +cluster startup, even if the arbitrator is not yet running. +If the arbitrator becomes temporarily unavailable, the controllers will +remain running if they can see each other. If an active controller +becomes isolated from the standby *and* the arbitrator, it will +self-fence and the standby will become active (if located in the same +network partition as the arbitrator). + +The provided self-signed certificate is an example only, and was generated using: + +openssl req -newkey rsa:2048 -nodes -keyout key.pem -x509 -days 100000 +-out certificate.pem + +It must be replaced in an actual deployment!! diff --git a/src/osaf/consensus/plugins/tcp/certificate.pem b/src/osaf/consensus/plugins/tcp/certificate.pem new file mode 100644 index 0000000..e0b4993 --- /dev/null +++ b/src/osaf/consensus/plugins/tcp/certificate.pem @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDUTCCAjmgAwIBAgIJANrPYThNMllvMA0GCSqGSIb3DQEBCwUAMD4xCzAJBgNV +BAYTAkFVMQ4wDAYDVQQIDAVTdGF0ZTENMAsGA1UEBwwEQ2l0eTEQMA4GA1UECgwH +T3BlblNBRjAgFw0xOTA5MzAwMDMxNTRaGA8yMjkzMDcxNTAwMzE1NFowPjELMAkG +A1UEBhMCQVUxDjAMBgNVBAgMBVN0YXRlMQ0wCwYDVQQHDARDaXR5MRAwDgYDVQQK +DAdPcGVuU0FGMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA5pCFKYnS ++pi0gzrRWPRYg1sak9VpNK+MkKbj+m0bptRt/8JvosV62js4q5Da3ldq2AAcEJyf +gd02YZ4HUDdCMgMtlWT1CAx89rNpozRwyj5g+4cfmOqiz7ApeZ9yqltInjG720DT +lam2/R4/00zmFGAqD2ZGPiOY93bjYx+GhtiHcDvpJuZS2Z2vQ/Dd09v6Omhus0rZ +WMrENyfavc7HwFv2z/qi4Hsb/Aa9ZuAXUKp1Q2cvC0XWdRJMdZaZfGUlTfY6X8ar +hSnswHJJKIjBq/0jYpztntOubceOuGVyezxPVXPw5qiBLO7ZyYNgN9IMoF6Rbu9y +K1O1MvPw3ShlDQIDAQABo1AwTjAdBgNVHQ4EFgQU7UCcR6MgV5c5JXjCHpwcUC+9 +HIAwHwYDVR0jBBgwFoAU7UCcR6MgV5c5JXjCHpwcUC+9HIAwDAYDVR0TBAUwAwEB +/zANBgkqhkiG9w0BAQsFAAOCAQEAAOP3iMgjCx8JNKevOSq24mGcWAqlX0iHP0/1 +hl7Dd/xRQywM90NfrMmiNTgO9Yyw1rOEKoeM4BFM/qs854iEHpAa7vlcW1ZidvHz +eMQZA2Y6+AZ9zyt41bRJGqkqW7YdKVl9yuqWHcFBqBKf1pUsvt0bkab5EZFOBPuB +tmKsODrU7cN1qeA1wjINZiOa88Kkh2YxkRoi7tL8NIMp2E40NLS3M5+xLEE8LKTH +ouhReM4eEfGfzE171NPe/kzRRp+ujNZwmyQ8xmWp6jPjfD7Mfqdf1WYjspiGzziQ +R/cdEHHAWq+wZrfG1aB5/yU4iA0h8xR8PNfVHjAjuUn4N6tSFg== +-----END CERTIFICATE----- diff --git a/src/osaf/consensus/plugins/tcp/key.pem b/src/osaf/consensus/plugins/tcp/key.pem new file mode 100644 index 0000000..66b8bcb --- /dev/null +++ b/src/osaf/consensus/plugins/tcp/key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDmkIUpidL6mLSD +OtFY9FiDWxqT1Wk0r4yQpuP6bRum1G3/wm+ixXraOzirkNreV2rYABwQnJ+B3TZh +ngdQN0IyAy2VZPUIDHz2s2mjNHDKPmD7hx+Y6qLPsCl5n3KqW0ieMbvbQNOVqbb9 +Hj/TTOYUYCoPZkY+I5j3duNjH4aG2IdwO+km5lLZna9D8N3T2/o6aG6zStlYysQ3 +J9q9zsfAW/bP+qLgexv8Br1m4BdQqnVDZy8LRdZ1Ekx1lpl8ZSVN9jpfxquFKezA +ckkoiMGr/SNinO2e065tx464ZXJ7PE9Vc/DmqIEs7tnJg2A30gygXpFu73IrU7Uy +8/DdKGUNAgMBAAECggEBAIPsr3T8Go8u7yKjdgPbFAZdC5EJLIBr7hcalxnEcmz7 +4dDU9UGCk2/pMNziLonIJSdgsK5En/QTmjkyzeZ1J9gr/1obASVQ1/Pk5o3uxJbE +KIPjZg3+O5hwqNqhhF+7iYqS1HV421goWr7sHwMNXhzMhWR4VbrabX0zNpqSAEyI +5LKVbekGbmHT8zPePH8a4wUAoSi0ExA1A+q58pljlE2TQlz3PDInO3UtPNGHaFYR +oJD05bAGzXdnEmezBauKZN6v+sVzehOK2f5/HEA4ajTnDoOlOe1CjkOjhvsYKxxz +/tauXrfUscftn0yX4bUxrT+z5b6ObTavaGoh/f5KjGECgYEA9c3XlOeoxT6GhWxX ++PyAwsWu29aBZKNcUTgWvIULr9hZkhWSRocWJZiruUxzw0qlsq16j88Z1m0Tsiin +3hMFuQNSeGZDbTpJHBctAwykAkp5vQthHkA6umkjcptgsvSHvROWMBM1v0WTpmc4 +5y2I9/91Sp1XEpqIoUo9K1tnAgkCgYEA8CDaAcaNlp2Q/QSxQaZgvRseOkZljapu +z2gGFWms3EXQMHJVIN78brHjCMD/KTdGSWQ1QVpNX4tVJeQB/rM0Zby7IvQ03hTi +l93Est8/AJ940ndnTMu/qGjR1PIvldEVSQjjTrv7SI1gKYuEWwkhDT8DYuM8D+39 +OiR8sBUPu+UCgYEA8FQDHT7nP3sjsZ494V6YUldP4Pe46Xnj0DFj4Yy/4X1KDk/z +BewcVkQQWosKgH4ixjFhrOvTmlhpsn6XqsS0irFZ2Ag8krYzNzjdtKaOUQMaRiCz +Iw2vngUgOHT8tdvqP47AAF835LyVYVR5SWa3DasCtiJiOPlI71ITvqmObGkCgYAF +hYvdzJYG55qk5s24p73Du3LnfiTprAieRlMVfPeXnRdbddWVSutdFEQXOHDlnrrE +B0TeOthaoRaVJ3gQRkinKj7XX+wzSyGmwle6kT2eowwhMtOyRWEj2z0v+12ywTsP +EeBAODxf/7g9XFLv0Pbsmg1W4cjIyP0wsBQZ7HIPLQKBgQDP35F9aSHz53D2+/fD +WoTdLfkH7rIAKUtlffp8fQzbm9onRx7LC/iJ1GjQQyKsZaJSX6+C4kNlWUgxQHo0 +6uGPtbS6kKUOzYOAMKO3GPeqskwwLS4ocZeq4BnsrwOsXiDAkACcEZS6QZI6tgsv +0ZCcum4WK037YSkrI1IFBUlKXA== +-----END PRIVATE KEY----- diff --git a/src/osaf/consensus/plugins/tcp/tcp.plugin b/src/osaf/consensus/plugins/tcp/tcp.plugin new file mode 100755 index 0000000..1b5ddf5 --- /dev/null +++ b/src/osaf/consensus/plugins/tcp/tcp.plugin @@ -0,0 +1,520 @@ +#!/usr/bin/env python3 +""" + -*- OpenSAF -*- + +(C) Copyright 2019 Ericsson AB 2019 - All Rights Reserved. + +This program is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. This file and +program are licensed under the GNU Lesser General Public License Version 2.1, February 1999. +The complete license can be accessed from the following location: +http://opensource.org/licenses/lgpl-license.php +See the Copying file included with the OpenSAF distribution for full +licensing terms. + +""" + +import argparse +import os +import socket +import ssl +import sys +import time +import xmlrpc.client +import syslog + + +class ArbitratorPlugin(object): + """ This class represents a TCP Plugin """ + + def __init__(self): + # Replace with actual URL + self.uri = "https://test:test@arbitrator:6666/" + # Warning: certificate verification is disabled + self.proxy = xmlrpc.client.ServerProxy( + self.uri, context=ssl._create_unverified_context()) + + # Get tipc link tolerance timeout if OpenSAF in used with TIPC + link_tolerance = self.get_tipc_tolerance() + self.timeout = link_tolerance + + # TODO make configurable? default to 1s + self.socket_timeout = 1 + socket.setdefaulttimeout(self.socket_timeout) + + # TODO make configurable? default to 100ms + self.heartbeat_interval = 0.1 + + # constants + self.keyname = 'opensaf_consensus_lock' + self.takeover_request = 'takeover_request' + self.testkey = "opensaf_write_test" + + # location where opensaf node_name is defined + self.node_name_file = '/etc/opensaf/node_name' + self.hostname = self.get_node_name() + + def run(self): + """ Main method to handle input and output """ + result = self.parse_cmd() + if 'output' in result: + print(result['output']) + sys.exit(result['code']) + + def parse_cmd(self): + """ Handle input parameters from user """ + parser = argparse.ArgumentParser(description="Arbitration Plugin") + sub_parser = parser.add_subparsers( + dest="command", + help="Execute script with this command" + ) + + parser_get = sub_parser.add_parser( + "get", + help="Retrieve <value> of <key> in tcp database" + ) + parser_get.add_argument("key", metavar="<key>") + + parser_set = sub_parser.add_parser( + "set", + help="Set <key> to <value> in tcp database" + ) + parser_set.add_argument("key", metavar="<key>") + parser_set.add_argument("value", metavar="<value>") + parser_set.add_argument("timeout", metavar="<timeout>", + type=int) + + parser_create = sub_parser.add_parser( + "create", + help="Create <key> and set to <value>" + " for <timeout> second in tcp database" + ) + parser_create.add_argument("key", metavar="<key>") + parser_create.add_argument("value", metavar="<value>") + parser_create.add_argument("timeout", metavar="<timeout>", + type=int) + + parser_set_if_prev = sub_parser.add_parser( + "set_if_prev", + help="Set <key> to <value> if the existing value matches <prev>" + ) + parser_set_if_prev.add_argument("key", metavar="<key>") + parser_set_if_prev.add_argument("value", metavar="<value>") + parser_set_if_prev.add_argument("prev", metavar="<prev>") + parser_set_if_prev.add_argument( + "timeout", metavar="<timeout>", type=int) + + parser_lock = sub_parser.add_parser( + "lock", + help="Set <owner> of the lock," + " automatically unlock after <timeout>" + ) + parser_lock.add_argument("owner", metavar="<owner>") + parser_lock.add_argument("timeout", metavar="<timeout>", + type=int) + + parser_unlock = sub_parser.add_parser( + "unlock", + help="Unlock the lock from <owner>" + ) + parser_unlock.add_argument("owner", metavar="<owner>") + parser_unlock.add_argument( + "--force", + help="force to unlock the lock even if lock isn't held by <owner>", + action='store_true' + ) + + sub_parser.add_parser( + "lock_owner", + help="Retrieve owner of lock" + ) + + parser_watch = sub_parser.add_parser( + "watch", + help="Watch events stream on <key>" + ) + parser_watch.add_argument("key", metavar="<key>") + + sub_parser.add_parser( + "watch_lock", + help="Watch events stream on lock" + ) + + parser_erase = sub_parser.add_parser( + "erase", + help="Remove the <key> from tcp database" + ) + parser_erase.add_argument("key", metavar="<key>") + + args = vars(parser.parse_args()) + command = args.pop('command') + params = [] + if args: + params.append(args) + return getattr(self, command)(*params) + + def get_node_name(self): + node_file = open(self.node_name_file) + node_name = node_file.readline().strip() + node_file.close() + return node_name + + def get_tipc_tolerance(self): + """ Retrieve tipc link tolerance timeout """ + # Get bearer + bearer = os.popen('tipc bearer list').read().strip() + if bearer is not None and len(bearer) > 0: + tokens = bearer.split(":") + tol_to = os.popen('tipc bearer get tolerance media ' + '%s device %s' % + (tokens[0], tokens[1])).read() + return float(tol_to) / 1000 + + takeover_timeout = os.environ.get( + 'FMS_TAKEOVER_REQUEST_VALID_TIME') + if takeover_timeout is not None and int(takeover_timeout) > 0: + return int(takeover_timeout) / 2 + + # default to 5 seconds + return 5 + + def is_active(self): + """ Should this node be active according to RDE """ + state = os.popen('rdegetrole').read().strip() + return bool(state == 'ACTIVE') + + def get(self, params): + """ Retrieve value of key in tcp database + + Args: + params (dict): contains key from user input + + Returns: + dict: result with format + { + 'code': <return code>, + 'output': <value of the key> + } + * code = 0: success + * code = 1: invalid param + * code = 2: other failure + """ + ret = {'code': 1, 'output': ''} + + # takeover_request will not be used + return ret + + def set(self, params): + """ Set key to value in tcp database + + Args: + params (dict): contains key, value from user input + + Returns: + dict: result with format + { + 'code': <return code> + } + * code = 0: success + * code = non-zero: failure + """ + + ret = {'code': 1} + key = params['key'] + value = params['value'] + # params['timeout'] + + if key == self.testkey: + try: + result = self.proxy.set(key, value) + except socket.error: + result = False + if result: + ret['code'] = 0 + + return ret + + def create(self, params): + """ Create key and set to value for timeout second in tcp + database + + Args: + params (dict): contains key, value, timeout + from user input + + Returns: + dict: result with format + { + 'code': <return code> + } + * code = 0: success + * code = 1: already exist + * code = 2: invalid param + * code = 3: other failure + """ + ret = {'code': 2} + key = params['key'] + + if key == self.takeover_request: + # takeover_request will not be used + ret['code'] = 2 + return ret + + def set_if_prev(self, params): + """ Set key to value if the existing value matches previous + value + + Args: + params (dict): contains key (str), + value (str), prev (str) from user input + + Returns: + dict: result with format + { + 'code': <return code> + } + * code = 0: success + * code = non-zero: failure + """ + ret = {'code': 1} + key = params['key'] + # params['value'] + # params['timeout'] + + if key == self.takeover_request: + # takeover_request will not be used + # pretend it worked + ret['code'] = 0 + + return ret + + def lock(self, params): + """ Set owner of the lock, automatically unlock after timeout + + Args: + params (dict): contains owner (str), + timeout (integer) from user input + + Returns: + dict: result with format + { + 'code': <return code>, + 'output': <current owner of the lock> + } + * code = 0: success + * code = 1: lock is owned by someone else + * code = 2 or above: other failure + + NOTE: if lock is already acquired by owner, then timeout is extended + TODO: timeout not yet implemented + """ + ret = {'code': 0} + owner = params['owner'] + owner_up = False + + try: + result = self.proxy.create(self.keyname, owner) + if result is False: + current_owner = self.proxy.get(self.keyname) + if current_owner == owner: + syslog.syslog("lock is already held by this node") + return ret + original_timestamp = self.proxy.get(current_owner) + if original_timestamp: + original_timestamp = int(original_timestamp) + else: + original_timestamp = 0 + # check if the owner is updating the arbitrator + while True: + # get current time from arbitrator + time_at_arb = self.proxy.heartbeat(self.hostname) + # last time the current owner heartbeated + last_timestamp = self.proxy.get(current_owner) + if last_timestamp > original_timestamp: + # owner is updating arbitrator OK + syslog.syslog("current active is heartbeating") + owner_up = True + break + elif time_at_arb > (original_timestamp + self.timeout): + syslog.syslog("current active is not heartbeating") + # more than 'timeout' seconds have elapsed since + # the last update, we can assume the old + # active is not heartbeating + owner_up = False + break + time.sleep(self.heartbeat_interval) + if owner_up: + ret['code'] = 1 + else: + # owner is not updating arbitrator + self.proxy.delete(self.keyname) + return self.lock(params) + else: + syslog.syslog("obtained lock at arbitrator") + except socket.error: + syslog.syslog("socket error") + ret['code'] = 2 + + return ret + + def unlock(self, params): + """ Unlock the lock from owner + + Args: + params (dict): contains owner (str), + force (boolean) from user input + * force = True: force to unlock the lock even if + owner don't match with current owner + + Returns: + dict: result with format + { + 'code': <return code>, + 'output': <current owner of the lock> + } + * code = 0: success + * code = 1: lock is owned by someone else + * code = 2 or above: other failure + """ + ret = {'code': 0} + owner = params['owner'] + + # remove 'owner.lock' + try: + current_owner = self.proxy.get(self.keyname) + if current_owner == owner: + result = self.proxy.delete(self.keyname) + if result is False: + ret['code'] = 2 + else: + ret['code'] = 1 + except socket.error: + ret['code'] = 2 + + return ret + + def lock_owner(self): + """ Retrieve owner of lock + + Returns: + dict: result with format + { + 'code': <return code>, + 'output': <current owner of the lock> + } + * code = 0: success + * code = non-zero: failure + """ + ret = {'code': 0, 'output': 'unknown'} + + try: + current_owner = self.proxy.get(self.keyname) + ret['output'] = current_owner + except socket.error: + ret['code'] = 1 + + return ret + + def watch_lock(self): + """ Watch events stream on lock + + Returns: + dict: result with format + { + 'code': <return code>, + 'output': <new value of the lock> + } + * code = 0: success + * code = non-zero: failure + """ + return self.watch({'key': self.keyname}) + + def erase(self, params): + """ Remove the key from tcp database + + Args: + params (dict): contains key from user input + + Returns: + dict: result with format + { + 'code': <return code> + } + * code = 0: success + * code = non-zero: failure + """ + ret = {'code': 1} + key = params['key'] + + if key == self.takeover_request: + # takeover_request will not be used + # pretend it worked + ret['code'] = 0 + + return ret + + def watch(self, params): + """ Watch events stream on key + + Args: + params (dict): contains key from user input + + Returns: + dict: result with format + { + 'code': <return code>, + 'output': <new value of the key> + } + * code = 0: success + * code = non-zero: failure + """ + ret = {'code': 0, 'output': 'unknown'} + key = params['key'] + + if key != self.keyname and key != self.takeover_request: + # no other key is supported + ret['code'] = 2 + return ret + + last_arb_timestamp = 0 + while True: + if key == self.takeover_request: + if self.is_active() is False: + # maybe a controller swap occurred + break + while True: + try: + time_at_arb = self.proxy.heartbeat(self.hostname) + if last_arb_timestamp == 0: + last_arb_timestamp = time_at_arb + break + elif (time_at_arb - last_arb_timestamp) > self.timeout: + # VM was frozen? + syslog.syslog('VM was frozen!') + ret['code'] = 126 + return ret + else: + last_arb_timestamp = time_at_arb + break + except socket.error: + # can't heartbeat, need to self-fence (if peer down) + syslog.syslog('cannot heartbeat, inform rded') + ret['output'] = self.hostname + \ + ' SC-0 10000000 UNDEFINED' + return ret + elif key == self.keyname: + try: + current_owner = self.proxy.get(self.keyname) + if not current_owner: + if self.is_active() is False: + # a switchover occurred + break + # maybe the arbitrator restarted + self.proxy.create(self.keyname, self.hostname) + elif current_owner != self.hostname: + ret['output'] = current_owner + break + except socket.error: + pass + time.sleep(self.heartbeat_interval) + return ret + + +if __name__ == '__main__': + ArbitratorPlugin().run() diff --git a/src/osaf/consensus/plugins/tcp/tcp_server.py b/src/osaf/consensus/plugins/tcp/tcp_server.py new file mode 100755 index 0000000..a7f22f2 --- /dev/null +++ b/src/osaf/consensus/plugins/tcp/tcp_server.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +""" + -*- OpenSAF -*- + +(C) Copyright 2019 Ericsson AB 2019 - All Rights Reserved. + +This program is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. This file and +program are licensed under the GNU Lesser General Public License Version 2.1, February 1999. +The complete license can be accessed from the following location: +http://opensource.org/licenses/lgpl-license.php +See the Copying file included with the OpenSAF distribution for full +licensing terms. + +Simple TCP arbitrator +""" + +import os +import socket +from base64 import b64decode +from socketserver import ThreadingMixIn from xmlrpc.server import +SimpleXMLRPCServer, SimpleXMLRPCRequestHandler import ssl import sys +import threading import time + +# Warning: example only, replace with signed certificate # Assumes pem +files are located in the same directory as this file DIRPATH = +os.path.dirname(os.path.realpath(__file__)) +CERTFILE = DIRPATH + '/certificate.pem' +KEYFILE = DIRPATH + '/key.pem' +USERNAME = 'test' +PASSWORD = 'test' + + +class RequestHandler(SimpleXMLRPCRequestHandler): + """ Allow keep alives """ + protocol_version = "HTTP/1.1" + + def parse_request(self): + if SimpleXMLRPCRequestHandler.parse_request(self): + if self.auth(self.headers): + return True + else: + self.send_error(401, 'Unauthorized') + return False + + def auth(self, headers): + (basic, _, encoded) = headers.get('Authorization').partition(' ') + assert basic == 'Basic', 'Only basic authentication supported' + decoded_bytes = b64decode(encoded) + decoded_string = decoded_bytes.decode() + (username, _, password) = decoded_string.partition(':') + if username == USERNAME and password == PASSWORD: + return True + return False + + +class ThreadedRPCServer(ThreadingMixIn, + SimpleXMLRPCServer): + """ Add thread support """ + def __init__(self, bind_address, bind_port, + requestHandler): + # IPv6 is supported + self.address_family = socket.getaddrinfo(bind_address, bind_port)[0][0] + SimpleXMLRPCServer.__init__(self, (bind_address, bind_port), + requestHandler, logRequests=False) + self.socket = ssl.wrap_socket( + socket.socket(self.address_family, self.socket_type), + server_side=True, + certfile=CERTFILE, + keyfile=KEYFILE, + cert_reqs=ssl.CERT_NONE, + ssl_version=ssl.PROTOCOL_TLSv1_2) + self.server_bind() + self.server_activate() + + +class Arbitrator(object): + """ Implementation of a simple arbitrator """ + + def __init__(self): + self.port = 6666 + self.kv_pairs = {} + self.mutex = threading.Lock() + + def heartbeat(self, key): + timestamp = int(time.time()) + self.mutex.acquire() + self.kv_pairs[key] = timestamp + self.mutex.release() + return timestamp + + def set(self, key, value): + self.mutex.acquire() + self.kv_pairs[key] = value + self.mutex.release() + return True + + def get(self, key): + value = "" + self.mutex.acquire() + if key in self.kv_pairs: + value = self.kv_pairs[key] + self.mutex.release() + return value + + def set_if_prev(self, key, prev_value, new_value): + result = False + self.mutex.acquire() + if key in self.kv_pairs: + value = self.kv_pairs[key] + if value == prev_value: + self.kv_pairs[key] = new_value + result = True + self.mutex.release() + return result + + def create(self, key, value): + result = False + self.mutex.acquire() + if key not in self.kv_pairs: + self.kv_pairs[key] = value + result = True + self.mutex.release() + return result + + def delete(self, key): + result = False + self.mutex.acquire() + if key in self.kv_pairs: + del self.kv_pairs[key] + result = True + self.mutex.release() + return result + + def run(self): + hostname = "::0.0.0.0" + server = ThreadedRPCServer(hostname, self.port, + requestHandler=RequestHandler) + print("Listening on port %d" % self.port) + server.register_multicall_functions() + server.register_function(self.heartbeat, 'heartbeat') + server.register_function(self.set, 'set') + server.register_function(self.get, 'get') + server.register_function(self.set_if_prev, 'set_if_prev') + server.register_function(self.create, 'create') + server.register_function(self.delete, 'delete') + server.serve_forever() + + sys.exit(0) + + +if __name__ == '__main__': + Arbitrator().run() -- 2.7.4 _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel