http://www.mediawiki.org/wiki/Special:Code/MediaWiki/83634

Revision: 83634
Author:   tstarling
Date:     2011-03-10 04:10:27 +0000 (Thu, 10 Mar 2011)
Log Message:
-----------
Rewrite the caching code in nextJobDB.php so that jobs-loop.sh has a chance of 
being able to exit from the "high-priority" loop. 

The current syndrome is that when the cache expires, a small number of 
high-priority jobs are added. These jobs are quickly cleared, but the job 
runners do not continue on to other things, because the cache keeps returning 
the same small list of wikis for 5 minutes. After 5 minutes, there are a few 
more jobs in the high-priority list, so the cycle continues.

Modified Paths:
--------------
    trunk/phase3/maintenance/nextJobDB.php

Modified: trunk/phase3/maintenance/nextJobDB.php
===================================================================
--- trunk/phase3/maintenance/nextJobDB.php      2011-03-10 03:46:58 UTC (rev 
83633)
+++ trunk/phase3/maintenance/nextJobDB.php      2011-03-10 04:10:27 UTC (rev 
83634)
@@ -33,20 +33,71 @@
        public function execute() {
                global $wgMemc;
                $type = $this->getOption( 'type', false );
-               $mckey = $type === false
-                                       ? "jobqueue:dbs"
-                                       : "jobqueue:dbs:$type";
-               $pendingDBs = $wgMemc->get( $mckey );
 
-               # If we didn't get it from the cache
+               $memcKey = 'jobqueue:dbs:v2';
+               $pendingDBs = $wgMemc->get( $memcKey );
+
+               // If the cache entry wasn't present, or in 1% of cases 
otherwise, 
+               // regenerate the cache.
+               if ( !$pendingDBs || mt_rand( 0, 100 ) == 0 ) {
+                       $pendingDBs = $this->getPendingDbs();
+                       $wgMemc->set( $memcKey, $pendingDBs, 300 );
+               }
+
                if ( !$pendingDBs ) {
-                       $pendingDBs = $this->getPendingDbs( $type );
-                       $wgMemc->set( $mckey, $pendingDBs, 300 );
+                       return;
                }
-               # If we've got a pending job in a db, display it.
-               if ( $pendingDBs ) {
-                       $this->output( $pendingDBs[mt_rand( 0, count( 
$pendingDBs ) - 1 )] );
+
+               do {
+                       $again = false;
+
+                       if ( $type === false ) {
+                               $candidates = call_user_func_array( 
'array_merge', $pendingDBs );
+                       } elseif ( isset( $pendingDBs[$type] ) ) {
+                               $candidates = $pendingDBs[$type];
+                       } else {
+                               $candidates = array();
+                       }
+                       if ( !$candidates ) {
+                               return;
+                       }
+
+                       $db = $candidates[ mt_rand( 0, count( $candidates ) - 1 
) ];
+                       if ( !$this->checkJob( $type, $db ) ) {
+                               // This job is not available in the current 
database. Remove it from 
+                               // the cache.
+                               if ( $type === false ) {
+                                       foreach ( $pendingDBs as $type2 => $dbs 
) {
+                                               $pendingDBs[$type2] = 
array_diff( $pendingDBs[$type2], array( $db ) );
+                                       }
+                               } else {
+                                       $pendingDBs[$type] = array_diff( 
$pendingDBs[$type], array( $db ) );
+                               }
+
+                               $wgMemc->set( $memcKey, $pendingDBs, 300 );
+                               $again = true;
+                       }
+               } while ( $again );
+
+               $this->output( $db . "\n" );
+       }
+
+       /**
+        * Check if the specified database has a job of the specified type in 
it.
+        * The type may be false to indicate "all". 
+        */
+       function checkJob( $type, $db ) {
+               $lb = wfGetLB( $db );
+               $db = $lb->getConnection( DB_MASTER );
+               if ( $type === false ) {
+                       $conds = array();
+               } else {
+                       $conds = array( 'job_cmd' => $type );
                }
+
+               $exists = (bool) $db->selectField( 'job', '1', $conds, 
__METHOD__ );
+               $lb->reuseConnection( $db );
+               return $exists;
        }
 
        /**
@@ -54,7 +105,7 @@
         * @param $type String Job type
         * @return array
         */
-       private function getPendingDbs( $type ) {
+       private function getPendingDbs() {
                global $wgLocalDatabases;
                $pendingDBs = array();
                # Cross-reference DBs by master DB server
@@ -66,10 +117,10 @@
 
                foreach ( $dbsByMaster as $dbs ) {
                        $dbConn = wfGetDB( DB_MASTER, array(), $dbs[0] );
-                       $stype = $dbConn->addQuotes( $type );
 
                        # Padding row for MySQL bug
-                       $sql = "(SELECT 
'-------------------------------------------' as db)";
+                       $pad = str_repeat( '-', 40 );
+                       $sql = "(SELECT '$pad' as db, '$pad' as job_cmd)";
                        foreach ( $dbs as $wikiId ) {
                                if ( $sql != '' ) {
                                        $sql .= ' UNION ';
@@ -79,10 +130,7 @@
                                $dbConn->tablePrefix( $tablePrefix );
                                $jobTable = $dbConn->tableName( 'job' );
 
-                               if ( $type === false )
-                                       $sql .= "(SELECT '$wikiId' as db FROM 
$dbName.$jobTable LIMIT 1)";
-                               else
-                                       $sql .= "(SELECT '$wikiId' as db FROM 
$dbName.$jobTable WHERE job_cmd=$stype LIMIT 1)";
+                               $sql .= "(SELECT DISTINCT '$wikiId' as db, 
job_cmd FROM $dbName.$jobTable GROUP BY job_cmd)";
                        }
                        $res = $dbConn->query( $sql, __METHOD__ );
                        $first = true;
@@ -92,7 +140,7 @@
                                        $first = false;
                                        continue;
                                }
-                               $pendingDBs[] = $row->db;
+                               $pendingDBs[$row->job_cmd][] = $row->db;
                        }
                }
                return $pendingDBs;


_______________________________________________
MediaWiki-CVS mailing list
MediaWiki-CVS@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs

Reply via email to