Smalyshev has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/284989

Change subject: Switch to pymemcache for finer control and add retries on 
network problems
......................................................................

Switch to pymemcache for finer control and add retries on network problems

Change-Id: I452c2110599faed7fe7d20e2fb5b5c05f023288b
---
M handlers/Memcached.py
M kafka-watcher.py
M requirements.txt
M watcher.yaml
4 files changed, 32 insertions(+), 11 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/services/kafka-watcher 
refs/changes/89/284989/1

diff --git a/handlers/Memcached.py b/handlers/Memcached.py
index e6e231a..a0e7d71 100644
--- a/handlers/Memcached.py
+++ b/handlers/Memcached.py
@@ -1,17 +1,34 @@
-import memcache
+from __future__ import print_function
+
+from pymemcache.client.base import Client
+from pymemcache.exceptions import MemcacheUnexpectedCloseError
 import time
 
 
 class Memcached(object):
 
-    def __init__(self, hostname, **params):
-        self.mc = memcache.Client([hostname])
+    def __init__(self, hostname, port, **params):
+        self.mc = Client((hostname, port))
 
     def handle(self, topic, message):
+        """
+        """
+        if 'cmd' not in message:
+            raise Exception("Bad message: no command")
         cmd = message['cmd']
         if not hasattr(self, cmd):
             raise Exception("Unknown command: " + cmd)
-        getattr(self, cmd)(message)
+        tryit = True
+        while tryit:
+            tryit = False
+            try:
+                getattr(self, cmd)(message)
+            except MemcacheUnexpectedCloseError:
+                # Server dropped dead - we'll retry
+                tryit = True
+            except IOError:
+                # Something network-related - retry
+                tryit = True
 
     def set(self, message):
         text = message['val'].encode('utf-8')
diff --git a/kafka-watcher.py b/kafka-watcher.py
index 0e49860..e78172c 100755
--- a/kafka-watcher.py
+++ b/kafka-watcher.py
@@ -1,10 +1,13 @@
 #!/usr/bin/python
+from __future__ import print_function
+
+from kafka import KafkaConsumer
 import yaml
 import argparse
-from kafka import KafkaConsumer
 import imp
 import json
 import sys
+
 
 parser = argparse.ArgumentParser(description='Process cache relay commands 
from Kafka')
 parser.add_argument('--config', required=True, help='YAML configuration file')
@@ -28,17 +31,17 @@
 consumer = KafkaConsumer(*topics)
 for msg in consumer:
     if msg.topic not in handlers:
-        print("Weird, unknown topic %s" % msg.topic)
+        print("Weird, unknown topic %s" % msg.topic, file=sys.stderr)
         continue
     try:
         data = json.loads(msg.value)
     except ValueError:
         data = None
     if not data:
-        print("Could not parse data, meh")
+        print("Could not parse data, meh", file=sys.stderr)
         continue
     try:
         handlers[msg.topic].handle(msg.topic, data)
     except:
         e = sys.exc_info()
-        print("Oops, something happened: " + str(e))
+        print("Oops, something happened: " + str(e), file=sys.stderr)
diff --git a/requirements.txt b/requirements.txt
index 04741d5..97e23d2 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,3 @@
 yaml
 kafka
-memcache
+pymemcache
diff --git a/watcher.yaml b/watcher.yaml
index d0b5353..490d38a 100644
--- a/watcher.yaml
+++ b/watcher.yaml
@@ -2,5 +2,6 @@
     - topic: wancache-purge
       handler: Memcached
       params: 
-        hostname: localhost:11211
-    
+        hostname: localhost
+        port: 11211
+

-- 
To view, visit https://gerrit.wikimedia.org/r/284989
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I452c2110599faed7fe7d20e2fb5b5c05f023288b
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/services/kafka-watcher
Gerrit-Branch: master
Gerrit-Owner: Smalyshev <smalys...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to