Smalyshev has uploaded a new change for review. https://gerrit.wikimedia.org/r/284989
Change subject: Switch to pymemcache for finer control and add retries on network problems ...................................................................... Switch to pymemcache for finer control and add retries on network problems Change-Id: I452c2110599faed7fe7d20e2fb5b5c05f023288b --- M handlers/Memcached.py M kafka-watcher.py M requirements.txt M watcher.yaml 4 files changed, 32 insertions(+), 11 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/kafka-watcher refs/changes/89/284989/1 diff --git a/handlers/Memcached.py b/handlers/Memcached.py index e6e231a..a0e7d71 100644 --- a/handlers/Memcached.py +++ b/handlers/Memcached.py @@ -1,17 +1,34 @@ -import memcache +from __future__ import print_function + +from pymemcache.client.base import Client +from pymemcache.exceptions import MemcacheUnexpectedCloseError import time class Memcached(object): - def __init__(self, hostname, **params): - self.mc = memcache.Client([hostname]) + def __init__(self, hostname, port, **params): + self.mc = Client((hostname, port)) def handle(self, topic, message): + """ + """ + if 'cmd' not in message: + raise Exception("Bad message: no command") cmd = message['cmd'] if not hasattr(self, cmd): raise Exception("Unknown command: " + cmd) - getattr(self, cmd)(message) + tryit = True + while tryit: + tryit = False + try: + getattr(self, cmd)(message) + except MemcacheUnexpectedCloseError: + # Server dropped dead - we'll retry + tryit = True + except IOError: + # Something network-related - retry + tryit = True def set(self, message): text = message['val'].encode('utf-8') diff --git a/kafka-watcher.py b/kafka-watcher.py index 0e49860..e78172c 100755 --- a/kafka-watcher.py +++ b/kafka-watcher.py @@ -1,10 +1,13 @@ #!/usr/bin/python +from __future__ import print_function + +from kafka import KafkaConsumer import yaml import argparse -from kafka import KafkaConsumer import imp import json import sys + parser = argparse.ArgumentParser(description='Process cache relay commands from Kafka') parser.add_argument('--config', required=True, help='YAML configuration file') @@ -28,17 +31,17 @@ consumer = KafkaConsumer(*topics) for msg in consumer: if msg.topic not in handlers: - print("Weird, unknown topic %s" % msg.topic) + print("Weird, unknown topic %s" % msg.topic, file=sys.stderr) continue try: data = json.loads(msg.value) except ValueError: data = None if not data: - print("Could not parse data, meh") + print("Could not parse data, meh", file=sys.stderr) continue try: handlers[msg.topic].handle(msg.topic, data) except: e = sys.exc_info() - print("Oops, something happened: " + str(e)) + print("Oops, something happened: " + str(e), file=sys.stderr) diff --git a/requirements.txt b/requirements.txt index 04741d5..97e23d2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ yaml kafka -memcache +pymemcache diff --git a/watcher.yaml b/watcher.yaml index d0b5353..490d38a 100644 --- a/watcher.yaml +++ b/watcher.yaml @@ -2,5 +2,6 @@ - topic: wancache-purge handler: Memcached params: - hostname: localhost:11211 - + hostname: localhost + port: 11211 + -- To view, visit https://gerrit.wikimedia.org/r/284989 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I452c2110599faed7fe7d20e2fb5b5c05f023288b Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/services/kafka-watcher Gerrit-Branch: master Gerrit-Owner: Smalyshev <smalys...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits