This is an automated email from the ASF dual-hosted git repository. japetrsn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push: new bdb2c9a Disable triggers for invalid auth when using custom auth handler (#354) bdb2c9a is described below commit bdb2c9a98b693aadcb28a9ad89896655b544a653 Author: James Dubee <jwdu...@us.ibm.com> AuthorDate: Thu Oct 17 10:18:03 2019 -0400 Disable triggers for invalid auth when using custom auth handler (#354) * Disable triggers for invalid auth when using custom auth handler * Fix bad indentation * Call __shouldDisable on auth handler exception * Always dump response on auth handler exception * Use response.ok instead of checking status code * Fix typo --- provider/authHandler.py | 25 +++++++++++----- provider/consumer.py | 80 +++++++++++++++++++++++++++---------------------- 2 files changed, 62 insertions(+), 43 deletions(-) diff --git a/provider/authHandler.py b/provider/authHandler.py index 0325161..58b1ef0 100644 --- a/provider/authHandler.py +++ b/provider/authHandler.py @@ -23,6 +23,9 @@ import time from requests.auth import AuthBase +class AuthHandlerException(Exception): + def __init__(self, response): + self.response = response class IAMAuth(AuthBase): @@ -35,18 +38,24 @@ class IAMAuth(AuthBase): r.headers['Authorization'] = 'Bearer {}'.format(self.__getToken()) return r - def __getToken(self): if 'expires_in' not in self.tokenInfo or self.__isRefreshTokenExpired(): - self.tokenInfo = self.__requestToken() - return self.tokenInfo['access_token'] + response = self.__requestToken() + if response.ok and 'access_token' in response.json(): + self.tokenInfo = response.json() + return self.tokenInfo['access_token'] + else: + raise AuthHandlerException(response) elif self.__isTokenExpired(): - self.tokenInfo = self.__refreshToken() - return self.tokenInfo['access_token'] + response = self.__refreshToken() + if response.ok and 'access_token' in response.json(): + self.tokenInfo = response.json() + return self.tokenInfo['access_token'] + else: + raise AuthHandlerException(response) else: return self.tokenInfo['access_token'] - def __requestToken(self): headers = { 'Content-type': 'application/x-www-form-urlencoded', @@ -86,7 +95,7 @@ class IAMAuth(AuthBase): def __isRefreshTokenExpired(self): if 'expiration' not in self.tokenInfo: - return true + return True sevenDays = 7 * 24 * 3600 currentTime = int(time.time()) @@ -96,4 +105,4 @@ class IAMAuth(AuthBase): def __sendRequest(self, payload, headers): response = requests.post(self.endpoint, data=payload, headers=headers) - return response.json() + return response diff --git a/provider/consumer.py b/provider/consumer.py index 74b243a..8742af3 100644 --- a/provider/consumer.py +++ b/provider/consumer.py @@ -32,6 +32,7 @@ from datetime import datetime from datetimeutils import secondsSince from multiprocessing import Process, Manager from urlparse import urlparse +from authHandler import AuthHandlerException from authHandler import IAMAuth from requests.auth import HTTPBasicAuth from datetime import datetime, timedelta @@ -409,44 +410,19 @@ class ConsumerProcess (Process): self.consumer.commit(offsets=self.__getOffsetList(messages), async=False) retry = False elif self.__shouldDisable(status_code): - logging.error('[{}] Error talking to OpenWhisk, status code {}'.format(self.trigger, status_code)) - response_dump = { - 'request': { - 'method': response.request.method, - 'url': response.request.url, - 'path_url': response.request.path_url, - 'headers': response.request.headers, - 'body': response.request.body - }, - 'response': { - 'status_code': response.status_code, - 'ok': response.ok, - 'reason': response.reason, - 'url': response.url, - 'headers': response.headers, - 'content': response.content - } - } - - logging.error('[{}] Dumping the content of the request and response:\n{}'.format(self.trigger, response_dump)) - - # abandon all hope? - self.setDesiredState(Consumer.State.Disabled) - # mark it disabled in the DB - - # when failing to establish a database connection, mark the consumer as dead to restart the consumer - try: - self.database = Database() - self.database.disableTrigger(self.trigger, status_code) - except Exception as e: - logging.error('[{}] Uncaught exception: {}'.format(self.trigger, e)) - self.__recordState(Consumer.State.Dead) - finally: - self.database.destroy() - retry = False + logging.error('[{}] Error talking to OpenWhisk, status code {}'.format(self.trigger, status_code)) + self.__dumpRequestResponse(response) + self.__disableTrigger(status_code) except requests.exceptions.RequestException as e: logging.error('[{}] Error talking to OpenWhisk: {}'.format(self.trigger, e)) + except AuthHandlerException as e: + logging.error("[{}] Encountered an exception from auth handler, status code {}").format(self.trigger, e.response.status_code) + self.__dumpRequestResponse(e.response) + + if self.__shouldDisable(e.response.status_code): + retry = False + self.__disableTrigger(e.response.status_code) if retry: retry_count += 1 @@ -460,6 +436,40 @@ class ConsumerProcess (Process): self.consumer.commit(offsets=self.__getOffsetList(messages), async=False) retry = False + def __disableTrigger(self, status_code): + self.setDesiredState(Consumer.State.Disabled) + + # when failing to establish a database connection, mark the consumer as dead to restart the consumer + try: + self.database = Database() + self.database.disableTrigger(self.trigger, status_code) + except Exception as e: + logging.error('[{}] Uncaught exception: {}'.format(self.trigger, e)) + self.__recordState(Consumer.State.Dead) + finally: + self.database.destroy() + + def __dumpRequestResponse(self, response): + response_dump = { + 'request': { + 'method': response.request.method, + 'url': response.request.url, + 'path_url': response.request.path_url, + 'headers': response.request.headers, + 'body': response.request.body + }, + 'response': { + 'status_code': response.status_code, + 'ok': response.ok, + 'reason': response.reason, + 'url': response.url, + 'headers': response.headers, + 'content': response.content + } + } + + logging.error('[{}] Dumping the content of the request and response:\n{}'.format(self.trigger, response_dump)) + # return the dict that will be sent as the trigger payload def __getMessagePayload(self, message): return {