Rfaulk has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/73721


Change subject: rm. Request notification handler to maintain job queue.
......................................................................

rm. Request notification handler to maintain job queue.

Change-Id: I7b0764d42ad85afaa98533ae0ff71a1c5c9508fe
---
M user_metrics/api/engine/request_manager.py
M user_metrics/api/engine/response_handler.py
2 files changed, 1 insertion(+), 173 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/user-metrics 
refs/changes/21/73721/1

diff --git a/user_metrics/api/engine/request_manager.py 
b/user_metrics/api/engine/request_manager.py
index 3a5f814..dd33191 100644
--- a/user_metrics/api/engine/request_manager.py
+++ b/user_metrics/api/engine/request_manager.py
@@ -204,12 +204,6 @@
                 .format(rm.cohort_expr, rm.metric))
             wait_queue.append(rm)
 
-            # Communicate with request notification callback about new job
-            key_sig = build_key_signature(rm, hash_result=True)
-            url = get_url_from_keys(build_key_signature(rm), REQUEST_PATH)
-            req_cb_add_req(key_sig, url, REQ_NCB_LOCK)
-
-
     logging.debug('{0} - FINISHING.'.format(log_name))
 
 
@@ -476,166 +470,3 @@
             results['data'][m[0]] = m[1:]
 
     return results
-
-
-# REQUEST NOTIFICATIONS
-# #####################
-
-from collections import OrderedDict
-
-req_notification_queue_in = Queue()
-req_notification_queue_out = Queue()
-
-request_msg_type = namedtuple('RequestMessage', 'type hash url is_alive')
-
-
-def requests_notification_callback(msg_queue_in, msg_queue_out):
-    """
-        Asynchronous callback.  Tracks status of requests and new requests.
-        This callback utilizes ``msg_queue_in`` & ``msg_queue_out`` to
-        manage request status.
-    """
-    log_name = '{0} :: {1}'.format(__name__,
-                                   requests_notification_callback.__name__)
-    logging.debug('{0}  - STARTING...'.format(log_name))
-
-    # TODO - potentially extend with an in-memory cache
-    job_list = OrderedDict()
-    while 1:
-
-        try:
-            msg = msg_queue_in.get(True)
-        except IOError as e:
-            logging.error(__name__ + ' :: Could not block '
-                                     'on in queue: "{0}"'.format(e.message))
-            sleep(1)
-            continue
-
-        try:
-            type = msg[0]
-        except (KeyError, ValueError):
-            logging.error(log_name + ' - No valid type ' \
-                                     '{0}'.format(str(msg)))
-            continue
-
-        # Init request
-        if type == 0:
-            try:
-                job_list[msg[1]] = [True, msg[2]]
-                logging.debug(log_name + ' - Initialize Request: ' \
-                                         '{0}.'.format(str(msg)))
-            except Exception:
-                logging.error(log_name + ' - Initialize Request' \
-                                         ' failed: {0}'.format(str(msg)))
-
-        # Flag request complete - leave on queue
-        elif type == 1:
-            try:
-                job_list[msg[1]][0] = False
-                logging.debug(log_name + ' - Set request finished: ' \
-                                         '{0}.\n'.format(str(msg)))
-            except Exception:
-                logging.error(log_name + ' - Set request finished failed: ' \
-                                         '{0}\n'.format(str(msg)))
-
-        # Is the key in the cache and running?
-        elif type == 2:
-            try:
-                if msg[1] in job_list:
-                    msg_queue_out.put([job_list[msg[1]][0]], True)
-                else:
-                    msg_queue_out.put([False], True)
-                logging.debug(log_name + ' - Get request alive: ' \
-                                         '{0}.'.format(str(msg)))
-            except (KeyError, ValueError):
-                logging.error(log_name + ' - Get request alive failed: ' \
-                                         '{0}'.format(str(msg)))
-
-        # Get keys
-        elif type == 3:
-            msg_queue_out.put(job_list.keys(), True)
-
-        # Get url
-        elif type == 4:
-            try:
-                if msg[1] in job_list:
-                    msg_queue_out.put([job_list[msg[1]][1]], True)
-                else:
-                    logging.error(log_name + ' - Get URL failed: {0}'.
-                    format(str(msg)))
-            except (KeyError, ValueError):
-                logging.error(log_name + ' - Get URL failed: 
{0}'.format(str(msg)))
-        else:
-            logging.error(log_name + ' - Bad message: {0}'.format(str(msg)))
-
-    logging.debug('{0}  - SHUTTING DOWN...'.format(log_name))
-
-
-# Wrapper Methods for working with Request Notifications
-# Use locks to enforce atomicity
-
-BLOCK_TIMEOUT = 1
-
-
-def req_cb_get_url(key, lock):
-    lock.acquire()
-    req_notification_queue_in.put([4, key], block=True)
-    try:
-        val = req_notification_queue_out.get(True, timeout=BLOCK_TIMEOUT)[0]
-    except Empty:
-        logging.error(__name__ + ' :: req_cb_get_url -'
-                                 ' Block time expired.')
-        val = ''
-    finally:
-        lock.release()
-    return val
-
-
-def req_cb_get_cache_keys(lock):
-    lock.acquire()
-    req_notification_queue_in.put([3], block=True)
-    try:
-        val =  req_notification_queue_out.get(block=True,
-                                              timeout=BLOCK_TIMEOUT)
-    except Empty:
-        logging.error(__name__ + ' :: req_cb_get_cache_keys -'
-                                 ' Block time expired.')
-        val = []
-    finally:
-        lock.release()
-    return val
-
-
-def req_cb_get_is_running(key, lock):
-    lock.acquire()
-    req_notification_queue_in.put([2, key], True)
-    try:
-        val =  req_notification_queue_out.get(block=True,
-                                              timeout=BLOCK_TIMEOUT)[0]
-    except Empty:
-        logging.error(__name__ + ' :: req_cb_get_is_running -'
-                                 ' Block time expired.')
-        val = False
-    finally:
-        lock.release()
-    return val
-
-
-def req_cb_add_req(key, url, lock):
-    lock.acquire()
-    try:
-        req_notification_queue_in.put([0, key, url])
-    except Empty:
-        pass
-    finally:
-        lock.release()
-
-
-def req_cb_flag_job_complete(key, lock):
-    lock.acquire()
-    try:
-        req_notification_queue_in.put([1, key], True)
-    except Empty:
-        pass
-    finally:
-        lock.release()
diff --git a/user_metrics/api/engine/response_handler.py 
b/user_metrics/api/engine/response_handler.py
index 5406d7e..d7a4738 100644
--- a/user_metrics/api/engine/response_handler.py
+++ b/user_metrics/api/engine/response_handler.py
@@ -8,12 +8,10 @@
 __date__ = "2013-03-14"
 __license__ = "GPL (version 2 or later)"
 
-from collections import OrderedDict
 from user_metrics.config import logging
 from user_metrics.api import REQ_NCB_LOCK
 from user_metrics.api.engine.request_meta import rebuild_unpacked_request
 from user_metrics.api.engine.data import set_data, build_key_signature
-from user_metrics.api.engine.request_manager import req_cb_flag_job_complete
 from Queue import Empty
 from flask import escape
 
@@ -73,8 +71,7 @@
 
         key_sig = build_key_signature(request_meta, hash_result=True)
 
-        # Set request in list to "not alive"
-        req_cb_flag_job_complete(key_sig, REQ_NCB_LOCK)
+        # Add result to cache once completed
 
         logging.debug(log_name + ' - Setting data for {0}'.format(
             str(request_meta)))

-- 
To view, visit https://gerrit.wikimedia.org/r/73721
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7b0764d42ad85afaa98533ae0ff71a1c5c9508fe
Gerrit-PatchSet: 1
Gerrit-Project: analytics/user-metrics
Gerrit-Branch: repair_runtime
Gerrit-Owner: Rfaulk <rfaulk...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to