Rfaulk has uploaded a new change for review. https://gerrit.wikimedia.org/r/73721
Change subject: rm. Request notification handler to maintain job queue. ...................................................................... rm. Request notification handler to maintain job queue. Change-Id: I7b0764d42ad85afaa98533ae0ff71a1c5c9508fe --- M user_metrics/api/engine/request_manager.py M user_metrics/api/engine/response_handler.py 2 files changed, 1 insertion(+), 173 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/user-metrics refs/changes/21/73721/1 diff --git a/user_metrics/api/engine/request_manager.py b/user_metrics/api/engine/request_manager.py index 3a5f814..dd33191 100644 --- a/user_metrics/api/engine/request_manager.py +++ b/user_metrics/api/engine/request_manager.py @@ -204,12 +204,6 @@ .format(rm.cohort_expr, rm.metric)) wait_queue.append(rm) - # Communicate with request notification callback about new job - key_sig = build_key_signature(rm, hash_result=True) - url = get_url_from_keys(build_key_signature(rm), REQUEST_PATH) - req_cb_add_req(key_sig, url, REQ_NCB_LOCK) - - logging.debug('{0} - FINISHING.'.format(log_name)) @@ -476,166 +470,3 @@ results['data'][m[0]] = m[1:] return results - - -# REQUEST NOTIFICATIONS -# ##################### - -from collections import OrderedDict - -req_notification_queue_in = Queue() -req_notification_queue_out = Queue() - -request_msg_type = namedtuple('RequestMessage', 'type hash url is_alive') - - -def requests_notification_callback(msg_queue_in, msg_queue_out): - """ - Asynchronous callback. Tracks status of requests and new requests. - This callback utilizes ``msg_queue_in`` & ``msg_queue_out`` to - manage request status. - """ - log_name = '{0} :: {1}'.format(__name__, - requests_notification_callback.__name__) - logging.debug('{0} - STARTING...'.format(log_name)) - - # TODO - potentially extend with an in-memory cache - job_list = OrderedDict() - while 1: - - try: - msg = msg_queue_in.get(True) - except IOError as e: - logging.error(__name__ + ' :: Could not block ' - 'on in queue: "{0}"'.format(e.message)) - sleep(1) - continue - - try: - type = msg[0] - except (KeyError, ValueError): - logging.error(log_name + ' - No valid type ' \ - '{0}'.format(str(msg))) - continue - - # Init request - if type == 0: - try: - job_list[msg[1]] = [True, msg[2]] - logging.debug(log_name + ' - Initialize Request: ' \ - '{0}.'.format(str(msg))) - except Exception: - logging.error(log_name + ' - Initialize Request' \ - ' failed: {0}'.format(str(msg))) - - # Flag request complete - leave on queue - elif type == 1: - try: - job_list[msg[1]][0] = False - logging.debug(log_name + ' - Set request finished: ' \ - '{0}.\n'.format(str(msg))) - except Exception: - logging.error(log_name + ' - Set request finished failed: ' \ - '{0}\n'.format(str(msg))) - - # Is the key in the cache and running? - elif type == 2: - try: - if msg[1] in job_list: - msg_queue_out.put([job_list[msg[1]][0]], True) - else: - msg_queue_out.put([False], True) - logging.debug(log_name + ' - Get request alive: ' \ - '{0}.'.format(str(msg))) - except (KeyError, ValueError): - logging.error(log_name + ' - Get request alive failed: ' \ - '{0}'.format(str(msg))) - - # Get keys - elif type == 3: - msg_queue_out.put(job_list.keys(), True) - - # Get url - elif type == 4: - try: - if msg[1] in job_list: - msg_queue_out.put([job_list[msg[1]][1]], True) - else: - logging.error(log_name + ' - Get URL failed: {0}'. - format(str(msg))) - except (KeyError, ValueError): - logging.error(log_name + ' - Get URL failed: {0}'.format(str(msg))) - else: - logging.error(log_name + ' - Bad message: {0}'.format(str(msg))) - - logging.debug('{0} - SHUTTING DOWN...'.format(log_name)) - - -# Wrapper Methods for working with Request Notifications -# Use locks to enforce atomicity - -BLOCK_TIMEOUT = 1 - - -def req_cb_get_url(key, lock): - lock.acquire() - req_notification_queue_in.put([4, key], block=True) - try: - val = req_notification_queue_out.get(True, timeout=BLOCK_TIMEOUT)[0] - except Empty: - logging.error(__name__ + ' :: req_cb_get_url -' - ' Block time expired.') - val = '' - finally: - lock.release() - return val - - -def req_cb_get_cache_keys(lock): - lock.acquire() - req_notification_queue_in.put([3], block=True) - try: - val = req_notification_queue_out.get(block=True, - timeout=BLOCK_TIMEOUT) - except Empty: - logging.error(__name__ + ' :: req_cb_get_cache_keys -' - ' Block time expired.') - val = [] - finally: - lock.release() - return val - - -def req_cb_get_is_running(key, lock): - lock.acquire() - req_notification_queue_in.put([2, key], True) - try: - val = req_notification_queue_out.get(block=True, - timeout=BLOCK_TIMEOUT)[0] - except Empty: - logging.error(__name__ + ' :: req_cb_get_is_running -' - ' Block time expired.') - val = False - finally: - lock.release() - return val - - -def req_cb_add_req(key, url, lock): - lock.acquire() - try: - req_notification_queue_in.put([0, key, url]) - except Empty: - pass - finally: - lock.release() - - -def req_cb_flag_job_complete(key, lock): - lock.acquire() - try: - req_notification_queue_in.put([1, key], True) - except Empty: - pass - finally: - lock.release() diff --git a/user_metrics/api/engine/response_handler.py b/user_metrics/api/engine/response_handler.py index 5406d7e..d7a4738 100644 --- a/user_metrics/api/engine/response_handler.py +++ b/user_metrics/api/engine/response_handler.py @@ -8,12 +8,10 @@ __date__ = "2013-03-14" __license__ = "GPL (version 2 or later)" -from collections import OrderedDict from user_metrics.config import logging from user_metrics.api import REQ_NCB_LOCK from user_metrics.api.engine.request_meta import rebuild_unpacked_request from user_metrics.api.engine.data import set_data, build_key_signature -from user_metrics.api.engine.request_manager import req_cb_flag_job_complete from Queue import Empty from flask import escape @@ -73,8 +71,7 @@ key_sig = build_key_signature(request_meta, hash_result=True) - # Set request in list to "not alive" - req_cb_flag_job_complete(key_sig, REQ_NCB_LOCK) + # Add result to cache once completed logging.debug(log_name + ' - Setting data for {0}'.format( str(request_meta))) -- To view, visit https://gerrit.wikimedia.org/r/73721 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I7b0764d42ad85afaa98533ae0ff71a1c5c9508fe Gerrit-PatchSet: 1 Gerrit-Project: analytics/user-metrics Gerrit-Branch: repair_runtime Gerrit-Owner: Rfaulk <rfaulk...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits