ArielGlenn has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/249754

Change subject: dumpadmin script: add "rerun" which reruns a broken job
......................................................................

dumpadmin script: add "rerun" which reruns a broken job

note that this reruns the job from the start, rather than
in just replacing broken files, in cases of jobs that produce
multiple small files (checkpoint or other)

Change-Id: I63d7eb071cd06a4695edde3a14f86194b8f9317a
---
M xmldumps-backup/dumpadmin.py
1 file changed, 100 insertions(+), 31 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/dumps 
refs/changes/54/249754/1

diff --git a/xmldumps-backup/dumpadmin.py b/xmldumps-backup/dumpadmin.py
index a39ef0e..0528db7 100644
--- a/xmldumps-backup/dumpadmin.py
+++ b/xmldumps-backup/dumpadmin.py
@@ -10,11 +10,12 @@
 import glob
 import socket
 import signal
-from dumps.utils import RunInfoFile, Chunk
-from dumps.runnerutils import Checksummer, Status, NoticeFile, SymLinks
+import traceback
+from dumps.utils import RunInfoFile
+from dumps.runnerutils import NoticeFile
 from dumps.jobs import DumpDir
-from worker import DumpItemList
-from dumps.WikiDump import Wiki, Config
+from worker import Runner
+from dumps.WikiDump import Wiki, Config, TimeUtils
 
 
 def command_has_wiki(pid, wikiname):
@@ -183,6 +184,8 @@
                 self.do_unlock()
             elif item == "remove":
                 self.do_remove()
+            elif item == "rerun":
+                self.do_rerun()
             elif item == "maintenance":
                 self.do_maintenance()
             elif item == "exit":
@@ -253,7 +256,8 @@
                             for field in fields:
                                 if field.startswith("DUMPS="):
                                     # if no wiki specified for instance, get 
procs for all
-                                    if self.wikiname is None or 
command_has_wiki(process_id, self.wikiname):
+                                    if self.wikiname is None or 
command_has_wiki(
+                                            process_id, self.wikiname):
                                         pids.append(process_id)
                                     break
                     process_environ.close()
@@ -334,7 +338,10 @@
             print "failed dumps info:", failed_dumps
         return failed_dumps
 
