On Wed, Feb 17, 2010 at 4:59 PM, Balazs Lecz <[email protected]> wrote:
>
> Signed-off-by: Balazs Lecz <[email protected]>
> ---
>  Makefile.am        |    3 +
>  daemons/ganeti-nld |   26 ++-
>  lib/constants.py   |   53 ++++++-
>  lib/errors.py      |   49 +++++
>  lib/nld_nld.py     |  499 
> ++++++++++++++++++++++++++++++++++++++++++++++++++++
>  lib/objects.py     |   62 +++++++
>  6 files changed, 684 insertions(+), 8 deletions(-)
>  create mode 100644 lib/errors.py
>  create mode 100644 lib/nld_nld.py
>  create mode 100644 lib/objects.py
>
> diff --git a/Makefile.am b/Makefile.am
> index e08d3fc..da4e57e 100644
> --- a/Makefile.am
> +++ b/Makefile.am
> @@ -28,10 +28,13 @@ pkgpython_PYTHON = \
>        lib/__init__.py \
>        lib/constants.py \
>        lib/config.py \
> +       lib/errors.py \
>        lib/iptables.py \
>        lib/networktables.py \
>        lib/nflog_dispatcher.py \
>        lib/nld_confd.py \
> +       lib/nld_nld.py

Are you missing a \ here?

> +       lib/objects.py \
>        lib/server.py
>
>  nodist_pkgpython_PYTHON = \
> diff --git a/daemons/ganeti-nld b/daemons/ganeti-nld
> index 2d332a8..07d49d2 100755
> --- a/daemons/ganeti-nld
> +++ b/daemons/ganeti-nld
> @@ -44,6 +44,7 @@ from ganeti_nbma import constants
>  from ganeti_nbma import config
>  from ganeti_nbma import server
>  from ganeti_nbma import nflog_dispatcher
> +from ganeti_nbma import nld_nld
>  from ganeti_nbma import nld_confd
>
>  from ganeti import constants as gnt_constants
> @@ -54,16 +55,13 @@ from ganeti import errors
>  # pylint: disable-msg=W0611
>  import ganeti.confd.client
>
> -# Injecting ourselves in the ganeti constants
> -gnt_constants.NLD = constants.NLD
> -gnt_constants.DAEMONS_LOGFILES[constants.NLD] = gnt_constants.LOG_DIR + 
> "nl-daemon.log"
> -

Actually removing these lines might be better in a separate patch?
And perhaps with a verification that we're running ganeti 2.1.1 (or at
least documentation saying we need that version)

