Signed-off-by: Balazs Lecz <[email protected]>
---
daemons/ganeti-nld | 50 +++++++++++++++++++++------------
lib/constants.py | 5 +++
lib/nld_nld.py | 78 ++++++++++++++++++----------------------------------
3 files changed, 64 insertions(+), 69 deletions(-)
diff --git a/daemons/ganeti-nld b/daemons/ganeti-nld
index 6317663..d28c123 100755
--- a/daemons/ganeti-nld
+++ b/daemons/ganeti-nld
@@ -65,9 +65,11 @@ class MisroutedPacketHandler(object):
"""Callback called when a packet is received via the NFLOG target.
"""
- def __init__(self, nld_server, instance_node_maps):
+ def __init__(self, nld_server, instance_node_maps, endpoints, updaters):
self.nld_server = nld_server
self.instance_node_maps = instance_node_maps
+ self.endpoints = endpoints
+ self.updaters = updaters
def __call__(self, i, nflog_payload):
# Look up the source IP in the instance->node maps. If found, it means the
@@ -92,18 +94,30 @@ class MisroutedPacketHandler(object):
" [cluster: %s] [node: %s] [link: %s] [source: %s]",
source_cluster, source_node, source_link,
ip_packet.src)
- # TODO: send NLD route invalidation request to the source node
- request = nld_nld.NLDClientRequest(type=constants.NLD_REQ_PING)
+ # Update the instance IP list on this node
+ self.updaters[source_cluster].UpdateInstances()
+ # Send NLD route invalidation request to the source node
+ request = nld_nld.NLDClientRequest(
+ type=constants.NLD_REQ_ROUTE_INVALIDATE,
+ query=ip_packet.dst)
self.nld_server.SendRequest(request, source_cluster, source_node)
else:
logging.debug("misrouted packet detected. [source: %s]",
ip_packet.src)
- # TODO: remove this test code
- request = nld_nld.NLDClientRequest(type=constants.NLD_REQ_PING)
- self.nld_server.SendRequest(request, "default", ip_packet.src)
-
- # TODO: notify the endpoint(s) via an NLD request (preferably by iterating
- # over the private IPs of the endpoints)
+ # Update the instance IP lists on this node
+ for _, updater in self.updaters.iteritems():
+ updater.UpdateInstances()
+
+ # Notify the endpoint(s)
+ # TODO: this uses the "externa" IPs of the endpoints.
+ # Maybe we should be using their private IPs here.
+ logging.debug("notifying the endpoints about a misrouted packet...")
+ for endpoint in self.endpoints:
+ logging.debug("notifying endpoint: %s", endpoint)
+ request = nld_nld.NLDClientRequest(
+ type=constants.NLD_REQ_ROUTE_INVALIDATE,
+ query=ip_packet.dst)
+ self.nld_server.SendRequest(request, "default", endpoint)
return 1
@@ -164,23 +178,21 @@ class NetworkLookupDaemon(object):
instance_node_maps = {}
# Instantiate one periodic updater per cluster
- self.updaters = []
+ self.updaters = {}
self.cluster_keys = {}
for cluster_name, cluster_options in self.config.clusters.iteritems():
hmac_key = utils.ReadFile(cluster_options["hmac_key_file"])
self.cluster_keys[cluster_name] = hmac_key
mc_list = utils.ReadFile(cluster_options["mc_list_file"]).splitlines()
instance_node_maps[cluster_name] = {}
- self.updaters.append(
- nld_confd.NLDPeriodicUpdater(
- cluster_name, mainloop, self.config,
- hmac_key, mc_list, peer_set_manager,
- instance_node_maps[cluster_name])
- )
+ self.updaters[cluster_name] = nld_confd.NLDPeriodicUpdater(
+ cluster_name, mainloop, self.config, hmac_key, mc_list,
+ peer_set_manager, instance_node_maps[cluster_name])
# Instantiate NLD network request and response processers
# and the async UDP server
- nld_request_processor = nld_nld.NLDRequestProcessor(self.cluster_keys)
+ nld_request_processor = nld_nld.NLDRequestProcessor(self.cluster_keys,
+ self.updaters)
nld_response_callback = nld_nld.NLDResponseCallback()
nld_server = nld_nld.NLDAsyncUDPServer(options.bind_address,
options.port,
@@ -190,7 +202,9 @@ class NetworkLookupDaemon(object):
# Instantiate the misrouted packet handler and its async dispatcher
misrouted_packet_callback = MisroutedPacketHandler(nld_server,
- instance_node_maps)
+ instance_node_maps,
+ self.config.endpoints,
+ self.updaters)
nflog_dispatcher.AsyncNFLog(misrouted_packet_callback,
log_group=self.config.nflog_queue)
diff --git a/lib/constants.py b/lib/constants.py
index bbdb61d..c804608 100644
--- a/lib/constants.py
+++ b/lib/constants.py
@@ -36,6 +36,11 @@ DEFAULT_NFLOG_QUEUE = 0
# NLD communication protocol related constants below
+# A few common errors for NLD
+NLD_ERROR_UNKNOWN_ENTRY = 1
+NLD_ERROR_INTERNAL = 2
+NLD_ERROR_ARGUMENT = 3
+
# Each nld request is "salted" by the current timestamp.
# This constants decides how many seconds of skew to accept.
# TODO: make this a default and allow the value to be more configurable
diff --git a/lib/nld_nld.py b/lib/nld_nld.py
index bc8ac48..8d55338 100644
--- a/lib/nld_nld.py
+++ b/lib/nld_nld.py
@@ -68,38 +68,28 @@ def UnpackMagic(payload):
return payload[_FOURCC_LEN:]
-class NLDQuery(object):
- """NLD Query base class.
+class NLDRequestProcessor(object):
+ """A processor for NLD requests.
"""
- def Exec(self, query): # pylint: disable-msg=R0201,W0613
- """Process a single UDP request from a client.
-
- Different queries should override this function, which by defaults returns
- a "non-implemented" answer.
-
- @type query: (undefined)
- @param query: NLDRequest 'query' field
- @rtype: (integer, undefined)
- @return: status and answer to give to the client
+ def __init__(self, cluster_keys, updaters):
+ """Constructor for NLDRequestProcessor
"""
- status = constants.NLD_REPL_STATUS_NOTIMPLEMENTED
- answer = 'not implemented'
- return status, answer
-
-
-class PingQuery(NLDQuery):
- """An empty NLD query.
+ # TODO: make these private
+ self.cluster_keys = cluster_keys
+ self.updaters = updaters
- It will return success on an empty argument, and an error on any other
- argument.
+ self.dispatch_table = {
+ constants.NLD_REQ_PING: self._Ping,
+ constants.NLD_REQ_ROUTE_INVALIDATE: self._RouteInvalidate,
+ }
- """
- def Exec(self, query):
- """PingQuery main execution.
+ assert \
+ not constants.NLD_REQS.symmetric_difference(self.dispatch_table), \
+ "dispatch_table is unaligned with NLD_REQS"
- """
+ def _Ping(self, query):
if query is None:
status = constants.NLD_REPL_STATUS_OK
answer = 'ok'
@@ -109,30 +99,17 @@ class PingQuery(NLDQuery):
return status, answer
+ def _RouteInvalidate(self, query):
+ if not query:
+ logging.debug("missing body from route invalidation query")
+ return constants.NLD_REPL_STATUS_ERROR, constants.NLD_ERROR_ARGUMENT
-class RouteInvalidateQuery(NLDQuery):
- # TODO: implement this
- pass
-
-
-class NLDRequestProcessor(object):
- """A processor for NLD requests.
-
- """
- DISPATCH_TABLE = {
- constants.NLD_REQ_PING: PingQuery,
- constants.NLD_REQ_ROUTE_INVALIDATE: RouteInvalidateQuery,
- }
-
- def __init__(self, cluster_keys):
- """Constructor for NLDRequestProcessor
-
- """
- # TODO: make this private
- self.cluster_keys = cluster_keys
- assert \
- not constants.NLD_REQS.symmetric_difference(self.DISPATCH_TABLE), \
- "DISPATCH_TABLE is unaligned with NLD_REQS"
+ logging.debug("executing route invalidation query: [%s]", query)
+ # TODO: make it cluster-aware (don't do a mass-update like this)
+ for _, updater in self.updaters.iteritems():
+ updater.UpdateInstances()
+ answer = 'done'
+ return constants.NLD_REPL_STATUS_OK, answer
def ExecQuery(self, payload, ip, port):
"""Process a single NLD request.
@@ -213,8 +190,7 @@ class NLDRequestProcessor(object):
msg = "missing requested salt"
raise errors.NLDRequestError(msg)
- query_object = self.DISPATCH_TABLE[request.type]()
- status, answer = query_object.Exec(request.query)
+ status, answer = self.dispatch_table[request.type](request.query)
reply = objects.NLDReply(
protocol=constants.NLD_PROTOCOL_VERSION,
is_request=False, # TODO: move this obvious initialization into the
@@ -512,7 +488,7 @@ class NLDResponseCallback(object):
# TODO: implement this
# We don't really expect an answer to a route invalidate request,
# maybe a confirmation that it was received and accepted (or rejected)
- logging.debug("Got a reply to a route invalidate request")
+ logging.debug("Got a reply to a route invalidate request: %s", up)
def __call__(self, up):
"""NLD response callback.
--
1.7.0.1