Hi all, since our 1.1 migration we've had some misterious behaviours with bibsched that, apparently, nobody else has. It may be related to having more than one Invenio in a single system, but I haven't been able to prove it.
One very curious behaviour is that the first task after midnight switches to SCHEDULED state but doesn't run. On the test server it happens to Traces, and on the production it happens to DDD. No matter how many hours I spend (and I have spent many!) finding why and how, the mystery continues. I get the friendly daily mail as such: Emergency from http://ddd.uab.cat: BibSched halted: Process bibsort (task_id: 140997) was launched but seems not to be able to reach RUNNING status. Anyhow, I needed a mechanism to automate my daily manual task to put bibsched into manual mode, know which is the task in SCHEDULED state, run it and put bibsched back to automatic mode. I have been patching bibsched to allow, at least, those basic scripting capabilities. I don't know how many more tasks do I need (acKnowledge errors, maybe?). I'm unsure on the names I have choosen. As I feel that it may be useful to somebody else, so I submit it for your consideration. Comments welcome, Ferran
BibSched: enable scripting commands [DRAFT] BibSched is a curses commands with only a few command-line options. This first patch adds a new command (appropiately called command) with two options: --mode=[automatic, manual] --key=k:task_id The first one allows to swith from manual to automatic modes, and the second allows to apply commads to tasks. Currently only one is implemented: R for run. --- modules/bibsched/lib/bibsched.py | 155 +++++++++++++++++++++++++++++++++---- 1 files changed, 138 insertions(+), 17 deletions(-) diff --git a/modules/bibsched/lib/bibsched.py b/modules/bibsched/lib/bibsched.py index 01314ae..dfba9fb 100644 --- a/modules/bibsched/lib/bibsched.py +++ b/modules/bibsched/lib/bibsched.py @@ -31,6 +31,7 @@ import getopt from itertools import chain from socket import gethostname from subprocess import Popen +from cStringIO import StringIO import signal from invenio.bibtask_config import \ @@ -281,9 +282,78 @@ def bibsched_send_signal(proc, task_id, sig): return False return False +def parse_report_queue_status(): + '''Get queue status parting the output of report_queue_status. + + Returns: a dictionary with the numeric task_id as key and a + dictionary for each value + ''' + # print "calling report_queue_status..." + out = StringIO() + report_queue_status(verbose=True, status=('WAITING', 'RUNNING', 'SCHEDULED'), stream=out) + report = out.getvalue() + tasks = {} + for line in report.split('\n'): + fields = {} + if '"' in line: + words = line.split('"') + while words: + word = words.pop(0) + if word.endswith('='): + key = word[:-1].split()[-1] + value = words.pop(0) + fields[key] = value + key = int(fields['ID']) + tasks[key] = fields + return tasks + + +def command(opt="", arg=""): + '''Check command parameters and call Manager with the appropiate values''' + + print "opt = [%s] arg = [%s]" % (opt, arg) + if opt in ('-m', '--mode'): + arg = arg.upper() + if 'AUTOMATIC'.startswith(arg): + mode = 'A' + elif 'MANUAL'.startswith(arg): + mode = 'M' + else: + mode = None + print >>sys.stderr,'Unknown mode: %s' % (arg) + sys.exit(1) + if mode: + print 'Manager, mode = %s' % (mode) + print 'redirect...' + # old_stdout, old_stderr = redirect_stdout_and_stderr() + old_stdout = sys.stdout + Manager(old_stdout, mode) + elif opt in ('-k', '--key'): + if arg.count(':') != 1: + print >>sys.stderr, "Error: syntax: K:task_id" + sys.exit(1) + else: + (cmd, task_id) = arg.split(':') + if len(cmd) == 1: + cmd = cmd.upper() + else: + print >>sys.stderr, "Error: Key must be single character" + sys.exit(1) + if not task_id.isdigit(): + prit >>sys.stderr, "Error: task id not numeric" + sys.exit(1) + print 'Manager, command = %s. [%s] [%s]' % (arg, cmd, task_id) + print 'redirect...' + # old_stdout, old_stderr = redirect_stdout_and_stderr() + old_stdout = sys.stdout + task_id = int(task_id) + if cmd == "R": + Manager(old_stdout, cmd, task_id) + + class Manager(object): - def __init__(self, old_stdout): + def __init__(self, old_stdout, key='', task_id=0): import curses import curses.panel from curses.wrapper import wrapper @@ -316,8 +386,40 @@ class Manager(object): self.header_lines = 3 except IOError: self.motd = "" - self.selected_line = self.header_lines - wrapper(self.start) + if key: + self.command(key, task_id) + else: + self.selected_line = self.header_lines + wrapper(self.start) + + def command(self, key, task_id): + '''Dispatch appropiate command-line command''' + + mode = server_pid() and "AUTOMATIC" or "MANUAL" + if key == 'A': + if mode == "MANUAL": + try: + self.change_auto_mode() + except AttributeError: + pass # not in curses, ignore missing stdscr attribute + elif key == 'M': + if mode == "AUTOMATIC": + self.auto_mode = 1 + try: + self.change_auto_mode() + except AttributeError: + pass # not in curses, ignore missing stdscr attribute + elif key == 'R': + if mode == "MANUAL": + try: + print "running [%s]" % (task_id) + self.run(task_id) + except AttributeError: + pass # not in curses, ignore missing stdscr attribute + else: + print >>sys.stderr,'Cannot run a task in automatic mode' + sys.exit(1) + sys.exit() def handle_keys(self, char): if char == -1: @@ -642,10 +744,16 @@ order to let this task run. The current priority is %s. New value:" \ gc_tasks() self.display_in_footer("DONE processes purged") - def run(self): - task_id = self.currentrow[0] - process = self.currentrow[1].split(':')[0] - status = self.currentrow[5] + def run(self, task_id=0): + if not task_id: + task_id = self.currentrow[0] + process = self.currentrow[1].split(':')[0] + status = self.currentrow[5] + else: + tasks = parse_report_queue_status() + status = tasks[task_id]['STATUS'] + process = tasks[task_id]['PROC'] + #if self.count_processes('RUNNING') + self.count_processes('CONTINUING') >= 1: #self.display_in_footer("a process is already running!") if status == "WAITING": @@ -1378,7 +1486,7 @@ def usage(exitcode=1, msg=""): sys.stderr.write("Error: %s.\n" % msg) sys.stderr.write("""\ -Usage: %s [options] [start|stop|restart|monitor|status] +Usage: %s [options] [start|stop|restart|monitor|status|command] The following commands are available for bibsched: @@ -1388,6 +1496,7 @@ The following commands are available for bibsched: restart restart running bibsched monitor enter the interactive monitor status get report about current status of the queue + command enable command scripting purge purge the scheduler queue from old tasks General options: @@ -1401,6 +1510,9 @@ Status options: is all) -t, --tasks=LIST\t Comma separated list of BibTask to consider (default \t is all) +Command options: + -m, --mode=MODE\t Change mode (manual, automatic) + -k, --key=COMMAND:TASK\t Apply key to task (ex: R:123, K:234, etc) Purge options: -s, --status=LIST\t Which BibTask status should be considered (default is DONE) -S, --since=TIME\t Since how long time to consider tasks e.g.: 30m, 2h, 1d (default @@ -1534,7 +1646,7 @@ def write_message(msg, stream=None, verbose=1): if stream is None: stream = sys.stdout if msg: - if stream == sys.stdout or stream == sys.stderr: + if stream: stream.write(time.strftime("%Y-%m-%d %H:%M:%S --> ", time.localtime())) try: @@ -1546,7 +1658,7 @@ def write_message(msg, stream=None, verbose=1): sys.stderr.write("Unknown stream %s. [must be sys.stdout or sys.stderr]\n" % stream) -def report_queue_status(verbose=True, status=None, since=None, tasks=None): +def report_queue_status(verbose=True, status=None, since=None, tasks=None, stream=None): """ Report about the current status of BibSched queue on standard output. """ @@ -1578,26 +1690,26 @@ def report_queue_status(verbose=True, status=None, since=None, tasks=None): }, (status,)) - write_message("%s processes: %d" % (status, len(res))) + write_message("%s processes: %d" % (status, len(res)), stream) for (proc_id, proc_proc, proc_user, proc_runtime, proc_sleeptime, proc_status, proc_progress, proc_priority) in res: write_message(' * ID="%s" PRIORITY="%s" PROC="%s" USER="%s" ' \ 'RUNTIME="%s" SLEEPTIME="%s" STATUS="%s" ' \ 'PROGRESS="%s"' % (proc_id, proc_priority, proc_proc, proc_user, proc_runtime, - proc_sleeptime, proc_status, proc_progress)) + proc_sleeptime, proc_status, proc_progress), stream) return - write_message("BibSched queue status report for %s:" % gethostname()) + write_message("BibSched queue status report for %s:" % gethostname(), stream) mode = server_pid() and "AUTOMATIC" or "MANUAL" - write_message("BibSched queue running mode: %s" % mode) + write_message("BibSched queue running mode: %s" % mode, stream) if status is None: report_about_processes('Running', since, tasks) report_about_processes('Waiting', since, tasks) else: for state in status: report_about_processes(state, since, tasks) - write_message("Done.") + write_message("Done.", stream) def restart(verbose=True, debug=False): @@ -1660,8 +1772,8 @@ def main(): debug = False try: - opts, args = getopt.gnu_getopt(sys.argv[1:], "hVdqS:s:t:", [ - "help", "version", "debug", "quiet", "since=", "status=", "task="]) + opts, args = getopt.gnu_getopt(sys.argv[1:], "hVdqcS:s:t:m:k:", [ + "help", "version", "debug", "quiet", "command", "since=", "status=", "task=", "mode=", "key="]) except getopt.GetoptError, err: Log("Error: %s" % err) usage(1, err) @@ -1689,6 +1801,12 @@ def main(): elif opt in ['-d', '--debug']: debug = True + elif opt in ['-m', '--mode']: + pass + + elif opt in ['-k', '--key']: + pass + else: usage(1) @@ -1702,6 +1820,9 @@ def main(): {'status' : report_queue_status, 'purge' : gc_tasks, }[cmd](verbose, status, since, tasks) + elif cmd == 'command': + {'command': command, + }[cmd](opt, arg) else: {'start': start, 'halt': halt,