>
>  class MisroutedPacketHandler(object):
>   """Callback called when a packet is received via the NFLOG target.
>
>   """
> -  def __init__(self, instance_node_maps):
> +  def __init__(self, nld_server, instance_node_maps):
> +    self.nld_server = nld_server
>     self.instance_node_maps = instance_node_maps
>
>   def __call__(self, i, nflog_payload):
> @@ -90,11 +88,11 @@ class MisroutedPacketHandler(object):
>           break
>
>     if source_node:
> -      # TODO: send notification to this node
>       logging.debug("misrouted packet detected."
>                     " [cluster: %s] [node: %s] [link: %s] [source: %s]",
>                     source_cluster, source_node, source_link,
>                     ip_packet.src)
> +      # TODO: send NLD route invalidation request to the source node
>     else:
>       logging.debug("misrouted packet detected. [source: %s]",
>                     ip_packet.src)
> @@ -162,8 +160,10 @@ class NetworkLookupDaemon(object):
>
>     # Instantiate one periodic updater per cluster
>     self.updaters = []
> +    self.cluster_keys = {}
>     for cluster_name, cluster_options in self.config.clusters.iteritems():
>       hmac_key = utils.ReadFile(cluster_options["hmac_key_file"])
> +      self.cluster_keys[cluster_name] = hmac_key
>       mc_list = utils.ReadFile(cluster_options["mc_list_file"]).splitlines()
>       instance_node_maps[cluster_name] = {}
>       self.updaters.append(
> @@ -173,7 +173,19 @@ class NetworkLookupDaemon(object):
>             instance_node_maps[cluster_name])
>           )
>
> -    misrouted_packet_callback = MisroutedPacketHandler(instance_node_maps)
> +    # Instantiate NLD network request and response processers
> +    # and the async UDP server
> +    nld_request_processor = nld_nld.NLDRequestProcessor(self.cluster_keys)
> +    nld_response_callback = nld_nld.NLDResponseCallback()
> +    nld_server = nld_nld.NLDAsyncUDPServer(options.bind_address,
> +                                           options.port,
> +                                           nld_request_processor,
> +                                           nld_response_callback,
> +                                           self.cluster_keys)
> +
> +    # Instantiate the misrouted packet handler and its async dispatcher
> +    misrouted_packet_callback = MisroutedPacketHandler(nld_server,
> +                                                       instance_node_maps)
>     nflog_dispatcher.AsyncNFLog(misrouted_packet_callback,
>                                 log_group=self.config.nflog_queue)
>
> diff --git a/lib/constants.py b/lib/constants.py
> index c1277f6..c804608 100644
> --- a/lib/constants.py
> +++ b/lib/constants.py
> @@ -1,7 +1,7 @@
>  #
>  #
>
> -# Copyright (C) 2009 Google Inc.
> +# Copyright (C) 2009, 2010 Google Inc.
>  #
>  # This program is free software; you can redistribute it and/or modify
>  # it under the terms of the GNU General Public License as published by
> @@ -22,6 +22,7 @@
>
>
>  from ganeti_nbma import _autoconf
> +from ganeti import constants as gnt_constants
>
>  NLD = "ganeti-nld"
>
> @@ -32,3 +33,53 @@ DEFAULT_CONF_FILE = CONF_DIR + "/common.conf"
>  DEFAULT_ROUTING_TABLE = "100"
>  DEFAULT_NEIGHBOUR_INTERFACE = "gtun0"
>  DEFAULT_NFLOG_QUEUE = 0
> +
> +# NLD communication protocol related constants below
> +
> +# A few common errors for NLD
> +NLD_ERROR_UNKNOWN_ENTRY = 1
> +NLD_ERROR_INTERNAL = 2
> +NLD_ERROR_ARGUMENT = 3
> +
> +# Each nld request is "salted" by the current timestamp.
> +# This constants decides how many seconds of skew to accept.
> +# TODO: make this a default and allow the value to be more configurable
> +NLD_MAX_CLOCK_SKEW = 2 * gnt_constants.NODE_MAX_CLOCK_SKEW
> +
> +NLD_PROTOCOL_VERSION = 1
> +
> +NLD_REQ_PING = 0
> +NLD_REQ_ROUTE_INVALIDATE = 1
> +
> +# NLD request query fields. These are used to pass parameters.
> +# These must be strings rather than integers, because json-encoding
> +# converts them to strings anyway, as they're used as dict-keys.
> +NLD_REQQ_LINK = "0" # FIXME: rename or remove
> +
> +NLD_REQFIELD_NAME = "0" # FIXME: rename or remove
> +
> +NLD_REQS = frozenset([
> +  NLD_REQ_PING,
> +  NLD_REQ_ROUTE_INVALIDATE,
> +  ])
> +
> +NLD_REPL_STATUS_OK = 0
> +NLD_REPL_STATUS_ERROR = 1
> +NLD_REPL_STATUS_NOTIMPLEMENTED = 2
> +
> +NLD_REPL_STATUSES = frozenset([
> +  NLD_REPL_STATUS_OK,
> +  NLD_REPL_STATUS_ERROR,
> +  NLD_REPL_STATUS_NOTIMPLEMENTED,
> +  ])
> +
> +# Magic number prepended to all nld queries.
> +# This allows us to distinguish different types of nld protocols and handle
> +# them. For example by changing this we can move the whole payload to be
> +# compressed, or move away from json.
> +NLD_MAGIC_FOURCC = 'plj0'
> +
> +# Timeout in seconds to expire pending query request in the nld client
> +# library. We don't actually expect any answer more than 10 seconds after we
> +# sent a request.
> +NLD_CLIENT_EXPIRE_TIMEOUT = 10
> diff --git a/lib/errors.py b/lib/errors.py
> new file mode 100644
> index 0000000..06d7e93
> --- /dev/null
> +++ b/lib/errors.py
> @@ -0,0 +1,49 @@
> +#
> +#
> +
> +# Copyright (C) 20010 Google Inc.
> +#
> +# This program is free software; you can redistribute it and/or modify
> +# it under the terms of the GNU General Public License as published by
> +# the Free Software Foundation; either version 2 of the License, or
> +# (at your option) any later version.
> +#
> +# This program is distributed in the hope that it will be useful, but
> +# WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> +# General Public License for more details.
> +#
> +# You should have received a copy of the GNU General Public License
> +# along with this program; if not, write to the Free Software
> +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
> +# 02110-1301, USA.
> +
> +
> +"""Ganeti NLD exception handling"""
> +
> +from ganeti import errors as ganeti_errors
> +
> +
> +class NLDRequestError(ganeti_errors.GenericError):
> +  """A request error in Ganeti NLD.
> +
> +  Events that should make nld abort the current request and proceed serving
> +  different ones.
> +
> +  """
> +
> +
> +class NLDClientError(ganeti_errors.GenericError):
> +  """A magic fourcc error in Ganeti NLD.
> +
> +  Errors in the NLD client library.
> +
> +  """
> +
> +
> +class NLDMagicError(ganeti_errors.GenericError):
> +  """A magic fourcc error in Ganeti NLD.
> +
> +  Errors processing the fourcc in Ganeti NLD datagrams.
> +
> +  """
> diff --git a/lib/nld_nld.py b/lib/nld_nld.py
> new file mode 100644
> index 0000000..6688ed2
> --- /dev/null
> +++ b/lib/nld_nld.py
> @@ -0,0 +1,499 @@
> +#
> +#
> +
> +# Copyright (C) 2010 Google Inc.
> +#
> +# This program is free software; you can redistribute it and/or modify
> +# it under the terms of the GNU General Public License as published by
> +# the Free Software Foundation; either version 2 of the License, or
> +# (at your option) any later version.
> +#
> +# This program is distributed in the hope that it will be useful, but
> +# WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> +# General Public License for more details.
> +#
> +# You should have received a copy of the GNU General Public License
> +# along with this program; if not, write to the Free Software
> +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
> +# 02110-1301, USA.
> +
> +
> +"""Ganeti NLD->NLD communication related functions
> +
> +"""
> +
> +# pylint: disable-msg=E0203
> +
> +# E0203: Access to member %r before its definition, since we use
> +# objects.py which doesn't explicitely initialise its members
> +
> +
> +import logging
> +import time
> +
> +from ganeti_nbma import constants
> +from ganeti_nbma import errors
> +from ganeti_nbma import objects
> +
> +from ganeti import errors as gnt_errors
> +from ganeti import objects as gnt_objects
> +from ganeti import serializer
> +from ganeti import daemon
> +from ganeti import utils
> +
> +
> +_FOURCC_LEN = 4
> +
> +
> +def PackMagic(payload):
> +  """Prepend the NLD magic fourcc to a payload.
> +
> +  """
> +  return ''.join([constants.NLD_MAGIC_FOURCC, payload])
> +
> +
> +def UnpackMagic(payload):
> +  """Unpack and check the NLD magic fourcc from a payload.
> +
> +  """
> +  if len(payload) < _FOURCC_LEN:
> +    raise errors.NLDMagicError("UDP payload too short to contain the"
> +                                 " fourcc code")
> +
> +  magic_number = payload[:_FOURCC_LEN]
> +  if magic_number != constants.NLD_MAGIC_FOURCC:
> +    raise errors.NLDMagicError("UDP payload contains an unkown fourcc")
> +
> +  return payload[_FOURCC_LEN:]
> +
> +
> +class NLDRequestProcessor(object):
> +  """A processor for NLD requests.
> +
> +  """
> +  def __init__(self, cluster_keys, updaters):
> +    """Constructor for NLDRequestProcessor
> +
> +    """
> +    self.cluster_keys = cluster_keys
> +    self.updaters = updaters
> +
> +    self.dispatch_table = {
> +      constants.NLD_REQ_PING: self._Ping,
> +      constants.NLD_REQ_ROUTE_INVALIDATE: self._RouteInvalidate,
> +      }
> +
> +    assert \
> +      not constants.NLD_REQS.symmetric_difference(self.dispatch_table), \
> +      "dispatch_table is unaligned with NLD_REQS"
> +
> +  def _Ping(self, query):
> +    if query is None:
> +      status = constants.NLD_REPL_STATUS_OK
> +      answer = 'ok'
> +    else:
> +      status = constants.NLD_REPL_STATUS_ERROR
> +      answer = 'non-empty ping query'
> +
> +    return status, answer
> +
> +  def _RouteInvalidate(self, query):
> +    if not query:
> +      logging.debug("missing body from route invalidation query")
> +      return constants.NLD_REPL_STATUS_ERROR, constants.NLD_ERROR_ARGUMENT
> +
> +    logging.debug("executing route invalidation query: [%s]", query)
> +    # TODO: can we make it cluster-aware to avoid a mass-update like this?
> +    for _, updater in self.updaters.iteritems():
> +      updater.UpdateInstances()
> +    answer = 'done'
> +    return constants.NLD_REPL_STATUS_OK, answer
> +
> +  def ExecQuery(self, payload, ip, port):
> +    """Process a single NLD request.
> +
> +   �...@type payload: string
> +   �...@param payload: request raw data
> +   �...@type ip: string
> +   �...@param ip: source ip address
> +   �...@param port: integer
> +   �...@type port: source port
> +
> +    """
> +    try:
> +      cluster_name, request = self.ExtractRequest(payload)
> +      reply, rsalt = self.ProcessRequest(request)
> +      payload_out = self.PackReply(reply, rsalt, cluster_name)
> +      return payload_out
> +    except errors.NLDRequestError, err:
> +      logging.info('Ignoring broken query from %s:%d: %s', ip, port, err)
> +      return None
> +
> +  def ExtractRequest(self, payload):
> +    """Extracts an NLDRequest object from a serialized hmac signed string.
> +
> +    This function also performs signature/timestamp validation.
> +
> +    """
> +    current_time = time.time()
> +    logging.debug("Extracting request with size: %d", len(payload))
> +    try:
> +      (message, salt) = serializer.LoadSigned(payload,
> +                                              key=self.cluster_keys.get)
> +    except gnt_errors.SignatureError, err:
> +      msg = "invalid signature: %s" % err
> +      raise errors.NLDRequestError(msg)
> +    try:
> +      message_timestamp = int(salt)
> +    except (ValueError, TypeError):
> +      msg = "non-integer timestamp: %s" % salt
> +      raise errors.NLDRequestError(msg)
> +
> +    skew = abs(current_time - message_timestamp)
> +    if skew > constants.NLD_MAX_CLOCK_SKEW:
> +      msg = "outside time range (skew: %d)" % skew
> +      raise errors.NLDRequestError(msg)
> +
> +    try:
> +      cluster_name = message["cluster"]
> +    except KeyError:
> +      raise errors.NLDRequestError("Cluster name is missing from NLD 
> request")
> +
> +    try:
> +      request = objects.NLDRequest.FromDict(message)
> +    except AttributeError, err:
> +      raise errors.NLDRequestError('%s' % err)
> +
> +    return cluster_name, request
> +
> +  def ProcessRequest(self, request):
> +    """Process one NLDRequest, and produce an answer
> +
> +   �...@type request: L{objects.NLDRequest}
> +   �...@rtype: (L{objects.NLDReply}, string)
> +   �...@return: tuple of reply and salt to add to the signature
> +
> +    """
> +    logging.debug("Processing request: %s", request)
> +    if request.protocol != constants.NLD_PROTOCOL_VERSION:
> +      msg = "wrong protocol version %d" % request.protocol
> +      raise errors.NLDRequestError(msg)
> +
> +    if request.type not in constants.NLD_REQS:
> +      msg = "wrong request type %d" % request.type
> +      raise errors.NLDRequestError(msg)
> +
> +    rsalt = request.rsalt
> +    if not rsalt:
> +      msg = "missing requested salt"
> +      raise errors.NLDRequestError(msg)
> +
> +    status, answer = self.dispatch_table[request.type](request.query)
> +    reply = objects.NLDReply(
> +      protocol=constants.NLD_PROTOCOL_VERSION,
> +      is_request=False,
> +      status=status,
> +      answer=answer,
> +      )
> +
> +    logging.debug("Sending reply: %s", reply)
> +
> +    return (reply, rsalt)
> +
> +  def PackReply(self, reply, rsalt, cluster_name):
> +    """Serialize and sign the given reply, with salt rsalt
> +
> +   �...@type reply: L{objects.NLDReply}
> +   �...@type rsalt: string
> +   �...@param cluster_name: name of the cluster
> +
> +    """
> +    message = reply.ToDict()
> +    message['cluster'] = cluster_name
> +    return serializer.DumpSigned(
> +      message,
> +      self.cluster_keys[cluster_name],
> +      salt=rsalt,
> +      key_selector=cluster_name
> +      )
> +
> +
> +class NLDAsyncUDPServer(daemon.AsyncUDPSocket):
> +  """The NLD UDP server, suitable for use with asyncore.
> +
> +  """
> +  def __init__(self, bind_address, port, processor, callback, cluster_keys):
> +    """Constructor for NLDAsyncUDPServer
> +
> +   �...@type bind_address: string
> +   �...@param bind_address: socket bind address ('' for all)
> +   �...@type port: int
> +   �...@param port: udp port
> +   �...@type processor: L{NLDRequestProcessor}
> +   �...@param processor: NLDRequestProcessor to use to handle queries
> +   �...@param callback: NLDResponseCallback to use to handle responses
> +   �...@param cluster_keys: dictinary with the cluster hmac keys
> +
> +    """
> +    daemon.AsyncUDPSocket.__init__(self)
> +    self.bind_address = bind_address
> +    self.port = port
> +    self.processor = processor
> +    self.bind((bind_address, port))
> +    self._callback = callback
> +    self._cluster_keys = cluster_keys
> +    self._requests = {}
> +    self._expire_requests = []
> +
> +    logging.debug("listening on ('%s':%d)", bind_address, port)
> +
> +  # this method is overriding the daemon.AsyncUDPSocket method
> +  def handle_datagram(self, payload_in, ip, port):
> +    try:
> +      payload = UnpackMagic(payload_in)
> +    except errors.NLDMagicError, err:
> +      logging.debug(err)
> +      return
> +
> +    signed_message = serializer.LoadJson(payload)
> +    message = serializer.LoadJson(signed_message['msg'])
> +
> +    message_is_request = message.get('is_request', None)
> +    if message_is_request is None:
> +      logging.error("Message request/response discriminator field is 
> missing."
> +                    " Message: [%s]", message)
> +      return
> +
> +    if message_is_request:
> +      self.HandleRequest(payload, ip, port)
> +    else:
> +      self.HandleResponse(payload, ip, port)
> +
> +  def HandleRequest(self, payload, ip, port):
> +    answer =  self.processor.ExecQuery(payload, ip, port)
> +    if answer is not None:
> +      try:
> +        self.enqueue_send(ip, port, PackMagic(answer))
> +      except gnt_errors.UdpDataSizeError:
> +        logging.error("Reply too big to fit in an udp packet.")
> +
> +  def _PackRequest(self, request, cluster_name, timestamp=None):
> +    """Prepare a request to be sent on the wire.
> +
> +    This function puts a proper salt in an NLD request and adds the correct
> +    magic number.
> +
> +    """
> +    if timestamp is None:
> +      timestamp = time.time()
> +    tstamp = '%d' % timestamp
> +    req = serializer.DumpSignedJson(request.ToDict(),
> +                                    self._cluster_keys[cluster_name],
> +                                    tstamp,
> +                                    key_selector=cluster_name)
> +    return PackMagic(req)
> +
> +  def _UnpackReply(self, payload):
> +    (dict_answer, salt) = serializer.LoadSignedJson(payload,
> +                                                    
> key=self._cluster_keys.get)
> +    answer = objects.NLDReply.FromDict(dict_answer)
> +    return answer, salt
> +
> +  def ExpireRequests(self):
> +    """Delete all the expired requests.
> +
> +    """
> +    now = time.time()
> +    while self._expire_requests:
> +      expire_time, rsalt = self._expire_requests[0]
> +      if now >= expire_time:
> +        self._expire_requests.pop(0)
> +        (request, args) = self._requests[rsalt]
> +        del self._requests[rsalt]
> +        client_reply = NLDUpcallPayload(salt=rsalt,
> +                                        type=UPCALL_EXPIRE,
> +                                        orig_request=request,
> +                                        extra_args=args,
> +                                        client=self,
> +                                        )
> +        self._callback(client_reply)
> +      else:
> +        break
> +
> +  def SendRequest(self, request, cluster_name, destination, args=None):
> +    """Send an NLD request to another NLD instance
> +
> +   �...@type request: L{objects.NLDRequest}
> +   �...@param request: the request to send
> +   �...@param cluster_name: name of the cluster
> +   �...@param destination: the address of the target NLD instance
> +   �...@type args: tuple
> +   �...@keyword args: additional callback arguments
> +
> +    """
> +    request.cluster = cluster_name
> +
> +    if not request.rsalt:
> +      raise errors.NLDClientError("Missing request rsalt")
> +
> +    self.ExpireRequests()
> +    if request.rsalt in self._requests:
> +      raise errors.NLDClientError("Duplicate request rsalt")
> +
> +    if request.type not in constants.NLD_REQS:
> +      raise errors.NLDClientError("Invalid request type")
> +
> +    now = time.time()
> +    payload = self._PackRequest(request, cluster_name, timestamp=now)
> +
> +    try:
> +      self.enqueue_send(destination, self.port, payload)
> +    except gnt_errors.UdpDataSizeError:
> +      raise errors.NLDClientError("Request too big")
> +
> +    self._requests[request.rsalt] = (request, args)
> +    expire_time = now + constants.NLD_CLIENT_EXPIRE_TIMEOUT
> +    self._expire_requests.append((expire_time, request.rsalt))
> +
> +  def HandleResponse(self, payload, ip, port):
> +    """Asynchronous handler for an NLD reply
> +
> +    Call the relevant callback associated with the original request.
> +
> +    """
> +    try:
> +      try:
> +        answer, salt = self._UnpackReply(payload)
> +      except (gnt_errors.SignatureError, errors.NLDMagicError), err:
> +        if self._logger:
> +          self._logger.debug("Discarding broken package: %s" % err)
> +        return
> +
> +      try:
> +        (request, args) = self._requests[salt]
> +      except KeyError:
> +        if self._logger:
> +          self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
> +        return
> +
> +      client_reply = NLDUpcallPayload(salt=salt,
> +                                      type=UPCALL_REPLY,
> +                                      server_reply=answer,
> +                                      orig_request=request,
> +                                      server_ip=ip,
> +                                      server_port=port,
> +                                      extra_args=args,
> +                                      client=self,
> +                                      )
> +      self._callback(client_reply)
> +
> +    finally:
> +      self.ExpireRequests()
> +
> +
> +# UPCALL_REPLY: server reply upcall
> +# has all NLDUpcallPayload fields populated
> +UPCALL_REPLY = 1
> +# UPCALL_EXPIRE: internal library request expire
> +# has only salt, type, orig_request and extra_args
> +UPCALL_EXPIRE = 2
> +NLD_UPCALL_TYPES = frozenset([
> +  UPCALL_REPLY,
> +  UPCALL_EXPIRE,
> +  ])
> +
> +
> +class NLDUpcallPayload(gnt_objects.ConfigObject):
> +  """Callback argument for NLD replies
> +
> + �...@type salt: string
> + �...@ivar salt: salt associated with the query
> + �...@type type: one of client.NLD_UPCALL_TYPES
> + �...@ivar type: upcall type (server reply, expired request, ...)
> + �...@type orig_request: L{objects.NLDRequest}
> + �...@ivar orig_request: original request
> + �...@type server_reply: L{objects.NLDReply}
> + �...@ivar server_reply: server reply
> + �...@type server_ip: string
> + �...@ivar server_ip: answering server ip address
> + �...@type server_port: int
> + �...@ivar server_port: answering server port
> + �...@type extra_args: any
> + �...@ivar extra_args: 'args' argument of the SendRequest function
> + �...@type client: L{NLDClient}
> + �...@ivar client: current NLD client instance
> +
> +  """
> +  __slots__ = [
> +    "salt",
> +    "type",
> +    "orig_request",
> +    "server_reply",
> +    "server_ip",
> +    "server_port",
> +    "extra_args",
> +    "client",
> +    ]
> +
> +
> +class NLDClientRequest(objects.NLDRequest):
> +  """This is the client-side version of NLDRequest.
> +
> +  This version of the class helps creating requests, on the client side, by
> +  filling in some default values.
> +
> +  """
> +  def __init__(self, **kwargs):
> +    objects.NLDRequest.__init__(self, **kwargs)
> +    self.is_request = True
> +    if not self.rsalt:
> +      self.rsalt = utils.NewUUID()
> +    if not self.protocol:
> +      self.protocol = constants.NLD_PROTOCOL_VERSION
> +    if self.type not in constants.NLD_REQS:
> +      raise errors.NLDClientError("Invalid request type")
> +
> +
> +class NLDResponseCallback(object):
> +  """Callback for NLD responses.
> +
> +  """
> +  def __init__(self):
> +    self.dispatch_table = {
> +      constants.NLD_REQ_PING:
> +        self.HandlePingResponse,
> +      constants.NLD_REQ_ROUTE_INVALIDATE:
> +        self.HandleRouteInvalidateResponse,
> +    }
> +
> + �...@staticmethod
> +  def HandlePingResponse(up):
> +    """Handle response to a ping request
> +
> +    """
> +    logging.debug("Received a ping response: [%s]", up.server_reply.answer)
> +
> + �...@staticmethod
> +  def HandleRouteInvalidateResponse(up):
> +    logging.debug("Got a reply to a route invalidate request: %s", up)
> +
> +  def __call__(self, up):
> +    """NLD response callback.
> +
> +   �...@type up: L{NLDUpcallPayload}
> +   �...@param up: upper callback
> +
> +    """
> +    if up.type == UPCALL_REPLY:
> +      if up.server_reply.status != constants.NLD_REPL_STATUS_OK:
> +        logging.warning("Received error '%s' to NLD request %s",
> +                        up.server_reply.answer, up.orig_request)
> +        return
> +
> +      rtype = up.orig_request.type
> +      try:
> +        dispatcher = self.dispatch_table[rtype]
> +      except KeyError, err: # pylint: disable-msg=W0612
> +        logging.warning("Unhandled NLD response type: %s", rtype)
> +      dispatcher(up)
> diff --git a/lib/objects.py b/lib/objects.py
> new file mode 100644
> index 0000000..da0f5c3
> --- /dev/null
> +++ b/lib/objects.py
> @@ -0,0 +1,62 @@
> +#
> +#
> +
> +# Copyright (C) 2006, 2007 Google Inc.
> +#
> +# This program is free software; you can redistribute it and/or modify
> +# it under the terms of the GNU General Public License as published by
> +# the Free Software Foundation; either version 2 of the License, or
> +# (at your option) any later version.
> +#
> +# This program is distributed in the hope that it will be useful, but
> +# WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> +# General Public License for more details.
> +#
> +# You should have received a copy of the GNU General Public License
> +# along with this program; if not, write to the Free Software
> +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
> +# 02110-1301, USA.
> +
> +
> +"""Transportable objects for Ganeti NLD.
> +
> +"""
> +
> +from ganeti import objects as gnt_objects
> +
> +
> +class NLDRequest(gnt_objects.ConfigObject):
> +  """Object holding an NLD request.
> +
> + �...@ivar protocol: NLD protocol version
> + �...@ivar type: NLD query type
> + �...@ivar query: query request
> + �...@ivar rsalt: requested reply salt
> +
> +  """
> +  __slots__ = [
> +    "protocol",
> +    "is_request",
> +    "cluster",
> +    "type",
> +    "query",
> +    "rsalt",
> +    ]
> +
> +
> +class NLDReply(gnt_objects.ConfigObject):
> +  """Object holding an NLD reply.
> +
> + �...@ivar protocol: NLD protocol version
> + �...@ivar status: reply status code (ok, error)
> + �...@ivar answer: NLD query reply
> +
> +  """
> +  __slots__ = [
> +    "protocol",
> +    "is_request",
> +    "cluster",
> +    "status",
> +    "answer",
> +    ]
> --
> 1.7.0.1
>
>

LGTM for the rest


Thanks,

Guido

Reply via email to