This is an automated email from the ASF dual-hosted git repository.
csantanapr pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new e323899 Better handle when the connection to the DB drops (#205)
e323899 is described below
commit e32389994c4d0afd766db951098afef05820a257
Author: Justin Berstler <[email protected]>
AuthorDate: Wed Aug 2 14:29:25 2017 -0400
Better handle when the connection to the DB drops (#205)
Make use of a timeout value when constructing the changes feed. This
ensures that if the connection to the DB is lost, an exception is thrown in a
timely manner so that the changes feed can be restarted. Without this timeout
set on the database client, the changes feed will simply hang when the
connection is lost, and may not recover.
update to the latest python-cloudant package
---
Dockerfile | 2 +-
provider/database.py | 15 +++---
provider/service.py | 143 +++++++++++++++++++++++++--------------------------
3 files changed, 79 insertions(+), 81 deletions(-)
diff --git a/Dockerfile b/Dockerfile
index 953170c..afdda12 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -24,7 +24,7 @@ RUN export LD_LIBRARY_PATH=$LD_LIBRARY_PATH \
&& ldconfig
RUN pip install gevent==1.1.2 flask==0.11.1 confluent-kafka==0.9.4 \
- requests==2.10.0 cloudant==2.4.0 psutil==5.0.0
+ requests==2.10.0 cloudant==2.5.0 psutil==5.0.0
# while I expect these will be overridden during deployment, we might as well
# set reasonable defaults
diff --git a/provider/database.py b/provider/database.py
index ac75cf4..a9cef28 100644
--- a/provider/database.py
+++ b/provider/database.py
@@ -41,17 +41,20 @@ class Database:
instance = os.getenv('INSTANCE', 'messageHubTrigger-0')
canaryId = "canary-{}".format(instance)
- def __init__(self):
- client = CouchDB(self.username, self.password, url=self.url)
- client.connect()
+ def __init__(self, timeout=None):
+ self.client = CouchDB(self.username, self.password, url=self.url,
timeout=timeout)
+ self.client.connect()
- if self.dbname in client.all_dbs():
+ if self.dbname in self.client.all_dbs():
logging.info('Database exists - connecting to it.')
- self.database = client[self.dbname]
+ self.database = self.client[self.dbname]
else:
logging.warn('Database does not exist - creating it.')
- self.database = client.create_database(self.dbname)
+ self.database = self.client.create_database(self.dbname)
+ def destroy(self):
+ self.client.disconnect()
+ self.client = None
def disableTrigger(self, triggerFQN, status_code):
try:
diff --git a/provider/service.py b/provider/service.py
index 7183b7f..6a6e6ae 100644
--- a/provider/service.py
+++ b/provider/service.py
@@ -25,19 +25,14 @@ from consumer import Consumer
from database import Database
from datetime import datetime
from datetimeutils import secondsSince
+from requests.exceptions import ConnectionError, ReadTimeout
from threading import Thread
# How often to produce canary documents
canaryInterval = 60 # seconds
-# How long to wait between detecting carnary documents before restarting the
-# DB changes feed. Should be significantly larger than canaryInterval to allow
for
-# the roundtrip to DB as well as to let the Service handle other work in the
-# meantime.
-canaryTimeout = 90 # seconds
-
# How long the changes feed should poll before timing out
-changesFeedTimeout = 10 # seconds
+changesFeedTimeout = 30 # seconds
class Service (Thread):
@@ -45,10 +40,7 @@ class Service (Thread):
Thread.__init__(self)
self.daemon = True
- self.lastCanaryTime = datetime.now()
-
- self.database = Database()
- self.changes = self.database.changesFeed(timeout=changesFeedTimeout)
+ self.database = None
self.lastSequence = None
self.canaryGenerator = CanaryDocumentGenerator()
@@ -58,76 +50,79 @@ class Service (Thread):
self.canaryGenerator.start()
while True:
- for change in self.changes:
- # change could be None because the changes feed will timeout
- # if it hasn't detected any changes. This timeout allows us to
- # check whether or not the feed is capable of detecting canary
- # documents
- if change != None:
- # Record the sequence in case the changes feed needs to be
- # restarted. This way the new feed can pick up right where
- # the old one left off.
- self.lastSequence = change['seq']
-
- if "deleted" in change and change["deleted"] == True:
- logging.info('[changes] Found a delete')
- consumer =
self.consumers.getConsumerForTrigger(change['id'])
- if consumer != None:
- if consumer.desiredState() ==
Consumer.State.Disabled:
- # just remove it from memory
- logging.info('[{}] Removing disabled
trigger'.format(consumer.trigger))
-
self.consumers.removeConsumerForTrigger(consumer.trigger)
- else:
- logging.info('[{}] Shutting down running
trigger'.format(consumer.trigger))
- consumer.shutdown()
- # since we can't use a filter function for the feed (then
- # you don't get deletes) we need to manually verify this
- # is a valid trigger doc that has changed
- elif 'triggerURL' in change['doc']:
- logging.info('[changes] Found a change in a trigger
document')
- document = change['doc']
-
- if not
self.consumers.hasConsumerForTrigger(change["id"]):
- logging.info('[{}] Found a new trigger to
create'.format(change["id"]))
- self.createAndRunConsumer(document)
- else:
- logging.info('[{}] Found a change to an existing
trigger'.format(change["id"]))
- existingConsumer =
self.consumers.getConsumerForTrigger(change["id"])
-
- if existingConsumer.desiredState() ==
Consumer.State.Disabled and self.__isTriggerDocActive(document):
- # disabled trigger has become active
- logging.info('[{}] Existing disabled trigger
should become active'.format(change["id"]))
+ try:
+ if self.database is not None:
+ logging.info("Shutting down existing DB client")
+ self.database.destroy()
+
+ logging.info("Starting changes feed")
+ self.database = Database(timeout=changesFeedTimeout)
+ self.changes =
self.database.changesFeed(timeout=changesFeedTimeout, since=self.lastSequence)
+
+ self.lastCanaryTime = datetime.now()
+
+ for change in self.changes:
+ # change could be None because the changes feed will
timeout
+ # if it hasn't detected any changes. This timeout allows
us to
+ # check whether or not the feed is capable of detecting
canary
+ # documents
+ if change != None:
+ # Record the sequence in case the changes feed needs
to be
+ # restarted. This way the new feed can pick up right
where
+ # the old one left off.
+ self.lastSequence = change['seq']
+
+ if "deleted" in change and change["deleted"] == True:
+ logging.info('[changes] Found a delete')
+ consumer =
self.consumers.getConsumerForTrigger(change['id'])
+ if consumer != None:
+ if consumer.desiredState() ==
Consumer.State.Disabled:
+ # just remove it from memory
+ logging.info('[{}] Removing disabled
trigger'.format(consumer.trigger))
+
self.consumers.removeConsumerForTrigger(consumer.trigger)
+ else:
+ logging.info('[{}] Shutting down running
trigger'.format(consumer.trigger))
+ consumer.shutdown()
+ # since we can't use a filter function for the feed
(then
+ # you don't get deletes) we need to manually verify
this
+ # is a valid trigger doc that has changed
+ elif 'triggerURL' in change['doc']:
+ logging.info('[changes] Found a change in a
trigger document')
+ document = change['doc']
+
+ if not
self.consumers.hasConsumerForTrigger(change["id"]):
+ logging.info('[{}] Found a new trigger to
create'.format(change["id"]))
self.createAndRunConsumer(document)
- elif existingConsumer.desiredState() ==
Consumer.State.Running and not self.__isTriggerDocActive(document):
- # running trigger should become disabled
- logging.info('[{}] Existing running trigger
should become disabled'.format(change["id"]))
- existingConsumer.disable()
else:
- logging.debug('[changes] Found non-interesting
trigger change: \n{}\n{}'.format(existingConsumer.desiredState(), document))
- elif 'canary-timestamp' in change['doc']:
- # found a canary - update lastCanaryTime
- logging.info('[canary] I found a canary. The last one
was {} seconds ago.'.format(secondsSince(self.lastCanaryTime)))
- self.lastCanaryTime = datetime.now()
- else:
- logging.debug('[changes] Found a change for a
non-trigger document')
-
- if secondsSince(self.lastCanaryTime) > canaryTimeout:
- logging.warn('[canary] It has been more than {} seconds
since the last canary - restarting the DB changes feed'.format(canaryTimeout))
- self.restartChangesFeed()
- # break out of the for loop so that it can be
re-established
- # with the new changes feed.
- break
+ logging.info('[{}] Found a change to an
existing trigger'.format(change["id"]))
+ existingConsumer =
self.consumers.getConsumerForTrigger(change["id"])
+
+ if existingConsumer.desiredState() ==
Consumer.State.Disabled and self.__isTriggerDocActive(document):
+ # disabled trigger has become active
+ logging.info('[{}] Existing disabled
trigger should become active'.format(change["id"]))
+ self.createAndRunConsumer(document)
+ elif existingConsumer.desiredState() ==
Consumer.State.Running and not self.__isTriggerDocActive(document):
+ # running trigger should become disabled
+ logging.info('[{}] Existing running
trigger should become disabled'.format(change["id"]))
+ existingConsumer.disable()
+ else:
+ logging.debug('[changes] Found
non-interesting trigger change:
\n{}\n{}'.format(existingConsumer.desiredState(), document))
+ elif 'canary-timestamp' in change['doc']:
+ # found a canary - update lastCanaryTime
+ logging.info('[canary] I found a canary. The last
one was {} seconds ago.'.format(secondsSince(self.lastCanaryTime)))
+ self.lastCanaryTime = datetime.now()
+ else:
+ logging.debug('[changes] Found a change for a
non-trigger document')
+ except (ConnectionError, ReadTimeout):
+ logging.error('[canary] DB connection timed out. Restarting
changes feed...')
+ self.stopChangesFeed()
logging.debug("[changes] I made it out of the changes loop!")
- def restartChangesFeed(self):
+ def stopChangesFeed(self):
if self.changes != None:
self.changes.stop()
-
- self.changes = None
- self.changes = self.database.changesFeed(timeout=changesFeedTimeout,
since=self.lastSequence)
- # reset this time to prevent immediately restarting this new feed
- self.lastCanaryTime = datetime.now()
+ self.changes = None
def createAndRunConsumer(self, doc):
triggerFQN = doc['_id']
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].