Hi all,

since our 1.1 migration we've had some misterious behaviours with
bibsched that, apparently, nobody else has.  It may be related to having
more than one Invenio in a single system, but I haven't been able to
prove it.

One very curious behaviour is that the first task after midnight
switches to SCHEDULED state but doesn't run.  On the test server it
happens to Traces, and on the production it happens to DDD.  No matter
how many hours I spend (and I have spent many!) finding why and how, the
mystery continues.

I get the friendly daily mail as such:

 Emergency from http://ddd.uab.cat: BibSched halted: Process bibsort
 (task_id: 140997) was launched but seems not to be able to reach
 RUNNING status.

Anyhow, I needed a mechanism to automate my daily manual task to put
bibsched into manual mode, know which is the task in SCHEDULED state,
run it and put bibsched back to automatic mode.

I have been patching bibsched to allow, at least, those basic scripting
capabilities.  I don't know how many more tasks do I need (acKnowledge
errors, maybe?).  I'm unsure on the names I have choosen.

As I feel that it may be useful to somebody else, so I submit it for
your consideration.

Comments welcome,

Ferran
BibSched: enable scripting commands [DRAFT]

BibSched is a curses commands with only a few command-line options.  This
first patch adds a new command (appropiately called command) with two
options:

 --mode=[automatic, manual]
 --key=k:task_id

The first one allows to swith from manual to automatic modes, and the second
allows to apply commads to tasks.  Currently only one is implemented: R for
run.
---
 modules/bibsched/lib/bibsched.py |  155 +++++++++++++++++++++++++++++++++----
 1 files changed, 138 insertions(+), 17 deletions(-)

diff --git a/modules/bibsched/lib/bibsched.py b/modules/bibsched/lib/bibsched.py
index 01314ae..dfba9fb 100644
--- a/modules/bibsched/lib/bibsched.py
+++ b/modules/bibsched/lib/bibsched.py
@@ -31,6 +31,7 @@ import getopt
 from itertools import chain
 from socket import gethostname
 from subprocess import Popen
+from cStringIO import StringIO
 import signal
 
 from invenio.bibtask_config import \
@@ -281,9 +282,78 @@ def bibsched_send_signal(proc, task_id, sig):
             return False
     return False
 
+def parse_report_queue_status():
+    '''Get queue status parting the output of report_queue_status.
+
+    Returns: a dictionary with the numeric task_id as key and a
+    dictionary for each value
+    '''
+    # print "calling report_queue_status..."
+    out = StringIO()
+    report_queue_status(verbose=True, status=('WAITING', 'RUNNING', 'SCHEDULED'), stream=out)
+    report = out.getvalue()
+    tasks = {}
+    for line in report.split('\n'):
+        fields = {}
+        if '"' in line:
+            words = line.split('"')
+            while words:
+                word = words.pop(0)
+                if word.endswith('='):
+                    key = word[:-1].split()[-1]
+                    value = words.pop(0)
+                    fields[key] = value
+            key = int(fields['ID'])
+            tasks[key] = fields
+    return tasks
+
+
+def command(opt="", arg=""):
+    '''Check command parameters and call Manager with the appropiate values'''
+
+    print "opt = [%s] arg = [%s]" % (opt, arg)
+    if opt in ('-m', '--mode'):
+        arg = arg.upper()
+        if 'AUTOMATIC'.startswith(arg):
+            mode = 'A'
+        elif 'MANUAL'.startswith(arg):
+            mode = 'M'
+        else:
+            mode = None
+            print >>sys.stderr,'Unknown mode: %s' % (arg)
+            sys.exit(1)
+        if mode:
+            print 'Manager, mode = %s' % (mode)
+            print 'redirect...'
+            # old_stdout, old_stderr = redirect_stdout_and_stderr()
+            old_stdout = sys.stdout
+            Manager(old_stdout, mode)
+    elif opt in ('-k', '--key'):
+        if arg.count(':') != 1:
+            print >>sys.stderr, "Error: syntax: K:task_id"
+            sys.exit(1)
+        else:
+            (cmd, task_id) = arg.split(':')
+            if len(cmd) == 1:
+                cmd = cmd.upper()
+            else:
+                print >>sys.stderr, "Error: Key must be single character"
+                sys.exit(1)
+            if not task_id.isdigit():
+                prit >>sys.stderr, "Error: task id not numeric"
+                sys.exit(1)
+        print 'Manager, command = %s. [%s] [%s]' % (arg, cmd, task_id)
+        print 'redirect...'
+        # old_stdout, old_stderr = redirect_stdout_and_stderr()
+        old_stdout = sys.stdout
+        task_id = int(task_id)
+        if cmd == "R":
+            Manager(old_stdout, cmd, task_id)
+
+
 
 class Manager(object):
-    def __init__(self, old_stdout):
+    def __init__(self, old_stdout, key='', task_id=0):
         import curses
         import curses.panel
         from curses.wrapper import wrapper
@@ -316,8 +386,40 @@ class Manager(object):
                 self.header_lines = 3
         except IOError:
             self.motd = ""
-        self.selected_line = self.header_lines
-        wrapper(self.start)
+        if key:
+            self.command(key, task_id)
+        else:
+            self.selected_line = self.header_lines
+            wrapper(self.start)
+
+    def command(self, key, task_id):
+        '''Dispatch appropiate command-line command'''
+
+        mode = server_pid() and "AUTOMATIC" or "MANUAL"
+        if key == 'A':
+            if mode == "MANUAL":
+                try:
+                    self.change_auto_mode()
+                except AttributeError:
+                    pass # not in curses, ignore missing stdscr attribute
+        elif key == 'M':
+            if mode == "AUTOMATIC":
+                self.auto_mode = 1
+                try:
+                    self.change_auto_mode()
+                except AttributeError:
+                    pass # not in curses, ignore missing stdscr attribute
+        elif key == 'R':
+            if mode == "MANUAL":
+                try:
+                    print "running [%s]" % (task_id)
+                    self.run(task_id)
+                except AttributeError:
+                    pass # not in curses, ignore missing stdscr attribute
+            else:
+                print >>sys.stderr,'Cannot run a task in automatic mode'
+                sys.exit(1)
+        sys.exit()
 
     def handle_keys(self, char):
         if char == -1:
@@ -642,10 +744,16 @@ order to let this task run. The current priority is %s. New value:" \
             gc_tasks()
             self.display_in_footer("DONE processes purged")
 
-    def run(self):
-        task_id = self.currentrow[0]
-        process = self.currentrow[1].split(':')[0]
-        status = self.currentrow[5]
+    def run(self, task_id=0):
+        if not task_id:
+            task_id = self.currentrow[0]
+            process = self.currentrow[1].split(':')[0]
+            status = self.currentrow[5]
+        else:
+            tasks = parse_report_queue_status()
+            status = tasks[task_id]['STATUS']
+            process = tasks[task_id]['PROC']
+
         #if self.count_processes('RUNNING') + self.count_processes('CONTINUING') >= 1:
             #self.display_in_footer("a process is already running!")
         if status == "WAITING":
@@ -1378,7 +1486,7 @@ def usage(exitcode=1, msg=""):
         sys.stderr.write("Error: %s.\n" % msg)
 
     sys.stderr.write("""\
-Usage: %s [options] [start|stop|restart|monitor|status]
+Usage: %s [options] [start|stop|restart|monitor|status|command]
 
 The following commands are available for bibsched:
 
@@ -1388,6 +1496,7 @@ The following commands are available for bibsched:
    restart    restart running bibsched
    monitor    enter the interactive monitor
    status     get report about current status of the queue
+   command    enable command scripting
    purge      purge the scheduler queue from old tasks
 
 General options:
@@ -1401,6 +1510,9 @@ Status options:
   is all)
   -t, --tasks=LIST\t Comma separated list of BibTask to consider (default
                   \t is all)
+Command options:
+  -m, --mode=MODE\t Change mode (manual, automatic)
+  -k, --key=COMMAND:TASK\t Apply key to task (ex: R:123, K:234, etc)
 Purge options:
   -s, --status=LIST\t Which BibTask status should be considered (default is DONE)
   -S, --since=TIME\t Since how long time to consider tasks e.g.: 30m, 2h, 1d (default
@@ -1534,7 +1646,7 @@ def write_message(msg, stream=None, verbose=1):
     if stream is None:
         stream = sys.stdout
     if msg:
-        if stream == sys.stdout or stream == sys.stderr:
+        if stream:
             stream.write(time.strftime("%Y-%m-%d %H:%M:%S --> ",
                 time.localtime()))
             try:
@@ -1546,7 +1658,7 @@ def write_message(msg, stream=None, verbose=1):
             sys.stderr.write("Unknown stream %s.  [must be sys.stdout or sys.stderr]\n" % stream)
 
 
-def report_queue_status(verbose=True, status=None, since=None, tasks=None):
+def report_queue_status(verbose=True, status=None, since=None, tasks=None, stream=None):
     """
     Report about the current status of BibSched queue on standard output.
     """
@@ -1578,26 +1690,26 @@ def report_queue_status(verbose=True, status=None, since=None, tasks=None):
                         },
                     (status,))
 
-        write_message("%s processes: %d" % (status, len(res)))
+        write_message("%s processes: %d" % (status, len(res)), stream)
         for (proc_id, proc_proc, proc_user, proc_runtime, proc_sleeptime,
              proc_status, proc_progress, proc_priority) in res:
             write_message(' * ID="%s" PRIORITY="%s" PROC="%s" USER="%s" ' \
                           'RUNTIME="%s" SLEEPTIME="%s" STATUS="%s" ' \
                           'PROGRESS="%s"' % (proc_id,
                           proc_priority, proc_proc, proc_user, proc_runtime,
-                          proc_sleeptime, proc_status, proc_progress))
+                          proc_sleeptime, proc_status, proc_progress), stream)
         return
 
-    write_message("BibSched queue status report for %s:" % gethostname())
+    write_message("BibSched queue status report for %s:" % gethostname(), stream)
     mode = server_pid() and "AUTOMATIC" or "MANUAL"
-    write_message("BibSched queue running mode: %s" % mode)
+    write_message("BibSched queue running mode: %s" % mode, stream)
     if status is None:
         report_about_processes('Running', since, tasks)
         report_about_processes('Waiting', since, tasks)
     else:
         for state in status:
             report_about_processes(state, since, tasks)
-    write_message("Done.")
+    write_message("Done.", stream)
 
 
 def restart(verbose=True, debug=False):
@@ -1660,8 +1772,8 @@ def main():
     debug = False
 
     try:
-        opts, args = getopt.gnu_getopt(sys.argv[1:], "hVdqS:s:t:", [
-            "help", "version", "debug", "quiet", "since=", "status=", "task="])
+        opts, args = getopt.gnu_getopt(sys.argv[1:], "hVdqcS:s:t:m:k:", [
+            "help", "version", "debug", "quiet", "command", "since=", "status=", "task=", "mode=", "key="])
     except getopt.GetoptError, err:
         Log("Error: %s" % err)
         usage(1, err)
@@ -1689,6 +1801,12 @@ def main():
         elif opt in ['-d', '--debug']:
             debug = True
 
+        elif opt in ['-m', '--mode']:
+            pass
+
+        elif opt in ['-k', '--key']:
+            pass
+
         else:
             usage(1)
 
@@ -1702,6 +1820,9 @@ def main():
             {'status' : report_queue_status,
               'purge' : gc_tasks,
             }[cmd](verbose, status, since, tasks)
+        elif cmd == 'command':
+            {'command': command,
+            }[cmd](opt, arg)
         else:
             {'start': start,
             'halt': halt,

Reply via email to