This is an automated email from the ASF dual-hosted git repository.

csantanapr pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new e323899  Better handle when the connection to the DB drops (#205)
e323899 is described below

commit e32389994c4d0afd766db951098afef05820a257
Author: Justin Berstler <[email protected]>
AuthorDate: Wed Aug 2 14:29:25 2017 -0400

    Better handle when the connection to the DB drops (#205)
    
    Make use of a timeout value when constructing the changes feed. This 
ensures that if the connection to the DB is lost, an exception is thrown in a 
timely manner so that the changes feed can be restarted. Without this timeout 
set on the database client, the changes feed will simply hang when the 
connection is lost, and may not recover.
    update to the latest python-cloudant package
---
 Dockerfile           |   2 +-
 provider/database.py |  15 +++---
 provider/service.py  | 143 +++++++++++++++++++++++++--------------------------
 3 files changed, 79 insertions(+), 81 deletions(-)

diff --git a/Dockerfile b/Dockerfile
index 953170c..afdda12 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -24,7 +24,7 @@ RUN export LD_LIBRARY_PATH=$LD_LIBRARY_PATH \
     && ldconfig
 
 RUN pip install gevent==1.1.2 flask==0.11.1 confluent-kafka==0.9.4 \
-        requests==2.10.0 cloudant==2.4.0 psutil==5.0.0
+        requests==2.10.0 cloudant==2.5.0 psutil==5.0.0
 
 # while I expect these will be overridden during deployment, we might as well
 # set reasonable defaults
diff --git a/provider/database.py b/provider/database.py
index ac75cf4..a9cef28 100644
--- a/provider/database.py
+++ b/provider/database.py
@@ -41,17 +41,20 @@ class Database:
     instance = os.getenv('INSTANCE', 'messageHubTrigger-0')
     canaryId = "canary-{}".format(instance)
 
-    def __init__(self):
-        client = CouchDB(self.username, self.password, url=self.url)
-        client.connect()
+    def __init__(self, timeout=None):
+        self.client = CouchDB(self.username, self.password, url=self.url, 
timeout=timeout)
+        self.client.connect()
 
-        if self.dbname in client.all_dbs():
+        if self.dbname in self.client.all_dbs():
             logging.info('Database exists - connecting to it.')
-            self.database = client[self.dbname]
+            self.database = self.client[self.dbname]
         else:
             logging.warn('Database does not exist - creating it.')
-            self.database = client.create_database(self.dbname)
+            self.database = self.client.create_database(self.dbname)
 
+    def destroy(self):
+        self.client.disconnect()
+        self.client = None
 
     def disableTrigger(self, triggerFQN, status_code):
         try:
diff --git a/provider/service.py b/provider/service.py
index 7183b7f..6a6e6ae 100644
--- a/provider/service.py
+++ b/provider/service.py
@@ -25,19 +25,14 @@ from consumer import Consumer
 from database import Database
 from datetime import datetime
 from datetimeutils import secondsSince
+from requests.exceptions import ConnectionError, ReadTimeout
 from threading import Thread
 
 # How often to produce canary documents
 canaryInterval = 60  # seconds
 
-# How long to wait between detecting carnary documents before restarting the
-# DB changes feed. Should be significantly larger than canaryInterval to allow 
for
-# the roundtrip to DB as well as to let the Service handle other work in the
-# meantime.
-canaryTimeout = 90  # seconds
-
 # How long the changes feed should poll before timing out
-changesFeedTimeout = 10  # seconds
+changesFeedTimeout = 30  # seconds
 
 
 class Service (Thread):
@@ -45,10 +40,7 @@ class Service (Thread):
         Thread.__init__(self)
         self.daemon = True
 
-        self.lastCanaryTime = datetime.now()
-
-        self.database = Database()
-        self.changes = self.database.changesFeed(timeout=changesFeedTimeout)
+        self.database = None
         self.lastSequence = None
         self.canaryGenerator = CanaryDocumentGenerator()
 
@@ -58,76 +50,79 @@ class Service (Thread):
         self.canaryGenerator.start()
 
         while True:
-            for change in self.changes:
-                # change could be None because the changes feed will timeout
-                # if it hasn't detected any changes. This timeout allows us to
-                # check whether or not the feed is capable of detecting canary
-                # documents
-                if change != None:
-                    # Record the sequence in case the changes feed needs to be
-                    # restarted. This way the new feed can pick up right where
-                    # the old one left off.
-                    self.lastSequence = change['seq']
-
-                    if "deleted" in change and change["deleted"] == True:
-                        logging.info('[changes] Found a delete')
-                        consumer = 
self.consumers.getConsumerForTrigger(change['id'])
-                        if consumer != None:
-                            if consumer.desiredState() == 
Consumer.State.Disabled:
-                                # just remove it from memory
-                                logging.info('[{}] Removing disabled 
trigger'.format(consumer.trigger))
-                                
self.consumers.removeConsumerForTrigger(consumer.trigger)
-                            else:
-                                logging.info('[{}] Shutting down running 
trigger'.format(consumer.trigger))
-                                consumer.shutdown()
-                    # since we can't use a filter function for the feed (then
-                    # you don't get deletes) we need to manually verify this
-                    # is a valid trigger doc that has changed
-                    elif 'triggerURL' in change['doc']:
-                        logging.info('[changes] Found a change in a trigger 
document')
-                        document = change['doc']
-
-                        if not 
self.consumers.hasConsumerForTrigger(change["id"]):
-                            logging.info('[{}] Found a new trigger to 
create'.format(change["id"]))
-                            self.createAndRunConsumer(document)
-                        else:
-                            logging.info('[{}] Found a change to an existing 
trigger'.format(change["id"]))
-                            existingConsumer = 
self.consumers.getConsumerForTrigger(change["id"])
-
-                            if existingConsumer.desiredState() == 
Consumer.State.Disabled and self.__isTriggerDocActive(document):
-                                # disabled trigger has become active
-                                logging.info('[{}] Existing disabled trigger 
should become active'.format(change["id"]))
+            try:
+                if self.database is not None:
+                    logging.info("Shutting down existing DB client")
+                    self.database.destroy()
+
+                logging.info("Starting changes feed")
+                self.database = Database(timeout=changesFeedTimeout)
+                self.changes = 
self.database.changesFeed(timeout=changesFeedTimeout, since=self.lastSequence)
+
+                self.lastCanaryTime = datetime.now()
+
+                for change in self.changes:
+                    # change could be None because the changes feed will 
timeout
+                    # if it hasn't detected any changes. This timeout allows 
us to
+                    # check whether or not the feed is capable of detecting 
canary
+                    # documents
+                    if change != None:
+                        # Record the sequence in case the changes feed needs 
to be
+                        # restarted. This way the new feed can pick up right 
where
+                        # the old one left off.
+                        self.lastSequence = change['seq']
+
+                        if "deleted" in change and change["deleted"] == True:
+                            logging.info('[changes] Found a delete')
+                            consumer = 
self.consumers.getConsumerForTrigger(change['id'])
+                            if consumer != None:
+                                if consumer.desiredState() == 
Consumer.State.Disabled:
+                                    # just remove it from memory
+                                    logging.info('[{}] Removing disabled 
trigger'.format(consumer.trigger))
+                                    
self.consumers.removeConsumerForTrigger(consumer.trigger)
+                                else:
+                                    logging.info('[{}] Shutting down running 
trigger'.format(consumer.trigger))
+                                    consumer.shutdown()
+                        # since we can't use a filter function for the feed 
(then
+                        # you don't get deletes) we need to manually verify 
this
+                        # is a valid trigger doc that has changed
+                        elif 'triggerURL' in change['doc']:
+                            logging.info('[changes] Found a change in a 
trigger document')
+                            document = change['doc']
+
+                            if not 
self.consumers.hasConsumerForTrigger(change["id"]):
+                                logging.info('[{}] Found a new trigger to 
create'.format(change["id"]))
                                 self.createAndRunConsumer(document)
-                            elif existingConsumer.desiredState() == 
Consumer.State.Running and not self.__isTriggerDocActive(document):
-                                # running trigger should become disabled
-                                logging.info('[{}] Existing running trigger 
should become disabled'.format(change["id"]))
-                                existingConsumer.disable()
                             else:
-                                logging.debug('[changes] Found non-interesting 
trigger change: \n{}\n{}'.format(existingConsumer.desiredState(), document))
-                    elif 'canary-timestamp' in change['doc']:
-                        # found a canary - update lastCanaryTime
-                        logging.info('[canary] I found a canary. The last one 
was {} seconds ago.'.format(secondsSince(self.lastCanaryTime)))
-                        self.lastCanaryTime = datetime.now()
-                    else:
-                        logging.debug('[changes] Found a change for a 
non-trigger document')
-
-                if secondsSince(self.lastCanaryTime) > canaryTimeout:
-                    logging.warn('[canary] It has been more than {} seconds 
since the last canary - restarting the DB changes feed'.format(canaryTimeout))
-                    self.restartChangesFeed()
-                    # break out of the for loop so that it can be 
re-established
-                    # with the new changes feed.
-                    break
+                                logging.info('[{}] Found a change to an 
existing trigger'.format(change["id"]))
+                                existingConsumer = 
self.consumers.getConsumerForTrigger(change["id"])
+
+                                if existingConsumer.desiredState() == 
Consumer.State.Disabled and self.__isTriggerDocActive(document):
+                                    # disabled trigger has become active
+                                    logging.info('[{}] Existing disabled 
trigger should become active'.format(change["id"]))
+                                    self.createAndRunConsumer(document)
+                                elif existingConsumer.desiredState() == 
Consumer.State.Running and not self.__isTriggerDocActive(document):
+                                    # running trigger should become disabled
+                                    logging.info('[{}] Existing running 
trigger should become disabled'.format(change["id"]))
+                                    existingConsumer.disable()
+                                else:
+                                    logging.debug('[changes] Found 
non-interesting trigger change: 
\n{}\n{}'.format(existingConsumer.desiredState(), document))
+                        elif 'canary-timestamp' in change['doc']:
+                            # found a canary - update lastCanaryTime
+                            logging.info('[canary] I found a canary. The last 
one was {} seconds ago.'.format(secondsSince(self.lastCanaryTime)))
+                            self.lastCanaryTime = datetime.now()
+                        else:
+                            logging.debug('[changes] Found a change for a 
non-trigger document')
+            except (ConnectionError, ReadTimeout):
+                logging.error('[canary] DB connection timed out. Restarting 
changes feed...')
+                self.stopChangesFeed()
 
             logging.debug("[changes] I made it out of the changes loop!")
 
-    def restartChangesFeed(self):
+    def stopChangesFeed(self):
         if self.changes != None:
             self.changes.stop()
-
-        self.changes = None
-        self.changes = self.database.changesFeed(timeout=changesFeedTimeout, 
since=self.lastSequence)
-        # reset this time to prevent immediately restarting this new feed
-        self.lastCanaryTime = datetime.now()
+            self.changes = None
 
     def createAndRunConsumer(self, doc):
         triggerFQN = doc['_id']

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to