ArielGlenn has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/61394


Change subject: ability to batch pages-logging dump (works aoround wikidata 
issue)
......................................................................

ability to batch pages-logging dump (works aoround wikidata issue)

Change-Id: Iccdbec2387fd6b624b77d76e303f45ff7edb8d40
---
M xmldumps-backup/WikiDump.py
M xmldumps-backup/worker.py
2 files changed, 93 insertions(+), 12 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/dumps 
refs/changes/94/61394/1

diff --git a/xmldumps-backup/WikiDump.py b/xmldumps-backup/WikiDump.py
index 2e71b42..ad22b7d 100644
--- a/xmldumps-backup/WikiDump.py
+++ b/xmldumps-backup/WikiDump.py
@@ -235,6 +235,9 @@
                        "checkpointTime" : "0",
                        #"otherformats": {
                        "multistream" : "0",
+                       # "pageslogging" : {
+                       # number of rows to request in a single query, default 
is no batch, do them all
+                       "loggingBatchsize" : "0",
                        }
                self.conf = ConfigParser.SafeConfigParser(defaults)
                self.conf.read(self.files)
@@ -300,6 +303,10 @@
                        self.conf.add_section('cleanup')
                self.keep = self.conf.getint("cleanup", "keep")
 
+               if not self.conf.has_section('pageslogging'):
+                       self.conf.add_section('pageslogging')
+               self.loggingBatchsize = 
self.conf.getint("pageslogging","batchsize")
+
        def parseConfFilePerProject(self, projectName = False):
                # we need to read from the project section without falling back
                # to the defaults, which has_option() normally does, ugh.  so 
set
diff --git a/xmldumps-backup/worker.py b/xmldumps-backup/worker.py
index efa3e68..e998f9c 100644
--- a/xmldumps-backup/worker.py
+++ b/xmldumps-backup/worker.py
@@ -2949,26 +2949,100 @@
        def getFileExt(self):
                return "gz"
 
+       def getMaxLogID(self, runner):
+               dbServerInfo = DbServerInfo(runner.wiki, runner.dbName)
+               query = "select MAX(log_id) from logging;"
+               results = None
+               retries = 0
+               maxretries = 5
+               results = dbServerInfo.runSqlAndGetOutput(query)
+               while (results == None and retries < maxretries):
+                       retries = retries + 1
+                       time.sleep(5)
+                       results = dbServerInfo.runSqlAndGetOutput(query)
+               if (not results):
+                       return None
+               lines = results.splitlines()
+               if (lines and lines[1]):
+                       return int(lines[1])
+               else:
+                       return None
+
+       def getTempFilename(self, name, number):
+               return name + "-" + str(number)
+
        def run(self, runner):
                self.cleanupOldFiles(runner.dumpDir)
                files = self.listOutputFilesForBuildCommand(runner.dumpDir)
                if (len(files) > 1):
                        raise BackupError("logging table job wants to produce 
more than one output file")
-               logging = runner.dumpDir.filenamePublicPath(files[0])
+               outputFileObj = files[0]
                if (not exists( runner.wiki.config.php ) ):
                        raise BackupError("php command %s not found" % 
runner.wiki.config.php)
                scriptCommand = 
MultiVersion.MWScriptAsArray(runner.wiki.config, "dumpBackup.php")
-               command = [ "%s" % runner.wiki.config.php, "-q" ]
-               command.extend(scriptCommand)
-               command.extend(["--wiki=%s" % runner.dbName,
-                           "--logs", "--report=10000",
-                           "%s" % runner.forceNormalOption(),
-                           "--output=gzip:%s" % logging ])
-               pipeline = [ command ]
-               series = [ pipeline ]
-               error = runner.runCommand([ series ], 
callbackStderr=self.progressCallback, callbackStderrArg=runner)
-               if (error):
-                       raise BackupError("error dimping log files")
+
+               # do logging table in batches to avoid taking days to dump 
(wikidata for example)
+               maxLogId = self.getMaxLogID(runner)
+               if not maxLogId:
+                       raise BackupError("error retrieving max id from logging 
table")
+
+               batchsize = runner.wiki.config.loggingBatchsize
+               if batchsize:
+                       startId = 0
+                       tempFiles = []
+                       tempFileObjs = []
+                       while startId < maxLogId:
+                               endId = startId + batchsize
+                               fileObjThisBatch = DumpFilename(runner.wiki, 
outputFileObj.date, self.getTempFilename(outputFileObj.dumpName,startId), 
outputFileObj.fileType, outputFileObj.fileExt)
+                               tempFileObjs.append(fileObjThisBatch)
+                               logging = 
runner.dumpDir.filenamePublicPath(fileObjThisBatch)
+                               tempFiles.append(logging)
+                               command = [ "%s" % runner.wiki.config.php, "-q" 
]
+                               command.extend(scriptCommand)
+                               command.extend(["--wiki=%s" % runner.dbName,
+                                               "--logs", "--report=10000",
+                                               "%s" % 
runner.forceNormalOption(),
+                                               "--start=%s" % startId,
+                                               "--end=%s" % endId,
+                                               "--output=gzip:%s" % logging ])
+                               pipeline = [ command ]
+                               series = [ pipeline ]
+                               error = runner.runCommand([ series ], 
callbackStderr=self.progressCallback,
+                                                 callbackStderrArg=runner)
+                               if (error):
+                                       raise BackupError("error dumping log 
files")
+                               startId = endId
+                       # recombine these now
+                       if (not exists( runner.wiki.config.gzip ) ):
+                               raise BackupError("gzip command %s not found" % 
runner.wiki.config.gzip)
+                       compressionCommand = runner.wiki.config.gzip
+                       compressionCommand = "%s > " % runner.wiki.config.gzip
+                       uncompressionCommand = [ "%s" % 
runner.wiki.config.gzip, "-dc" ]
+                       recombineCommandString = 
self.buildRecombineCommandString(runner, tempFileObjs, outputFileObj, 
compressionCommand, uncompressionCommand )
+                       recombineCommand = [ recombineCommandString ]
+                       recombinePipeline = [ recombineCommand ]
+                       series = [ recombinePipeline ]
+                       result = runner.runCommand([ series ], 
callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True)
+                       if result:
+                               error = result
+                       if (error):
+                               raise BackupError("error recombining 
pages-logging files")
+                       # clean up those intermediate files now
+                       for f in tempFiles:
+                               os.remove(f)
+               else:
+                       logging = 
runner.dumpDir.filenamePublicPath(outputFileObj)
+                       command = [ "%s" % runner.wiki.config.php, "-q" ]
+                       command.extend(scriptCommand)
+                       command.extend(["--wiki=%s" % runner.dbName,
+                                       "--logs", "--report=10000",
+                                       "%s" % runner.forceNormalOption(),
+                                       "--output=gzip:%s" % logging ])
+                       pipeline = [ command ]
+                       series = [ pipeline ]
+                       error = runner.runCommand([ series ], 
callbackStderr=self.progressCallback, callbackStderrArg=runner)
+                       if (error):
+                               raise BackupError("error dmping log files")
 
 class XmlDump(Dump):
        """Primary XML dumps, one section at a time."""

-- 
To view, visit https://gerrit.wikimedia.org/r/61394
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iccdbec2387fd6b624b77d76e303f45ff7edb8d40
Gerrit-PatchSet: 1
Gerrit-Project: operations/dumps
Gerrit-Branch: ariel
Gerrit-Owner: ArielGlenn <ar...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to