Initial confd client library implementation.  This initial version uses
asyncore, and supports answers via a callback.

Signed-off-by: Guido Trotter <[email protected]>
---
 Makefile.am         |    1 +
 lib/confd/client.py |  206 +++++++++++++++++++++++++++++++++++++++++++++++++++
 lib/constants.py    |   10 +++
 lib/errors.py       |    8 ++
 4 files changed, 225 insertions(+), 0 deletions(-)
 create mode 100644 lib/confd/client.py

diff --git a/Makefile.am b/Makefile.am
index 08725a4..6816012 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -130,6 +130,7 @@ http_PYTHON = \
 
 confd_PYTHON = \
        lib/confd/__init__.py \
+       lib/confd/client.py \
        lib/confd/server.py \
        lib/confd/querylib.py
 
diff --git a/lib/confd/client.py b/lib/confd/client.py
new file mode 100644
index 0000000..601103e
--- /dev/null
+++ b/lib/confd/client.py
@@ -0,0 +1,206 @@
+#
+#
+
+# Copyright (C) 2009 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+
+"""Ganeti confd client
+
+"""
+import socket
+import time
+import random
+
+from ganeti import utils
+from ganeti import constants
+from ganeti import objects
+from ganeti import serializer
+from ganeti import daemon # contains AsyncUDPSocket
+from ganeti import errors
+from ganeti.confd import PackMagic, UnpackMagic
+
+
+class ConfdAsyncUDPClient(daemon.AsyncUDPSocket):
+  """Confd udp asyncore client
+
+  This is kept separate from the main ConfdClient to make sure it's easy to
+  implement a non-asyncore based client library.
+
+  """
+  def __init__(self, client):
+    """Constructor for ConfdAsyncUDPClient
+
+    @type client: L{ConfdClient}
+    @param client: client library, to pass the datagrams to
+
+    """
+    daemon.AsyncUDPSocket.__init__(self)
+    self.client = client
+
+  # this method is overriding a daemon.AsyncUDPSocket method
+  def handle_datagram(self, payload, ip, port):
+    self.client.HandleResponse(payload, ip, port)
+
+
+class ConfdClient:
+  """Send queries to confd, and get back answers.
+
+  Since the confd model works by querying multiple master candidates, and
+  getting back answers, this is an asynchronous library. It can either work
+  through asyncore or with your own handling.
+
+  """
+
+  def __init__(self, hmac_key, mc_list):
+    """Constructor for ConfdClient
+
+    @type hmac_key: string
+    @param hmac_key: hmac key to talk to confd
+    @type mc_list: list
+    @param mc_list: list of peer nodes
+
+    """
+    if not isinstance(mc_list, list):
+      raise errors.ConfdClientError("mc_list must be a list")
+
+    self._mc_list = mc_list
+    self._hmac_key = hmac_key
+    self._socket = ConfdAsyncUDPClient(self)
+    self._callbacks = {}
+    self._expire_callbacks = []
+    self._confd_port = utils.GetDaemonPort(constants.CONFD)
+
+  def _PackRequest(self, request, now=None):
+    """Prepare a request to be sent on the wire.
+
+    This function puts a proper salt in a confd request, puts the proper salt,
+    and adds the correct magic number.
+
+    """
+    if now is None:
+      now = time.time()
+    tstamp = '%s' % int(now)
+    req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp)
+    return PackMagic(req)
+
+  def _UnpackReply(self, payload):
+    in_payload = UnpackMagic(payload)
+    (answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
+    return answer, salt
+
+  def _ExpireCallbacks(self):
+    """Delete all the expired callbacks.
+
+    """
+    now = time.time()
+    while self._expire_callbacks:
+      expire_time, rsalt = self._expire_callbacks[0]
+      if now >= expire_time:
+        self._expire_callbacks.pop()
+        del self._callbacks[rsalt]
+      else:
+        break
+
+  def SendRequest(self, request, callback, args, coverage=None):
+    """Send a confd request to some MCs
+
+    @type request: L{objects.ConfdRequest}
+    @param request: the request to send
+    @type callback: f(answer, req_type, req_query, salt, ip, port, args)
+    @param callback: answer callback
+    @type args: tuple
+    @param args: additional callback arguments
+    @type coverage: integer
+    @keyword coverage: number of remote nodes to contact
+
+    """
+    if coverage is None:
+      coverage = min(len(self._mc_list), constants.CONFD_DEFAULT_REQ_COVERAGE)
+
+    if not callable(callback):
+      raise errors.ConfdClientError("callback must be callable")
+
+    if coverage > len(self._mc_list):
+      raise errors.ConfdClientError("Not enough MCs known to provide the"
+                                    " desired coverage")
+
+    if not request.rsalt:
+      raise errors.ConfdClientError("Missing request rsalt")
+
+    self._ExpireCallbacks()
+    if request.rsalt in self._callbacks:
+      raise errors.ConfdClientError("Duplicate request rsalt")
+
+    if request.type not in constants.CONFD_REQS:
+      raise errors.ConfdClientError("Invalid request type")
+
+    random.shuffle(self._mc_list)
+    targets = self._mc_list[:coverage]
+
+    now = time.time()
+    payload = self._PackRequest(request, now=now)
+
+    for target in targets:
+      try:
+        self._socket.enqueue_send(target, self._confd_port, payload)
+      except errors.UdpDataSizeError:
+        raise errors.ConfdClientError("Request too big")
+
+    self._callbacks[request.rsalt] = (callback, request.type,
+                                      request.query, args)
+    expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
+    self._expire_callbacks.append((expire_time, request.rsalt))
+
+  def HandleResponse(self, payload, ip, port):
+    """Asynchronous handler for a confd reply
+
+    Call the relevant callback associated to the current request.
+
+    """
+    try:
+      try:
+        answer, salt = self._UnpackReply(payload)
+      except errors.SignatureError, err:
+        return
+      except errors.ConfdMagicError, err:
+        return
+
+      if salt in self._callbacks:
+        callback, type, query, args = self._callbacks[salt]
+        callback(answer, type, query, salt, ip, port, args)
+
+    finally:
+      self._ExpireCallbacks()
+
+
+class ConfdClientRequest(objects.ConfdRequest):
+  """This is the client-side version of ConfdRequest.
+
+  This version of the class helps creating requests, on the client side, by
+  filling in some default values.
+
+  """
+  def __init__(self, **kwargs):
+    objects.ConfdRequest.__init__(self, **kwargs)
+    if not self.rsalt:
+      self.rsalt = utils.NewUUID()
+    if not self.protocol:
+      self.protocol = constants.CONFD_PROTOCOL_VERSION
+    if self.type not in constants.CONFD_REQS:
+      raise errors.ConfdClientError("Invalid request type")
+
diff --git a/lib/constants.py b/lib/constants.py
index f233f47..ee03bb2 100644
--- a/lib/constants.py
+++ b/lib/constants.py
@@ -688,6 +688,16 @@ CONFD_CONFIG_RELOAD_RATELIMIT = 2
 # compressed, or move away from json.
 CONFD_MAGIC_FOURCC = 'plj0'
 
+# By default a confd request is sent to the minimum between this number and all
+# MCs. 6 was chosen because even in the case of a disastrous 50% response rate,
+# we should have enough answers to be able to compare more than one.
+CONFD_DEFAULT_REQ_COVERAGE = 6
+
+# Timeout in seconds to expire pending query request in the confd client
+# library. We don't actually expect any answer more than 10 seconds after we
+# sent a request.
+CONFD_CLIENT_EXPIRE_TIMEOUT = 10
+
 # Maximum UDP datagram size.
 # On IPv4: 64K - 20 (ip header size) - 8 (udp header size) = 65507
 # On IPv6: 64K - 40 (ip6 header size) - 8 (udp header size) = 65487
diff --git a/lib/errors.py b/lib/errors.py
index 9136c5e..9fd9868 100644
--- a/lib/errors.py
+++ b/lib/errors.py
@@ -305,6 +305,14 @@ class ConfdMagicError(GenericError):
   """
 
 
+class ConfdClientError(GenericError):
+  """A magic fourcc error in Ganeti confd.
+
+  Errors in the confd client library.
+
+  """
+
+
 class UdpDataSizeError(GenericError):
   """UDP payload too big.
 
-- 
1.5.6.5

Reply via email to