Hi Hans OK that’s a good idea.
thanks > On 4 Oct 2019, at 11:06 pm, Hans Nordebäck <hans.nordeb...@ericsson.com> > wrote: > > Hi Gary, ack, review only. One comment/suggestion can we provide a > small script that generates the x509 certificate (use e.g. openssl X509 > ... ) instead of including a self signed cert? /BR Hans >> On Tue, 2019-10-01 at 12:53 +1000, Gary Lee wrote: >> --- >> src/osaf/consensus/plugins/tcp/README | 41 ++ >> src/osaf/consensus/plugins/tcp/certificate.pem | 20 + >> src/osaf/consensus/plugins/tcp/key.pem | 28 ++ >> src/osaf/consensus/plugins/tcp/tcp.plugin | 520 >> +++++++++++++++++++++++++ >> src/osaf/consensus/plugins/tcp/tcp_server.py | 157 ++++++++ >> 5 files changed, 766 insertions(+) >> create mode 100644 src/osaf/consensus/plugins/tcp/README >> create mode 100644 src/osaf/consensus/plugins/tcp/certificate.pem >> create mode 100644 src/osaf/consensus/plugins/tcp/key.pem >> create mode 100755 src/osaf/consensus/plugins/tcp/tcp.plugin >> create mode 100755 src/osaf/consensus/plugins/tcp/tcp_server.py >> >> diff --git a/src/osaf/consensus/plugins/tcp/README >> b/src/osaf/consensus/plugins/tcp/README >> new file mode 100644 >> index 0000000..6f739e8 >> --- /dev/null >> +++ b/src/osaf/consensus/plugins/tcp/README >> @@ -0,0 +1,41 @@ >> +TCP arbitrator >> + >> +The TCP arbitrator may be useful for deployments where deploying >> etcd is not >> +feasible. An example arbitrator is provided to help prevent split >> brain in >> +clusters that contain up to 2 system controllers. >> + >> +The example arbitrator is a simple python based program that can be >> deployed on >> +a single payload or a node external to the cluster. >> + >> +Two main pieces of information are stored on the arbitrator: the >> hostname of the >> +current active controller and a heartbeat timestamp. >> + >> +An active controller sends a heartbeat to the controller every 100ms >> using TLs >> +over a persistent TCP connection. It should self-fence if it is >> unable to >> +heartbeat, as it is likely to be separated from the arbitrator. >> + >> +A candidate active controller must check the existing controller is >> not >> +heartbeating before promoting itself active. On a cluster using >> TIPC, >> +the timeout value is the TIPC link tolerance timeout. On a TCP based >> cluster, >> +the timeout is calculated from FMS_TAKEOVER_REQUEST_VALID_TIME. >> + >> +Suggested fmd.conf configuration: >> + >> +export FMS_SPLIT_BRAIN_PREVENTION=1 >> +export FMS_KEYVALUE_STORE_PLUGIN_CMD=/full/path/to/tcp.plugin >> +export FMS_TAKEOVER_PRIORITISE_PARTITION_SIZE=0 (any other setting >> is ignored) >> +export FMS_RELAXED_NODE_PROMOTION=1 >> + >> +The above settings will allow a controller to be elected active >> during >> +cluster startup, even if the arbitrator is not yet running. >> +If the arbitrator becomes temporarily unavailable, the controllers >> will >> +remain running if they can see each other. If an active controller >> becomes >> +isolated from the standby *and* the arbitrator, it will self-fence >> and the >> +standby will become active (if located in the same network partition >> as >> +the arbitrator). >> + >> +The provided self-signed certificate is an example only, and was >> generated using: >> + >> +openssl req -newkey rsa:2048 -nodes -keyout key.pem -x509 -days >> 100000 -out certificate.pem >> + >> +It must be replaced in an actual deployment!! >> diff --git a/src/osaf/consensus/plugins/tcp/certificate.pem >> b/src/osaf/consensus/plugins/tcp/certificate.pem >> new file mode 100644 >> index 0000000..e0b4993 >> --- /dev/null >> +++ b/src/osaf/consensus/plugins/tcp/certificate.pem >> @@ -0,0 +1,20 @@ >> +-----BEGIN CERTIFICATE----- >> +MIIDUTCCAjmgAwIBAgIJANrPYThNMllvMA0GCSqGSIb3DQEBCwUAMD4xCzAJBgNV >> +BAYTAkFVMQ4wDAYDVQQIDAVTdGF0ZTENMAsGA1UEBwwEQ2l0eTEQMA4GA1UECgwH >> +T3BlblNBRjAgFw0xOTA5MzAwMDMxNTRaGA8yMjkzMDcxNTAwMzE1NFowPjELMAkG >> +A1UEBhMCQVUxDjAMBgNVBAgMBVN0YXRlMQ0wCwYDVQQHDARDaXR5MRAwDgYDVQQK >> +DAdPcGVuU0FGMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA5pCFKYnS >> ++pi0gzrRWPRYg1sak9VpNK+MkKbj+m0bptRt/8JvosV62js4q5Da3ldq2AAcEJyf >> +gd02YZ4HUDdCMgMtlWT1CAx89rNpozRwyj5g+4cfmOqiz7ApeZ9yqltInjG720DT >> +lam2/R4/00zmFGAqD2ZGPiOY93bjYx+GhtiHcDvpJuZS2Z2vQ/Dd09v6Omhus0rZ >> +WMrENyfavc7HwFv2z/qi4Hsb/Aa9ZuAXUKp1Q2cvC0XWdRJMdZaZfGUlTfY6X8ar >> +hSnswHJJKIjBq/0jYpztntOubceOuGVyezxPVXPw5qiBLO7ZyYNgN9IMoF6Rbu9y >> +K1O1MvPw3ShlDQIDAQABo1AwTjAdBgNVHQ4EFgQU7UCcR6MgV5c5JXjCHpwcUC+9 >> +HIAwHwYDVR0jBBgwFoAU7UCcR6MgV5c5JXjCHpwcUC+9HIAwDAYDVR0TBAUwAwEB >> +/zANBgkqhkiG9w0BAQsFAAOCAQEAAOP3iMgjCx8JNKevOSq24mGcWAqlX0iHP0/1 >> +hl7Dd/xRQywM90NfrMmiNTgO9Yyw1rOEKoeM4BFM/qs854iEHpAa7vlcW1ZidvHz >> +eMQZA2Y6+AZ9zyt41bRJGqkqW7YdKVl9yuqWHcFBqBKf1pUsvt0bkab5EZFOBPuB >> +tmKsODrU7cN1qeA1wjINZiOa88Kkh2YxkRoi7tL8NIMp2E40NLS3M5+xLEE8LKTH >> +ouhReM4eEfGfzE171NPe/kzRRp+ujNZwmyQ8xmWp6jPjfD7Mfqdf1WYjspiGzziQ >> +R/cdEHHAWq+wZrfG1aB5/yU4iA0h8xR8PNfVHjAjuUn4N6tSFg== >> +-----END CERTIFICATE----- >> diff --git a/src/osaf/consensus/plugins/tcp/key.pem >> b/src/osaf/consensus/plugins/tcp/key.pem >> new file mode 100644 >> index 0000000..66b8bcb >> --- /dev/null >> +++ b/src/osaf/consensus/plugins/tcp/key.pem >> @@ -0,0 +1,28 @@ >> +-----BEGIN PRIVATE KEY----- >> +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDmkIUpidL6mLSD >> +OtFY9FiDWxqT1Wk0r4yQpuP6bRum1G3/wm+ixXraOzirkNreV2rYABwQnJ+B3TZh >> +ngdQN0IyAy2VZPUIDHz2s2mjNHDKPmD7hx+Y6qLPsCl5n3KqW0ieMbvbQNOVqbb9 >> +Hj/TTOYUYCoPZkY+I5j3duNjH4aG2IdwO+km5lLZna9D8N3T2/o6aG6zStlYysQ3 >> +J9q9zsfAW/bP+qLgexv8Br1m4BdQqnVDZy8LRdZ1Ekx1lpl8ZSVN9jpfxquFKezA >> +ckkoiMGr/SNinO2e065tx464ZXJ7PE9Vc/DmqIEs7tnJg2A30gygXpFu73IrU7Uy >> +8/DdKGUNAgMBAAECggEBAIPsr3T8Go8u7yKjdgPbFAZdC5EJLIBr7hcalxnEcmz7 >> +4dDU9UGCk2/pMNziLonIJSdgsK5En/QTmjkyzeZ1J9gr/1obASVQ1/Pk5o3uxJbE >> +KIPjZg3+O5hwqNqhhF+7iYqS1HV421goWr7sHwMNXhzMhWR4VbrabX0zNpqSAEyI >> +5LKVbekGbmHT8zPePH8a4wUAoSi0ExA1A+q58pljlE2TQlz3PDInO3UtPNGHaFYR >> +oJD05bAGzXdnEmezBauKZN6v+sVzehOK2f5/HEA4ajTnDoOlOe1CjkOjhvsYKxxz >> +/tauXrfUscftn0yX4bUxrT+z5b6ObTavaGoh/f5KjGECgYEA9c3XlOeoxT6GhWxX >> ++PyAwsWu29aBZKNcUTgWvIULr9hZkhWSRocWJZiruUxzw0qlsq16j88Z1m0Tsiin >> +3hMFuQNSeGZDbTpJHBctAwykAkp5vQthHkA6umkjcptgsvSHvROWMBM1v0WTpmc4 >> +5y2I9/91Sp1XEpqIoUo9K1tnAgkCgYEA8CDaAcaNlp2Q/QSxQaZgvRseOkZljapu >> +z2gGFWms3EXQMHJVIN78brHjCMD/KTdGSWQ1QVpNX4tVJeQB/rM0Zby7IvQ03hTi >> +l93Est8/AJ940ndnTMu/qGjR1PIvldEVSQjjTrv7SI1gKYuEWwkhDT8DYuM8D+39 >> +OiR8sBUPu+UCgYEA8FQDHT7nP3sjsZ494V6YUldP4Pe46Xnj0DFj4Yy/4X1KDk/z >> +BewcVkQQWosKgH4ixjFhrOvTmlhpsn6XqsS0irFZ2Ag8krYzNzjdtKaOUQMaRiCz >> +Iw2vngUgOHT8tdvqP47AAF835LyVYVR5SWa3DasCtiJiOPlI71ITvqmObGkCgYAF >> +hYvdzJYG55qk5s24p73Du3LnfiTprAieRlMVfPeXnRdbddWVSutdFEQXOHDlnrrE >> +B0TeOthaoRaVJ3gQRkinKj7XX+wzSyGmwle6kT2eowwhMtOyRWEj2z0v+12ywTsP >> +EeBAODxf/7g9XFLv0Pbsmg1W4cjIyP0wsBQZ7HIPLQKBgQDP35F9aSHz53D2+/fD >> +WoTdLfkH7rIAKUtlffp8fQzbm9onRx7LC/iJ1GjQQyKsZaJSX6+C4kNlWUgxQHo0 >> +6uGPtbS6kKUOzYOAMKO3GPeqskwwLS4ocZeq4BnsrwOsXiDAkACcEZS6QZI6tgsv >> +0ZCcum4WK037YSkrI1IFBUlKXA== >> +-----END PRIVATE KEY----- >> diff --git a/src/osaf/consensus/plugins/tcp/tcp.plugin >> b/src/osaf/consensus/plugins/tcp/tcp.plugin >> new file mode 100755 >> index 0000000..1b5ddf5 >> --- /dev/null >> +++ b/src/osaf/consensus/plugins/tcp/tcp.plugin >> @@ -0,0 +1,520 @@ >> +#!/usr/bin/env python3 >> +""" >> + -*- OpenSAF -*- >> + >> +(C) Copyright 2019 Ericsson AB 2019 - All Rights Reserved. >> + >> +This program is distributed in the hope that it will be useful, but >> +WITHOUT ANY WARRANTY; without even the implied warranty of >> MERCHANTABILITY >> +or FITNESS FOR A PARTICULAR PURPOSE. This file and program are >> licensed >> +under the GNU Lesser General Public License Version 2.1, February >> 1999. >> +The complete license can be accessed from the following location: >> + >> https://protect2.fireeye.com/url?k=e5558a57-b9dc5010-e555cacc-0cc47ad93e96-95f7c31077941b95&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php >> +See the Copying file included with the OpenSAF distribution for full >> +licensing terms. >> + >> +""" >> + >> +import argparse >> +import os >> +import socket >> +import ssl >> +import sys >> +import time >> +import xmlrpc.client >> +import syslog >> + >> + >> +class ArbitratorPlugin(object): >> + """ This class represents a TCP Plugin """ >> + >> + def __init__(self): >> + # Replace with actual URL >> + self.uri = "https://test:test@arbitrator:6666/" >> + # Warning: certificate verification is disabled >> + self.proxy = xmlrpc.client.ServerProxy( >> + self.uri, context=ssl._create_unverified_context()) >> + >> + # Get tipc link tolerance timeout if OpenSAF in used with >> TIPC >> + link_tolerance = self.get_tipc_tolerance() >> + self.timeout = link_tolerance >> + >> + # TODO make configurable? default to 1s >> + self.socket_timeout = 1 >> + socket.setdefaulttimeout(self.socket_timeout) >> + >> + # TODO make configurable? default to 100ms >> + self.heartbeat_interval = 0.1 >> + >> + # constants >> + self.keyname = 'opensaf_consensus_lock' >> + self.takeover_request = 'takeover_request' >> + self.testkey = "opensaf_write_test" >> + >> + # location where opensaf node_name is defined >> + self.node_name_file = '/etc/opensaf/node_name' >> + self.hostname = self.get_node_name() >> + >> + def run(self): >> + """ Main method to handle input and output """ >> + result = self.parse_cmd() >> + if 'output' in result: >> + print(result['output']) >> + sys.exit(result['code']) >> + >> + def parse_cmd(self): >> + """ Handle input parameters from user """ >> + parser = argparse.ArgumentParser(description="Arbitration >> Plugin") >> + sub_parser = parser.add_subparsers( >> + dest="command", >> + help="Execute script with this command" >> + ) >> + >> + parser_get = sub_parser.add_parser( >> + "get", >> + help="Retrieve <value> of <key> in tcp database" >> + ) >> + parser_get.add_argument("key", metavar="<key>") >> + >> + parser_set = sub_parser.add_parser( >> + "set", >> + help="Set <key> to <value> in tcp database" >> + ) >> + parser_set.add_argument("key", metavar="<key>") >> + parser_set.add_argument("value", metavar="<value>") >> + parser_set.add_argument("timeout", metavar="<timeout>", >> type=int) >> + >> + parser_create = sub_parser.add_parser( >> + "create", >> + help="Create <key> and set to <value>" >> + " for <timeout> second in tcp database" >> + ) >> + parser_create.add_argument("key", metavar="<key>") >> + parser_create.add_argument("value", metavar="<value>") >> + parser_create.add_argument("timeout", metavar="<timeout>", >> type=int) >> + >> + parser_set_if_prev = sub_parser.add_parser( >> + "set_if_prev", >> + help="Set <key> to <value> if the existing value matches >> <prev>" >> + ) >> + parser_set_if_prev.add_argument("key", metavar="<key>") >> + parser_set_if_prev.add_argument("value", metavar="<value>") >> + parser_set_if_prev.add_argument("prev", metavar="<prev>") >> + parser_set_if_prev.add_argument( >> + "timeout", metavar="<timeout>", type=int) >> + >> + parser_lock = sub_parser.add_parser( >> + "lock", >> + help="Set <owner> of the lock," >> + " automatically unlock after <timeout>" >> + ) >> + parser_lock.add_argument("owner", metavar="<owner>") >> + parser_lock.add_argument("timeout", metavar="<timeout>", >> type=int) >> + >> + parser_unlock = sub_parser.add_parser( >> + "unlock", >> + help="Unlock the lock from <owner>" >> + ) >> + parser_unlock.add_argument("owner", metavar="<owner>") >> + parser_unlock.add_argument( >> + "--force", >> + help="force to unlock the lock even if lock isn't held >> by <owner>", >> + action='store_true' >> + ) >> + >> + sub_parser.add_parser( >> + "lock_owner", >> + help="Retrieve owner of lock" >> + ) >> + >> + parser_watch = sub_parser.add_parser( >> + "watch", >> + help="Watch events stream on <key>" >> + ) >> + parser_watch.add_argument("key", metavar="<key>") >> + >> + sub_parser.add_parser( >> + "watch_lock", >> + help="Watch events stream on lock" >> + ) >> + >> + parser_erase = sub_parser.add_parser( >> + "erase", >> + help="Remove the <key> from tcp database" >> + ) >> + parser_erase.add_argument("key", metavar="<key>") >> + >> + args = vars(parser.parse_args()) >> + command = args.pop('command') >> + params = [] >> + if args: >> + params.append(args) >> + return getattr(self, command)(*params) >> + >> + def get_node_name(self): >> + node_file = open(self.node_name_file) >> + node_name = node_file.readline().strip() >> + node_file.close() >> + return node_name >> + >> + def get_tipc_tolerance(self): >> + """ Retrieve tipc link tolerance timeout """ >> + # Get bearer >> + bearer = os.popen('tipc bearer list').read().strip() >> + if bearer is not None and len(bearer) > 0: >> + tokens = bearer.split(":") >> + tol_to = os.popen('tipc bearer get tolerance media ' >> + '%s device %s' % >> + (tokens[0], tokens[1])).read() >> + return float(tol_to) / 1000 >> + >> + takeover_timeout = os.environ.get( >> + 'FMS_TAKEOVER_REQUEST_VALID_TIME') >> + if takeover_timeout is not None and int(takeover_timeout) > >> 0: >> + return int(takeover_timeout) / 2 >> + >> + # default to 5 seconds >> + return 5 >> + >> + def is_active(self): >> + """ Should this node be active according to RDE """ >> + state = os.popen('rdegetrole').read().strip() >> + return bool(state == 'ACTIVE') >> + >> + def get(self, params): >> + """ Retrieve value of key in tcp database >> + >> + Args: >> + params (dict): contains key from user input >> + >> + Returns: >> + dict: result with format >> + { >> + 'code': <return code>, >> + 'output': <value of the key> >> + } >> + * code = 0: success >> + * code = 1: invalid param >> + * code = 2: other failure >> + """ >> + ret = {'code': 1, 'output': ''} >> + >> + # takeover_request will not be used >> + return ret >> + >> + def set(self, params): >> + """ Set key to value in tcp database >> + >> + Args: >> + params (dict): contains key, value from user input >> + >> + Returns: >> + dict: result with format >> + { >> + 'code': <return code> >> + } >> + * code = 0: success >> + * code = non-zero: failure >> + """ >> + >> + ret = {'code': 1} >> + key = params['key'] >> + value = params['value'] >> + # params['timeout'] >> + >> + if key == self.testkey: >> + try: >> + result = self.proxy.set(key, value) >> + except socket.error: >> + result = False >> + if result: >> + ret['code'] = 0 >> + >> + return ret >> + >> + def create(self, params): >> + """ Create key and set to value for timeout second in tcp >> database >> + >> + Args: >> + params (dict): contains key, value, timeout >> + from user input >> + >> + Returns: >> + dict: result with format >> + { >> + 'code': <return code> >> + } >> + * code = 0: success >> + * code = 1: already exist >> + * code = 2: invalid param >> + * code = 3: other failure >> + """ >> + ret = {'code': 2} >> + key = params['key'] >> + >> + if key == self.takeover_request: >> + # takeover_request will not be used >> + ret['code'] = 2 >> + return ret >> + >> + def set_if_prev(self, params): >> + """ Set key to value if the existing value matches previous >> value >> + >> + Args: >> + params (dict): contains key (str), >> + value (str), prev (str) from user input >> + >> + Returns: >> + dict: result with format >> + { >> + 'code': <return code> >> + } >> + * code = 0: success >> + * code = non-zero: failure >> + """ >> + ret = {'code': 1} >> + key = params['key'] >> + # params['value'] >> + # params['timeout'] >> + >> + if key == self.takeover_request: >> + # takeover_request will not be used >> + # pretend it worked >> + ret['code'] = 0 >> + >> + return ret >> + >> + def lock(self, params): >> + """ Set owner of the lock, automatically unlock after >> timeout >> + >> + Args: >> + params (dict): contains owner (str), >> + timeout (integer) from user input >> + >> + Returns: >> + dict: result with format >> + { >> + 'code': <return code>, >> + 'output': <current owner of the lock> >> + } >> + * code = 0: success >> + * code = 1: lock is owned by someone else >> + * code = 2 or above: other failure >> + >> + NOTE: if lock is already acquired by owner, then timeout is >> extended >> + TODO: timeout not yet implemented >> + """ >> + ret = {'code': 0} >> + owner = params['owner'] >> + owner_up = False >> + >> + try: >> + result = self.proxy.create(self.keyname, owner) >> + if result is False: >> + current_owner = self.proxy.get(self.keyname) >> + if current_owner == owner: >> + syslog.syslog("lock is already held by this >> node") >> + return ret >> + original_timestamp = self.proxy.get(current_owner) >> + if original_timestamp: >> + original_timestamp = int(original_timestamp) >> + else: >> + original_timestamp = 0 >> + # check if the owner is updating the arbitrator >> + while True: >> + # get current time from arbitrator >> + time_at_arb = >> self.proxy.heartbeat(self.hostname) >> + # last time the current owner heartbeated >> + last_timestamp = self.proxy.get(current_owner) >> + if last_timestamp > original_timestamp: >> + # owner is updating arbitrator OK >> + syslog.syslog("current active is >> heartbeating") >> + owner_up = True >> + break >> + elif time_at_arb > (original_timestamp + >> self.timeout): >> + syslog.syslog("current active is not >> heartbeating") >> + # more than 'timeout' seconds have elapsed >> since >> + # the last update, we can assume the old >> + # active is not heartbeating >> + owner_up = False >> + break >> + time.sleep(self.heartbeat_interval) >> + if owner_up: >> + ret['code'] = 1 >> + else: >> + # owner is not updating arbitrator >> + self.proxy.delete(self.keyname) >> + return self.lock(params) >> + else: >> + syslog.syslog("obtained lock at arbitrator") >> + except socket.error: >> + syslog.syslog("socket error") >> + ret['code'] = 2 >> + >> + return ret >> + >> + def unlock(self, params): >> + """ Unlock the lock from owner >> + >> + Args: >> + params (dict): contains owner (str), >> + force (boolean) from user input >> + * force = True: force to unlock the lock even if >> + owner don't match with current owner >> + >> + Returns: >> + dict: result with format >> + { >> + 'code': <return code>, >> + 'output': <current owner of the lock> >> + } >> + * code = 0: success >> + * code = 1: lock is owned by someone else >> + * code = 2 or above: other failure >> + """ >> + ret = {'code': 0} >> + owner = params['owner'] >> + >> + # remove 'owner.lock' >> + try: >> + current_owner = self.proxy.get(self.keyname) >> + if current_owner == owner: >> + result = self.proxy.delete(self.keyname) >> + if result is False: >> + ret['code'] = 2 >> + else: >> + ret['code'] = 1 >> + except socket.error: >> + ret['code'] = 2 >> + >> + return ret >> + >> + def lock_owner(self): >> + """ Retrieve owner of lock >> + >> + Returns: >> + dict: result with format >> + { >> + 'code': <return code>, >> + 'output': <current owner of the lock> >> + } >> + * code = 0: success >> + * code = non-zero: failure >> + """ >> + ret = {'code': 0, 'output': 'unknown'} >> + >> + try: >> + current_owner = self.proxy.get(self.keyname) >> + ret['output'] = current_owner >> + except socket.error: >> + ret['code'] = 1 >> + >> + return ret >> + >> + def watch_lock(self): >> + """ Watch events stream on lock >> + >> + Returns: >> + dict: result with format >> + { >> + 'code': <return code>, >> + 'output': <new value of the lock> >> + } >> + * code = 0: success >> + * code = non-zero: failure >> + """ >> + return self.watch({'key': self.keyname}) >> + >> + def erase(self, params): >> + """ Remove the key from tcp database >> + >> + Args: >> + params (dict): contains key from user input >> + >> + Returns: >> + dict: result with format >> + { >> + 'code': <return code> >> + } >> + * code = 0: success >> + * code = non-zero: failure >> + """ >> + ret = {'code': 1} >> + key = params['key'] >> + >> + if key == self.takeover_request: >> + # takeover_request will not be used >> + # pretend it worked >> + ret['code'] = 0 >> + >> + return ret >> + >> + def watch(self, params): >> + """ Watch events stream on key >> + >> + Args: >> + params (dict): contains key from user input >> + >> + Returns: >> + dict: result with format >> + { >> + 'code': <return code>, >> + 'output': <new value of the key> >> + } >> + * code = 0: success >> + * code = non-zero: failure >> + """ >> + ret = {'code': 0, 'output': 'unknown'} >> + key = params['key'] >> + >> + if key != self.keyname and key != self.takeover_request: >> + # no other key is supported >> + ret['code'] = 2 >> + return ret >> + >> + last_arb_timestamp = 0 >> + while True: >> + if key == self.takeover_request: >> + if self.is_active() is False: >> + # maybe a controller swap occurred >> + break >> + while True: >> + try: >> + time_at_arb = >> self.proxy.heartbeat(self.hostname) >> + if last_arb_timestamp == 0: >> + last_arb_timestamp = time_at_arb >> + break >> + elif (time_at_arb - last_arb_timestamp) > >> self.timeout: >> + # VM was frozen? >> + syslog.syslog('VM was frozen!') >> + ret['code'] = 126 >> + return ret >> + else: >> + last_arb_timestamp = time_at_arb >> + break >> + except socket.error: >> + # can't heartbeat, need to self-fence (if >> peer down) >> + syslog.syslog('cannot heartbeat, inform >> rded') >> + ret['output'] = self.hostname + \ >> + ' SC-0 10000000 UNDEFINED' >> + return ret >> + elif key == self.keyname: >> + try: >> + current_owner = self.proxy.get(self.keyname) >> + if not current_owner: >> + if self.is_active() is False: >> + # a switchover occurred >> + break >> + # maybe the arbitrator restarted >> + self.proxy.create(self.keyname, >> self.hostname) >> + elif current_owner != self.hostname: >> + ret['output'] = current_owner >> + break >> + except socket.error: >> + pass >> + time.sleep(self.heartbeat_interval) >> + return ret >> + >> + >> +if __name__ == '__main__': >> + ArbitratorPlugin().run() >> diff --git a/src/osaf/consensus/plugins/tcp/tcp_server.py >> b/src/osaf/consensus/plugins/tcp/tcp_server.py >> new file mode 100755 >> index 0000000..a7f22f2 >> --- /dev/null >> +++ b/src/osaf/consensus/plugins/tcp/tcp_server.py >> @@ -0,0 +1,157 @@ >> +#!/usr/bin/env python3 >> +""" >> + -*- OpenSAF -*- >> + >> +(C) Copyright 2019 Ericsson AB 2019 - All Rights Reserved. >> + >> +This program is distributed in the hope that it will be useful, but >> +WITHOUT ANY WARRANTY; without even the implied warranty of >> MERCHANTABILITY >> +or FITNESS FOR A PARTICULAR PURPOSE. This file and program are >> licensed >> +under the GNU Lesser General Public License Version 2.1, February >> 1999. >> +The complete license can be accessed from the following location: >> + >> https://protect2.fireeye.com/url?k=3aa9040c-6620de4b-3aa94497-0cc47ad93e96-88a20cd595459a91&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php >> +See the Copying file included with the OpenSAF distribution for full >> +licensing terms. >> + >> +Simple TCP arbitrator >> +""" >> + >> +import os >> +import socket >> +from base64 import b64decode >> +from socketserver import ThreadingMixIn >> +from xmlrpc.server import SimpleXMLRPCServer, >> SimpleXMLRPCRequestHandler >> +import ssl >> +import sys >> +import threading >> +import time >> + >> +# Warning: example only, replace with signed certificate >> +# Assumes pem files are located in the same directory as this file >> +DIRPATH = os.path.dirname(os.path.realpath(__file__)) >> +CERTFILE = DIRPATH + '/certificate.pem' >> +KEYFILE = DIRPATH + '/key.pem' >> +USERNAME = 'test' >> +PASSWORD = 'test' >> + >> + >> +class RequestHandler(SimpleXMLRPCRequestHandler): >> + """ Allow keep alives """ >> + protocol_version = "HTTP/1.1" >> + >> + def parse_request(self): >> + if SimpleXMLRPCRequestHandler.parse_request(self): >> + if self.auth(self.headers): >> + return True >> + else: >> + self.send_error(401, 'Unauthorized') >> + return False >> + >> + def auth(self, headers): >> + (basic, _, encoded) = >> headers.get('Authorization').partition(' ') >> + assert basic == 'Basic', 'Only basic authentication >> supported' >> + decoded_bytes = b64decode(encoded) >> + decoded_string = decoded_bytes.decode() >> + (username, _, password) = decoded_string.partition(':') >> + if username == USERNAME and password == PASSWORD: >> + return True >> + return False >> + >> + >> +class ThreadedRPCServer(ThreadingMixIn, >> + SimpleXMLRPCServer): >> + """ Add thread support """ >> + def __init__(self, bind_address, bind_port, >> + requestHandler): >> + # IPv6 is supported >> + self.address_family = socket.getaddrinfo(bind_address, >> bind_port)[0][0] >> + SimpleXMLRPCServer.__init__(self, (bind_address, bind_port), >> + requestHandler, >> logRequests=False) >> + self.socket = ssl.wrap_socket( >> + socket.socket(self.address_family, self.socket_type), >> + server_side=True, >> + certfile=CERTFILE, >> + keyfile=KEYFILE, >> + cert_reqs=ssl.CERT_NONE, >> + ssl_version=ssl.PROTOCOL_TLSv1_2) >> + self.server_bind() >> + self.server_activate() >> + >> + >> +class Arbitrator(object): >> + """ Implementation of a simple arbitrator """ >> + >> + def __init__(self): >> + self.port = 6666 >> + self.kv_pairs = {} >> + self.mutex = threading.Lock() >> + >> + def heartbeat(self, key): >> + timestamp = int(time.time()) >> + self.mutex.acquire() >> + self.kv_pairs[key] = timestamp >> + self.mutex.release() >> + return timestamp >> + >> + def set(self, key, value): >> + self.mutex.acquire() >> + self.kv_pairs[key] = value >> + self.mutex.release() >> + return True >> + >> + def get(self, key): >> + value = "" >> + self.mutex.acquire() >> + if key in self.kv_pairs: >> + value = self.kv_pairs[key] >> + self.mutex.release() >> + return value >> + >> + def set_if_prev(self, key, prev_value, new_value): >> + result = False >> + self.mutex.acquire() >> + if key in self.kv_pairs: >> + value = self.kv_pairs[key] >> + if value == prev_value: >> + self.kv_pairs[key] = new_value >> + result = True >> + self.mutex.release() >> + return result >> + >> + def create(self, key, value): >> + result = False >> + self.mutex.acquire() >> + if key not in self.kv_pairs: >> + self.kv_pairs[key] = value >> + result = True >> + self.mutex.release() >> + return result >> + >> + def delete(self, key): >> + result = False >> + self.mutex.acquire() >> + if key in self.kv_pairs: >> + del self.kv_pairs[key] >> + result = True >> + self.mutex.release() >> + return result >> + >> + def run(self): >> + hostname = "::0.0.0.0" >> + server = ThreadedRPCServer(hostname, self.port, >> + requestHandler=RequestHandler) >> + print("Listening on port %d" % self.port) >> + server.register_multicall_functions() >> + server.register_function(self.heartbeat, 'heartbeat') >> + server.register_function(self.set, 'set') >> + server.register_function(self.get, 'get') >> + server.register_function(self.set_if_prev, 'set_if_prev') >> + server.register_function(self.create, 'create') >> + server.register_function(self.delete, 'delete') >> + server.serve_forever() >> + >> + sys.exit(0) >> + >> + >> +if __name__ == '__main__': >> + Arbitrator().run() _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel