Signed-off-by: Balazs Lecz <[email protected]>
---
 daemons/ganeti-nld |   50 +++++++++++++++++++++------------
 lib/constants.py   |    5 +++
 lib/nld_nld.py     |   78 ++++++++++++++++++----------------------------------
 3 files changed, 64 insertions(+), 69 deletions(-)

diff --git a/daemons/ganeti-nld b/daemons/ganeti-nld
index 6317663..d28c123 100755
--- a/daemons/ganeti-nld
+++ b/daemons/ganeti-nld
@@ -65,9 +65,11 @@ class MisroutedPacketHandler(object):
   """Callback called when a packet is received via the NFLOG target.
 
   """
-  def __init__(self, nld_server, instance_node_maps):
+  def __init__(self, nld_server, instance_node_maps, endpoints, updaters):
     self.nld_server = nld_server
     self.instance_node_maps = instance_node_maps
+    self.endpoints = endpoints
+    self.updaters = updaters
 
   def __call__(self, i, nflog_payload):
     # Look up the source IP in the instance->node maps. If found, it means the
@@ -92,18 +94,30 @@ class MisroutedPacketHandler(object):
                     " [cluster: %s] [node: %s] [link: %s] [source: %s]",
                     source_cluster, source_node, source_link,
                     ip_packet.src)
-      # TODO: send NLD route invalidation request to the source node
-      request = nld_nld.NLDClientRequest(type=constants.NLD_REQ_PING)
+      # Update the instance IP list on this node
+      self.updaters[source_cluster].UpdateInstances()
+      # Send NLD route invalidation request to the source node
+      request = nld_nld.NLDClientRequest(
+          type=constants.NLD_REQ_ROUTE_INVALIDATE,
+          query=ip_packet.dst)
       self.nld_server.SendRequest(request, source_cluster, source_node)
     else:
       logging.debug("misrouted packet detected. [source: %s]",
                     ip_packet.src)
-      # TODO: remove this test code
-      request = nld_nld.NLDClientRequest(type=constants.NLD_REQ_PING)
-      self.nld_server.SendRequest(request, "default", ip_packet.src)
-
-    # TODO: notify the endpoint(s) via an NLD request (preferably by iterating
-    #       over the private IPs of the endpoints)
+      # Update the instance IP lists on this node
+      for _, updater in self.updaters.iteritems():
+        updater.UpdateInstances()
+
+    # Notify the endpoint(s)
+    # TODO: this uses the "externa" IPs of the endpoints.
+    # Maybe we should be using their private IPs here.
+    logging.debug("notifying the endpoints about a misrouted packet...")
+    for endpoint in self.endpoints:
+      logging.debug("notifying endpoint: %s", endpoint)
+      request = nld_nld.NLDClientRequest(
+          type=constants.NLD_REQ_ROUTE_INVALIDATE,
+          query=ip_packet.dst)
+      self.nld_server.SendRequest(request, "default", endpoint)
 
     return 1
 
@@ -164,23 +178,21 @@ class NetworkLookupDaemon(object):
     instance_node_maps = {}
 
     # Instantiate one periodic updater per cluster
-    self.updaters = []
+    self.updaters = {}
     self.cluster_keys = {}
     for cluster_name, cluster_options in self.config.clusters.iteritems():
       hmac_key = utils.ReadFile(cluster_options["hmac_key_file"])
       self.cluster_keys[cluster_name] = hmac_key
       mc_list = utils.ReadFile(cluster_options["mc_list_file"]).splitlines()
       instance_node_maps[cluster_name] = {}
-      self.updaters.append(
-          nld_confd.NLDPeriodicUpdater(
-            cluster_name, mainloop, self.config,
-            hmac_key, mc_list, peer_set_manager,
-            instance_node_maps[cluster_name])
-          )
+      self.updaters[cluster_name] = nld_confd.NLDPeriodicUpdater(
+          cluster_name, mainloop, self.config, hmac_key, mc_list,
+          peer_set_manager, instance_node_maps[cluster_name])
 
     # Instantiate NLD network request and response processers
     # and the async UDP server
-    nld_request_processor = nld_nld.NLDRequestProcessor(self.cluster_keys)
+    nld_request_processor = nld_nld.NLDRequestProcessor(self.cluster_keys,
+                                                        self.updaters)
     nld_response_callback = nld_nld.NLDResponseCallback()
     nld_server = nld_nld.NLDAsyncUDPServer(options.bind_address,
                                            options.port,
@@ -190,7 +202,9 @@ class NetworkLookupDaemon(object):
 
     # Instantiate the misrouted packet handler and its async dispatcher
     misrouted_packet_callback = MisroutedPacketHandler(nld_server,
-                                                       instance_node_maps)
+                                                       instance_node_maps,
+                                                       self.config.endpoints,
+                                                       self.updaters)
     nflog_dispatcher.AsyncNFLog(misrouted_packet_callback,
                                 log_group=self.config.nflog_queue)
 
diff --git a/lib/constants.py b/lib/constants.py
index bbdb61d..c804608 100644
--- a/lib/constants.py
+++ b/lib/constants.py
@@ -36,6 +36,11 @@ DEFAULT_NFLOG_QUEUE = 0
 
 # NLD communication protocol related constants below
 
+# A few common errors for NLD
+NLD_ERROR_UNKNOWN_ENTRY = 1
+NLD_ERROR_INTERNAL = 2
+NLD_ERROR_ARGUMENT = 3
+
 # Each nld request is "salted" by the current timestamp.
 # This constants decides how many seconds of skew to accept.
 # TODO: make this a default and allow the value to be more configurable
diff --git a/lib/nld_nld.py b/lib/nld_nld.py
index bc8ac48..8d55338 100644
--- a/lib/nld_nld.py
+++ b/lib/nld_nld.py
@@ -68,38 +68,28 @@ def UnpackMagic(payload):
   return payload[_FOURCC_LEN:]
 
 
-class NLDQuery(object):
-  """NLD Query base class.
+class NLDRequestProcessor(object):
+  """A processor for NLD requests.
 
   """
-  def Exec(self, query): # pylint: disable-msg=R0201,W0613
-    """Process a single UDP request from a client.
-
-    Different queries should override this function, which by defaults returns
-    a "non-implemented" answer.
-
-    @type query: (undefined)
-    @param query: NLDRequest 'query' field
-    @rtype: (integer, undefined)
-    @return: status and answer to give to the client
+  def __init__(self, cluster_keys, updaters):
+    """Constructor for NLDRequestProcessor
 
     """
-    status = constants.NLD_REPL_STATUS_NOTIMPLEMENTED
-    answer = 'not implemented'
-    return status, answer
-
-
-class PingQuery(NLDQuery):
-  """An empty NLD query.
+    # TODO: make these private
+    self.cluster_keys = cluster_keys
+    self.updaters = updaters
 
-  It will return success on an empty argument, and an error on any other
-  argument.
+    self.dispatch_table = {
+      constants.NLD_REQ_PING: self._Ping,
+      constants.NLD_REQ_ROUTE_INVALIDATE: self._RouteInvalidate,
+      }
 
-  """
-  def Exec(self, query):
-    """PingQuery main execution.
+    assert \
+      not constants.NLD_REQS.symmetric_difference(self.dispatch_table), \
+      "dispatch_table is unaligned with NLD_REQS"
 
-    """
+  def _Ping(self, query):
     if query is None:
       status = constants.NLD_REPL_STATUS_OK
       answer = 'ok'
@@ -109,30 +99,17 @@ class PingQuery(NLDQuery):
 
     return status, answer
 
+  def _RouteInvalidate(self, query):
+    if not query:
+      logging.debug("missing body from route invalidation query")
+      return constants.NLD_REPL_STATUS_ERROR, constants.NLD_ERROR_ARGUMENT
 
-class RouteInvalidateQuery(NLDQuery):
-  # TODO: implement this
-  pass
-
-
-class NLDRequestProcessor(object):
-  """A processor for NLD requests.
-
-  """
-  DISPATCH_TABLE = {
-    constants.NLD_REQ_PING: PingQuery,
-    constants.NLD_REQ_ROUTE_INVALIDATE: RouteInvalidateQuery,
-    }
-
-  def __init__(self, cluster_keys):
-    """Constructor for NLDRequestProcessor
-
-    """
-    # TODO: make this private
-    self.cluster_keys = cluster_keys
-    assert \
-      not constants.NLD_REQS.symmetric_difference(self.DISPATCH_TABLE), \
-      "DISPATCH_TABLE is unaligned with NLD_REQS"
+    logging.debug("executing route invalidation query: [%s]", query)
+    # TODO: make it cluster-aware (don't do a mass-update like this)
+    for _, updater in self.updaters.iteritems():
+      updater.UpdateInstances()
+    answer = 'done'
+    return constants.NLD_REPL_STATUS_OK, answer
 
   def ExecQuery(self, payload, ip, port):
     """Process a single NLD request.
@@ -213,8 +190,7 @@ class NLDRequestProcessor(object):
       msg = "missing requested salt"
       raise errors.NLDRequestError(msg)
 
-    query_object = self.DISPATCH_TABLE[request.type]()
-    status, answer = query_object.Exec(request.query)
+    status, answer = self.dispatch_table[request.type](request.query)
     reply = objects.NLDReply(
       protocol=constants.NLD_PROTOCOL_VERSION,
       is_request=False, # TODO: move this obvious initialization into the
@@ -512,7 +488,7 @@ class NLDResponseCallback(object):
     # TODO: implement this
     # We don't really expect an answer to a route invalidate request,
     # maybe a confirmation that it was received and accepted (or rejected)
-    logging.debug("Got a reply to a route invalidate request")
+    logging.debug("Got a reply to a route invalidate request: %s", up)
 
   def __call__(self, up):
     """NLD response callback.
-- 
1.7.0.1

Reply via email to