ArielGlenn has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/355077 )

Change subject: split up flow dumps into stubs and content passes
......................................................................

split up flow dumps into stubs and content passes

[WIP] during the stubs pass, revision metadata will be dumped with no
content; during the content phase, the previous successful dump's flow
content dump will be used for revision content it contains, and only
new revisions will be retrieved from the database

This change cannot be merged until after the relevant patches are made
to the Flow extension.

Bug: T164262
Change-Id: Ie473da574dd728cb41d30de9f213874eccfd1c1c
---
M xmldumps-backup/dumps/flowjob.py
M xmldumps-backup/dumps/runner.py
2 files changed, 116 insertions(+), 14 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/dumps 
refs/changes/77/355077/1

diff --git a/xmldumps-backup/dumps/flowjob.py b/xmldumps-backup/dumps/flowjob.py
index 9b7a70a..1c60a61 100644
--- a/xmldumps-backup/dumps/flowjob.py
+++ b/xmldumps-backup/dumps/flowjob.py
@@ -6,13 +6,20 @@
 from dumps.exceptions import BackupError
 from dumps.utils import MultiVersion
 from dumps.jobs import Dump
+from dumps.jobs import get_reg_files
 
 
 class FlowDump(Dump):
     """Dump the flow pages."""
 
-    def __init__(self, name, desc, history=False):
+    def __init__(self, name, desc, item_for_stubs=None, history=False, 
stubs=False, prefetch=False):
         self.history = history
+        self.stubs = stubs
+        self.prefetch = prefetch
+        if item_for_stubs is None:
+            self._prerequisite_items = []
+        else:
+            self._prerequisite_items = [item_for_stubs]
         Dump.__init__(self, name, desc)
 
     def detail(self):
@@ -22,13 +29,81 @@
         return "xml"
 
     def get_file_ext(self):
-        return "bz2"
+        if self.stubs:
+            return "gz"
+        else:
+            return "bz2"
 
     def get_dumpname(self):
+        dumpname = 'flow'
+        if self.stubs:
+            dumpname += 'stubs'
         if self.history:
-            return 'flowhistory'
+            dumpname += 'history'
+        return dumpname
+
+    def get_stub_dfname(self, runner):
+        '''
+        get the stub file pertaining to our dumpname (one of flow, flowhistory)
+
+        arguments:
+           Runner
+        returns:
+           DumpFilename
+        '''
+        input_dfnames = 
self._prerequisite_items[0].list_outfiles_for_input(runner.dump_dir)
+        if len(input_dfnames) > 1:
+            print "huh, input_dfnames is", [dfn.filename for dfn in 
input_dfnames]
+            # this is an error
+            return None
         else:
-            return 'flow'
+            print "interesting, input_dfnames is", input_dfnames
+        return input_dfnames[0]
+
+
+    def find_prefetch_file(self, runner):
+        """
+        this finds the content file from the first previous successful dump
+        to be used as input ("prefetch") for this run.
+
+        args:
+            Runner
+        returns:
+            DumpFilename
+        """
+        dumpdates = runner.wiki.dump_dirs()
+        dumpdates = sorted(dumpdates, reverse=True)
+        for date in dumpdates:
+            if date == runner.wiki.date:
+                runner.debug("skipping current dump for prefetch of job %s, 
date %s" %
+                             (self.get_dumpname(), runner.wiki.date))
+                continue
+
+            # see if this job from that date was successful
+            if not runner.dumpjobdata.runinfo.status_of_old_dump_is_done(
+                    runner, date, self.get_dumpname(), self._desc):
+                runner.debug("skipping incomplete or failed dump for prefetch 
date %s" % date)
+                continue
+
+            dfnames = get_reg_files(
+                runner.dump_dir, [self.get_dumpname()],
+                self.file_type, self.file_ext, date, parts=False)
+            possible_prefetch_dfnames = dfnames
+            dfnames = []
+            for prefetch_dfname in possible_prefetch_dfnames:
+                possible_path = 
runner.dump_dir.filename_public_path(prefetch_dfname, date)
+                size = os.path.getsize(possible_path)
+                if size < 7000:
+                    runner.debug("small %d-byte prefetch dump at %s, skipping" 
% (
+                        size, possible_path))
+                    continue
+                else:
+                    dfnames.append(prefetch_dfname)
+            if len(dfnames):
+                return dfnames
+
+        runner.debug("Could not locate a prefetchable dump.")
+        return None
 
     def run(self, runner):
         self.cleanup_old_files(runner.dump_dir, runner)
@@ -40,20 +115,38 @@
             raise BackupError("php command %s not found" % 
runner.wiki.config.php)
 
         flow_output_fpath = runner.dump_dir.filename_public_path(output_dfname)
-        script_command = MultiVersion.mw_script_as_array(
-            runner.wiki.config, "extensions/Flow/maintenance/dumpBackup.php")
-
+        if self.stubs:
+            script_command = MultiVersion.mw_script_as_array(
+                runner.wiki.config, 
"extensions/Flow/maintenance/dumpBackup.php")
+            compressor = "gzip"
+        else:
+            script_command = MultiVersion.mw_script_as_array(
+                runner.wiki.config, 
"extensions/Flow/maintenance/dumpTextPass.php")
+            compressor = "bzip2"
         command = [runner.wiki.config.php]
         command.extend(script_command)
         command.extend(["--wiki=%s" % runner.db_name,
-                        "--current", "--report=1000",
-                        "--output=bzip2:%s" % flow_output_fpath])
+                        "--report=1000",
+                        "--output=%s:%s" % (compressor, flow_output_fpath)])
+        if self.stubs:
+            command.append("--stub")
+        else:
+            stub_input_dfname = self.get_stub_dfname(runner)
+            stub_option = "--stub=gzip:%s" % 
runner.dump_dir.filename_public_path(stub_input_dfname)
+            command.append(stub_option)
+            if self.prefetch:
+                if self.verbose:
+                    print "prefetch requested"
+                prefetchfile = self.find_prefetch_file(runner)
+                if prefetchfile:
+                    command.extend(["--prefetch", prefetchfile])
         if self.history:
             command.append("--full")
-
+        else:
+            command.append("--current")
         pipeline = [command]
         series = [pipeline]
-        error, broken = runner.run_command([series], 
callback_stderr=self.progress_callback,
-                                           callback_stderr_arg=runner)
+        error, broken_unused = runner.run_command([series], 
callback_stderr=self.progress_callback,
+                                                  callback_stderr_arg=runner)
         if error:
             raise BackupError("error dumping flow page files")
diff --git a/xmldumps-backup/dumps/runner.py b/xmldumps-backup/dumps/runner.py
index 4a46e02..242c93d 100644
--- a/xmldumps-backup/dumps/runner.py
+++ b/xmldumps-backup/dumps/runner.py
@@ -267,9 +267,17 @@
             XmlLogging("Log events to all pages and users."))
 
         self.append_job_if_needed(
-            FlowDump("xmlflowdump", "content of flow pages in xml format"))
+            FlowDump("xmlflowstubsdump", "metadata of flow pages in xml 
format", stubs=True))
         self.append_job_if_needed(
-            FlowDump("xmlflowhistorydump", "history content of flow pages in 
xml format", True))
+            FlowDump("xmlflowstubshistorydump", "metadata of flow pages 
history in xml format",
+                     history=True, stubs=True))
+        self.append_job_if_needed(
+            FlowDump("xmlflowdump", "content of flow pages in xml format",
+                     
item_for_stubs=self.find_item_by_name('xmlflowstubsdump'), prefetch=True))
+        self.append_job_if_needed(
+            FlowDump("xmlflowhistorydump", "history content of flow pages in 
xml format",
+                     
item_for_stubs=self.find_item_by_name('xmlflowstubshistorydump'),
+                     history=True, prefetch=True))
 
         self.dump_items.append(
             BigXmlDump(
@@ -381,6 +389,7 @@
                     return True
         if job == "noop" or job == "latestlinks" or job == "createdirs":
             return True
+        
         sys.stderr.write("No job of the name specified exists. Choose one of 
the following:\n")
         sys.stderr.write("noop (runs no job but rewrites checksums files and"
                          "resets latest links)\n")

-- 
To view, visit https://gerrit.wikimedia.org/r/355077
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie473da574dd728cb41d30de9f213874eccfd1c1c
Gerrit-PatchSet: 1
Gerrit-Project: operations/dumps
Gerrit-Branch: master
Gerrit-Owner: ArielGlenn <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to