This callback can be stacked with another one, and will filter duplicate results.
Signed-off-by: Guido Trotter <[email protected]> --- lib/confd/client.py | 66 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 66 insertions(+), 0 deletions(-) diff --git a/lib/confd/client.py b/lib/confd/client.py index 53d1673..672a984 100644 --- a/lib/confd/client.py +++ b/lib/confd/client.py @@ -271,3 +271,69 @@ class ConfdClientRequest(objects.ConfdRequest): if self.type not in constants.CONFD_REQS: raise errors.ConfdClientError("Invalid request type") + +class ConfdFilterCallback: + """Callback that calls another callback, but filters duplicate results. + + """ + def __init__(self, callback, logger=None): + """Constructor for ConfdFilterCallback + + @type callback: f(L{ConfdUpcallPayload}) + @param callback: function to call when getting answers + @type logger: L{logging.Logger} + @keyword logger: optional logger for internal conditions + + """ + if not callable(callback): + raise errors.ProgrammerError("callback must be callable") + + self._callback = callback + self._logger = logger + # answers contains a dict of salt -> answer + self._answers = {} + + def __call__(self, up): + """Filtering callback + + @type up: L{ConfdUpcallPayload} + @param up: upper callback + + """ + salt = up.salt + filter = False + if up.type == UPCALL_REPLY: + if salt in self._answers: + old_answer = self._answers[salt] + new_answer = up.server_reply + if new_answer.serial > old_answer.serial: + self._answers[salt] = up.server_reply + if new_answer.answer == old_answer.answer: + self._logger.debug("Filtering confirming answer, with newer" + " serial for query %s" % salt) + filter = True + elif new_answer.serial == old_answer.serial: + filter = True + if self._logger: + if new_answer.answer != old_answer.answer: + self._logger.debug("Got incoherent answers for query %s" + " (serial: %s)" % (salt, new_answer.serial)) + else: + self._logger.debug("Filtering confirming answer, with same" + " serial for query %s" % salt) + else: + filter = True + if self._logger: + self._logger.debug("Filtering outdated answer for query %s" + " serial: (%d < %d)" % (salt, old_answer.serial, new_answer.serial)) + else: + self._answers[salt] = up.server_reply + elif up.type == UPCALL_EXPIRE: + if salt in self._answers: + del self._answers[salt] + + if not filter: + return self._callback(up) + else: + return + -- 1.6.3.3
