ArielGlenn has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/345985 )

Change subject: retry failed page content pieces immediately after page content 
step completes
......................................................................


retry failed page content pieces immediately after page content step completes

This means we don't regenerate partial stubs, we just put the failed
commands back in the queue and retry until success or we hit max retry
limit.

There are several new config settings as a result of this update.

Bug: T160507
Change-Id: I7db012f3ca8670e3274d33c7ff860962a7eae8f2
---
M xmldumps-backup/defaults.conf
M xmldumps-backup/dumps/CommandManagement.py
M xmldumps-backup/dumps/WikiDump.py
M xmldumps-backup/dumps/apijobs.py
M xmldumps-backup/dumps/flowjob.py
M xmldumps-backup/dumps/pagerange.py
M xmldumps-backup/dumps/recombinejobs.py
M xmldumps-backup/dumps/recompressjobs.py
M xmldumps-backup/dumps/runner.py
M xmldumps-backup/dumps/tablesjobs.py
M xmldumps-backup/dumps/xmlcontentjobs.py
M xmldumps-backup/dumps/xmljobs.py
12 files changed, 72 insertions(+), 50 deletions(-)

Approvals:
  ArielGlenn: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/xmldumps-backup/defaults.conf b/xmldumps-backup/defaults.conf
index 4025b62..d17cbbb 100644
--- a/xmldumps-backup/defaults.conf
+++ b/xmldumps-backup/defaults.conf
@@ -62,6 +62,9 @@
 pagesPerChunkAbstract=0
 jobsperbatch=""
 revsPerJob=1000000
+maxRetries=0
+retryWait=30
+revsMargin=100
 
 [otherformats]
 multistream=0
diff --git a/xmldumps-backup/dumps/CommandManagement.py 
b/xmldumps-backup/dumps/CommandManagement.py
index 9b054bb..8877eca 100644
--- a/xmldumps-backup/dumps/CommandManagement.py
+++ b/xmldumps-backup/dumps/CommandManagement.py
@@ -173,11 +173,14 @@
                 return False
         return True
 
-    def exited_with_errors(self):
+    def exited_with_errors(self, stringfmt=True):
         if not self.exited_successfully():
             # we wil return the whole pipeline I guess, they might as well
             # see it in the error report instead of the specific issue in the 
pipe.
-            return self.pipeline_string()
+            if stringfmt:
+                return self.pipeline_string()
+            else:
+                return self._commands
         return None
 
     # Checks the exit values of the individual commands in the
@@ -352,12 +355,12 @@
                 return False
         return True
 
-    def exited_with_errors(self):
+    def exited_with_errors(self, stringfmt=True):
         """Return list of commands that exited with errors."""
         commands = []
         for pipeline in self._command_pipelines:
             if not pipeline.exited_successfully():
-                command = pipeline.exited_with_errors()
+                command = pipeline.exited_with_errors(stringfmt)
                 if command is not None:
                     commands.append(command)
         return commands
@@ -575,11 +578,11 @@
                 return False
         return True
 
-    def commands_with_errors(self):
+    def commands_with_errors(self, stringfmt=True):
         commands = []
         for series in self._command_serieses:
             if not series.exited_successfully():
-                commands.extend(series.exited_with_errors())
+                commands.extend(series.exited_with_errors(stringfmt))
         return commands
 
     def watch_output_queue(self):
diff --git a/xmldumps-backup/dumps/WikiDump.py 
b/xmldumps-backup/dumps/WikiDump.py
index 9c4801e..09d3b6a 100644
--- a/xmldumps-backup/dumps/WikiDump.py
+++ b/xmldumps-backup/dumps/WikiDump.py
@@ -210,6 +210,12 @@
             "chunks", "checkpointTime", 1)
         self.revs_per_job = self.get_opt_for_proj_or_default(
             "chunks", "revsPerJob", 1)
+        self.max_retries = self.get_opt_for_proj_or_default(
+            "chunks", "maxRetries", 1)
+        self.retry_wait = self.get_opt_for_proj_or_default(
+            "chunks", "retryWait", 1)
+        self.revs_margin = self.get_opt_for_proj_or_default(
+            "chunks", "revsMargin", 1)
 
         if not self.conf.has_section('otherformats'):
             self.conf.add_section('otherformats')
diff --git a/xmldumps-backup/dumps/apijobs.py b/xmldumps-backup/dumps/apijobs.py
index 089a4da..341be04 100644
--- a/xmldumps-backup/dumps/apijobs.py
+++ b/xmldumps-backup/dumps/apijobs.py
@@ -27,17 +27,17 @@
             raise BackupError("siteinfo dump %s trying to produce more than 
one file" %
                               self.dumpname)
         output_dfname = dfnames[0]
-        error = self.get_siteinfo(
+        error, broken = self.get_siteinfo(
             runner.dump_dir.filename_public_path(output_dfname), runner)
         while error and retries < maxretries:
             retries = retries + 1
             time.sleep(5)
-            error = self.get_siteinfo(
+            error, broken = self.get_siteinfo(
                 runner.dump_dir.filename_public_path(output_dfname), runner)
         if error:
             raise BackupError("error dumping siteinfo props %s" % 
','.join(self._properties))
 
-    # returns 0 on success, 1 on error
+    # returns 0, None on success, 1, commands on error
     def get_siteinfo(self, outfile, runner):
         """Dump siteinfo properties via the MediaWiki api in json format and 
save."""
         commands = self.build_api_command(runner)
diff --git a/xmldumps-backup/dumps/flowjob.py b/xmldumps-backup/dumps/flowjob.py
index 53d808f..9b7a70a 100644
--- a/xmldumps-backup/dumps/flowjob.py
+++ b/xmldumps-backup/dumps/flowjob.py
@@ -53,7 +53,7 @@
 
         pipeline = [command]
         series = [pipeline]
-        error = runner.run_command([series], 
callback_stderr=self.progress_callback,
-                                   callback_stderr_arg=runner)
+        error, broken = runner.run_command([series], 
callback_stderr=self.progress_callback,
+                                           callback_stderr_arg=runner)
         if error:
             raise BackupError("error dumping flow page files")
diff --git a/xmldumps-backup/dumps/pagerange.py 
b/xmldumps-backup/dumps/pagerange.py
index 8c0c7a5..6d2861f 100644
--- a/xmldumps-backup/dumps/pagerange.py
+++ b/xmldumps-backup/dumps/pagerange.py
@@ -298,8 +298,7 @@
             prevguess = badguess
 
             margin = abs(revcount - numrevs)
-            # FIXME configurable?
-            if margin <= 100:
+            if margin <= self.qrunner.wiki.config.revs_margin:
                 return (page_start, badguess)
             if self.verbose:
                 print "revcount is greater than allowed margin from numrevs"
diff --git a/xmldumps-backup/dumps/recombinejobs.py 
b/xmldumps-backup/dumps/recombinejobs.py
index 19ec12a..f1eb0a3 100644
--- a/xmldumps-backup/dumps/recombinejobs.py
+++ b/xmldumps-backup/dumps/recombinejobs.py
@@ -75,8 +75,8 @@
             recombine_command = [recombine_command_string]
             recombine_pipeline = [recombine_command]
             series = [recombine_pipeline]
-            result = runner.run_command([series], 
callback_timed=self.progress_callback,
-                                        callback_timed_arg=runner, shell=True)
+            result, broken = runner.run_command([series], 
callback_timed=self.progress_callback,
+                                                callback_timed_arg=runner, 
shell=True)
             if result:
                 error = result
         if error:
@@ -124,7 +124,7 @@
         recombine_command = [recombine_command_string]
         recombine_pipeline = [recombine_command]
         series = [recombine_pipeline]
-        error = runner.run_command(
+        error, broken = runner.run_command(
             [series], callback_timed=self.progress_callback,
             callback_timed_arg=runner, shell=True)
 
@@ -176,7 +176,7 @@
             recombine_command = [recombine_command_string]
             recombine_pipeline = [recombine_command]
             series = [recombine_pipeline]
-            result = runner.run_command(
+            result, broken = runner.run_command(
                 [series], callback_timed=self.progress_callback,
                 callback_timed_arg=runner, shell=True)
             if result:
@@ -225,8 +225,8 @@
             recombine_command = [recombine_command_string]
             recombine_pipeline = [recombine_command]
             series = [recombine_pipeline]
-            result = runner.run_command([series], 
callback_timed=self.progress_callback,
-                                        callback_timed_arg=runner, shell=True)
+            result, broken = runner.run_command([series], 
callback_timed=self.progress_callback,
+                                                callback_timed_arg=runner, 
shell=True)
             if result:
                 error = result
         if error:
diff --git a/xmldumps-backup/dumps/recompressjobs.py 
b/xmldumps-backup/dumps/recompressjobs.py
index 7569fe6..544b580 100644
--- a/xmldumps-backup/dumps/recompressjobs.py
+++ b/xmldumps-backup/dumps/recompressjobs.py
@@ -135,8 +135,8 @@
             series = self.build_command(runner, output_dfnames)
             commands.append(series)
 
-        error = runner.run_command(commands, 
callback_timed=self.progress_callback,
-                                   callback_timed_arg=runner, shell=True)
+        error, broken = runner.run_command(commands, 
callback_timed=self.progress_callback,
+                                           callback_timed_arg=runner, 
shell=True)
         if error:
             raise BackupError("error recompressing bz2 file(s)")
 
@@ -313,8 +313,8 @@
             series = self.build_command(runner, output_dfnames)
             commands.append(series)
 
-        error = runner.run_command(commands, 
callback_timed=self.progress_callback,
-                                   callback_timed_arg=runner, shell=True)
+        error, broken = runner.run_command(commands, 
callback_timed=self.progress_callback,
+                                           callback_timed_arg=runner, 
shell=True)
         if error:
             raise BackupError("error recompressing bz2 file(s)")
 
diff --git a/xmldumps-backup/dumps/runner.py b/xmldumps-backup/dumps/runner.py
index 4b1357b..4a46e02 100644
--- a/xmldumps-backup/dumps/runner.py
+++ b/xmldumps-backup/dumps/runner.py
@@ -593,7 +593,7 @@
         self.statushtml.update_status_file()
         self.runstatus_updater.write_statusapi_file()
 
-    # returns 0 on success, 1 on error
+    # returns 0, None on success, 1, commands on error
     def save_command(self, commands, outfile):
         """For one pipeline of commands, redirect output to a given file."""
         commands[-1].extend([">", outfile])
@@ -646,14 +646,14 @@
                                           shell=shell, 
callback_interval=callback_interval)
             commands.run_commands()
             if commands.exited_successfully():
-                return 0
+                return 0, None
             else:
                 problem_commands = commands.commands_with_errors()
                 error_string = "Error from command(s): "
                 for cmd in problem_commands:
                     error_string = error_string + "%s " % cmd
                 self.log_and_print(error_string)
-                return 1
+                return 1, commands.commands_with_errors(stringfmt=False)
 
     def debug(self, stuff):
         self.log_and_print("%s: %s %s" % (TimeUtils.pretty_time(), 
self.db_name, stuff))
diff --git a/xmldumps-backup/dumps/tablesjobs.py 
b/xmldumps-backup/dumps/tablesjobs.py
index 231ab11..ebc5832 100644
--- a/xmldumps-backup/dumps/tablesjobs.py
+++ b/xmldumps-backup/dumps/tablesjobs.py
@@ -35,12 +35,12 @@
         if len(dfnames) > 1:
             raise BackupError("table dump %s trying to produce more than one 
file" % self.dumpname)
         output_dfname = dfnames[0]
-        error = self.save_table(
+        error, broken = self.save_table(
             self._table, runner.dump_dir.filename_public_path(output_dfname), 
runner)
         while error and retries < maxretries:
             retries = retries + 1
             time.sleep(5)
-            error = self.save_table(
+            error, broken = self.save_table(
                 self._table, 
runner.dump_dir.filename_public_path(output_dfname), runner)
         if error:
             raise BackupError("error dumping table %s" % self._table)
@@ -82,12 +82,12 @@
         if len(dfnames) > 1:
             raise BackupError("table dump %s trying to produce more than one 
file" % self.dumpname)
         output_dfname = dfnames[0]
-        error = self.save_table(
+        error, broken = self.save_table(
             self._table, runner.dump_dir.filename_private_path(output_dfname), 
runner)
         while error and retries < maxretries:
             retries = retries + 1
             time.sleep(5)
-            error = self.save_table(
+            error, broken = self.save_table(
                 self._table, 
runner.dump_dir.filename_private_path(output_dfname), runner)
         if error:
             raise BackupError("error dumping table %s" % self._table)
@@ -119,11 +119,11 @@
             raise BackupError("page title dump trying to produce more than one 
output file")
         dfname = dfnames[0]
         outpath = runner.dump_dir.filename_public_path(dfname)
-        error = self.save_sql(query, outpath, runner)
+        error, broken = self.save_sql(query, outpath, runner)
         while error and retries < maxretries:
             retries = retries + 1
             time.sleep(5)
-            error = self.save_sql(query, outpath, runner)
+            error, broken = self.save_sql(query, outpath, runner)
         if error:
             raise BackupError("error dumping titles list")
 
@@ -149,7 +149,7 @@
             raise BackupError("all titles dump trying to produce more than one 
output file")
         dfname = dfnames[0]
         outpath = runner.dump_dir.filename_public_path(dfname)
-        error = self.save_sql(query, outpath, runner)
+        error, broken = self.save_sql(query, outpath, runner)
         while error and retries < maxretries:
             retries = retries + 1
             time.sleep(5)
diff --git a/xmldumps-backup/dumps/xmlcontentjobs.py 
b/xmldumps-backup/dumps/xmlcontentjobs.py
index 9414c00..53cb3e3 100644
--- a/xmldumps-backup/dumps/xmlcontentjobs.py
+++ b/xmldumps-backup/dumps/xmlcontentjobs.py
@@ -4,6 +4,7 @@
 
 import os
 from os.path import exists
+import time
 
 from dumps.exceptions import BackupError
 from dumps.fileutils import DumpContents, DumpFilename
@@ -93,7 +94,7 @@
 
         pipeline = [command]
         series = [pipeline]
-        error = runner.run_command([series], shell=True)
+        error, broken = runner.run_command([series], shell=True)
         if error:
             raise BackupError("failed to write pagerange stub file %s" % 
output_dfname.filename)
 
@@ -674,20 +675,30 @@
         else:
             batchsize = 1
         errors = False
-        while commands:
+        failed_commands = []
+        max_retries = self.wiki.config.max_retries
+        retries = 0
+        while commands and (retries < max_retries or retries == 0):
             command_batch = commands[:batchsize]
-            error = runner.run_command(command_batch, 
callback_stderr=self.progress_callback,
-                                       callback_stderr_arg=runner)
-            # log individual batch failures, FIXME we should find out which 
command specifically
-            # failed
+            error, broken = runner.run_command(command_batch, 
callback_stderr=self.progress_callback,
+                                               callback_stderr_arg=runner)
             if error:
-                runner.log_and_print("error from commands: %s" % " ".join(
-                    entry for series in command_batch for pipeline in series
-                    for command in pipeline for entry in command))
+                for series in broken:
+                    for pipeline in series:
+                        runner.log_and_print("error from commands: %s" % " 
".join(
+                            [entry for entry in pipeline]))
+                failed_commands.append(broken)
                 errors = True
             commands = commands[batchsize:]
-        # FIXME we should accumulate all failures, wait a bit and retry them as
-        # batches in their own right
+            if not commands:
+                if failed_commands:
+                    retries += 1
+                    # retry failed commands
+                    commands = failed_commands
+                    failed_commands = []
+                    # no instant retries, give the servers a break
+                    time.sleep(self.wiki.config.retry_wait)
+                    errors = False
         if errors:
             raise BackupError("error producing xml file(s) %s" % 
self.get_dumpname())
 
diff --git a/xmldumps-backup/dumps/xmljobs.py b/xmldumps-backup/dumps/xmljobs.py
index ac66848..588fcb4 100644
--- a/xmldumps-backup/dumps/xmljobs.py
+++ b/xmldumps-backup/dumps/xmljobs.py
@@ -155,8 +155,8 @@
             for dfname in batch:
                 series = self.build_command(runner, dfname)
                 commands.append(series)
-            error = runner.run_command(commands, 
callback_stderr=self.progress_callback,
-                                       callback_stderr_arg=runner)
+            error, broken = runner.run_command(commands, 
callback_stderr=self.progress_callback,
+                                               callback_stderr_arg=runner)
             if error:
                 raise BackupError("error producing stub files")
 
@@ -200,8 +200,8 @@
 
         pipeline = [command]
         series = [pipeline]
-        error = runner.run_command([series], 
callback_stderr=self.progress_callback,
-                                   callback_stderr_arg=runner)
+        error, broken = runner.run_command([series], 
callback_stderr=self.progress_callback,
+                                           callback_stderr_arg=runner)
         if error:
             raise BackupError("error dumping log files")
 
@@ -280,8 +280,8 @@
             if dfname.dumpname == dumpname0:
                 series = self.build_command(runner, dfname)
                 commands.append(series)
-        error = runner.run_command(commands, 
callback_stderr=self.progress_callback,
-                                   callback_stderr_arg=runner)
+        error, broken = runner.run_command(commands, 
callback_stderr=self.progress_callback,
+                                           callback_stderr_arg=runner)
         if error:
             raise BackupError("error producing abstract dump")
 

-- 
To view, visit https://gerrit.wikimedia.org/r/345985
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I7db012f3ca8670e3274d33c7ff860962a7eae8f2
Gerrit-PatchSet: 2
Gerrit-Project: operations/dumps
Gerrit-Branch: master
Gerrit-Owner: ArielGlenn <ar...@wikimedia.org>
Gerrit-Reviewer: ArielGlenn <ar...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to