Aaron Schulz has uploaded a new change for review. https://gerrit.wikimedia.org/r/71966
Change subject: jobqueue: improved performance of JobQueueGroup::getQueuesWithJobs() ...................................................................... jobqueue: improved performance of JobQueueGroup::getQueuesWithJobs() * Also added a JobQueueGroup::getQueueSizes() function. This function is now used by the API to make is useful for when queues are not in the DB. bug: 45072 bug: 50635 bug: 9518 Change-Id: I75c16ffa14c963e7f8fb7cb390e6cc4cde0a5804 --- M includes/SiteStats.php M includes/job/JobQueue.php M includes/job/JobQueueDB.php M includes/job/JobQueueFederated.php M includes/job/JobQueueGroup.php M includes/job/JobQueueRedis.php 6 files changed, 216 insertions(+), 8 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/core refs/changes/66/71966/1 diff --git a/includes/SiteStats.php b/includes/SiteStats.php index 199c64f..78bb6cc 100644 --- a/includes/SiteStats.php +++ b/includes/SiteStats.php @@ -189,7 +189,7 @@ static function jobs() { if ( !isset( self::$jobs ) ) { $dbr = wfGetDB( DB_SLAVE ); - self::$jobs = $dbr->estimateRowCount( 'job' ); + self::$jobs = array_sum( JobQueueGroup::singleton()->getQueueSizes() ); /* Zero rows still do single row read for row that doesn't exist, but people are annoyed by that */ if ( self::$jobs == 1 ) { self::$jobs = 0; diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index 3295c24..2fd26fc 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -617,6 +617,63 @@ } /** + * Do not use this function outside of JobQueue/JobQueueGroup + * + * @return string + * @since 1.22 + */ + public function getCoalesceLocationInternal() { + return null; + } + + /** + * Check whether each of the given queues are empty. + * This is used for batching checks for queues stored at the same place. + * + * @param array $types List of queues types + * @return array|null (list of non-empty queue types) or null if unsupported + * @throws MWException + * @since 1.22 + */ + final public function getSiblingQueuesNonEmpty( array $types ) { + $section = new ProfileSection( __METHOD__ ); + return $this->doGetSiblingQueuesNonEmpty( $types ); + } + + /** + * @see JobQueue::getSiblingQueuesNonEmpty() + * @param array $types List of queues types + * @return array|null (list of queue types) or null if unsupported + */ + protected function doGetSiblingQueuesNonEmpty( array $types ) { + return null; // not supported + } + + /** + * Check the size of each of the given queues. + * For queues not served by the same store as this one, 0 is returned. + * This is used for batching checks for queues stored at the same place. + * + * @param array $types List of queues types + * @return array|null (job type => whether queue is empty) or null if unsupported + * @throws MWException + * @since 1.22 + */ + final public function getSiblingQueueSizes( array $types ) { + $section = new ProfileSection( __METHOD__ ); + return $this->doGetSiblingQueueSizes( $types ); + } + + /** + * @see JobQueue::getSiblingQueuesSize() + * @param array $types List of queues types + * @return array|null (list of queue types) or null if unsupported + */ + protected function doGetSiblingQueueSizes( array $types ) { + return null; // not supported + } + + /** * Call wfIncrStats() for the queue overall and for the queue type * * @param string $key Event type diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index 56da4f3..77011ad 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -571,6 +571,34 @@ ); } + public function getCoalesceLocationInternal() { + return $this->cluster ? "DBCluster:{$this->cluster}" : "LBFactory:{$this->wiki}"; + } + + protected function doGetSiblingQueuesNonEmpty( array $types ) { + list( $dbr, $scope ) = $this->getSlaveDB(); + $res = $dbr->select( 'job', 'DISTINCT job_cmd', + array( 'job_cmd' => $types ), __METHOD__ ); + + $types = array(); + foreach ( $res as $row ) { + $types[] = $row->job_cmd; + } + return $types; + } + + protected function doGetSiblingQueueSizes( array $types ) { + list( $dbr, $scope ) = $this->getSlaveDB(); + $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ), + array( 'job_cmd' => $types ), __METHOD__ ); + + $sizes = array(); + foreach ( $res as $row ) { + $sizes[$row->job_cmd] = (int)$row->count; + } + return $sizes; + } + /** * Recycle or destroy any jobs that have been claimed for too long * diff --git a/includes/job/JobQueueFederated.php b/includes/job/JobQueueFederated.php index db5b686..b69fa00 100644 --- a/includes/job/JobQueueFederated.php +++ b/includes/job/JobQueueFederated.php @@ -352,6 +352,38 @@ return $iterator; } + public function getCoalesceLocationInternal() { + return "JobQueueFederated:wiki:" . $this->wiki; + } + + protected function doGetSiblingQueuesNonEmpty( array $types ) { + $result = array(); + foreach ( $this->partitionQueues as $queue ) { + $nonEmpty = $queue->doGetSiblingQueuesNonEmpty( $types ); + if ( is_array( $nonEmpty ) ) { + $result = array_merge( $result, $nonEmpty ); + } else { + return null; // not supported on all partitions; bail + } + } + return array_values( array_unique( $result ) ); + } + + protected function doGetSiblingQueueSizes( array $types ) { + $result = array(); + foreach ( $this->partitionQueues as $queue ) { + $sizes = $queue->doGetSiblingQueueSizes( $types ); + if ( is_array( $sizes ) ) { + foreach ( $sizes as $type => $size ) { + $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size; + } + } else { + return null; // not supported on all partitions; bail + } + } + return $result; + } + public function setTestingPrefix( $key ) { foreach ( $this->partitionQueues as $queue ) { $queue->setTestingPrefix( $key ); diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php index 85f99b7..f31007f 100644 --- a/includes/job/JobQueueGroup.php +++ b/includes/job/JobQueueGroup.php @@ -36,6 +36,9 @@ protected $wiki; // string; wiki ID + /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */ + protected $coalescedQueues; + const TYPE_DEFAULT = 1; // integer; jobs popped by default const TYPE_ANY = 2; // integer; any job @@ -254,15 +257,72 @@ */ public function getQueuesWithJobs() { $types = array(); - foreach ( $this->getQueueTypes() as $type ) { - if ( !$this->get( $type )->isEmpty() ) { - $types[] = $type; + foreach ( $this->getCoalescedQueues() as $info ) { + $nonEmpty = $info['queue']->getSiblingQueuesNonEmpty( $this->getQueueTypes() ); + if ( is_array( $nonEmpty ) ) { // batching features supported + $types = array_merge( $types, $nonEmpty ); + } else { // we have to go through the queues in the bucket one-by-one + foreach ( $info['types'] as $type ) { + if ( !$this->get( $type )->isEmpty() ) { + $types[] = $type; + } + } } } return $types; } /** + * Get the size of the queus for a list of job types + * + * @return Array Map of (job type => size) + */ + public function getQueueSizes() { + $sizeMap = array(); + foreach ( $this->getCoalescedQueues() as $info ) { + $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() ); + if ( is_array( $sizes ) ) { // batching features supported + $sizeMap = $sizeMap + $sizes; + } else { // we have to go through the queues in the bucket one-by-one + foreach ( $info['types'] as $type ) { + $sizeMap[$type] = $this->get( $type )->getSize(); + } + } + } + return $sizeMap; + } + + /** + * @return array + */ + protected function getCoalescedQueues() { + global $wgJobTypeConf; + + if ( $this->coalescedQueues === null ) { + $this->coalescedQueues = array(); + foreach ( $wgJobTypeConf as $type => $conf ) { + $queue = JobQueue::factory( + array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf ); + $loc = $queue->getCoalesceLocationInternal(); + if ( !isset( $this->coalescedQueues[$loc] ) ) { + $this->coalescedQueues[$loc]['queue'] = $queue; + $this->coalescedQueues[$loc]['types'] = array(); + } + if ( $type === 'default' ) { + $this->coalescedQueues[$loc]['types'] = array_merge( + $this->coalescedQueues[$loc]['types'], + array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) ) + ); + } else { + $this->coalescedQueues[$loc]['types'][] = $type; + } + } + } + + return $this->coalescedQueues; + } + + /** * Check if jobs should not be popped of a queue right now. * This is only used for performance, such as to avoid spamming * the queue with many sub-jobs before they actually get run. diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php index 1f5b761..ff6b468 100644 --- a/includes/job/JobQueueRedis.php +++ b/includes/job/JobQueueRedis.php @@ -501,7 +501,7 @@ foreach ( $props as $prop ) { $keys[] = $this->getQueueKey( $prop ); } - $res = ( $conn->delete( $keys ) !== false ); + return ( $conn->delete( $keys ) !== false ); } catch ( RedisException $e ) { $this->throwRedisException( $this->server, $conn, $e ); } @@ -543,6 +543,35 @@ } catch ( RedisException $e ) { $this->throwRedisException( $this->server, $conn, $e ); } + } + + public function getCoalesceLocationInternal() { + return "RedisServer:" . $this->server; + } + + protected function doGetSiblingQueuesNonEmpty( array $types ) { + return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) ); + } + + protected function doGetSiblingQueueSizes( array $types ) { + $sizes = array(); // (type => size) + $types = array_values( $types ); // reindex + try { + $conn = $this->getConnection(); + $conn->multi( Redis::PIPELINE ); + foreach ( $types as $type ) { + $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) ); + } + $res = $conn->exec(); + if ( is_array( $res ) ) { + foreach ( $res as $i => $size ) { + $sizes[$types[$i]] = $size; + } + } + } catch ( RedisException $e ) { + $this->throwRedisException( $this->server, $conn, $e ); + } + return $sizes; } /** @@ -798,14 +827,16 @@ /** * @param $prop string + * @param $type string|null * @return string */ - private function getQueueKey( $prop ) { + private function getQueueKey( $prop, $type = null ) { + $type = is_string( $type ) ? $type : $this->type; list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); if ( strlen( $this->key ) ) { // namespaced queue (for testing) - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $this->key, $prop ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop ); } else { - return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop ); } } -- To view, visit https://gerrit.wikimedia.org/r/71966 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I75c16ffa14c963e7f8fb7cb390e6cc4cde0a5804 Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/core Gerrit-Branch: master Gerrit-Owner: Aaron Schulz <asch...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits