Hi Gary,

ACK from me.

Best Regards,
ThuanTr

-----Original Message-----
From: Gary Lee <gary....@dektech.com.au> 
Sent: Tuesday, October 1, 2019 9:54 AM
To: hans.nordeb...@ericsson.com; minh.c...@dektech.com.au;
anders.wid...@ericsson.com
Cc: opensaf-devel@lists.sourceforge.net
Subject: [devel] [PATCH 1/1] osaf: add tcp arbitrator [#3064]

---
 src/osaf/consensus/plugins/tcp/README          |  41 ++
 src/osaf/consensus/plugins/tcp/certificate.pem |  20 +
 src/osaf/consensus/plugins/tcp/key.pem         |  28 ++
 src/osaf/consensus/plugins/tcp/tcp.plugin      | 520
+++++++++++++++++++++++++
 src/osaf/consensus/plugins/tcp/tcp_server.py   | 157 ++++++++
 5 files changed, 766 insertions(+)
 create mode 100644 src/osaf/consensus/plugins/tcp/README
 create mode 100644 src/osaf/consensus/plugins/tcp/certificate.pem
 create mode 100644 src/osaf/consensus/plugins/tcp/key.pem
 create mode 100755 src/osaf/consensus/plugins/tcp/tcp.plugin
 create mode 100755 src/osaf/consensus/plugins/tcp/tcp_server.py

diff --git a/src/osaf/consensus/plugins/tcp/README
b/src/osaf/consensus/plugins/tcp/README
new file mode 100644
index 0000000..6f739e8
--- /dev/null
+++ b/src/osaf/consensus/plugins/tcp/README
@@ -0,0 +1,41 @@
+TCP arbitrator
+
+The TCP arbitrator may be useful for deployments where deploying etcd 
+is not feasible. An example arbitrator is provided to help prevent 
+split brain in clusters that contain up to 2 system controllers.
+
+The example arbitrator is a simple python based program that can be 
+deployed on a single payload or a node external to the cluster.
+
+Two main pieces of information are stored on the arbitrator: the 
+hostname of the current active controller and a heartbeat timestamp.
+
+An active controller sends a heartbeat to the controller every 100ms 
+using TLs over a persistent TCP connection. It should self-fence if it 
+is unable to heartbeat, as it is likely to be separated from the
arbitrator.
+
+A candidate active controller must check the existing controller is not 
+heartbeating before promoting itself active. On a cluster using TIPC, 
+the timeout value is the TIPC link tolerance timeout. On a TCP based 
+cluster, the timeout is calculated from FMS_TAKEOVER_REQUEST_VALID_TIME.
+
+Suggested fmd.conf configuration:
+
+export FMS_SPLIT_BRAIN_PREVENTION=1
+export FMS_KEYVALUE_STORE_PLUGIN_CMD=/full/path/to/tcp.plugin
+export FMS_TAKEOVER_PRIORITISE_PARTITION_SIZE=0 (any other setting is 
+ignored) export FMS_RELAXED_NODE_PROMOTION=1
+
+The above settings will allow a controller to be elected active during 
+cluster startup, even if the arbitrator is not yet running.
+If the arbitrator becomes temporarily unavailable, the controllers will 
+remain running if they can see each other. If an active controller 
+becomes isolated from the standby *and* the arbitrator, it will 
+self-fence and the standby will become active (if located in the same 
+network partition as the arbitrator).
+
+The provided self-signed certificate is an example only, and was generated
using:
+
+openssl req -newkey rsa:2048 -nodes -keyout key.pem -x509 -days 100000 
+-out certificate.pem
+
+It must be replaced in an actual deployment!!
diff --git a/src/osaf/consensus/plugins/tcp/certificate.pem
b/src/osaf/consensus/plugins/tcp/certificate.pem
new file mode 100644
index 0000000..e0b4993
--- /dev/null
+++ b/src/osaf/consensus/plugins/tcp/certificate.pem
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDUTCCAjmgAwIBAgIJANrPYThNMllvMA0GCSqGSIb3DQEBCwUAMD4xCzAJBgNV
+BAYTAkFVMQ4wDAYDVQQIDAVTdGF0ZTENMAsGA1UEBwwEQ2l0eTEQMA4GA1UECgwH
+T3BlblNBRjAgFw0xOTA5MzAwMDMxNTRaGA8yMjkzMDcxNTAwMzE1NFowPjELMAkG
+A1UEBhMCQVUxDjAMBgNVBAgMBVN0YXRlMQ0wCwYDVQQHDARDaXR5MRAwDgYDVQQK
+DAdPcGVuU0FGMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA5pCFKYnS
++pi0gzrRWPRYg1sak9VpNK+MkKbj+m0bptRt/8JvosV62js4q5Da3ldq2AAcEJyf
+gd02YZ4HUDdCMgMtlWT1CAx89rNpozRwyj5g+4cfmOqiz7ApeZ9yqltInjG720DT
+lam2/R4/00zmFGAqD2ZGPiOY93bjYx+GhtiHcDvpJuZS2Z2vQ/Dd09v6Omhus0rZ
+WMrENyfavc7HwFv2z/qi4Hsb/Aa9ZuAXUKp1Q2cvC0XWdRJMdZaZfGUlTfY6X8ar
+hSnswHJJKIjBq/0jYpztntOubceOuGVyezxPVXPw5qiBLO7ZyYNgN9IMoF6Rbu9y
+K1O1MvPw3ShlDQIDAQABo1AwTjAdBgNVHQ4EFgQU7UCcR6MgV5c5JXjCHpwcUC+9
+HIAwHwYDVR0jBBgwFoAU7UCcR6MgV5c5JXjCHpwcUC+9HIAwDAYDVR0TBAUwAwEB
+/zANBgkqhkiG9w0BAQsFAAOCAQEAAOP3iMgjCx8JNKevOSq24mGcWAqlX0iHP0/1
+hl7Dd/xRQywM90NfrMmiNTgO9Yyw1rOEKoeM4BFM/qs854iEHpAa7vlcW1ZidvHz
+eMQZA2Y6+AZ9zyt41bRJGqkqW7YdKVl9yuqWHcFBqBKf1pUsvt0bkab5EZFOBPuB
+tmKsODrU7cN1qeA1wjINZiOa88Kkh2YxkRoi7tL8NIMp2E40NLS3M5+xLEE8LKTH
+ouhReM4eEfGfzE171NPe/kzRRp+ujNZwmyQ8xmWp6jPjfD7Mfqdf1WYjspiGzziQ
+R/cdEHHAWq+wZrfG1aB5/yU4iA0h8xR8PNfVHjAjuUn4N6tSFg==
+-----END CERTIFICATE-----
diff --git a/src/osaf/consensus/plugins/tcp/key.pem
b/src/osaf/consensus/plugins/tcp/key.pem
new file mode 100644
index 0000000..66b8bcb
--- /dev/null
+++ b/src/osaf/consensus/plugins/tcp/key.pem
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDmkIUpidL6mLSD
+OtFY9FiDWxqT1Wk0r4yQpuP6bRum1G3/wm+ixXraOzirkNreV2rYABwQnJ+B3TZh
+ngdQN0IyAy2VZPUIDHz2s2mjNHDKPmD7hx+Y6qLPsCl5n3KqW0ieMbvbQNOVqbb9
+Hj/TTOYUYCoPZkY+I5j3duNjH4aG2IdwO+km5lLZna9D8N3T2/o6aG6zStlYysQ3
+J9q9zsfAW/bP+qLgexv8Br1m4BdQqnVDZy8LRdZ1Ekx1lpl8ZSVN9jpfxquFKezA
+ckkoiMGr/SNinO2e065tx464ZXJ7PE9Vc/DmqIEs7tnJg2A30gygXpFu73IrU7Uy
+8/DdKGUNAgMBAAECggEBAIPsr3T8Go8u7yKjdgPbFAZdC5EJLIBr7hcalxnEcmz7
+4dDU9UGCk2/pMNziLonIJSdgsK5En/QTmjkyzeZ1J9gr/1obASVQ1/Pk5o3uxJbE
+KIPjZg3+O5hwqNqhhF+7iYqS1HV421goWr7sHwMNXhzMhWR4VbrabX0zNpqSAEyI
+5LKVbekGbmHT8zPePH8a4wUAoSi0ExA1A+q58pljlE2TQlz3PDInO3UtPNGHaFYR
+oJD05bAGzXdnEmezBauKZN6v+sVzehOK2f5/HEA4ajTnDoOlOe1CjkOjhvsYKxxz
+/tauXrfUscftn0yX4bUxrT+z5b6ObTavaGoh/f5KjGECgYEA9c3XlOeoxT6GhWxX
++PyAwsWu29aBZKNcUTgWvIULr9hZkhWSRocWJZiruUxzw0qlsq16j88Z1m0Tsiin
+3hMFuQNSeGZDbTpJHBctAwykAkp5vQthHkA6umkjcptgsvSHvROWMBM1v0WTpmc4
+5y2I9/91Sp1XEpqIoUo9K1tnAgkCgYEA8CDaAcaNlp2Q/QSxQaZgvRseOkZljapu
+z2gGFWms3EXQMHJVIN78brHjCMD/KTdGSWQ1QVpNX4tVJeQB/rM0Zby7IvQ03hTi
+l93Est8/AJ940ndnTMu/qGjR1PIvldEVSQjjTrv7SI1gKYuEWwkhDT8DYuM8D+39
+OiR8sBUPu+UCgYEA8FQDHT7nP3sjsZ494V6YUldP4Pe46Xnj0DFj4Yy/4X1KDk/z
+BewcVkQQWosKgH4ixjFhrOvTmlhpsn6XqsS0irFZ2Ag8krYzNzjdtKaOUQMaRiCz
+Iw2vngUgOHT8tdvqP47AAF835LyVYVR5SWa3DasCtiJiOPlI71ITvqmObGkCgYAF
+hYvdzJYG55qk5s24p73Du3LnfiTprAieRlMVfPeXnRdbddWVSutdFEQXOHDlnrrE
+B0TeOthaoRaVJ3gQRkinKj7XX+wzSyGmwle6kT2eowwhMtOyRWEj2z0v+12ywTsP
+EeBAODxf/7g9XFLv0Pbsmg1W4cjIyP0wsBQZ7HIPLQKBgQDP35F9aSHz53D2+/fD
+WoTdLfkH7rIAKUtlffp8fQzbm9onRx7LC/iJ1GjQQyKsZaJSX6+C4kNlWUgxQHo0
+6uGPtbS6kKUOzYOAMKO3GPeqskwwLS4ocZeq4BnsrwOsXiDAkACcEZS6QZI6tgsv
+0ZCcum4WK037YSkrI1IFBUlKXA==
+-----END PRIVATE KEY-----
diff --git a/src/osaf/consensus/plugins/tcp/tcp.plugin
b/src/osaf/consensus/plugins/tcp/tcp.plugin
new file mode 100755
index 0000000..1b5ddf5
--- /dev/null
+++ b/src/osaf/consensus/plugins/tcp/tcp.plugin
@@ -0,0 +1,520 @@
+#!/usr/bin/env python3
+"""
+     -*- OpenSAF  -*-
+
+(C) Copyright 2019 Ericsson AB 2019 - All Rights Reserved.
+
+This program is distributed in the hope that it will be useful, but 
+WITHOUT ANY WARRANTY; without even the implied warranty of 
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. This file and 
+program are licensed under the GNU Lesser General Public License Version
2.1, February 1999.
+The complete license can be accessed from the following location:
+http://opensource.org/licenses/lgpl-license.php
+See the Copying file included with the OpenSAF distribution for full 
+licensing terms.
+
+"""
+
+import argparse
+import os
+import socket
+import ssl
+import sys
+import time
+import xmlrpc.client
+import syslog
+
+
+class ArbitratorPlugin(object):
+    """ This class represents a TCP Plugin """
+
+    def __init__(self):
+        # Replace with actual URL
+        self.uri = "https://test:test@arbitrator:6666/";
+        # Warning: certificate verification is disabled
+        self.proxy = xmlrpc.client.ServerProxy(
+            self.uri, context=ssl._create_unverified_context())
+
+        # Get tipc link tolerance timeout if OpenSAF in used with TIPC
+        link_tolerance = self.get_tipc_tolerance()
+        self.timeout = link_tolerance
+
+        # TODO make configurable? default to 1s
+        self.socket_timeout = 1
+        socket.setdefaulttimeout(self.socket_timeout)
+
+        # TODO make configurable? default to 100ms
+        self.heartbeat_interval = 0.1
+
+        # constants
+        self.keyname = 'opensaf_consensus_lock'
+        self.takeover_request = 'takeover_request'
+        self.testkey = "opensaf_write_test"
+
+        # location where opensaf node_name is defined
+        self.node_name_file = '/etc/opensaf/node_name'
+        self.hostname = self.get_node_name()
+
+    def run(self):
+        """ Main method to handle input and output """
+        result = self.parse_cmd()
+        if 'output' in result:
+            print(result['output'])
+        sys.exit(result['code'])
+
+    def parse_cmd(self):
+        """ Handle input parameters from user """
+        parser = argparse.ArgumentParser(description="Arbitration Plugin")
+        sub_parser = parser.add_subparsers(
+            dest="command",
+            help="Execute script with this command"
+        )
+
+        parser_get = sub_parser.add_parser(
+            "get",
+            help="Retrieve <value> of <key> in tcp database"
+        )
+        parser_get.add_argument("key", metavar="<key>")
+
+        parser_set = sub_parser.add_parser(
+            "set",
+            help="Set <key> to <value> in tcp database"
+        )
+        parser_set.add_argument("key", metavar="<key>")
+        parser_set.add_argument("value", metavar="<value>")
+        parser_set.add_argument("timeout", metavar="<timeout>", 
+ type=int)
+
+        parser_create = sub_parser.add_parser(
+            "create",
+            help="Create <key> and set to <value>"
+                 " for <timeout> second in tcp database"
+        )
+        parser_create.add_argument("key", metavar="<key>")
+        parser_create.add_argument("value", metavar="<value>")
+        parser_create.add_argument("timeout", metavar="<timeout>", 
+ type=int)
+
+        parser_set_if_prev = sub_parser.add_parser(
+            "set_if_prev",
+            help="Set <key> to <value> if the existing value matches
<prev>"
+        )
+        parser_set_if_prev.add_argument("key", metavar="<key>")
+        parser_set_if_prev.add_argument("value", metavar="<value>")
+        parser_set_if_prev.add_argument("prev", metavar="<prev>")
+        parser_set_if_prev.add_argument(
+            "timeout", metavar="<timeout>", type=int)
+
+        parser_lock = sub_parser.add_parser(
+            "lock",
+            help="Set <owner> of the lock,"
+                 " automatically unlock after <timeout>"
+        )
+        parser_lock.add_argument("owner", metavar="<owner>")
+        parser_lock.add_argument("timeout", metavar="<timeout>", 
+ type=int)
+
+        parser_unlock = sub_parser.add_parser(
+            "unlock",
+            help="Unlock the lock from <owner>"
+        )
+        parser_unlock.add_argument("owner", metavar="<owner>")
+        parser_unlock.add_argument(
+            "--force",
+            help="force to unlock the lock even if lock isn't held by
<owner>",
+            action='store_true'
+        )
+
+        sub_parser.add_parser(
+            "lock_owner",
+            help="Retrieve owner of lock"
+        )
+
+        parser_watch = sub_parser.add_parser(
+            "watch",
+            help="Watch events stream on <key>"
+        )
+        parser_watch.add_argument("key", metavar="<key>")
+
+        sub_parser.add_parser(
+            "watch_lock",
+            help="Watch events stream on lock"
+        )
+
+        parser_erase = sub_parser.add_parser(
+            "erase",
+            help="Remove the <key> from tcp database"
+        )
+        parser_erase.add_argument("key", metavar="<key>")
+
+        args = vars(parser.parse_args())
+        command = args.pop('command')
+        params = []
+        if args:
+            params.append(args)
+        return getattr(self, command)(*params)
+
+    def get_node_name(self):
+        node_file = open(self.node_name_file)
+        node_name = node_file.readline().strip()
+        node_file.close()
+        return node_name
+
+    def get_tipc_tolerance(self):
+        """ Retrieve tipc link tolerance timeout """
+        # Get bearer
+        bearer = os.popen('tipc bearer list').read().strip()
+        if bearer is not None and len(bearer) > 0:
+            tokens = bearer.split(":")
+            tol_to = os.popen('tipc bearer get tolerance media '
+                              '%s device %s' %
+                              (tokens[0], tokens[1])).read()
+            return float(tol_to) / 1000
+
+        takeover_timeout = os.environ.get(
+            'FMS_TAKEOVER_REQUEST_VALID_TIME')
+        if takeover_timeout is not None and int(takeover_timeout) > 0:
+            return int(takeover_timeout) / 2
+
+        # default to 5 seconds
+        return 5
+
+    def is_active(self):
+        """ Should this node be active according to RDE """
+        state = os.popen('rdegetrole').read().strip()
+        return bool(state == 'ACTIVE')
+
+    def get(self, params):
+        """ Retrieve value of key in tcp database
+
+        Args:
+            params (dict): contains key from user input
+
+        Returns:
+            dict: result with format
+            {
+                'code': <return code>,
+                'output': <value of the key>
+            }
+                * code = 0: success
+                * code = 1: invalid param
+                * code = 2: other failure
+        """
+        ret = {'code': 1, 'output': ''}
+
+        # takeover_request will not be used
+        return ret
+
+    def set(self, params):
+        """ Set key to value in tcp database
+
+        Args:
+            params (dict): contains key, value from user input
+
+        Returns:
+            dict: result with format
+            {
+                'code': <return code>
+            }
+                * code = 0: success
+                * code = non-zero: failure
+        """
+
+        ret = {'code': 1}
+        key = params['key']
+        value = params['value']
+        # params['timeout']
+
+        if key == self.testkey:
+            try:
+                result = self.proxy.set(key, value)
+            except socket.error:
+                result = False
+            if result:
+                ret['code'] = 0
+
+        return ret
+
+    def create(self, params):
+        """ Create key and set to value for timeout second in tcp 
+ database
+
+        Args:
+            params (dict): contains key, value, timeout
+             from user input
+
+        Returns:
+            dict: result with format
+            {
+                'code': <return code>
+            }
+                * code = 0: success
+                * code = 1: already exist
+                * code = 2: invalid param
+                * code = 3: other failure
+        """
+        ret = {'code': 2}
+        key = params['key']
+
+        if key == self.takeover_request:
+            # takeover_request will not be used
+            ret['code'] = 2
+        return ret
+
+    def set_if_prev(self, params):
+        """ Set key to value if the existing value matches previous 
+ value
+
+        Args:
+            params (dict): contains key (str),
+             value (str), prev (str) from user input
+
+        Returns:
+            dict: result with format
+            {
+                'code': <return code>
+            }
+                * code = 0: success
+                * code = non-zero: failure
+        """
+        ret = {'code': 1}
+        key = params['key']
+        # params['value']
+        # params['timeout']
+
+        if key == self.takeover_request:
+            # takeover_request will not be used
+            # pretend it worked
+            ret['code'] = 0
+
+        return ret
+
+    def lock(self, params):
+        """ Set owner of the lock, automatically unlock after timeout
+
+        Args:
+            params (dict): contains owner (str),
+             timeout (integer) from user input
+
+        Returns:
+            dict: result with format
+            {
+                'code': <return code>,
+                'output': <current owner of the lock>
+            }
+                * code = 0: success
+                * code = 1: lock is owned by someone else
+                * code = 2 or above: other failure
+
+        NOTE: if lock is already acquired by owner, then timeout is
extended
+        TODO: timeout not yet implemented
+        """
+        ret = {'code': 0}
+        owner = params['owner']
+        owner_up = False
+
+        try:
+            result = self.proxy.create(self.keyname, owner)
+            if result is False:
+                current_owner = self.proxy.get(self.keyname)
+                if current_owner == owner:
+                    syslog.syslog("lock is already held by this node")
+                    return ret
+                original_timestamp = self.proxy.get(current_owner)
+                if original_timestamp:
+                    original_timestamp = int(original_timestamp)
+                else:
+                    original_timestamp = 0
+                # check if the owner is updating the arbitrator
+                while True:
+                    # get current time from arbitrator
+                    time_at_arb = self.proxy.heartbeat(self.hostname)
+                    # last time the current owner heartbeated
+                    last_timestamp = self.proxy.get(current_owner)
+                    if last_timestamp > original_timestamp:
+                        # owner is updating arbitrator OK
+                        syslog.syslog("current active is heartbeating")
+                        owner_up = True
+                        break
+                    elif time_at_arb > (original_timestamp + self.timeout):
+                        syslog.syslog("current active is not heartbeating")
+                        # more than 'timeout' seconds have elapsed since
+                        # the last update, we can assume the old
+                        # active is not heartbeating
+                        owner_up = False
+                        break
+                    time.sleep(self.heartbeat_interval)
+                if owner_up:
+                    ret['code'] = 1
+                else:
+                    # owner is not updating arbitrator
+                    self.proxy.delete(self.keyname)
+                    return self.lock(params)
+            else:
+                syslog.syslog("obtained lock at arbitrator")
+        except socket.error:
+            syslog.syslog("socket error")
+            ret['code'] = 2
+
+        return ret
+
+    def unlock(self, params):
+        """ Unlock the lock from owner
+
+        Args:
+            params (dict): contains owner (str),
+             force (boolean) from user input
+                * force = True: force to unlock the lock even if
+                    owner don't match with current owner
+
+        Returns:
+            dict: result with format
+            {
+                'code': <return code>,
+                'output': <current owner of the lock>
+            }
+                * code = 0: success
+                * code = 1: lock is owned by someone else
+                * code = 2 or above: other failure
+        """
+        ret = {'code': 0}
+        owner = params['owner']
+
+        # remove 'owner.lock'
+        try:
+            current_owner = self.proxy.get(self.keyname)
+            if current_owner == owner:
+                result = self.proxy.delete(self.keyname)
+                if result is False:
+                    ret['code'] = 2
+            else:
+                ret['code'] = 1
+        except socket.error:
+            ret['code'] = 2
+
+        return ret
+
+    def lock_owner(self):
+        """ Retrieve owner of lock
+
+        Returns:
+            dict: result with format
+            {
+                'code': <return code>,
+                'output': <current owner of the lock>
+            }
+                * code = 0: success
+                * code = non-zero: failure
+        """
+        ret = {'code': 0, 'output': 'unknown'}
+
+        try:
+            current_owner = self.proxy.get(self.keyname)
+            ret['output'] = current_owner
+        except socket.error:
+            ret['code'] = 1
+
+        return ret
+
+    def watch_lock(self):
+        """ Watch events stream on lock
+
+        Returns:
+            dict: result with format
+            {
+                'code': <return code>,
+                'output': <new value of the lock>
+            }
+                * code = 0: success
+                * code = non-zero: failure
+        """
+        return self.watch({'key': self.keyname})
+
+    def erase(self, params):
+        """ Remove the key from tcp database
+
+        Args:
+            params (dict): contains key from user input
+
+        Returns:
+            dict: result with format
+            {
+                'code': <return code>
+            }
+                * code = 0: success
+                * code = non-zero: failure
+        """
+        ret = {'code': 1}
+        key = params['key']
+
+        if key == self.takeover_request:
+            # takeover_request will not be used
+            # pretend it worked
+            ret['code'] = 0
+
+        return ret
+
+    def watch(self, params):
+        """ Watch events stream on key
+
+        Args:
+            params (dict): contains key from user input
+
+        Returns:
+            dict: result with format
+            {
+                'code': <return code>,
+                'output': <new value of the key>
+            }
+                * code = 0: success
+                * code = non-zero: failure
+        """
+        ret = {'code': 0, 'output': 'unknown'}
+        key = params['key']
+
+        if key != self.keyname and key != self.takeover_request:
+            # no other key is supported
+            ret['code'] = 2
+            return ret
+
+        last_arb_timestamp = 0
+        while True:
+            if key == self.takeover_request:
+                if self.is_active() is False:
+                    # maybe a controller swap occurred
+                    break
+                while True:
+                    try:
+                        time_at_arb = self.proxy.heartbeat(self.hostname)
+                        if last_arb_timestamp == 0:
+                            last_arb_timestamp = time_at_arb
+                            break
+                        elif (time_at_arb - last_arb_timestamp) >
self.timeout:
+                            # VM was frozen?
+                            syslog.syslog('VM was frozen!')
+                            ret['code'] = 126
+                            return ret
+                        else:
+                            last_arb_timestamp = time_at_arb
+                            break
+                    except socket.error:
+                        # can't heartbeat, need to self-fence (if peer
down)
+                        syslog.syslog('cannot heartbeat, inform rded')
+                        ret['output'] = self.hostname + \
+                            ' SC-0 10000000 UNDEFINED'
+                        return ret
+            elif key == self.keyname:
+                try:
+                    current_owner = self.proxy.get(self.keyname)
+                    if not current_owner:
+                        if self.is_active() is False:
+                            # a switchover occurred
+                            break
+                        # maybe the arbitrator restarted
+                        self.proxy.create(self.keyname, self.hostname)
+                    elif current_owner != self.hostname:
+                        ret['output'] = current_owner
+                        break
+                except socket.error:
+                    pass
+            time.sleep(self.heartbeat_interval)
+        return ret
+
+
+if __name__ == '__main__':
+    ArbitratorPlugin().run()
diff --git a/src/osaf/consensus/plugins/tcp/tcp_server.py
b/src/osaf/consensus/plugins/tcp/tcp_server.py
new file mode 100755
index 0000000..a7f22f2
--- /dev/null
+++ b/src/osaf/consensus/plugins/tcp/tcp_server.py
@@ -0,0 +1,157 @@
+#!/usr/bin/env python3
+"""
+     -*- OpenSAF  -*-
+
+(C) Copyright 2019 Ericsson AB 2019 - All Rights Reserved.
+
+This program is distributed in the hope that it will be useful, but 
+WITHOUT ANY WARRANTY; without even the implied warranty of 
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. This file and 
+program are licensed under the GNU Lesser General Public License Version
2.1, February 1999.
+The complete license can be accessed from the following location:
+http://opensource.org/licenses/lgpl-license.php
+See the Copying file included with the OpenSAF distribution for full 
+licensing terms.
+
+Simple TCP arbitrator
+"""
+
+import os
+import socket
+from base64 import b64decode
+from socketserver import ThreadingMixIn from xmlrpc.server import 
+SimpleXMLRPCServer, SimpleXMLRPCRequestHandler import ssl import sys 
+import threading import time
+
+# Warning: example only, replace with signed certificate # Assumes pem 
+files are located in the same directory as this file DIRPATH = 
+os.path.dirname(os.path.realpath(__file__))
+CERTFILE = DIRPATH + '/certificate.pem'
+KEYFILE = DIRPATH + '/key.pem'
+USERNAME = 'test'
+PASSWORD = 'test'
+
+
+class RequestHandler(SimpleXMLRPCRequestHandler):
+    """ Allow keep alives """
+    protocol_version = "HTTP/1.1"
+
+    def parse_request(self):
+        if SimpleXMLRPCRequestHandler.parse_request(self):
+            if self.auth(self.headers):
+                return True
+            else:
+                self.send_error(401, 'Unauthorized')
+        return False
+
+    def auth(self, headers):
+        (basic, _, encoded) = headers.get('Authorization').partition(' ')
+        assert basic == 'Basic', 'Only basic authentication supported'
+        decoded_bytes = b64decode(encoded)
+        decoded_string = decoded_bytes.decode()
+        (username, _, password) = decoded_string.partition(':')
+        if username == USERNAME and password == PASSWORD:
+            return True
+        return False
+
+
+class ThreadedRPCServer(ThreadingMixIn,
+                        SimpleXMLRPCServer):
+    """ Add thread support """
+    def __init__(self, bind_address, bind_port,
+                 requestHandler):
+        # IPv6 is supported
+        self.address_family = socket.getaddrinfo(bind_address,
bind_port)[0][0]
+        SimpleXMLRPCServer.__init__(self, (bind_address, bind_port),
+                                    requestHandler, logRequests=False)
+        self.socket = ssl.wrap_socket(
+            socket.socket(self.address_family, self.socket_type),
+            server_side=True,
+            certfile=CERTFILE,
+            keyfile=KEYFILE,
+            cert_reqs=ssl.CERT_NONE,
+            ssl_version=ssl.PROTOCOL_TLSv1_2)
+        self.server_bind()
+        self.server_activate()
+
+
+class Arbitrator(object):
+    """ Implementation of a simple arbitrator """
+
+    def __init__(self):
+        self.port = 6666
+        self.kv_pairs = {}
+        self.mutex = threading.Lock()
+
+    def heartbeat(self, key):
+        timestamp = int(time.time())
+        self.mutex.acquire()
+        self.kv_pairs[key] = timestamp
+        self.mutex.release()
+        return timestamp
+
+    def set(self, key, value):
+        self.mutex.acquire()
+        self.kv_pairs[key] = value
+        self.mutex.release()
+        return True
+
+    def get(self, key):
+        value = ""
+        self.mutex.acquire()
+        if key in self.kv_pairs:
+            value = self.kv_pairs[key]
+        self.mutex.release()
+        return value
+
+    def set_if_prev(self, key, prev_value, new_value):
+        result = False
+        self.mutex.acquire()
+        if key in self.kv_pairs:
+            value = self.kv_pairs[key]
+            if value == prev_value:
+                self.kv_pairs[key] = new_value
+                result = True
+        self.mutex.release()
+        return result
+
+    def create(self, key, value):
+        result = False
+        self.mutex.acquire()
+        if key not in self.kv_pairs:
+            self.kv_pairs[key] = value
+            result = True
+        self.mutex.release()
+        return result
+
+    def delete(self, key):
+        result = False
+        self.mutex.acquire()
+        if key in self.kv_pairs:
+            del self.kv_pairs[key]
+            result = True
+        self.mutex.release()
+        return result
+
+    def run(self):
+        hostname = "::0.0.0.0"
+        server = ThreadedRPCServer(hostname, self.port,
+                                   requestHandler=RequestHandler)
+        print("Listening on port %d" % self.port)
+        server.register_multicall_functions()
+        server.register_function(self.heartbeat, 'heartbeat')
+        server.register_function(self.set, 'set')
+        server.register_function(self.get, 'get')
+        server.register_function(self.set_if_prev, 'set_if_prev')
+        server.register_function(self.create, 'create')
+        server.register_function(self.delete, 'delete')
+        server.serve_forever()
+
+        sys.exit(0)
+
+
+if __name__ == '__main__':
+    Arbitrator().run()
--
2.7.4



_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel




_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to