Aaron Schulz has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/71966


Change subject: jobqueue: improved performance of 
JobQueueGroup::getQueuesWithJobs()
......................................................................

jobqueue: improved performance of JobQueueGroup::getQueuesWithJobs()

* Also added a JobQueueGroup::getQueueSizes() function.
  This function is now used by the API to make is useful
  for when queues are not in the DB.

bug: 45072
bug: 50635
bug: 9518
Change-Id: I75c16ffa14c963e7f8fb7cb390e6cc4cde0a5804
---
M includes/SiteStats.php
M includes/job/JobQueue.php
M includes/job/JobQueueDB.php
M includes/job/JobQueueFederated.php
M includes/job/JobQueueGroup.php
M includes/job/JobQueueRedis.php
6 files changed, 216 insertions(+), 8 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/core 
refs/changes/66/71966/1

diff --git a/includes/SiteStats.php b/includes/SiteStats.php
index 199c64f..78bb6cc 100644
--- a/includes/SiteStats.php
+++ b/includes/SiteStats.php
@@ -189,7 +189,7 @@
        static function jobs() {
                if ( !isset( self::$jobs ) ) {
                        $dbr = wfGetDB( DB_SLAVE );
-                       self::$jobs = $dbr->estimateRowCount( 'job' );
+                       self::$jobs = array_sum( 
JobQueueGroup::singleton()->getQueueSizes() );
                        /* Zero rows still do single row read for row that 
doesn't exist, but people are annoyed by that */
                        if ( self::$jobs == 1 ) {
                                self::$jobs = 0;
diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php
index 3295c24..2fd26fc 100644
--- a/includes/job/JobQueue.php
+++ b/includes/job/JobQueue.php
@@ -617,6 +617,63 @@
        }
 
        /**
+        * Do not use this function outside of JobQueue/JobQueueGroup
+        *
+        * @return string
+        * @since 1.22
+        */
+       public function getCoalesceLocationInternal() {
+               return null;
+       }
+
+       /**
+        * Check whether each of the given queues are empty.
+        * This is used for batching checks for queues stored at the same place.
+        *
+        * @param array $types List of queues types
+        * @return array|null (list of non-empty queue types) or null if 
unsupported
+        * @throws MWException
+        * @since 1.22
+        */
+       final public function getSiblingQueuesNonEmpty( array $types ) {
+               $section = new ProfileSection( __METHOD__ );
+               return $this->doGetSiblingQueuesNonEmpty( $types );
+       }
+
+       /**
+        * @see JobQueue::getSiblingQueuesNonEmpty()
+        * @param array $types List of queues types
+        * @return array|null (list of queue types) or null if unsupported
+        */
+       protected function doGetSiblingQueuesNonEmpty( array $types ) {
+               return null; // not supported
+       }
+
+       /**
+        * Check the size of each of the given queues.
+        * For queues not served by the same store as this one, 0 is returned.
+        * This is used for batching checks for queues stored at the same place.
+        *
+        * @param array $types List of queues types
+        * @return array|null (job type => whether queue is empty) or null if 
unsupported
+        * @throws MWException
+        * @since 1.22
+        */
+       final public function getSiblingQueueSizes( array $types ) {
+               $section = new ProfileSection( __METHOD__ );
+               return $this->doGetSiblingQueueSizes( $types );
+       }
+
+       /**
+        * @see JobQueue::getSiblingQueuesSize()
+        * @param array $types List of queues types
+        * @return array|null (list of queue types) or null if unsupported
+        */
+       protected function doGetSiblingQueueSizes( array $types ) {
+               return null; // not supported
+       }
+
+       /**
         * Call wfIncrStats() for the queue overall and for the queue type
         *
         * @param string $key Event type
diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php
index 56da4f3..77011ad 100644
--- a/includes/job/JobQueueDB.php
+++ b/includes/job/JobQueueDB.php
@@ -571,6 +571,34 @@
                );
        }
 
+       public function getCoalesceLocationInternal() {
+               return $this->cluster ? "DBCluster:{$this->cluster}" : 
"LBFactory:{$this->wiki}";
+       }
+
+       protected function doGetSiblingQueuesNonEmpty( array $types ) {
+               list( $dbr, $scope ) = $this->getSlaveDB();
+               $res = $dbr->select( 'job', 'DISTINCT job_cmd',
+                       array( 'job_cmd' => $types ), __METHOD__ );
+
+               $types = array();
+               foreach ( $res as $row ) {
+                       $types[] = $row->job_cmd;
+               }
+               return $types;
+       }
+
+       protected function doGetSiblingQueueSizes( array $types ) {
+               list( $dbr, $scope ) = $this->getSlaveDB();
+               $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS 
count' ),
+                       array( 'job_cmd' => $types ), __METHOD__ );
+
+               $sizes = array();
+               foreach ( $res as $row ) {
+                       $sizes[$row->job_cmd] = (int)$row->count;
+               }
+               return $sizes;
+       }
+
        /**
         * Recycle or destroy any jobs that have been claimed for too long
         *
diff --git a/includes/job/JobQueueFederated.php 
b/includes/job/JobQueueFederated.php
index db5b686..b69fa00 100644
--- a/includes/job/JobQueueFederated.php
+++ b/includes/job/JobQueueFederated.php
@@ -352,6 +352,38 @@
                return $iterator;
        }
 
+       public function getCoalesceLocationInternal() {
+               return "JobQueueFederated:wiki:" . $this->wiki;
+       }
+
+       protected function doGetSiblingQueuesNonEmpty( array $types ) {
+               $result = array();
+               foreach ( $this->partitionQueues as $queue ) {
+                       $nonEmpty = $queue->doGetSiblingQueuesNonEmpty( $types 
);
+                       if ( is_array( $nonEmpty ) ) {
+                               $result = array_merge( $result, $nonEmpty );
+                       } else {
+                               return null; // not supported on all 
partitions; bail
+                       }
+               }
+               return array_values( array_unique( $result ) );
+       }
+
+       protected function doGetSiblingQueueSizes( array $types ) {
+               $result = array();
+               foreach ( $this->partitionQueues as $queue ) {
+                       $sizes = $queue->doGetSiblingQueueSizes( $types );
+                       if ( is_array( $sizes ) ) {
+                               foreach ( $sizes as $type => $size ) {
+                                       $result[$type] = isset( $result[$type] 
) ? $result[$type] + $size : $size;
+                               }
+                       } else {
+                               return null; // not supported on all 
partitions; bail
+                       }
+               }
+               return $result;
+       }
+
        public function setTestingPrefix( $key ) {
                foreach ( $this->partitionQueues as $queue ) {
                        $queue->setTestingPrefix( $key );
diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php
index 85f99b7..f31007f 100644
--- a/includes/job/JobQueueGroup.php
+++ b/includes/job/JobQueueGroup.php
@@ -36,6 +36,9 @@
 
        protected $wiki; // string; wiki ID
 
+       /** @var array Map of (bucket => (queue => JobQueue, types => list of 
types) */
+       protected $coalescedQueues;
+
        const TYPE_DEFAULT = 1; // integer; jobs popped by default
        const TYPE_ANY = 2; // integer; any job
 
@@ -254,15 +257,72 @@
         */
        public function getQueuesWithJobs() {
                $types = array();
-               foreach ( $this->getQueueTypes() as $type ) {
-                       if ( !$this->get( $type )->isEmpty() ) {
-                               $types[] = $type;
+               foreach ( $this->getCoalescedQueues() as $info ) {
+                       $nonEmpty = $info['queue']->getSiblingQueuesNonEmpty( 
$this->getQueueTypes() );
+                       if ( is_array( $nonEmpty ) ) { // batching features 
supported
+                               $types = array_merge( $types, $nonEmpty );
+                       } else { // we have to go through the queues in the 
bucket one-by-one
+                               foreach ( $info['types'] as $type ) {
+                                       if ( !$this->get( $type )->isEmpty() ) {
+                                               $types[] = $type;
+                                       }
+                               }
                        }
                }
                return $types;
        }
 
        /**
+        * Get the size of the queus for a list of job types
+        *
+        * @return Array Map of (job type => size)
+        */
+       public function getQueueSizes() {
+               $sizeMap = array();
+               foreach ( $this->getCoalescedQueues() as $info ) {
+                       $sizes = $info['queue']->getSiblingQueueSizes( 
$this->getQueueTypes() );
+                       if ( is_array( $sizes ) ) { // batching features 
supported
+                               $sizeMap = $sizeMap + $sizes;
+                       } else { // we have to go through the queues in the 
bucket one-by-one
+                               foreach ( $info['types'] as $type ) {
+                                       $sizeMap[$type] = $this->get( $type 
)->getSize();
+                               }
+                       }
+               }
+               return $sizeMap;
+       }
+
+       /**
+        * @return array
+        */
+       protected function getCoalescedQueues() {
+               global $wgJobTypeConf;
+
+               if ( $this->coalescedQueues === null ) {
+                       $this->coalescedQueues = array();
+                       foreach ( $wgJobTypeConf as $type => $conf ) {
+                               $queue = JobQueue::factory(
+                                       array( 'wiki' => $this->wiki, 'type' => 
'null' ) + $conf );
+                               $loc = $queue->getCoalesceLocationInternal();
+                               if ( !isset( $this->coalescedQueues[$loc] ) ) {
+                                       $this->coalescedQueues[$loc]['queue'] = 
$queue;
+                                       $this->coalescedQueues[$loc]['types'] = 
array();
+                               }
+                               if ( $type === 'default' ) {
+                                       $this->coalescedQueues[$loc]['types'] = 
array_merge(
+                                               
$this->coalescedQueues[$loc]['types'],
+                                               array_diff( 
$this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
+                                       );
+                               } else {
+                                       $this->coalescedQueues[$loc]['types'][] 
= $type;
+                               }
+                       }
+               }
+
+               return $this->coalescedQueues;
+       }
+
+       /**
         * Check if jobs should not be popped of a queue right now.
         * This is only used for performance, such as to avoid spamming
         * the queue with many sub-jobs before they actually get run.
diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php
index 1f5b761..ff6b468 100644
--- a/includes/job/JobQueueRedis.php
+++ b/includes/job/JobQueueRedis.php
@@ -501,7 +501,7 @@
                        foreach ( $props as $prop ) {
                                $keys[] = $this->getQueueKey( $prop );
                        }
-                       $res = ( $conn->delete( $keys ) !== false );
+                       return ( $conn->delete( $keys ) !== false );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
                }
@@ -543,6 +543,35 @@
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
                }
+       }
+
+       public function getCoalesceLocationInternal() {
+               return "RedisServer:" . $this->server;
+       }
+
+       protected function doGetSiblingQueuesNonEmpty( array $types ) {
+               return array_keys( array_filter( $this->doGetSiblingQueueSizes( 
$types ) ) );
+       }
+
+       protected function doGetSiblingQueueSizes( array $types ) {
+               $sizes = array(); // (type => size)
+               $types = array_values( $types ); // reindex
+               try {
+                       $conn = $this->getConnection();
+                       $conn->multi( Redis::PIPELINE );
+                       foreach ( $types as $type ) {
+                               $conn->lSize( $this->getQueueKey( 
'l-unclaimed', $type ) );
+                       }
+                       $res = $conn->exec();
+                       if ( is_array( $res ) ) {
+                               foreach ( $res as $i => $size ) {
+                                       $sizes[$types[$i]] = $size;
+                               }
+                       }
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+               return $sizes;
        }
 
        /**
@@ -798,14 +827,16 @@
 
        /**
         * @param $prop string
+        * @param $type string|null
         * @return string
         */
-       private function getQueueKey( $prop ) {
+       private function getQueueKey( $prop, $type = null ) {
+               $type = is_string( $type ) ? $type : $this->type;
                list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
                if ( strlen( $this->key ) ) { // namespaced queue (for testing)
-                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', 
$this->type, $this->key, $prop );
+                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', 
$type, $this->key, $prop );
                } else {
-                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', 
$this->type, $prop );
+                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', 
$type, $prop );
                }
        }
 

-- 
To view, visit https://gerrit.wikimedia.org/r/71966
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I75c16ffa14c963e7f8fb7cb390e6cc4cde0a5804
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/core
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <asch...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to