-    def do_remove(self):
+    def do_rerun(self):
+        self.do_remove(rerun=True)
+
+    def do_remove(self, rerun=False):
         '''
         find all failed dump jobs for unlocked wikis
         clean them up after getting lock on each one
@@ -354,28 +361,30 @@
                 except:
                     sys.stderr.write("Couldn't lock %s, can't do cleanup\n" % 
wikiname)
                     continue
-                self.cleanup_dump(wiki, failed_dumps[wikiname][date])
+                self.cleanup_dump(wiki, failed_dumps[wikiname][date], 
rerun=rerun)
                 wiki.unlock()
 
-    def cleanup_dump(self, wiki, failed_jobs):
+    def cleanup_dump(self, wiki, failed_jobs, rerun=False):
         '''
         for the specified wiki, and the given list
         of failed jobs, find all the output files, toss
         them, then rebuild: md5sums file, symlinks
         into latest dir, dump run info file
         '''
-        chunk_info = Chunk(wiki, wiki.db_name)
-        dump_dir = DumpDir(wiki, wiki.db_name)
-        run_info_file = RunInfoFile(wiki, True)
-        dump_item_list = DumpItemList(wiki, False, False, False, None, None,
-                                      True, chunk_info, None, run_info_file, 
dump_dir)
+        # need to update status files, dumpruninfo, checksums file
+        # and latest links.
+        runner = Runner(wiki, prefetch=True, spawn=True, job=None,
+                        skip_jobs=[], restart=False, notice="", dryrun=False,
+                        logging_enabled=False, chunk_to_do=False, 
checkpoint_file=None,
+                        page_id_range=None, skipdone=[], verbose=self.verbose)
+
         if not failed_jobs:
             if self.verbose:
                 print "no failed jobs for wiki", wiki
 
         for job in failed_jobs:
-            files = get_job_output_files(wiki, job, dump_item_list.dump_items)
-            paths = [dump_dir.filename_public_path(fileinfo) for fileinfo in 
files]
+            files = get_job_output_files(wiki, job, 
runner.dump_item_list.dump_items)
+            paths = [runner.dump_dir.filename_public_path(fileinfo) for 
fileinfo in files]
             if self.verbose:
                 print "for job", job, "these are the output files:", paths
             for filename in paths:
@@ -392,23 +401,83 @@
             print "status file, index.html file and symlinks to latest dir"
             return
 
-        # need to update status files, dumpruninfo, checksums file
-        # and latest links.
-        checksums = Checksummer(wiki, dump_dir, True, False)
-        html_notice_file = NoticeFile(wiki, "", True)
-        status = Status(wiki, dump_dir, dump_item_list.dump_items, checksums,
-                        True, False, html_notice_file, None, self.verbose)
-        if self.verbose:
-            print "updating status files for wiki", wiki.db_name
-        status.update_status_files()
-        run_info_file = RunInfoFile(wiki, True)
         if self.verbose:
             print "updating dump run info file for wiki", wiki.db_name
-        
run_info_file.save_dump_runinfo_file(dump_item_list.report_dump_runinfo())
-        symlinks = SymLinks(wiki, dump_dir, False, False, True)
+        
runner.runinfo_file.save_dump_runinfo_file(runner.dump_item_list.report_dump_runinfo())
+
         if self.verbose:
             print "updating symlinks for wiki", wiki.db_name
-        symlinks.cleanup_symlinks()
+        runner.sym_links.cleanup_symlinks()
+
+        if self.verbose:
+            print "updating status files for wiki", wiki.db_name
+        runner.status.update_status_files()
+
+        if rerun:
+            for job in failed_jobs:
+                runner.dump_item_list.mark_dumps_to_run(job)
+            self.rerun_jobs(runner)
+
+    def log_and_print(self, message):
+        sys.stderr.write("%s\n" % message)
+
+    def debug(self, stuff):
+        self.log_and_print("%s: %s" % (TimeUtils.pretty_time(), stuff))
+
+    def rerun_jobs(self, runner):
+        runner.checksums.prepare_checksums()
+        for item in runner.dump_item_list.dump_items:
+            if item.to_run():
+                item.start()
+                runner.status.update_status_files()
+                runner.runinfo_file.save_dump_runinfo_file(
+                    runner.dump_item_list.report_dump_runinfo())
+                try:
+                    item.dump(runner)
+                except Exception, ex:
+                    exc_type, exc_value, exc_traceback = sys.exc_info()
+                    if self.verbose:
+                        sys.stderr.write(repr(traceback.format_exception(
+                            exc_type, exc_value, exc_traceback)))
+                    else:
+                        if exc_type.__name__ == 'BackupPrereqError':
+                            self.debug(str(ex))
+                        else:
+                            self.debug("*** exception! " + str(ex))
+                            if exc_type.__name__ != 'BackupPrereqError':
+                                item.set_status("failed")
+
+                # Here for example status is "failed". But maybe also
+                # "in-progress", if an item chooses to override dump(...) and
+                # forgets to set the status. This is a failure as well.
+                if item.status() not in ["done", "waiting", "skipped"]:
+                    runner.status.report_failure()
+                    runner.status.fail_count += 1
+
+            if item.status() == "done":
+                runner.checksums.cp_chksum_tmpfiles_to_permfile()
+                runner.run_update_item_fileinfo(item)
+            elif item.status() == "waiting" or item.status() == "skipped":
+                # don't update the checksum files for this item.
+                continue
+            else:
+                # failure
+                # preexisting failures can be ignored, a failure from this
+                # job run has already been alerted
+                continue
+
+        if runner.dump_item_list.all_possible_jobs_done():
+            # All jobs are either in status "done", "waiting", "failed", 
"skipped"
+            runner.status.update_status_files("done")
+        else:
+            # This may happen if we start a dump now and abort before all 
items are
+            # done. Then some are left for example in state "waiting". When
+            # afterwards running a specific job, all (but one) of the jobs
+            # previously in "waiting" are still in status "waiting"
+            runner.status.update_status_files("partialdone")
+            
runner.runinfo_file.save_dump_runinfo_file(runner.dump_item_list.report_dump_runinfo())
+
+        runner.checksums.move_chksumfiles_into_place()
 
     def do_maintenance(self):
         '''
@@ -639,7 +708,7 @@
     '''
     return action correspodning to command line option
     '''
-    action_options = ['kill', 'unlock', 'remove', 'maintenance', 'exit']
+    action_options = ['kill', 'unlock', 'remove', 'rerun', 'maintenance', 
'exit']
     if option.startswith("--"):
         option = option[2:]
         if option in action_options:
@@ -665,9 +734,9 @@
     wiki = None
 
     try:
-        (options, remainder) = getopt.gnu_getopt(sys.argv[1:], 
"c:n:U:w:kurmedvh",
+        (options, remainder) = getopt.gnu_getopt(sys.argv[1:], 
"c:n:U:w:kurRmedvh",
                                                  ["configfile=", "notice=", 
"no=", "undo=",
-                                                  "wiki=", "kill", "unlock", 
"remove",
+                                                  "wiki=", "kill", "unlock", 
"remove", "rerun",
                                                   "maintenance", "exit", 
"dryrun",
                                                   "verbose", "help"])
     except getopt.GetoptError as err:

-- 
To view, visit https://gerrit.wikimedia.org/r/249754
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I63d7eb071cd06a4695edde3a14f86194b8f9317a
Gerrit-PatchSet: 1
Gerrit-Project: operations/dumps
Gerrit-Branch: ariel
Gerrit-Owner: ArielGlenn <ar...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to