This callback can be stacked with another one, and will filter duplicate
results.

Signed-off-by: Guido Trotter <[email protected]>
---
 lib/confd/client.py |   66 +++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 66 insertions(+), 0 deletions(-)

diff --git a/lib/confd/client.py b/lib/confd/client.py
index 53d1673..672a984 100644
--- a/lib/confd/client.py
+++ b/lib/confd/client.py
@@ -271,3 +271,69 @@ class ConfdClientRequest(objects.ConfdRequest):
     if self.type not in constants.CONFD_REQS:
       raise errors.ConfdClientError("Invalid request type")
 
+
+class ConfdFilterCallback:
+  """Callback that calls another callback, but filters duplicate results.
+
+  """
+  def __init__(self, callback, logger=None):
+    """Constructor for ConfdFilterCallback
+
+    @type callback: f(L{ConfdUpcallPayload})
+    @param callback: function to call when getting answers
+    @type logger: L{logging.Logger}
+    @keyword logger: optional logger for internal conditions
+
+    """
+    if not callable(callback):
+      raise errors.ProgrammerError("callback must be callable")
+
+    self._callback = callback
+    self._logger = logger
+    # answers contains a dict of salt -> answer
+    self._answers = {}
+
+  def __call__(self, up):
+    """Filtering callback
+
+    @type up: L{ConfdUpcallPayload}
+    @param up: upper callback
+
+    """
+    salt = up.salt
+    filter = False
+    if up.type == UPCALL_REPLY:
+      if salt in self._answers:
+        old_answer = self._answers[salt]
+        new_answer = up.server_reply
+        if new_answer.serial > old_answer.serial:
+          self._answers[salt] = up.server_reply
+          if new_answer.answer == old_answer.answer:
+            self._logger.debug("Filtering confirming answer, with newer"
+              " serial for query %s" % salt)
+            filter = True
+        elif new_answer.serial == old_answer.serial:
+          filter = True
+          if self._logger:
+            if new_answer.answer != old_answer.answer:
+              self._logger.debug("Got incoherent answers for query %s"
+                " (serial: %s)" % (salt, new_answer.serial))
+            else:
+              self._logger.debug("Filtering confirming answer, with same"
+                " serial for query %s" % salt)
+        else:
+          filter = True
+          if self._logger:
+            self._logger.debug("Filtering outdated answer for query %s"
+              " serial: (%d < %d)" % (salt, old_answer.serial, 
new_answer.serial))
+      else:
+        self._answers[salt] = up.server_reply
+    elif up.type == UPCALL_EXPIRE:
+      if salt in self._answers:
+        del self._answers[salt]
+
+    if not filter:
+      return self._callback(up)
+    else:
+      return
+
-- 
1.6.3.3

Reply via email to