Hi Gary, ack, review only. One comment/suggestion can we provide a small script that generates the x509 certificate (use e.g. openssl X509 ... ) instead of including a self signed cert? /BR Hans On Tue, 2019-10-01 at 12:53 +1000, Gary Lee wrote: > --- > src/osaf/consensus/plugins/tcp/README | 41 ++ > src/osaf/consensus/plugins/tcp/certificate.pem | 20 + > src/osaf/consensus/plugins/tcp/key.pem | 28 ++ > src/osaf/consensus/plugins/tcp/tcp.plugin | 520 > +++++++++++++++++++++++++ > src/osaf/consensus/plugins/tcp/tcp_server.py | 157 ++++++++ > 5 files changed, 766 insertions(+) > create mode 100644 src/osaf/consensus/plugins/tcp/README > create mode 100644 src/osaf/consensus/plugins/tcp/certificate.pem > create mode 100644 src/osaf/consensus/plugins/tcp/key.pem > create mode 100755 src/osaf/consensus/plugins/tcp/tcp.plugin > create mode 100755 src/osaf/consensus/plugins/tcp/tcp_server.py > > diff --git a/src/osaf/consensus/plugins/tcp/README > b/src/osaf/consensus/plugins/tcp/README > new file mode 100644 > index 0000000..6f739e8 > --- /dev/null > +++ b/src/osaf/consensus/plugins/tcp/README > @@ -0,0 +1,41 @@ > +TCP arbitrator > + > +The TCP arbitrator may be useful for deployments where deploying > etcd is not > +feasible. An example arbitrator is provided to help prevent split > brain in > +clusters that contain up to 2 system controllers. > + > +The example arbitrator is a simple python based program that can be > deployed on > +a single payload or a node external to the cluster. > + > +Two main pieces of information are stored on the arbitrator: the > hostname of the > +current active controller and a heartbeat timestamp. > + > +An active controller sends a heartbeat to the controller every 100ms > using TLs > +over a persistent TCP connection. It should self-fence if it is > unable to > +heartbeat, as it is likely to be separated from the arbitrator. > + > +A candidate active controller must check the existing controller is > not > +heartbeating before promoting itself active. On a cluster using > TIPC, > +the timeout value is the TIPC link tolerance timeout. On a TCP based > cluster, > +the timeout is calculated from FMS_TAKEOVER_REQUEST_VALID_TIME. > + > +Suggested fmd.conf configuration: > + > +export FMS_SPLIT_BRAIN_PREVENTION=1 > +export FMS_KEYVALUE_STORE_PLUGIN_CMD=/full/path/to/tcp.plugin > +export FMS_TAKEOVER_PRIORITISE_PARTITION_SIZE=0 (any other setting > is ignored) > +export FMS_RELAXED_NODE_PROMOTION=1 > + > +The above settings will allow a controller to be elected active > during > +cluster startup, even if the arbitrator is not yet running. > +If the arbitrator becomes temporarily unavailable, the controllers > will > +remain running if they can see each other. If an active controller > becomes > +isolated from the standby *and* the arbitrator, it will self-fence > and the > +standby will become active (if located in the same network partition > as > +the arbitrator). > + > +The provided self-signed certificate is an example only, and was > generated using: > + > +openssl req -newkey rsa:2048 -nodes -keyout key.pem -x509 -days > 100000 -out certificate.pem > + > +It must be replaced in an actual deployment!! > diff --git a/src/osaf/consensus/plugins/tcp/certificate.pem > b/src/osaf/consensus/plugins/tcp/certificate.pem > new file mode 100644 > index 0000000..e0b4993 > --- /dev/null > +++ b/src/osaf/consensus/plugins/tcp/certificate.pem > @@ -0,0 +1,20 @@ > +-----BEGIN CERTIFICATE----- > +MIIDUTCCAjmgAwIBAgIJANrPYThNMllvMA0GCSqGSIb3DQEBCwUAMD4xCzAJBgNV > +BAYTAkFVMQ4wDAYDVQQIDAVTdGF0ZTENMAsGA1UEBwwEQ2l0eTEQMA4GA1UECgwH > +T3BlblNBRjAgFw0xOTA5MzAwMDMxNTRaGA8yMjkzMDcxNTAwMzE1NFowPjELMAkG > +A1UEBhMCQVUxDjAMBgNVBAgMBVN0YXRlMQ0wCwYDVQQHDARDaXR5MRAwDgYDVQQK > +DAdPcGVuU0FGMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA5pCFKYnS > ++pi0gzrRWPRYg1sak9VpNK+MkKbj+m0bptRt/8JvosV62js4q5Da3ldq2AAcEJyf > +gd02YZ4HUDdCMgMtlWT1CAx89rNpozRwyj5g+4cfmOqiz7ApeZ9yqltInjG720DT > +lam2/R4/00zmFGAqD2ZGPiOY93bjYx+GhtiHcDvpJuZS2Z2vQ/Dd09v6Omhus0rZ > +WMrENyfavc7HwFv2z/qi4Hsb/Aa9ZuAXUKp1Q2cvC0XWdRJMdZaZfGUlTfY6X8ar > +hSnswHJJKIjBq/0jYpztntOubceOuGVyezxPVXPw5qiBLO7ZyYNgN9IMoF6Rbu9y > +K1O1MvPw3ShlDQIDAQABo1AwTjAdBgNVHQ4EFgQU7UCcR6MgV5c5JXjCHpwcUC+9 > +HIAwHwYDVR0jBBgwFoAU7UCcR6MgV5c5JXjCHpwcUC+9HIAwDAYDVR0TBAUwAwEB > +/zANBgkqhkiG9w0BAQsFAAOCAQEAAOP3iMgjCx8JNKevOSq24mGcWAqlX0iHP0/1 > +hl7Dd/xRQywM90NfrMmiNTgO9Yyw1rOEKoeM4BFM/qs854iEHpAa7vlcW1ZidvHz > +eMQZA2Y6+AZ9zyt41bRJGqkqW7YdKVl9yuqWHcFBqBKf1pUsvt0bkab5EZFOBPuB > +tmKsODrU7cN1qeA1wjINZiOa88Kkh2YxkRoi7tL8NIMp2E40NLS3M5+xLEE8LKTH > +ouhReM4eEfGfzE171NPe/kzRRp+ujNZwmyQ8xmWp6jPjfD7Mfqdf1WYjspiGzziQ > +R/cdEHHAWq+wZrfG1aB5/yU4iA0h8xR8PNfVHjAjuUn4N6tSFg== > +-----END CERTIFICATE----- > diff --git a/src/osaf/consensus/plugins/tcp/key.pem > b/src/osaf/consensus/plugins/tcp/key.pem > new file mode 100644 > index 0000000..66b8bcb > --- /dev/null > +++ b/src/osaf/consensus/plugins/tcp/key.pem > @@ -0,0 +1,28 @@ > +-----BEGIN PRIVATE KEY----- > +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDmkIUpidL6mLSD > +OtFY9FiDWxqT1Wk0r4yQpuP6bRum1G3/wm+ixXraOzirkNreV2rYABwQnJ+B3TZh > +ngdQN0IyAy2VZPUIDHz2s2mjNHDKPmD7hx+Y6qLPsCl5n3KqW0ieMbvbQNOVqbb9 > +Hj/TTOYUYCoPZkY+I5j3duNjH4aG2IdwO+km5lLZna9D8N3T2/o6aG6zStlYysQ3 > +J9q9zsfAW/bP+qLgexv8Br1m4BdQqnVDZy8LRdZ1Ekx1lpl8ZSVN9jpfxquFKezA > +ckkoiMGr/SNinO2e065tx464ZXJ7PE9Vc/DmqIEs7tnJg2A30gygXpFu73IrU7Uy > +8/DdKGUNAgMBAAECggEBAIPsr3T8Go8u7yKjdgPbFAZdC5EJLIBr7hcalxnEcmz7 > +4dDU9UGCk2/pMNziLonIJSdgsK5En/QTmjkyzeZ1J9gr/1obASVQ1/Pk5o3uxJbE > +KIPjZg3+O5hwqNqhhF+7iYqS1HV421goWr7sHwMNXhzMhWR4VbrabX0zNpqSAEyI > +5LKVbekGbmHT8zPePH8a4wUAoSi0ExA1A+q58pljlE2TQlz3PDInO3UtPNGHaFYR > +oJD05bAGzXdnEmezBauKZN6v+sVzehOK2f5/HEA4ajTnDoOlOe1CjkOjhvsYKxxz > +/tauXrfUscftn0yX4bUxrT+z5b6ObTavaGoh/f5KjGECgYEA9c3XlOeoxT6GhWxX > ++PyAwsWu29aBZKNcUTgWvIULr9hZkhWSRocWJZiruUxzw0qlsq16j88Z1m0Tsiin > +3hMFuQNSeGZDbTpJHBctAwykAkp5vQthHkA6umkjcptgsvSHvROWMBM1v0WTpmc4 > +5y2I9/91Sp1XEpqIoUo9K1tnAgkCgYEA8CDaAcaNlp2Q/QSxQaZgvRseOkZljapu > +z2gGFWms3EXQMHJVIN78brHjCMD/KTdGSWQ1QVpNX4tVJeQB/rM0Zby7IvQ03hTi > +l93Est8/AJ940ndnTMu/qGjR1PIvldEVSQjjTrv7SI1gKYuEWwkhDT8DYuM8D+39 > +OiR8sBUPu+UCgYEA8FQDHT7nP3sjsZ494V6YUldP4Pe46Xnj0DFj4Yy/4X1KDk/z > +BewcVkQQWosKgH4ixjFhrOvTmlhpsn6XqsS0irFZ2Ag8krYzNzjdtKaOUQMaRiCz > +Iw2vngUgOHT8tdvqP47AAF835LyVYVR5SWa3DasCtiJiOPlI71ITvqmObGkCgYAF > +hYvdzJYG55qk5s24p73Du3LnfiTprAieRlMVfPeXnRdbddWVSutdFEQXOHDlnrrE > +B0TeOthaoRaVJ3gQRkinKj7XX+wzSyGmwle6kT2eowwhMtOyRWEj2z0v+12ywTsP > +EeBAODxf/7g9XFLv0Pbsmg1W4cjIyP0wsBQZ7HIPLQKBgQDP35F9aSHz53D2+/fD > +WoTdLfkH7rIAKUtlffp8fQzbm9onRx7LC/iJ1GjQQyKsZaJSX6+C4kNlWUgxQHo0 > +6uGPtbS6kKUOzYOAMKO3GPeqskwwLS4ocZeq4BnsrwOsXiDAkACcEZS6QZI6tgsv > +0ZCcum4WK037YSkrI1IFBUlKXA== > +-----END PRIVATE KEY----- > diff --git a/src/osaf/consensus/plugins/tcp/tcp.plugin > b/src/osaf/consensus/plugins/tcp/tcp.plugin > new file mode 100755 > index 0000000..1b5ddf5 > --- /dev/null > +++ b/src/osaf/consensus/plugins/tcp/tcp.plugin > @@ -0,0 +1,520 @@ > +#!/usr/bin/env python3 > +""" > + -*- OpenSAF -*- > + > +(C) Copyright 2019 Ericsson AB 2019 - All Rights Reserved. > + > +This program is distributed in the hope that it will be useful, but > +WITHOUT ANY WARRANTY; without even the implied warranty of > MERCHANTABILITY > +or FITNESS FOR A PARTICULAR PURPOSE. This file and program are > licensed > +under the GNU Lesser General Public License Version 2.1, February > 1999. > +The complete license can be accessed from the following location: > + > https://protect2.fireeye.com/url?k=e5558a57-b9dc5010-e555cacc-0cc47ad93e96-95f7c31077941b95&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php > +See the Copying file included with the OpenSAF distribution for full > +licensing terms. > + > +""" > + > +import argparse > +import os > +import socket > +import ssl > +import sys > +import time > +import xmlrpc.client > +import syslog > + > + > +class ArbitratorPlugin(object): > + """ This class represents a TCP Plugin """ > + > + def __init__(self): > + # Replace with actual URL > + self.uri = "https://test:test@arbitrator:6666/" > + # Warning: certificate verification is disabled > + self.proxy = xmlrpc.client.ServerProxy( > + self.uri, context=ssl._create_unverified_context()) > + > + # Get tipc link tolerance timeout if OpenSAF in used with > TIPC > + link_tolerance = self.get_tipc_tolerance() > + self.timeout = link_tolerance > + > + # TODO make configurable? default to 1s > + self.socket_timeout = 1 > + socket.setdefaulttimeout(self.socket_timeout) > + > + # TODO make configurable? default to 100ms > + self.heartbeat_interval = 0.1 > + > + # constants > + self.keyname = 'opensaf_consensus_lock' > + self.takeover_request = 'takeover_request' > + self.testkey = "opensaf_write_test" > + > + # location where opensaf node_name is defined > + self.node_name_file = '/etc/opensaf/node_name' > + self.hostname = self.get_node_name() > + > + def run(self): > + """ Main method to handle input and output """ > + result = self.parse_cmd() > + if 'output' in result: > + print(result['output']) > + sys.exit(result['code']) > + > + def parse_cmd(self): > + """ Handle input parameters from user """ > + parser = argparse.ArgumentParser(description="Arbitration > Plugin") > + sub_parser = parser.add_subparsers( > + dest="command", > + help="Execute script with this command" > + ) > + > + parser_get = sub_parser.add_parser( > + "get", > + help="Retrieve <value> of <key> in tcp database" > + ) > + parser_get.add_argument("key", metavar="<key>") > + > + parser_set = sub_parser.add_parser( > + "set", > + help="Set <key> to <value> in tcp database" > + ) > + parser_set.add_argument("key", metavar="<key>") > + parser_set.add_argument("value", metavar="<value>") > + parser_set.add_argument("timeout", metavar="<timeout>", > type=int) > + > + parser_create = sub_parser.add_parser( > + "create", > + help="Create <key> and set to <value>" > + " for <timeout> second in tcp database" > + ) > + parser_create.add_argument("key", metavar="<key>") > + parser_create.add_argument("value", metavar="<value>") > + parser_create.add_argument("timeout", metavar="<timeout>", > type=int) > + > + parser_set_if_prev = sub_parser.add_parser( > + "set_if_prev", > + help="Set <key> to <value> if the existing value matches > <prev>" > + ) > + parser_set_if_prev.add_argument("key", metavar="<key>") > + parser_set_if_prev.add_argument("value", metavar="<value>") > + parser_set_if_prev.add_argument("prev", metavar="<prev>") > + parser_set_if_prev.add_argument( > + "timeout", metavar="<timeout>", type=int) > + > + parser_lock = sub_parser.add_parser( > + "lock", > + help="Set <owner> of the lock," > + " automatically unlock after <timeout>" > + ) > + parser_lock.add_argument("owner", metavar="<owner>") > + parser_lock.add_argument("timeout", metavar="<timeout>", > type=int) > + > + parser_unlock = sub_parser.add_parser( > + "unlock", > + help="Unlock the lock from <owner>" > + ) > + parser_unlock.add_argument("owner", metavar="<owner>") > + parser_unlock.add_argument( > + "--force", > + help="force to unlock the lock even if lock isn't held > by <owner>", > + action='store_true' > + ) > + > + sub_parser.add_parser( > + "lock_owner", > + help="Retrieve owner of lock" > + ) > + > + parser_watch = sub_parser.add_parser( > + "watch", > + help="Watch events stream on <key>" > + ) > + parser_watch.add_argument("key", metavar="<key>") > + > + sub_parser.add_parser( > + "watch_lock", > + help="Watch events stream on lock" > + ) > + > + parser_erase = sub_parser.add_parser( > + "erase", > + help="Remove the <key> from tcp database" > + ) > + parser_erase.add_argument("key", metavar="<key>") > + > + args = vars(parser.parse_args()) > + command = args.pop('command') > + params = [] > + if args: > + params.append(args) > + return getattr(self, command)(*params) > + > + def get_node_name(self): > + node_file = open(self.node_name_file) > + node_name = node_file.readline().strip() > + node_file.close() > + return node_name > + > + def get_tipc_tolerance(self): > + """ Retrieve tipc link tolerance timeout """ > + # Get bearer > + bearer = os.popen('tipc bearer list').read().strip() > + if bearer is not None and len(bearer) > 0: > + tokens = bearer.split(":") > + tol_to = os.popen('tipc bearer get tolerance media ' > + '%s device %s' % > + (tokens[0], tokens[1])).read() > + return float(tol_to) / 1000 > + > + takeover_timeout = os.environ.get( > + 'FMS_TAKEOVER_REQUEST_VALID_TIME') > + if takeover_timeout is not None and int(takeover_timeout) > > 0: > + return int(takeover_timeout) / 2 > + > + # default to 5 seconds > + return 5 > + > + def is_active(self): > + """ Should this node be active according to RDE """ > + state = os.popen('rdegetrole').read().strip() > + return bool(state == 'ACTIVE') > + > + def get(self, params): > + """ Retrieve value of key in tcp database > + > + Args: > + params (dict): contains key from user input > + > + Returns: > + dict: result with format > + { > + 'code': <return code>, > + 'output': <value of the key> > + } > + * code = 0: success > + * code = 1: invalid param > + * code = 2: other failure > + """ > + ret = {'code': 1, 'output': ''} > + > + # takeover_request will not be used > + return ret > + > + def set(self, params): > + """ Set key to value in tcp database > + > + Args: > + params (dict): contains key, value from user input > + > + Returns: > + dict: result with format > + { > + 'code': <return code> > + } > + * code = 0: success > + * code = non-zero: failure > + """ > + > + ret = {'code': 1} > + key = params['key'] > + value = params['value'] > + # params['timeout'] > + > + if key == self.testkey: > + try: > + result = self.proxy.set(key, value) > + except socket.error: > + result = False > + if result: > + ret['code'] = 0 > + > + return ret > + > + def create(self, params): > + """ Create key and set to value for timeout second in tcp > database > + > + Args: > + params (dict): contains key, value, timeout > + from user input > + > + Returns: > + dict: result with format > + { > + 'code': <return code> > + } > + * code = 0: success > + * code = 1: already exist > + * code = 2: invalid param > + * code = 3: other failure > + """ > + ret = {'code': 2} > + key = params['key'] > + > + if key == self.takeover_request: > + # takeover_request will not be used > + ret['code'] = 2 > + return ret > + > + def set_if_prev(self, params): > + """ Set key to value if the existing value matches previous > value > + > + Args: > + params (dict): contains key (str), > + value (str), prev (str) from user input > + > + Returns: > + dict: result with format > + { > + 'code': <return code> > + } > + * code = 0: success > + * code = non-zero: failure > + """ > + ret = {'code': 1} > + key = params['key'] > + # params['value'] > + # params['timeout'] > + > + if key == self.takeover_request: > + # takeover_request will not be used > + # pretend it worked > + ret['code'] = 0 > + > + return ret > + > + def lock(self, params): > + """ Set owner of the lock, automatically unlock after > timeout > + > + Args: > + params (dict): contains owner (str), > + timeout (integer) from user input > + > + Returns: > + dict: result with format > + { > + 'code': <return code>, > + 'output': <current owner of the lock> > + } > + * code = 0: success > + * code = 1: lock is owned by someone else > + * code = 2 or above: other failure > + > + NOTE: if lock is already acquired by owner, then timeout is > extended > + TODO: timeout not yet implemented > + """ > + ret = {'code': 0} > + owner = params['owner'] > + owner_up = False > + > + try: > + result = self.proxy.create(self.keyname, owner) > + if result is False: > + current_owner = self.proxy.get(self.keyname) > + if current_owner == owner: > + syslog.syslog("lock is already held by this > node") > + return ret > + original_timestamp = self.proxy.get(current_owner) > + if original_timestamp: > + original_timestamp = int(original_timestamp) > + else: > + original_timestamp = 0 > + # check if the owner is updating the arbitrator > + while True: > + # get current time from arbitrator > + time_at_arb = > self.proxy.heartbeat(self.hostname) > + # last time the current owner heartbeated > + last_timestamp = self.proxy.get(current_owner) > + if last_timestamp > original_timestamp: > + # owner is updating arbitrator OK > + syslog.syslog("current active is > heartbeating") > + owner_up = True > + break > + elif time_at_arb > (original_timestamp + > self.timeout): > + syslog.syslog("current active is not > heartbeating") > + # more than 'timeout' seconds have elapsed > since > + # the last update, we can assume the old > + # active is not heartbeating > + owner_up = False > + break > + time.sleep(self.heartbeat_interval) > + if owner_up: > + ret['code'] = 1 > + else: > + # owner is not updating arbitrator > + self.proxy.delete(self.keyname) > + return self.lock(params) > + else: > + syslog.syslog("obtained lock at arbitrator") > + except socket.error: > + syslog.syslog("socket error") > + ret['code'] = 2 > + > + return ret > + > + def unlock(self, params): > + """ Unlock the lock from owner > + > + Args: > + params (dict): contains owner (str), > + force (boolean) from user input > + * force = True: force to unlock the lock even if > + owner don't match with current owner > + > + Returns: > + dict: result with format > + { > + 'code': <return code>, > + 'output': <current owner of the lock> > + } > + * code = 0: success > + * code = 1: lock is owned by someone else > + * code = 2 or above: other failure > + """ > + ret = {'code': 0} > + owner = params['owner'] > + > + # remove 'owner.lock' > + try: > + current_owner = self.proxy.get(self.keyname) > + if current_owner == owner: > + result = self.proxy.delete(self.keyname) > + if result is False: > + ret['code'] = 2 > + else: > + ret['code'] = 1 > + except socket.error: > + ret['code'] = 2 > + > + return ret > + > + def lock_owner(self): > + """ Retrieve owner of lock > + > + Returns: > + dict: result with format > + { > + 'code': <return code>, > + 'output': <current owner of the lock> > + } > + * code = 0: success > + * code = non-zero: failure > + """ > + ret = {'code': 0, 'output': 'unknown'} > + > + try: > + current_owner = self.proxy.get(self.keyname) > + ret['output'] = current_owner > + except socket.error: > + ret['code'] = 1 > + > + return ret > + > + def watch_lock(self): > + """ Watch events stream on lock > + > + Returns: > + dict: result with format > + { > + 'code': <return code>, > + 'output': <new value of the lock> > + } > + * code = 0: success > + * code = non-zero: failure > + """ > + return self.watch({'key': self.keyname}) > + > + def erase(self, params): > + """ Remove the key from tcp database > + > + Args: > + params (dict): contains key from user input > + > + Returns: > + dict: result with format > + { > + 'code': <return code> > + } > + * code = 0: success > + * code = non-zero: failure > + """ > + ret = {'code': 1} > + key = params['key'] > + > + if key == self.takeover_request: > + # takeover_request will not be used > + # pretend it worked > + ret['code'] = 0 > + > + return ret > + > + def watch(self, params): > + """ Watch events stream on key > + > + Args: > + params (dict): contains key from user input > + > + Returns: > + dict: result with format > + { > + 'code': <return code>, > + 'output': <new value of the key> > + } > + * code = 0: success > + * code = non-zero: failure > + """ > + ret = {'code': 0, 'output': 'unknown'} > + key = params['key'] > + > + if key != self.keyname and key != self.takeover_request: > + # no other key is supported > + ret['code'] = 2 > + return ret > + > + last_arb_timestamp = 0 > + while True: > + if key == self.takeover_request: > + if self.is_active() is False: > + # maybe a controller swap occurred > + break > + while True: > + try: > + time_at_arb = > self.proxy.heartbeat(self.hostname) > + if last_arb_timestamp == 0: > + last_arb_timestamp = time_at_arb > + break > + elif (time_at_arb - last_arb_timestamp) > > self.timeout: > + # VM was frozen? > + syslog.syslog('VM was frozen!') > + ret['code'] = 126 > + return ret > + else: > + last_arb_timestamp = time_at_arb > + break > + except socket.error: > + # can't heartbeat, need to self-fence (if > peer down) > + syslog.syslog('cannot heartbeat, inform > rded') > + ret['output'] = self.hostname + \ > + ' SC-0 10000000 UNDEFINED' > + return ret > + elif key == self.keyname: > + try: > + current_owner = self.proxy.get(self.keyname) > + if not current_owner: > + if self.is_active() is False: > + # a switchover occurred > + break > + # maybe the arbitrator restarted > + self.proxy.create(self.keyname, > self.hostname) > + elif current_owner != self.hostname: > + ret['output'] = current_owner > + break > + except socket.error: > + pass > + time.sleep(self.heartbeat_interval) > + return ret > + > + > +if __name__ == '__main__': > + ArbitratorPlugin().run() > diff --git a/src/osaf/consensus/plugins/tcp/tcp_server.py > b/src/osaf/consensus/plugins/tcp/tcp_server.py > new file mode 100755 > index 0000000..a7f22f2 > --- /dev/null > +++ b/src/osaf/consensus/plugins/tcp/tcp_server.py > @@ -0,0 +1,157 @@ > +#!/usr/bin/env python3 > +""" > + -*- OpenSAF -*- > + > +(C) Copyright 2019 Ericsson AB 2019 - All Rights Reserved. > + > +This program is distributed in the hope that it will be useful, but > +WITHOUT ANY WARRANTY; without even the implied warranty of > MERCHANTABILITY > +or FITNESS FOR A PARTICULAR PURPOSE. This file and program are > licensed > +under the GNU Lesser General Public License Version 2.1, February > 1999. > +The complete license can be accessed from the following location: > + > https://protect2.fireeye.com/url?k=3aa9040c-6620de4b-3aa94497-0cc47ad93e96-88a20cd595459a91&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php > +See the Copying file included with the OpenSAF distribution for full > +licensing terms. > + > +Simple TCP arbitrator > +""" > + > +import os > +import socket > +from base64 import b64decode > +from socketserver import ThreadingMixIn > +from xmlrpc.server import SimpleXMLRPCServer, > SimpleXMLRPCRequestHandler > +import ssl > +import sys > +import threading > +import time > + > +# Warning: example only, replace with signed certificate > +# Assumes pem files are located in the same directory as this file > +DIRPATH = os.path.dirname(os.path.realpath(__file__)) > +CERTFILE = DIRPATH + '/certificate.pem' > +KEYFILE = DIRPATH + '/key.pem' > +USERNAME = 'test' > +PASSWORD = 'test' > + > + > +class RequestHandler(SimpleXMLRPCRequestHandler): > + """ Allow keep alives """ > + protocol_version = "HTTP/1.1" > + > + def parse_request(self): > + if SimpleXMLRPCRequestHandler.parse_request(self): > + if self.auth(self.headers): > + return True > + else: > + self.send_error(401, 'Unauthorized') > + return False > + > + def auth(self, headers): > + (basic, _, encoded) = > headers.get('Authorization').partition(' ') > + assert basic == 'Basic', 'Only basic authentication > supported' > + decoded_bytes = b64decode(encoded) > + decoded_string = decoded_bytes.decode() > + (username, _, password) = decoded_string.partition(':') > + if username == USERNAME and password == PASSWORD: > + return True > + return False > + > + > +class ThreadedRPCServer(ThreadingMixIn, > + SimpleXMLRPCServer): > + """ Add thread support """ > + def __init__(self, bind_address, bind_port, > + requestHandler): > + # IPv6 is supported > + self.address_family = socket.getaddrinfo(bind_address, > bind_port)[0][0] > + SimpleXMLRPCServer.__init__(self, (bind_address, bind_port), > + requestHandler, > logRequests=False) > + self.socket = ssl.wrap_socket( > + socket.socket(self.address_family, self.socket_type), > + server_side=True, > + certfile=CERTFILE, > + keyfile=KEYFILE, > + cert_reqs=ssl.CERT_NONE, > + ssl_version=ssl.PROTOCOL_TLSv1_2) > + self.server_bind() > + self.server_activate() > + > + > +class Arbitrator(object): > + """ Implementation of a simple arbitrator """ > + > + def __init__(self): > + self.port = 6666 > + self.kv_pairs = {} > + self.mutex = threading.Lock() > + > + def heartbeat(self, key): > + timestamp = int(time.time()) > + self.mutex.acquire() > + self.kv_pairs[key] = timestamp > + self.mutex.release() > + return timestamp > + > + def set(self, key, value): > + self.mutex.acquire() > + self.kv_pairs[key] = value > + self.mutex.release() > + return True > + > + def get(self, key): > + value = "" > + self.mutex.acquire() > + if key in self.kv_pairs: > + value = self.kv_pairs[key] > + self.mutex.release() > + return value > + > + def set_if_prev(self, key, prev_value, new_value): > + result = False > + self.mutex.acquire() > + if key in self.kv_pairs: > + value = self.kv_pairs[key] > + if value == prev_value: > + self.kv_pairs[key] = new_value > + result = True > + self.mutex.release() > + return result > + > + def create(self, key, value): > + result = False > + self.mutex.acquire() > + if key not in self.kv_pairs: > + self.kv_pairs[key] = value > + result = True > + self.mutex.release() > + return result > + > + def delete(self, key): > + result = False > + self.mutex.acquire() > + if key in self.kv_pairs: > + del self.kv_pairs[key] > + result = True > + self.mutex.release() > + return result > + > + def run(self): > + hostname = "::0.0.0.0" > + server = ThreadedRPCServer(hostname, self.port, > + requestHandler=RequestHandler) > + print("Listening on port %d" % self.port) > + server.register_multicall_functions() > + server.register_function(self.heartbeat, 'heartbeat') > + server.register_function(self.set, 'set') > + server.register_function(self.get, 'get') > + server.register_function(self.set_if_prev, 'set_if_prev') > + server.register_function(self.create, 'create') > + server.register_function(self.delete, 'delete') > + server.serve_forever() > + > + sys.exit(0) > + > + > +if __name__ == '__main__': > + Arbitrator().run()
_______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel