Hi Hans

OK that’s a good idea.

thanks

> On 4 Oct 2019, at 11:06 pm, Hans Nordebäck <hans.nordeb...@ericsson.com> 
> wrote:
> 
> Hi Gary, ack, review only. One comment/suggestion can we provide a
> small script that generates the x509 certificate (use e.g. openssl X509
> ... ) instead of including a self signed cert? /BR Hans
>> On Tue, 2019-10-01 at 12:53 +1000, Gary Lee wrote:
>> ---
>> src/osaf/consensus/plugins/tcp/README          |  41 ++
>> src/osaf/consensus/plugins/tcp/certificate.pem |  20 +
>> src/osaf/consensus/plugins/tcp/key.pem         |  28 ++
>> src/osaf/consensus/plugins/tcp/tcp.plugin      | 520
>> +++++++++++++++++++++++++
>> src/osaf/consensus/plugins/tcp/tcp_server.py   | 157 ++++++++
>> 5 files changed, 766 insertions(+)
>> create mode 100644 src/osaf/consensus/plugins/tcp/README
>> create mode 100644 src/osaf/consensus/plugins/tcp/certificate.pem
>> create mode 100644 src/osaf/consensus/plugins/tcp/key.pem
>> create mode 100755 src/osaf/consensus/plugins/tcp/tcp.plugin
>> create mode 100755 src/osaf/consensus/plugins/tcp/tcp_server.py
>> 
>> diff --git a/src/osaf/consensus/plugins/tcp/README
>> b/src/osaf/consensus/plugins/tcp/README
>> new file mode 100644
>> index 0000000..6f739e8
>> --- /dev/null
>> +++ b/src/osaf/consensus/plugins/tcp/README
>> @@ -0,0 +1,41 @@
>> +TCP arbitrator
>> +
>> +The TCP arbitrator may be useful for deployments where deploying
>> etcd is not
>> +feasible. An example arbitrator is provided to help prevent split
>> brain in
>> +clusters that contain up to 2 system controllers.
>> +
>> +The example arbitrator is a simple python based program that can be
>> deployed on
>> +a single payload or a node external to the cluster.
>> +
>> +Two main pieces of information are stored on the arbitrator: the
>> hostname of the
>> +current active controller and a heartbeat timestamp.
>> +
>> +An active controller sends a heartbeat to the controller every 100ms
>> using TLs
>> +over a persistent TCP connection. It should self-fence if it is
>> unable to
>> +heartbeat, as it is likely to be separated from the arbitrator.
>> +
>> +A candidate active controller must check the existing controller is
>> not
>> +heartbeating before promoting itself active. On a cluster using
>> TIPC,
>> +the timeout value is the TIPC link tolerance timeout. On a TCP based
>> cluster,
>> +the timeout is calculated from FMS_TAKEOVER_REQUEST_VALID_TIME.
>> +
>> +Suggested fmd.conf configuration:
>> +
>> +export FMS_SPLIT_BRAIN_PREVENTION=1
>> +export FMS_KEYVALUE_STORE_PLUGIN_CMD=/full/path/to/tcp.plugin
>> +export FMS_TAKEOVER_PRIORITISE_PARTITION_SIZE=0 (any other setting
>> is ignored)
>> +export FMS_RELAXED_NODE_PROMOTION=1
>> +
>> +The above settings will allow a controller to be elected active
>> during
>> +cluster startup, even if the arbitrator is not yet running.
>> +If the arbitrator becomes temporarily unavailable, the controllers
>> will
>> +remain running if they can see each other. If an active controller
>> becomes
>> +isolated from the standby *and* the arbitrator, it will self-fence
>> and the
>> +standby will become active (if located in the same network partition
>> as
>> +the arbitrator).
>> +
>> +The provided self-signed certificate is an example only, and was
>> generated using:
>> +
>> +openssl req -newkey rsa:2048 -nodes -keyout key.pem -x509 -days
>> 100000 -out certificate.pem
>> +
>> +It must be replaced in an actual deployment!!
>> diff --git a/src/osaf/consensus/plugins/tcp/certificate.pem
>> b/src/osaf/consensus/plugins/tcp/certificate.pem
>> new file mode 100644
>> index 0000000..e0b4993
>> --- /dev/null
>> +++ b/src/osaf/consensus/plugins/tcp/certificate.pem
>> @@ -0,0 +1,20 @@
>> +-----BEGIN CERTIFICATE-----
>> +MIIDUTCCAjmgAwIBAgIJANrPYThNMllvMA0GCSqGSIb3DQEBCwUAMD4xCzAJBgNV
>> +BAYTAkFVMQ4wDAYDVQQIDAVTdGF0ZTENMAsGA1UEBwwEQ2l0eTEQMA4GA1UECgwH
>> +T3BlblNBRjAgFw0xOTA5MzAwMDMxNTRaGA8yMjkzMDcxNTAwMzE1NFowPjELMAkG
>> +A1UEBhMCQVUxDjAMBgNVBAgMBVN0YXRlMQ0wCwYDVQQHDARDaXR5MRAwDgYDVQQK
>> +DAdPcGVuU0FGMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA5pCFKYnS
>> ++pi0gzrRWPRYg1sak9VpNK+MkKbj+m0bptRt/8JvosV62js4q5Da3ldq2AAcEJyf
>> +gd02YZ4HUDdCMgMtlWT1CAx89rNpozRwyj5g+4cfmOqiz7ApeZ9yqltInjG720DT
>> +lam2/R4/00zmFGAqD2ZGPiOY93bjYx+GhtiHcDvpJuZS2Z2vQ/Dd09v6Omhus0rZ
>> +WMrENyfavc7HwFv2z/qi4Hsb/Aa9ZuAXUKp1Q2cvC0XWdRJMdZaZfGUlTfY6X8ar
>> +hSnswHJJKIjBq/0jYpztntOubceOuGVyezxPVXPw5qiBLO7ZyYNgN9IMoF6Rbu9y
>> +K1O1MvPw3ShlDQIDAQABo1AwTjAdBgNVHQ4EFgQU7UCcR6MgV5c5JXjCHpwcUC+9
>> +HIAwHwYDVR0jBBgwFoAU7UCcR6MgV5c5JXjCHpwcUC+9HIAwDAYDVR0TBAUwAwEB
>> +/zANBgkqhkiG9w0BAQsFAAOCAQEAAOP3iMgjCx8JNKevOSq24mGcWAqlX0iHP0/1
>> +hl7Dd/xRQywM90NfrMmiNTgO9Yyw1rOEKoeM4BFM/qs854iEHpAa7vlcW1ZidvHz
>> +eMQZA2Y6+AZ9zyt41bRJGqkqW7YdKVl9yuqWHcFBqBKf1pUsvt0bkab5EZFOBPuB
>> +tmKsODrU7cN1qeA1wjINZiOa88Kkh2YxkRoi7tL8NIMp2E40NLS3M5+xLEE8LKTH
>> +ouhReM4eEfGfzE171NPe/kzRRp+ujNZwmyQ8xmWp6jPjfD7Mfqdf1WYjspiGzziQ
>> +R/cdEHHAWq+wZrfG1aB5/yU4iA0h8xR8PNfVHjAjuUn4N6tSFg==
>> +-----END CERTIFICATE-----
>> diff --git a/src/osaf/consensus/plugins/tcp/key.pem
>> b/src/osaf/consensus/plugins/tcp/key.pem
>> new file mode 100644
>> index 0000000..66b8bcb
>> --- /dev/null
>> +++ b/src/osaf/consensus/plugins/tcp/key.pem
>> @@ -0,0 +1,28 @@
>> +-----BEGIN PRIVATE KEY-----
>> +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDmkIUpidL6mLSD
>> +OtFY9FiDWxqT1Wk0r4yQpuP6bRum1G3/wm+ixXraOzirkNreV2rYABwQnJ+B3TZh
>> +ngdQN0IyAy2VZPUIDHz2s2mjNHDKPmD7hx+Y6qLPsCl5n3KqW0ieMbvbQNOVqbb9
>> +Hj/TTOYUYCoPZkY+I5j3duNjH4aG2IdwO+km5lLZna9D8N3T2/o6aG6zStlYysQ3
>> +J9q9zsfAW/bP+qLgexv8Br1m4BdQqnVDZy8LRdZ1Ekx1lpl8ZSVN9jpfxquFKezA
>> +ckkoiMGr/SNinO2e065tx464ZXJ7PE9Vc/DmqIEs7tnJg2A30gygXpFu73IrU7Uy
>> +8/DdKGUNAgMBAAECggEBAIPsr3T8Go8u7yKjdgPbFAZdC5EJLIBr7hcalxnEcmz7
>> +4dDU9UGCk2/pMNziLonIJSdgsK5En/QTmjkyzeZ1J9gr/1obASVQ1/Pk5o3uxJbE
>> +KIPjZg3+O5hwqNqhhF+7iYqS1HV421goWr7sHwMNXhzMhWR4VbrabX0zNpqSAEyI
>> +5LKVbekGbmHT8zPePH8a4wUAoSi0ExA1A+q58pljlE2TQlz3PDInO3UtPNGHaFYR
>> +oJD05bAGzXdnEmezBauKZN6v+sVzehOK2f5/HEA4ajTnDoOlOe1CjkOjhvsYKxxz
>> +/tauXrfUscftn0yX4bUxrT+z5b6ObTavaGoh/f5KjGECgYEA9c3XlOeoxT6GhWxX
>> ++PyAwsWu29aBZKNcUTgWvIULr9hZkhWSRocWJZiruUxzw0qlsq16j88Z1m0Tsiin
>> +3hMFuQNSeGZDbTpJHBctAwykAkp5vQthHkA6umkjcptgsvSHvROWMBM1v0WTpmc4
>> +5y2I9/91Sp1XEpqIoUo9K1tnAgkCgYEA8CDaAcaNlp2Q/QSxQaZgvRseOkZljapu
>> +z2gGFWms3EXQMHJVIN78brHjCMD/KTdGSWQ1QVpNX4tVJeQB/rM0Zby7IvQ03hTi
>> +l93Est8/AJ940ndnTMu/qGjR1PIvldEVSQjjTrv7SI1gKYuEWwkhDT8DYuM8D+39
>> +OiR8sBUPu+UCgYEA8FQDHT7nP3sjsZ494V6YUldP4Pe46Xnj0DFj4Yy/4X1KDk/z
>> +BewcVkQQWosKgH4ixjFhrOvTmlhpsn6XqsS0irFZ2Ag8krYzNzjdtKaOUQMaRiCz
>> +Iw2vngUgOHT8tdvqP47AAF835LyVYVR5SWa3DasCtiJiOPlI71ITvqmObGkCgYAF
>> +hYvdzJYG55qk5s24p73Du3LnfiTprAieRlMVfPeXnRdbddWVSutdFEQXOHDlnrrE
>> +B0TeOthaoRaVJ3gQRkinKj7XX+wzSyGmwle6kT2eowwhMtOyRWEj2z0v+12ywTsP
>> +EeBAODxf/7g9XFLv0Pbsmg1W4cjIyP0wsBQZ7HIPLQKBgQDP35F9aSHz53D2+/fD
>> +WoTdLfkH7rIAKUtlffp8fQzbm9onRx7LC/iJ1GjQQyKsZaJSX6+C4kNlWUgxQHo0
>> +6uGPtbS6kKUOzYOAMKO3GPeqskwwLS4ocZeq4BnsrwOsXiDAkACcEZS6QZI6tgsv
>> +0ZCcum4WK037YSkrI1IFBUlKXA==
>> +-----END PRIVATE KEY-----
>> diff --git a/src/osaf/consensus/plugins/tcp/tcp.plugin
>> b/src/osaf/consensus/plugins/tcp/tcp.plugin
>> new file mode 100755
>> index 0000000..1b5ddf5
>> --- /dev/null
>> +++ b/src/osaf/consensus/plugins/tcp/tcp.plugin
>> @@ -0,0 +1,520 @@
>> +#!/usr/bin/env python3
>> +"""
>> +     -*- OpenSAF  -*-
>> +
>> +(C) Copyright 2019 Ericsson AB 2019 - All Rights Reserved.
>> +
>> +This program is distributed in the hope that it will be useful, but
>> +WITHOUT ANY WARRANTY; without even the implied warranty of
>> MERCHANTABILITY
>> +or FITNESS FOR A PARTICULAR PURPOSE. This file and program are
>> licensed
>> +under the GNU Lesser General Public License Version 2.1, February
>> 1999.
>> +The complete license can be accessed from the following location:
>> +
>> https://protect2.fireeye.com/url?k=e5558a57-b9dc5010-e555cacc-0cc47ad93e96-95f7c31077941b95&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php
>> +See the Copying file included with the OpenSAF distribution for full
>> +licensing terms.
>> +
>> +"""
>> +
>> +import argparse
>> +import os
>> +import socket
>> +import ssl
>> +import sys
>> +import time
>> +import xmlrpc.client
>> +import syslog
>> +
>> +
>> +class ArbitratorPlugin(object):
>> +    """ This class represents a TCP Plugin """
>> +
>> +    def __init__(self):
>> +        # Replace with actual URL
>> +        self.uri = "https://test:test@arbitrator:6666/";
>> +        # Warning: certificate verification is disabled
>> +        self.proxy = xmlrpc.client.ServerProxy(
>> +            self.uri, context=ssl._create_unverified_context())
>> +
>> +        # Get tipc link tolerance timeout if OpenSAF in used with
>> TIPC
>> +        link_tolerance = self.get_tipc_tolerance()
>> +        self.timeout = link_tolerance
>> +
>> +        # TODO make configurable? default to 1s
>> +        self.socket_timeout = 1
>> +        socket.setdefaulttimeout(self.socket_timeout)
>> +
>> +        # TODO make configurable? default to 100ms
>> +        self.heartbeat_interval = 0.1
>> +
>> +        # constants
>> +        self.keyname = 'opensaf_consensus_lock'
>> +        self.takeover_request = 'takeover_request'
>> +        self.testkey = "opensaf_write_test"
>> +
>> +        # location where opensaf node_name is defined
>> +        self.node_name_file = '/etc/opensaf/node_name'
>> +        self.hostname = self.get_node_name()
>> +
>> +    def run(self):
>> +        """ Main method to handle input and output """
>> +        result = self.parse_cmd()
>> +        if 'output' in result:
>> +            print(result['output'])
>> +        sys.exit(result['code'])
>> +
>> +    def parse_cmd(self):
>> +        """ Handle input parameters from user """
>> +        parser = argparse.ArgumentParser(description="Arbitration
>> Plugin")
>> +        sub_parser = parser.add_subparsers(
>> +            dest="command",
>> +            help="Execute script with this command"
>> +        )
>> +
>> +        parser_get = sub_parser.add_parser(
>> +            "get",
>> +            help="Retrieve <value> of <key> in tcp database"
>> +        )
>> +        parser_get.add_argument("key", metavar="<key>")
>> +
>> +        parser_set = sub_parser.add_parser(
>> +            "set",
>> +            help="Set <key> to <value> in tcp database"
>> +        )
>> +        parser_set.add_argument("key", metavar="<key>")
>> +        parser_set.add_argument("value", metavar="<value>")
>> +        parser_set.add_argument("timeout", metavar="<timeout>",
>> type=int)
>> +
>> +        parser_create = sub_parser.add_parser(
>> +            "create",
>> +            help="Create <key> and set to <value>"
>> +                 " for <timeout> second in tcp database"
>> +        )
>> +        parser_create.add_argument("key", metavar="<key>")
>> +        parser_create.add_argument("value", metavar="<value>")
>> +        parser_create.add_argument("timeout", metavar="<timeout>",
>> type=int)
>> +
>> +        parser_set_if_prev = sub_parser.add_parser(
>> +            "set_if_prev",
>> +            help="Set <key> to <value> if the existing value matches
>> <prev>"
>> +        )
>> +        parser_set_if_prev.add_argument("key", metavar="<key>")
>> +        parser_set_if_prev.add_argument("value", metavar="<value>")
>> +        parser_set_if_prev.add_argument("prev", metavar="<prev>")
>> +        parser_set_if_prev.add_argument(
>> +            "timeout", metavar="<timeout>", type=int)
>> +
>> +        parser_lock = sub_parser.add_parser(
>> +            "lock",
>> +            help="Set <owner> of the lock,"
>> +                 " automatically unlock after <timeout>"
>> +        )
>> +        parser_lock.add_argument("owner", metavar="<owner>")
>> +        parser_lock.add_argument("timeout", metavar="<timeout>",
>> type=int)
>> +
>> +        parser_unlock = sub_parser.add_parser(
>> +            "unlock",
>> +            help="Unlock the lock from <owner>"
>> +        )
>> +        parser_unlock.add_argument("owner", metavar="<owner>")
>> +        parser_unlock.add_argument(
>> +            "--force",
>> +            help="force to unlock the lock even if lock isn't held
>> by <owner>",
>> +            action='store_true'
>> +        )
>> +
>> +        sub_parser.add_parser(
>> +            "lock_owner",
>> +            help="Retrieve owner of lock"
>> +        )
>> +
>> +        parser_watch = sub_parser.add_parser(
>> +            "watch",
>> +            help="Watch events stream on <key>"
>> +        )
>> +        parser_watch.add_argument("key", metavar="<key>")
>> +
>> +        sub_parser.add_parser(
>> +            "watch_lock",
>> +            help="Watch events stream on lock"
>> +        )
>> +
>> +        parser_erase = sub_parser.add_parser(
>> +            "erase",
>> +            help="Remove the <key> from tcp database"
>> +        )
>> +        parser_erase.add_argument("key", metavar="<key>")
>> +
>> +        args = vars(parser.parse_args())
>> +        command = args.pop('command')
>> +        params = []
>> +        if args:
>> +            params.append(args)
>> +        return getattr(self, command)(*params)
>> +
>> +    def get_node_name(self):
>> +        node_file = open(self.node_name_file)
>> +        node_name = node_file.readline().strip()
>> +        node_file.close()
>> +        return node_name
>> +
>> +    def get_tipc_tolerance(self):
>> +        """ Retrieve tipc link tolerance timeout """
>> +        # Get bearer
>> +        bearer = os.popen('tipc bearer list').read().strip()
>> +        if bearer is not None and len(bearer) > 0:
>> +            tokens = bearer.split(":")
>> +            tol_to = os.popen('tipc bearer get tolerance media '
>> +                              '%s device %s' %
>> +                              (tokens[0], tokens[1])).read()
>> +            return float(tol_to) / 1000
>> +
>> +        takeover_timeout = os.environ.get(
>> +            'FMS_TAKEOVER_REQUEST_VALID_TIME')
>> +        if takeover_timeout is not None and int(takeover_timeout) >
>> 0:
>> +            return int(takeover_timeout) / 2
>> +
>> +        # default to 5 seconds
>> +        return 5
>> +
>> +    def is_active(self):
>> +        """ Should this node be active according to RDE """
>> +        state = os.popen('rdegetrole').read().strip()
>> +        return bool(state == 'ACTIVE')
>> +
>> +    def get(self, params):
>> +        """ Retrieve value of key in tcp database
>> +
>> +        Args:
>> +            params (dict): contains key from user input
>> +
>> +        Returns:
>> +            dict: result with format
>> +            {
>> +                'code': <return code>,
>> +                'output': <value of the key>
>> +            }
>> +                * code = 0: success
>> +                * code = 1: invalid param
>> +                * code = 2: other failure
>> +        """
>> +        ret = {'code': 1, 'output': ''}
>> +
>> +        # takeover_request will not be used
>> +        return ret
>> +
>> +    def set(self, params):
>> +        """ Set key to value in tcp database
>> +
>> +        Args:
>> +            params (dict): contains key, value from user input
>> +
>> +        Returns:
>> +            dict: result with format
>> +            {
>> +                'code': <return code>
>> +            }
>> +                * code = 0: success
>> +                * code = non-zero: failure
>> +        """
>> +
>> +        ret = {'code': 1}
>> +        key = params['key']
>> +        value = params['value']
>> +        # params['timeout']
>> +
>> +        if key == self.testkey:
>> +            try:
>> +                result = self.proxy.set(key, value)
>> +            except socket.error:
>> +                result = False
>> +            if result:
>> +                ret['code'] = 0
>> +
>> +        return ret
>> +
>> +    def create(self, params):
>> +        """ Create key and set to value for timeout second in tcp
>> database
>> +
>> +        Args:
>> +            params (dict): contains key, value, timeout
>> +             from user input
>> +
>> +        Returns:
>> +            dict: result with format
>> +            {
>> +                'code': <return code>
>> +            }
>> +                * code = 0: success
>> +                * code = 1: already exist
>> +                * code = 2: invalid param
>> +                * code = 3: other failure
>> +        """
>> +        ret = {'code': 2}
>> +        key = params['key']
>> +
>> +        if key == self.takeover_request:
>> +            # takeover_request will not be used
>> +            ret['code'] = 2
>> +        return ret
>> +
>> +    def set_if_prev(self, params):
>> +        """ Set key to value if the existing value matches previous
>> value
>> +
>> +        Args:
>> +            params (dict): contains key (str),
>> +             value (str), prev (str) from user input
>> +
>> +        Returns:
>> +            dict: result with format
>> +            {
>> +                'code': <return code>
>> +            }
>> +                * code = 0: success
>> +                * code = non-zero: failure
>> +        """
>> +        ret = {'code': 1}
>> +        key = params['key']
>> +        # params['value']
>> +        # params['timeout']
>> +
>> +        if key == self.takeover_request:
>> +            # takeover_request will not be used
>> +            # pretend it worked
>> +            ret['code'] = 0
>> +
>> +        return ret
>> +
>> +    def lock(self, params):
>> +        """ Set owner of the lock, automatically unlock after
>> timeout
>> +
>> +        Args:
>> +            params (dict): contains owner (str),
>> +             timeout (integer) from user input
>> +
>> +        Returns:
>> +            dict: result with format
>> +            {
>> +                'code': <return code>,
>> +                'output': <current owner of the lock>
>> +            }
>> +                * code = 0: success
>> +                * code = 1: lock is owned by someone else
>> +                * code = 2 or above: other failure
>> +
>> +        NOTE: if lock is already acquired by owner, then timeout is
>> extended
>> +        TODO: timeout not yet implemented
>> +        """
>> +        ret = {'code': 0}
>> +        owner = params['owner']
>> +        owner_up = False
>> +
>> +        try:
>> +            result = self.proxy.create(self.keyname, owner)
>> +            if result is False:
>> +                current_owner = self.proxy.get(self.keyname)
>> +                if current_owner == owner:
>> +                    syslog.syslog("lock is already held by this
>> node")
>> +                    return ret
>> +                original_timestamp = self.proxy.get(current_owner)
>> +                if original_timestamp:
>> +                    original_timestamp = int(original_timestamp)
>> +                else:
>> +                    original_timestamp = 0
>> +                # check if the owner is updating the arbitrator
>> +                while True:
>> +                    # get current time from arbitrator
>> +                    time_at_arb =
>> self.proxy.heartbeat(self.hostname)
>> +                    # last time the current owner heartbeated
>> +                    last_timestamp = self.proxy.get(current_owner)
>> +                    if last_timestamp > original_timestamp:
>> +                        # owner is updating arbitrator OK
>> +                        syslog.syslog("current active is
>> heartbeating")
>> +                        owner_up = True
>> +                        break
>> +                    elif time_at_arb > (original_timestamp +
>> self.timeout):
>> +                        syslog.syslog("current active is not
>> heartbeating")
>> +                        # more than 'timeout' seconds have elapsed
>> since
>> +                        # the last update, we can assume the old
>> +                        # active is not heartbeating
>> +                        owner_up = False
>> +                        break
>> +                    time.sleep(self.heartbeat_interval)
>> +                if owner_up:
>> +                    ret['code'] = 1
>> +                else:
>> +                    # owner is not updating arbitrator
>> +                    self.proxy.delete(self.keyname)
>> +                    return self.lock(params)
>> +            else:
>> +                syslog.syslog("obtained lock at arbitrator")
>> +        except socket.error:
>> +            syslog.syslog("socket error")
>> +            ret['code'] = 2
>> +
>> +        return ret
>> +
>> +    def unlock(self, params):
>> +        """ Unlock the lock from owner
>> +
>> +        Args:
>> +            params (dict): contains owner (str),
>> +             force (boolean) from user input
>> +                * force = True: force to unlock the lock even if
>> +                    owner don't match with current owner
>> +
>> +        Returns:
>> +            dict: result with format
>> +            {
>> +                'code': <return code>,
>> +                'output': <current owner of the lock>
>> +            }
>> +                * code = 0: success
>> +                * code = 1: lock is owned by someone else
>> +                * code = 2 or above: other failure
>> +        """
>> +        ret = {'code': 0}
>> +        owner = params['owner']
>> +
>> +        # remove 'owner.lock'
>> +        try:
>> +            current_owner = self.proxy.get(self.keyname)
>> +            if current_owner == owner:
>> +                result = self.proxy.delete(self.keyname)
>> +                if result is False:
>> +                    ret['code'] = 2
>> +            else:
>> +                ret['code'] = 1
>> +        except socket.error:
>> +            ret['code'] = 2
>> +
>> +        return ret
>> +
>> +    def lock_owner(self):
>> +        """ Retrieve owner of lock
>> +
>> +        Returns:
>> +            dict: result with format
>> +            {
>> +                'code': <return code>,
>> +                'output': <current owner of the lock>
>> +            }
>> +                * code = 0: success
>> +                * code = non-zero: failure
>> +        """
>> +        ret = {'code': 0, 'output': 'unknown'}
>> +
>> +        try:
>> +            current_owner = self.proxy.get(self.keyname)
>> +            ret['output'] = current_owner
>> +        except socket.error:
>> +            ret['code'] = 1
>> +
>> +        return ret
>> +
>> +    def watch_lock(self):
>> +        """ Watch events stream on lock
>> +
>> +        Returns:
>> +            dict: result with format
>> +            {
>> +                'code': <return code>,
>> +                'output': <new value of the lock>
>> +            }
>> +                * code = 0: success
>> +                * code = non-zero: failure
>> +        """
>> +        return self.watch({'key': self.keyname})
>> +
>> +    def erase(self, params):
>> +        """ Remove the key from tcp database
>> +
>> +        Args:
>> +            params (dict): contains key from user input
>> +
>> +        Returns:
>> +            dict: result with format
>> +            {
>> +                'code': <return code>
>> +            }
>> +                * code = 0: success
>> +                * code = non-zero: failure
>> +        """
>> +        ret = {'code': 1}
>> +        key = params['key']
>> +
>> +        if key == self.takeover_request:
>> +            # takeover_request will not be used
>> +            # pretend it worked
>> +            ret['code'] = 0
>> +
>> +        return ret
>> +
>> +    def watch(self, params):
>> +        """ Watch events stream on key
>> +
>> +        Args:
>> +            params (dict): contains key from user input
>> +
>> +        Returns:
>> +            dict: result with format
>> +            {
>> +                'code': <return code>,
>> +                'output': <new value of the key>
>> +            }
>> +                * code = 0: success
>> +                * code = non-zero: failure
>> +        """
>> +        ret = {'code': 0, 'output': 'unknown'}
>> +        key = params['key']
>> +
>> +        if key != self.keyname and key != self.takeover_request:
>> +            # no other key is supported
>> +            ret['code'] = 2
>> +            return ret
>> +
>> +        last_arb_timestamp = 0
>> +        while True:
>> +            if key == self.takeover_request:
>> +                if self.is_active() is False:
>> +                    # maybe a controller swap occurred
>> +                    break
>> +                while True:
>> +                    try:
>> +                        time_at_arb =
>> self.proxy.heartbeat(self.hostname)
>> +                        if last_arb_timestamp == 0:
>> +                            last_arb_timestamp = time_at_arb
>> +                            break
>> +                        elif (time_at_arb - last_arb_timestamp) >
>> self.timeout:
>> +                            # VM was frozen?
>> +                            syslog.syslog('VM was frozen!')
>> +                            ret['code'] = 126
>> +                            return ret
>> +                        else:
>> +                            last_arb_timestamp = time_at_arb
>> +                            break
>> +                    except socket.error:
>> +                        # can't heartbeat, need to self-fence (if
>> peer down)
>> +                        syslog.syslog('cannot heartbeat, inform
>> rded')
>> +                        ret['output'] = self.hostname + \
>> +                            ' SC-0 10000000 UNDEFINED'
>> +                        return ret
>> +            elif key == self.keyname:
>> +                try:
>> +                    current_owner = self.proxy.get(self.keyname)
>> +                    if not current_owner:
>> +                        if self.is_active() is False:
>> +                            # a switchover occurred
>> +                            break
>> +                        # maybe the arbitrator restarted
>> +                        self.proxy.create(self.keyname,
>> self.hostname)
>> +                    elif current_owner != self.hostname:
>> +                        ret['output'] = current_owner
>> +                        break
>> +                except socket.error:
>> +                    pass
>> +            time.sleep(self.heartbeat_interval)
>> +        return ret
>> +
>> +
>> +if __name__ == '__main__':
>> +    ArbitratorPlugin().run()
>> diff --git a/src/osaf/consensus/plugins/tcp/tcp_server.py
>> b/src/osaf/consensus/plugins/tcp/tcp_server.py
>> new file mode 100755
>> index 0000000..a7f22f2
>> --- /dev/null
>> +++ b/src/osaf/consensus/plugins/tcp/tcp_server.py
>> @@ -0,0 +1,157 @@
>> +#!/usr/bin/env python3
>> +"""
>> +     -*- OpenSAF  -*-
>> +
>> +(C) Copyright 2019 Ericsson AB 2019 - All Rights Reserved.
>> +
>> +This program is distributed in the hope that it will be useful, but
>> +WITHOUT ANY WARRANTY; without even the implied warranty of
>> MERCHANTABILITY
>> +or FITNESS FOR A PARTICULAR PURPOSE. This file and program are
>> licensed
>> +under the GNU Lesser General Public License Version 2.1, February
>> 1999.
>> +The complete license can be accessed from the following location:
>> +
>> https://protect2.fireeye.com/url?k=3aa9040c-6620de4b-3aa94497-0cc47ad93e96-88a20cd595459a91&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php
>> +See the Copying file included with the OpenSAF distribution for full
>> +licensing terms.
>> +
>> +Simple TCP arbitrator
>> +"""
>> +
>> +import os
>> +import socket
>> +from base64 import b64decode
>> +from socketserver import ThreadingMixIn
>> +from xmlrpc.server import SimpleXMLRPCServer,
>> SimpleXMLRPCRequestHandler
>> +import ssl
>> +import sys
>> +import threading
>> +import time
>> +
>> +# Warning: example only, replace with signed certificate
>> +# Assumes pem files are located in the same directory as this file
>> +DIRPATH = os.path.dirname(os.path.realpath(__file__))
>> +CERTFILE = DIRPATH + '/certificate.pem'
>> +KEYFILE = DIRPATH + '/key.pem'
>> +USERNAME = 'test'
>> +PASSWORD = 'test'
>> +
>> +
>> +class RequestHandler(SimpleXMLRPCRequestHandler):
>> +    """ Allow keep alives """
>> +    protocol_version = "HTTP/1.1"
>> +
>> +    def parse_request(self):
>> +        if SimpleXMLRPCRequestHandler.parse_request(self):
>> +            if self.auth(self.headers):
>> +                return True
>> +            else:
>> +                self.send_error(401, 'Unauthorized')
>> +        return False
>> +
>> +    def auth(self, headers):
>> +        (basic, _, encoded) =
>> headers.get('Authorization').partition(' ')
>> +        assert basic == 'Basic', 'Only basic authentication
>> supported'
>> +        decoded_bytes = b64decode(encoded)
>> +        decoded_string = decoded_bytes.decode()
>> +        (username, _, password) = decoded_string.partition(':')
>> +        if username == USERNAME and password == PASSWORD:
>> +            return True
>> +        return False
>> +
>> +
>> +class ThreadedRPCServer(ThreadingMixIn,
>> +                        SimpleXMLRPCServer):
>> +    """ Add thread support """
>> +    def __init__(self, bind_address, bind_port,
>> +                 requestHandler):
>> +        # IPv6 is supported
>> +        self.address_family = socket.getaddrinfo(bind_address,
>> bind_port)[0][0]
>> +        SimpleXMLRPCServer.__init__(self, (bind_address, bind_port),
>> +                                    requestHandler,
>> logRequests=False)
>> +        self.socket = ssl.wrap_socket(
>> +            socket.socket(self.address_family, self.socket_type),
>> +            server_side=True,
>> +            certfile=CERTFILE,
>> +            keyfile=KEYFILE,
>> +            cert_reqs=ssl.CERT_NONE,
>> +            ssl_version=ssl.PROTOCOL_TLSv1_2)
>> +        self.server_bind()
>> +        self.server_activate()
>> +
>> +
>> +class Arbitrator(object):
>> +    """ Implementation of a simple arbitrator """
>> +
>> +    def __init__(self):
>> +        self.port = 6666
>> +        self.kv_pairs = {}
>> +        self.mutex = threading.Lock()
>> +
>> +    def heartbeat(self, key):
>> +        timestamp = int(time.time())
>> +        self.mutex.acquire()
>> +        self.kv_pairs[key] = timestamp
>> +        self.mutex.release()
>> +        return timestamp
>> +
>> +    def set(self, key, value):
>> +        self.mutex.acquire()
>> +        self.kv_pairs[key] = value
>> +        self.mutex.release()
>> +        return True
>> +
>> +    def get(self, key):
>> +        value = ""
>> +        self.mutex.acquire()
>> +        if key in self.kv_pairs:
>> +            value = self.kv_pairs[key]
>> +        self.mutex.release()
>> +        return value
>> +
>> +    def set_if_prev(self, key, prev_value, new_value):
>> +        result = False
>> +        self.mutex.acquire()
>> +        if key in self.kv_pairs:
>> +            value = self.kv_pairs[key]
>> +            if value == prev_value:
>> +                self.kv_pairs[key] = new_value
>> +                result = True
>> +        self.mutex.release()
>> +        return result
>> +
>> +    def create(self, key, value):
>> +        result = False
>> +        self.mutex.acquire()
>> +        if key not in self.kv_pairs:
>> +            self.kv_pairs[key] = value
>> +            result = True
>> +        self.mutex.release()
>> +        return result
>> +
>> +    def delete(self, key):
>> +        result = False
>> +        self.mutex.acquire()
>> +        if key in self.kv_pairs:
>> +            del self.kv_pairs[key]
>> +            result = True
>> +        self.mutex.release()
>> +        return result
>> +
>> +    def run(self):
>> +        hostname = "::0.0.0.0"
>> +        server = ThreadedRPCServer(hostname, self.port,
>> +                                   requestHandler=RequestHandler)
>> +        print("Listening on port %d" % self.port)
>> +        server.register_multicall_functions()
>> +        server.register_function(self.heartbeat, 'heartbeat')
>> +        server.register_function(self.set, 'set')
>> +        server.register_function(self.get, 'get')
>> +        server.register_function(self.set_if_prev, 'set_if_prev')
>> +        server.register_function(self.create, 'create')
>> +        server.register_function(self.delete, 'delete')
>> +        server.serve_forever()
>> +
>> +        sys.exit(0)
>> +
>> +
>> +if __name__ == '__main__':
>> +    Arbitrator().run()


_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to