Hi Gary, ack, review only. One comment/suggestion can we provide a
small script that generates the x509 certificate (use e.g. openssl X509
... ) instead of including a self signed cert? /BR Hans
On Tue, 2019-10-01 at 12:53 +1000, Gary Lee wrote:
> ---
>  src/osaf/consensus/plugins/tcp/README          |  41 ++
>  src/osaf/consensus/plugins/tcp/certificate.pem |  20 +
>  src/osaf/consensus/plugins/tcp/key.pem         |  28 ++
>  src/osaf/consensus/plugins/tcp/tcp.plugin      | 520
> +++++++++++++++++++++++++
>  src/osaf/consensus/plugins/tcp/tcp_server.py   | 157 ++++++++
>  5 files changed, 766 insertions(+)
>  create mode 100644 src/osaf/consensus/plugins/tcp/README
>  create mode 100644 src/osaf/consensus/plugins/tcp/certificate.pem
>  create mode 100644 src/osaf/consensus/plugins/tcp/key.pem
>  create mode 100755 src/osaf/consensus/plugins/tcp/tcp.plugin
>  create mode 100755 src/osaf/consensus/plugins/tcp/tcp_server.py
> 
> diff --git a/src/osaf/consensus/plugins/tcp/README
> b/src/osaf/consensus/plugins/tcp/README
> new file mode 100644
> index 0000000..6f739e8
> --- /dev/null
> +++ b/src/osaf/consensus/plugins/tcp/README
> @@ -0,0 +1,41 @@
> +TCP arbitrator
> +
> +The TCP arbitrator may be useful for deployments where deploying
> etcd is not
> +feasible. An example arbitrator is provided to help prevent split
> brain in
> +clusters that contain up to 2 system controllers.
> +
> +The example arbitrator is a simple python based program that can be
> deployed on
> +a single payload or a node external to the cluster.
> +
> +Two main pieces of information are stored on the arbitrator: the
> hostname of the
> +current active controller and a heartbeat timestamp.
> +
> +An active controller sends a heartbeat to the controller every 100ms
> using TLs
> +over a persistent TCP connection. It should self-fence if it is
> unable to
> +heartbeat, as it is likely to be separated from the arbitrator.
> +
> +A candidate active controller must check the existing controller is
> not
> +heartbeating before promoting itself active. On a cluster using
> TIPC,
> +the timeout value is the TIPC link tolerance timeout. On a TCP based
> cluster,
> +the timeout is calculated from FMS_TAKEOVER_REQUEST_VALID_TIME.
> +
> +Suggested fmd.conf configuration:
> +
> +export FMS_SPLIT_BRAIN_PREVENTION=1
> +export FMS_KEYVALUE_STORE_PLUGIN_CMD=/full/path/to/tcp.plugin
> +export FMS_TAKEOVER_PRIORITISE_PARTITION_SIZE=0 (any other setting
> is ignored)
> +export FMS_RELAXED_NODE_PROMOTION=1
> +
> +The above settings will allow a controller to be elected active
> during
> +cluster startup, even if the arbitrator is not yet running.
> +If the arbitrator becomes temporarily unavailable, the controllers
> will
> +remain running if they can see each other. If an active controller
> becomes
> +isolated from the standby *and* the arbitrator, it will self-fence
> and the
> +standby will become active (if located in the same network partition
> as
> +the arbitrator).
> +
> +The provided self-signed certificate is an example only, and was
> generated using:
> +
> +openssl req -newkey rsa:2048 -nodes -keyout key.pem -x509 -days
> 100000 -out certificate.pem
> +
> +It must be replaced in an actual deployment!!
> diff --git a/src/osaf/consensus/plugins/tcp/certificate.pem
> b/src/osaf/consensus/plugins/tcp/certificate.pem
> new file mode 100644
> index 0000000..e0b4993
> --- /dev/null
> +++ b/src/osaf/consensus/plugins/tcp/certificate.pem
> @@ -0,0 +1,20 @@
> +-----BEGIN CERTIFICATE-----
> +MIIDUTCCAjmgAwIBAgIJANrPYThNMllvMA0GCSqGSIb3DQEBCwUAMD4xCzAJBgNV
> +BAYTAkFVMQ4wDAYDVQQIDAVTdGF0ZTENMAsGA1UEBwwEQ2l0eTEQMA4GA1UECgwH
> +T3BlblNBRjAgFw0xOTA5MzAwMDMxNTRaGA8yMjkzMDcxNTAwMzE1NFowPjELMAkG
> +A1UEBhMCQVUxDjAMBgNVBAgMBVN0YXRlMQ0wCwYDVQQHDARDaXR5MRAwDgYDVQQK
> +DAdPcGVuU0FGMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA5pCFKYnS
> ++pi0gzrRWPRYg1sak9VpNK+MkKbj+m0bptRt/8JvosV62js4q5Da3ldq2AAcEJyf
> +gd02YZ4HUDdCMgMtlWT1CAx89rNpozRwyj5g+4cfmOqiz7ApeZ9yqltInjG720DT
> +lam2/R4/00zmFGAqD2ZGPiOY93bjYx+GhtiHcDvpJuZS2Z2vQ/Dd09v6Omhus0rZ
> +WMrENyfavc7HwFv2z/qi4Hsb/Aa9ZuAXUKp1Q2cvC0XWdRJMdZaZfGUlTfY6X8ar
> +hSnswHJJKIjBq/0jYpztntOubceOuGVyezxPVXPw5qiBLO7ZyYNgN9IMoF6Rbu9y
> +K1O1MvPw3ShlDQIDAQABo1AwTjAdBgNVHQ4EFgQU7UCcR6MgV5c5JXjCHpwcUC+9
> +HIAwHwYDVR0jBBgwFoAU7UCcR6MgV5c5JXjCHpwcUC+9HIAwDAYDVR0TBAUwAwEB
> +/zANBgkqhkiG9w0BAQsFAAOCAQEAAOP3iMgjCx8JNKevOSq24mGcWAqlX0iHP0/1
> +hl7Dd/xRQywM90NfrMmiNTgO9Yyw1rOEKoeM4BFM/qs854iEHpAa7vlcW1ZidvHz
> +eMQZA2Y6+AZ9zyt41bRJGqkqW7YdKVl9yuqWHcFBqBKf1pUsvt0bkab5EZFOBPuB
> +tmKsODrU7cN1qeA1wjINZiOa88Kkh2YxkRoi7tL8NIMp2E40NLS3M5+xLEE8LKTH
> +ouhReM4eEfGfzE171NPe/kzRRp+ujNZwmyQ8xmWp6jPjfD7Mfqdf1WYjspiGzziQ
> +R/cdEHHAWq+wZrfG1aB5/yU4iA0h8xR8PNfVHjAjuUn4N6tSFg==
> +-----END CERTIFICATE-----
> diff --git a/src/osaf/consensus/plugins/tcp/key.pem
> b/src/osaf/consensus/plugins/tcp/key.pem
> new file mode 100644
> index 0000000..66b8bcb
> --- /dev/null
> +++ b/src/osaf/consensus/plugins/tcp/key.pem
> @@ -0,0 +1,28 @@
> +-----BEGIN PRIVATE KEY-----
> +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDmkIUpidL6mLSD
> +OtFY9FiDWxqT1Wk0r4yQpuP6bRum1G3/wm+ixXraOzirkNreV2rYABwQnJ+B3TZh
> +ngdQN0IyAy2VZPUIDHz2s2mjNHDKPmD7hx+Y6qLPsCl5n3KqW0ieMbvbQNOVqbb9
> +Hj/TTOYUYCoPZkY+I5j3duNjH4aG2IdwO+km5lLZna9D8N3T2/o6aG6zStlYysQ3
> +J9q9zsfAW/bP+qLgexv8Br1m4BdQqnVDZy8LRdZ1Ekx1lpl8ZSVN9jpfxquFKezA
> +ckkoiMGr/SNinO2e065tx464ZXJ7PE9Vc/DmqIEs7tnJg2A30gygXpFu73IrU7Uy
> +8/DdKGUNAgMBAAECggEBAIPsr3T8Go8u7yKjdgPbFAZdC5EJLIBr7hcalxnEcmz7
> +4dDU9UGCk2/pMNziLonIJSdgsK5En/QTmjkyzeZ1J9gr/1obASVQ1/Pk5o3uxJbE
> +KIPjZg3+O5hwqNqhhF+7iYqS1HV421goWr7sHwMNXhzMhWR4VbrabX0zNpqSAEyI
> +5LKVbekGbmHT8zPePH8a4wUAoSi0ExA1A+q58pljlE2TQlz3PDInO3UtPNGHaFYR
> +oJD05bAGzXdnEmezBauKZN6v+sVzehOK2f5/HEA4ajTnDoOlOe1CjkOjhvsYKxxz
> +/tauXrfUscftn0yX4bUxrT+z5b6ObTavaGoh/f5KjGECgYEA9c3XlOeoxT6GhWxX
> ++PyAwsWu29aBZKNcUTgWvIULr9hZkhWSRocWJZiruUxzw0qlsq16j88Z1m0Tsiin
> +3hMFuQNSeGZDbTpJHBctAwykAkp5vQthHkA6umkjcptgsvSHvROWMBM1v0WTpmc4
> +5y2I9/91Sp1XEpqIoUo9K1tnAgkCgYEA8CDaAcaNlp2Q/QSxQaZgvRseOkZljapu
> +z2gGFWms3EXQMHJVIN78brHjCMD/KTdGSWQ1QVpNX4tVJeQB/rM0Zby7IvQ03hTi
> +l93Est8/AJ940ndnTMu/qGjR1PIvldEVSQjjTrv7SI1gKYuEWwkhDT8DYuM8D+39
> +OiR8sBUPu+UCgYEA8FQDHT7nP3sjsZ494V6YUldP4Pe46Xnj0DFj4Yy/4X1KDk/z
> +BewcVkQQWosKgH4ixjFhrOvTmlhpsn6XqsS0irFZ2Ag8krYzNzjdtKaOUQMaRiCz
> +Iw2vngUgOHT8tdvqP47AAF835LyVYVR5SWa3DasCtiJiOPlI71ITvqmObGkCgYAF
> +hYvdzJYG55qk5s24p73Du3LnfiTprAieRlMVfPeXnRdbddWVSutdFEQXOHDlnrrE
> +B0TeOthaoRaVJ3gQRkinKj7XX+wzSyGmwle6kT2eowwhMtOyRWEj2z0v+12ywTsP
> +EeBAODxf/7g9XFLv0Pbsmg1W4cjIyP0wsBQZ7HIPLQKBgQDP35F9aSHz53D2+/fD
> +WoTdLfkH7rIAKUtlffp8fQzbm9onRx7LC/iJ1GjQQyKsZaJSX6+C4kNlWUgxQHo0
> +6uGPtbS6kKUOzYOAMKO3GPeqskwwLS4ocZeq4BnsrwOsXiDAkACcEZS6QZI6tgsv
> +0ZCcum4WK037YSkrI1IFBUlKXA==
> +-----END PRIVATE KEY-----
> diff --git a/src/osaf/consensus/plugins/tcp/tcp.plugin
> b/src/osaf/consensus/plugins/tcp/tcp.plugin
> new file mode 100755
> index 0000000..1b5ddf5
> --- /dev/null
> +++ b/src/osaf/consensus/plugins/tcp/tcp.plugin
> @@ -0,0 +1,520 @@
> +#!/usr/bin/env python3
> +"""
> +     -*- OpenSAF  -*-
> +
> +(C) Copyright 2019 Ericsson AB 2019 - All Rights Reserved.
> +
> +This program is distributed in the hope that it will be useful, but
> +WITHOUT ANY WARRANTY; without even the implied warranty of
> MERCHANTABILITY
> +or FITNESS FOR A PARTICULAR PURPOSE. This file and program are
> licensed
> +under the GNU Lesser General Public License Version 2.1, February
> 1999.
> +The complete license can be accessed from the following location:
> +
> https://protect2.fireeye.com/url?k=e5558a57-b9dc5010-e555cacc-0cc47ad93e96-95f7c31077941b95&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php
> +See the Copying file included with the OpenSAF distribution for full
> +licensing terms.
> +
> +"""
> +
> +import argparse
> +import os
> +import socket
> +import ssl
> +import sys
> +import time
> +import xmlrpc.client
> +import syslog
> +
> +
> +class ArbitratorPlugin(object):
> +    """ This class represents a TCP Plugin """
> +
> +    def __init__(self):
> +        # Replace with actual URL
> +        self.uri = "https://test:test@arbitrator:6666/";
> +        # Warning: certificate verification is disabled
> +        self.proxy = xmlrpc.client.ServerProxy(
> +            self.uri, context=ssl._create_unverified_context())
> +
> +        # Get tipc link tolerance timeout if OpenSAF in used with
> TIPC
> +        link_tolerance = self.get_tipc_tolerance()
> +        self.timeout = link_tolerance
> +
> +        # TODO make configurable? default to 1s
> +        self.socket_timeout = 1
> +        socket.setdefaulttimeout(self.socket_timeout)
> +
> +        # TODO make configurable? default to 100ms
> +        self.heartbeat_interval = 0.1
> +
> +        # constants
> +        self.keyname = 'opensaf_consensus_lock'
> +        self.takeover_request = 'takeover_request'
> +        self.testkey = "opensaf_write_test"
> +
> +        # location where opensaf node_name is defined
> +        self.node_name_file = '/etc/opensaf/node_name'
> +        self.hostname = self.get_node_name()
> +
> +    def run(self):
> +        """ Main method to handle input and output """
> +        result = self.parse_cmd()
> +        if 'output' in result:
> +            print(result['output'])
> +        sys.exit(result['code'])
> +
> +    def parse_cmd(self):
> +        """ Handle input parameters from user """
> +        parser = argparse.ArgumentParser(description="Arbitration
> Plugin")
> +        sub_parser = parser.add_subparsers(
> +            dest="command",
> +            help="Execute script with this command"
> +        )
> +
> +        parser_get = sub_parser.add_parser(
> +            "get",
> +            help="Retrieve <value> of <key> in tcp database"
> +        )
> +        parser_get.add_argument("key", metavar="<key>")
> +
> +        parser_set = sub_parser.add_parser(
> +            "set",
> +            help="Set <key> to <value> in tcp database"
> +        )
> +        parser_set.add_argument("key", metavar="<key>")
> +        parser_set.add_argument("value", metavar="<value>")
> +        parser_set.add_argument("timeout", metavar="<timeout>",
> type=int)
> +
> +        parser_create = sub_parser.add_parser(
> +            "create",
> +            help="Create <key> and set to <value>"
> +                 " for <timeout> second in tcp database"
> +        )
> +        parser_create.add_argument("key", metavar="<key>")
> +        parser_create.add_argument("value", metavar="<value>")
> +        parser_create.add_argument("timeout", metavar="<timeout>",
> type=int)
> +
> +        parser_set_if_prev = sub_parser.add_parser(
> +            "set_if_prev",
> +            help="Set <key> to <value> if the existing value matches
> <prev>"
> +        )
> +        parser_set_if_prev.add_argument("key", metavar="<key>")
> +        parser_set_if_prev.add_argument("value", metavar="<value>")
> +        parser_set_if_prev.add_argument("prev", metavar="<prev>")
> +        parser_set_if_prev.add_argument(
> +            "timeout", metavar="<timeout>", type=int)
> +
> +        parser_lock = sub_parser.add_parser(
> +            "lock",
> +            help="Set <owner> of the lock,"
> +                 " automatically unlock after <timeout>"
> +        )
> +        parser_lock.add_argument("owner", metavar="<owner>")
> +        parser_lock.add_argument("timeout", metavar="<timeout>",
> type=int)
> +
> +        parser_unlock = sub_parser.add_parser(
> +            "unlock",
> +            help="Unlock the lock from <owner>"
> +        )
> +        parser_unlock.add_argument("owner", metavar="<owner>")
> +        parser_unlock.add_argument(
> +            "--force",
> +            help="force to unlock the lock even if lock isn't held
> by <owner>",
> +            action='store_true'
> +        )
> +
> +        sub_parser.add_parser(
> +            "lock_owner",
> +            help="Retrieve owner of lock"
> +        )
> +
> +        parser_watch = sub_parser.add_parser(
> +            "watch",
> +            help="Watch events stream on <key>"
> +        )
> +        parser_watch.add_argument("key", metavar="<key>")
> +
> +        sub_parser.add_parser(
> +            "watch_lock",
> +            help="Watch events stream on lock"
> +        )
> +
> +        parser_erase = sub_parser.add_parser(
> +            "erase",
> +            help="Remove the <key> from tcp database"
> +        )
> +        parser_erase.add_argument("key", metavar="<key>")
> +
> +        args = vars(parser.parse_args())
> +        command = args.pop('command')
> +        params = []
> +        if args:
> +            params.append(args)
> +        return getattr(self, command)(*params)
> +
> +    def get_node_name(self):
> +        node_file = open(self.node_name_file)
> +        node_name = node_file.readline().strip()
> +        node_file.close()
> +        return node_name
> +
> +    def get_tipc_tolerance(self):
> +        """ Retrieve tipc link tolerance timeout """
> +        # Get bearer
> +        bearer = os.popen('tipc bearer list').read().strip()
> +        if bearer is not None and len(bearer) > 0:
> +            tokens = bearer.split(":")
> +            tol_to = os.popen('tipc bearer get tolerance media '
> +                              '%s device %s' %
> +                              (tokens[0], tokens[1])).read()
> +            return float(tol_to) / 1000
> +
> +        takeover_timeout = os.environ.get(
> +            'FMS_TAKEOVER_REQUEST_VALID_TIME')
> +        if takeover_timeout is not None and int(takeover_timeout) >
> 0:
> +            return int(takeover_timeout) / 2
> +
> +        # default to 5 seconds
> +        return 5
> +
> +    def is_active(self):
> +        """ Should this node be active according to RDE """
> +        state = os.popen('rdegetrole').read().strip()
> +        return bool(state == 'ACTIVE')
> +
> +    def get(self, params):
> +        """ Retrieve value of key in tcp database
> +
> +        Args:
> +            params (dict): contains key from user input
> +
> +        Returns:
> +            dict: result with format
> +            {
> +                'code': <return code>,
> +                'output': <value of the key>
> +            }
> +                * code = 0: success
> +                * code = 1: invalid param
> +                * code = 2: other failure
> +        """
> +        ret = {'code': 1, 'output': ''}
> +
> +        # takeover_request will not be used
> +        return ret
> +
> +    def set(self, params):
> +        """ Set key to value in tcp database
> +
> +        Args:
> +            params (dict): contains key, value from user input
> +
> +        Returns:
> +            dict: result with format
> +            {
> +                'code': <return code>
> +            }
> +                * code = 0: success
> +                * code = non-zero: failure
> +        """
> +
> +        ret = {'code': 1}
> +        key = params['key']
> +        value = params['value']
> +        # params['timeout']
> +
> +        if key == self.testkey:
> +            try:
> +                result = self.proxy.set(key, value)
> +            except socket.error:
> +                result = False
> +            if result:
> +                ret['code'] = 0
> +
> +        return ret
> +
> +    def create(self, params):
> +        """ Create key and set to value for timeout second in tcp
> database
> +
> +        Args:
> +            params (dict): contains key, value, timeout
> +             from user input
> +
> +        Returns:
> +            dict: result with format
> +            {
> +                'code': <return code>
> +            }
> +                * code = 0: success
> +                * code = 1: already exist
> +                * code = 2: invalid param
> +                * code = 3: other failure
> +        """
> +        ret = {'code': 2}
> +        key = params['key']
> +
> +        if key == self.takeover_request:
> +            # takeover_request will not be used
> +            ret['code'] = 2
> +        return ret
> +
> +    def set_if_prev(self, params):
> +        """ Set key to value if the existing value matches previous
> value
> +
> +        Args:
> +            params (dict): contains key (str),
> +             value (str), prev (str) from user input
> +
> +        Returns:
> +            dict: result with format
> +            {
> +                'code': <return code>
> +            }
> +                * code = 0: success
> +                * code = non-zero: failure
> +        """
> +        ret = {'code': 1}
> +        key = params['key']
> +        # params['value']
> +        # params['timeout']
> +
> +        if key == self.takeover_request:
> +            # takeover_request will not be used
> +            # pretend it worked
> +            ret['code'] = 0
> +
> +        return ret
> +
> +    def lock(self, params):
> +        """ Set owner of the lock, automatically unlock after
> timeout
> +
> +        Args:
> +            params (dict): contains owner (str),
> +             timeout (integer) from user input
> +
> +        Returns:
> +            dict: result with format
> +            {
> +                'code': <return code>,
> +                'output': <current owner of the lock>
> +            }
> +                * code = 0: success
> +                * code = 1: lock is owned by someone else
> +                * code = 2 or above: other failure
> +
> +        NOTE: if lock is already acquired by owner, then timeout is
> extended
> +        TODO: timeout not yet implemented
> +        """
> +        ret = {'code': 0}
> +        owner = params['owner']
> +        owner_up = False
> +
> +        try:
> +            result = self.proxy.create(self.keyname, owner)
> +            if result is False:
> +                current_owner = self.proxy.get(self.keyname)
> +                if current_owner == owner:
> +                    syslog.syslog("lock is already held by this
> node")
> +                    return ret
> +                original_timestamp = self.proxy.get(current_owner)
> +                if original_timestamp:
> +                    original_timestamp = int(original_timestamp)
> +                else:
> +                    original_timestamp = 0
> +                # check if the owner is updating the arbitrator
> +                while True:
> +                    # get current time from arbitrator
> +                    time_at_arb =
> self.proxy.heartbeat(self.hostname)
> +                    # last time the current owner heartbeated
> +                    last_timestamp = self.proxy.get(current_owner)
> +                    if last_timestamp > original_timestamp:
> +                        # owner is updating arbitrator OK
> +                        syslog.syslog("current active is
> heartbeating")
> +                        owner_up = True
> +                        break
> +                    elif time_at_arb > (original_timestamp +
> self.timeout):
> +                        syslog.syslog("current active is not
> heartbeating")
> +                        # more than 'timeout' seconds have elapsed
> since
> +                        # the last update, we can assume the old
> +                        # active is not heartbeating
> +                        owner_up = False
> +                        break
> +                    time.sleep(self.heartbeat_interval)
> +                if owner_up:
> +                    ret['code'] = 1
> +                else:
> +                    # owner is not updating arbitrator
> +                    self.proxy.delete(self.keyname)
> +                    return self.lock(params)
> +            else:
> +                syslog.syslog("obtained lock at arbitrator")
> +        except socket.error:
> +            syslog.syslog("socket error")
> +            ret['code'] = 2
> +
> +        return ret
> +
> +    def unlock(self, params):
> +        """ Unlock the lock from owner
> +
> +        Args:
> +            params (dict): contains owner (str),
> +             force (boolean) from user input
> +                * force = True: force to unlock the lock even if
> +                    owner don't match with current owner
> +
> +        Returns:
> +            dict: result with format
> +            {
> +                'code': <return code>,
> +                'output': <current owner of the lock>
> +            }
> +                * code = 0: success
> +                * code = 1: lock is owned by someone else
> +                * code = 2 or above: other failure
> +        """
> +        ret = {'code': 0}
> +        owner = params['owner']
> +
> +        # remove 'owner.lock'
> +        try:
> +            current_owner = self.proxy.get(self.keyname)
> +            if current_owner == owner:
> +                result = self.proxy.delete(self.keyname)
> +                if result is False:
> +                    ret['code'] = 2
> +            else:
> +                ret['code'] = 1
> +        except socket.error:
> +            ret['code'] = 2
> +
> +        return ret
> +
> +    def lock_owner(self):
> +        """ Retrieve owner of lock
> +
> +        Returns:
> +            dict: result with format
> +            {
> +                'code': <return code>,
> +                'output': <current owner of the lock>
> +            }
> +                * code = 0: success
> +                * code = non-zero: failure
> +        """
> +        ret = {'code': 0, 'output': 'unknown'}
> +
> +        try:
> +            current_owner = self.proxy.get(self.keyname)
> +            ret['output'] = current_owner
> +        except socket.error:
> +            ret['code'] = 1
> +
> +        return ret
> +
> +    def watch_lock(self):
> +        """ Watch events stream on lock
> +
> +        Returns:
> +            dict: result with format
> +            {
> +                'code': <return code>,
> +                'output': <new value of the lock>
> +            }
> +                * code = 0: success
> +                * code = non-zero: failure
> +        """
> +        return self.watch({'key': self.keyname})
> +
> +    def erase(self, params):
> +        """ Remove the key from tcp database
> +
> +        Args:
> +            params (dict): contains key from user input
> +
> +        Returns:
> +            dict: result with format
> +            {
> +                'code': <return code>
> +            }
> +                * code = 0: success
> +                * code = non-zero: failure
> +        """
> +        ret = {'code': 1}
> +        key = params['key']
> +
> +        if key == self.takeover_request:
> +            # takeover_request will not be used
> +            # pretend it worked
> +            ret['code'] = 0
> +
> +        return ret
> +
> +    def watch(self, params):
> +        """ Watch events stream on key
> +
> +        Args:
> +            params (dict): contains key from user input
> +
> +        Returns:
> +            dict: result with format
> +            {
> +                'code': <return code>,
> +                'output': <new value of the key>
> +            }
> +                * code = 0: success
> +                * code = non-zero: failure
> +        """
> +        ret = {'code': 0, 'output': 'unknown'}
> +        key = params['key']
> +
> +        if key != self.keyname and key != self.takeover_request:
> +            # no other key is supported
> +            ret['code'] = 2
> +            return ret
> +
> +        last_arb_timestamp = 0
> +        while True:
> +            if key == self.takeover_request:
> +                if self.is_active() is False:
> +                    # maybe a controller swap occurred
> +                    break
> +                while True:
> +                    try:
> +                        time_at_arb =
> self.proxy.heartbeat(self.hostname)
> +                        if last_arb_timestamp == 0:
> +                            last_arb_timestamp = time_at_arb
> +                            break
> +                        elif (time_at_arb - last_arb_timestamp) >
> self.timeout:
> +                            # VM was frozen?
> +                            syslog.syslog('VM was frozen!')
> +                            ret['code'] = 126
> +                            return ret
> +                        else:
> +                            last_arb_timestamp = time_at_arb
> +                            break
> +                    except socket.error:
> +                        # can't heartbeat, need to self-fence (if
> peer down)
> +                        syslog.syslog('cannot heartbeat, inform
> rded')
> +                        ret['output'] = self.hostname + \
> +                            ' SC-0 10000000 UNDEFINED'
> +                        return ret
> +            elif key == self.keyname:
> +                try:
> +                    current_owner = self.proxy.get(self.keyname)
> +                    if not current_owner:
> +                        if self.is_active() is False:
> +                            # a switchover occurred
> +                            break
> +                        # maybe the arbitrator restarted
> +                        self.proxy.create(self.keyname,
> self.hostname)
> +                    elif current_owner != self.hostname:
> +                        ret['output'] = current_owner
> +                        break
> +                except socket.error:
> +                    pass
> +            time.sleep(self.heartbeat_interval)
> +        return ret
> +
> +
> +if __name__ == '__main__':
> +    ArbitratorPlugin().run()
> diff --git a/src/osaf/consensus/plugins/tcp/tcp_server.py
> b/src/osaf/consensus/plugins/tcp/tcp_server.py
> new file mode 100755
> index 0000000..a7f22f2
> --- /dev/null
> +++ b/src/osaf/consensus/plugins/tcp/tcp_server.py
> @@ -0,0 +1,157 @@
> +#!/usr/bin/env python3
> +"""
> +     -*- OpenSAF  -*-
> +
> +(C) Copyright 2019 Ericsson AB 2019 - All Rights Reserved.
> +
> +This program is distributed in the hope that it will be useful, but
> +WITHOUT ANY WARRANTY; without even the implied warranty of
> MERCHANTABILITY
> +or FITNESS FOR A PARTICULAR PURPOSE. This file and program are
> licensed
> +under the GNU Lesser General Public License Version 2.1, February
> 1999.
> +The complete license can be accessed from the following location:
> +
> https://protect2.fireeye.com/url?k=3aa9040c-6620de4b-3aa94497-0cc47ad93e96-88a20cd595459a91&q=1&u=http%3A%2F%2Fopensource.org%2Flicenses%2Flgpl-license.php
> +See the Copying file included with the OpenSAF distribution for full
> +licensing terms.
> +
> +Simple TCP arbitrator
> +"""
> +
> +import os
> +import socket
> +from base64 import b64decode
> +from socketserver import ThreadingMixIn
> +from xmlrpc.server import SimpleXMLRPCServer,
> SimpleXMLRPCRequestHandler
> +import ssl
> +import sys
> +import threading
> +import time
> +
> +# Warning: example only, replace with signed certificate
> +# Assumes pem files are located in the same directory as this file
> +DIRPATH = os.path.dirname(os.path.realpath(__file__))
> +CERTFILE = DIRPATH + '/certificate.pem'
> +KEYFILE = DIRPATH + '/key.pem'
> +USERNAME = 'test'
> +PASSWORD = 'test'
> +
> +
> +class RequestHandler(SimpleXMLRPCRequestHandler):
> +    """ Allow keep alives """
> +    protocol_version = "HTTP/1.1"
> +
> +    def parse_request(self):
> +        if SimpleXMLRPCRequestHandler.parse_request(self):
> +            if self.auth(self.headers):
> +                return True
> +            else:
> +                self.send_error(401, 'Unauthorized')
> +        return False
> +
> +    def auth(self, headers):
> +        (basic, _, encoded) =
> headers.get('Authorization').partition(' ')
> +        assert basic == 'Basic', 'Only basic authentication
> supported'
> +        decoded_bytes = b64decode(encoded)
> +        decoded_string = decoded_bytes.decode()
> +        (username, _, password) = decoded_string.partition(':')
> +        if username == USERNAME and password == PASSWORD:
> +            return True
> +        return False
> +
> +
> +class ThreadedRPCServer(ThreadingMixIn,
> +                        SimpleXMLRPCServer):
> +    """ Add thread support """
> +    def __init__(self, bind_address, bind_port,
> +                 requestHandler):
> +        # IPv6 is supported
> +        self.address_family = socket.getaddrinfo(bind_address,
> bind_port)[0][0]
> +        SimpleXMLRPCServer.__init__(self, (bind_address, bind_port),
> +                                    requestHandler,
> logRequests=False)
> +        self.socket = ssl.wrap_socket(
> +            socket.socket(self.address_family, self.socket_type),
> +            server_side=True,
> +            certfile=CERTFILE,
> +            keyfile=KEYFILE,
> +            cert_reqs=ssl.CERT_NONE,
> +            ssl_version=ssl.PROTOCOL_TLSv1_2)
> +        self.server_bind()
> +        self.server_activate()
> +
> +
> +class Arbitrator(object):
> +    """ Implementation of a simple arbitrator """
> +
> +    def __init__(self):
> +        self.port = 6666
> +        self.kv_pairs = {}
> +        self.mutex = threading.Lock()
> +
> +    def heartbeat(self, key):
> +        timestamp = int(time.time())
> +        self.mutex.acquire()
> +        self.kv_pairs[key] = timestamp
> +        self.mutex.release()
> +        return timestamp
> +
> +    def set(self, key, value):
> +        self.mutex.acquire()
> +        self.kv_pairs[key] = value
> +        self.mutex.release()
> +        return True
> +
> +    def get(self, key):
> +        value = ""
> +        self.mutex.acquire()
> +        if key in self.kv_pairs:
> +            value = self.kv_pairs[key]
> +        self.mutex.release()
> +        return value
> +
> +    def set_if_prev(self, key, prev_value, new_value):
> +        result = False
> +        self.mutex.acquire()
> +        if key in self.kv_pairs:
> +            value = self.kv_pairs[key]
> +            if value == prev_value:
> +                self.kv_pairs[key] = new_value
> +                result = True
> +        self.mutex.release()
> +        return result
> +
> +    def create(self, key, value):
> +        result = False
> +        self.mutex.acquire()
> +        if key not in self.kv_pairs:
> +            self.kv_pairs[key] = value
> +            result = True
> +        self.mutex.release()
> +        return result
> +
> +    def delete(self, key):
> +        result = False
> +        self.mutex.acquire()
> +        if key in self.kv_pairs:
> +            del self.kv_pairs[key]
> +            result = True
> +        self.mutex.release()
> +        return result
> +
> +    def run(self):
> +        hostname = "::0.0.0.0"
> +        server = ThreadedRPCServer(hostname, self.port,
> +                                   requestHandler=RequestHandler)
> +        print("Listening on port %d" % self.port)
> +        server.register_multicall_functions()
> +        server.register_function(self.heartbeat, 'heartbeat')
> +        server.register_function(self.set, 'set')
> +        server.register_function(self.get, 'get')
> +        server.register_function(self.set_if_prev, 'set_if_prev')
> +        server.register_function(self.create, 'create')
> +        server.register_function(self.delete, 'delete')
> +        server.serve_forever()
> +
> +        sys.exit(0)
> +
> +
> +if __name__ == '__main__':
> +    Arbitrator().run()

_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to