Aaron Schulz has uploaded a new change for review. https://gerrit.wikimedia.org/r/118955
Change subject: [WIP] Added a Redis pool counter class ...................................................................... [WIP] Added a Redis pool counter class * This should be easier to set up for typical installs Change-Id: Icb4a7481b944fa0818c4635e3edbe12d08af9924 --- M includes/AutoLoader.php A includes/PoolCounterRedis.php 2 files changed, 341 insertions(+), 0 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/core refs/changes/55/118955/1 diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php index a9080b2..f7a0568 100644 --- a/includes/AutoLoader.php +++ b/includes/AutoLoader.php @@ -157,6 +157,7 @@ 'PhpHttpRequest' => 'includes/HttpFunctions.php', 'PoolCounter' => 'includes/PoolCounter.php', 'PoolCounter_Stub' => 'includes/PoolCounter.php', + 'PoolCounterRedis' => 'includes/PoolCounterRedis.php', 'PoolCounterWork' => 'includes/PoolCounter.php', 'PoolCounterWorkViaCallback' => 'includes/PoolCounter.php', 'PoolWorkArticleView' => 'includes/WikiPage.php', diff --git a/includes/PoolCounterRedis.php b/includes/PoolCounterRedis.php new file mode 100644 index 0000000..784d626 --- /dev/null +++ b/includes/PoolCounterRedis.php @@ -0,0 +1,340 @@ +<?php +/** + * Provides of semaphore semantics for restricting the number + * of workers that may be concurrently performing the same task. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @author Aaron Schulz + */ + +/** + * Version of PoolCounter using Redis + * + * @since 1.23 + */ +class PoolCounterRedis extends PoolCounter { + /** @var HashRing */ + protected $ring; + /** @var RedisConnectionPool */ + protected $pool; + /** @var array (server name => host) map */ + protected $servers; + + /** @var RedisConnRef */ + protected $conn; + + /** @var string Pool slot value */ + protected $slot; + /** @var integer AWAKE_* constant */ + protected $onRelease; + /** @var integer TTL for locks to expire */ + protected $lockTTL; + /** @var string Unique string to identify this process */ + protected $session; + + const AWAKE_ONE = 1; + const AWAKE_ALL = 2; + + function __construct( $conf, $type, $key ) { + parent::__construct( $conf, $type, $key ); + + $this->servers = $conf['servers']; + $this->ring = new HashRing( array_fill_keys( array_keys( $conf['servers'] ), 100 ) ); + + $conf['redisConfig']['serializer'] = 'none'; // for use with Lua + $this->pool = RedisConnectionPool::singleton( $conf['redisConfig'] ); + + $met = ini_get( 'max_execution_time' ); // this is 0 in CLI mode + $this->lockTTL = max( 5 * 60, 2 * (int)$met ); + + $this->session = wfRandomString( 32 ); + } + + /** + * @return Status + */ + protected function getConnection() { + if ( !isset( $this->conn ) ) { + $conn = false; + $servers = $this->ring->getLocations( $this->key, 3 ); + ArrayUtils::consistentHashSort( $servers, $this->key ); + foreach ( $servers as $server ) { + $address = $this->servers[$server]; + $conn = $this->pool->getConnection( $address ); + if ( $conn ) { + break; + } + } + if ( !$conn ) { + return Status::newFatal( 'pool-errorunknown', $server ); + } + $this->conn = $conn; + } + return Status::newGood( $this->conn ); + } + + function acquireForMe() { + $section = new ProfileSection( __METHOD__ ); + + if ( $this->slot !== null ) { + return Status::newGood( PoolCounter::LOCK_HELD ); // already acquired + } + + $status = $this->getConnection(); + if ( !$status->isOK() ) { + return $status; + } + + return $this->waitForSlotOrNotif( $status->value, true ); + } + + function acquireForAnyone() { + $section = new ProfileSection( __METHOD__ ); + + if ( $this->slot !== null ) { + return Status::newGood( PoolCounter::LOCK_HELD ); // already acquired + } + + $status = $this->getConnection(); + if ( !$status->isOK() ) { + return $status; + } + + return $this->waitForSlotOrNotif( $status->value, true ); + } + + function release() { + $section = new ProfileSection( __METHOD__ ); + + if ( !$this->slot ) { + return Status::newGood( PoolCounter::NOT_LOCKED ); // not locked + } + + $status = $this->getConnection(); + if ( !$status->isOK() ) { + return $status; + } + $conn = $status->value; + + static $script = +<<<LUA + local kSlots,kSlotsLastFree,kWakeup,kWaiting = unpack(KEYS) + local rMaxWorkers, rExpiry, rSlot, rAwakeAll, rTime = unpack(ARGV) + -- If rSlot is 'w', then the client was told to wake up but + -- did not get an actual slot. Only add slots back to the list + -- if an actual slot was acquired by the client. Treat the list + -- as expired if the "last free" time hash is missing. + if rSlot ~= 'w' and redis.call('exists',kSlotsLastFree) == 1 then + if redis.call('lLen',kSlots) >= (1*rMaxWorkers - 1) then + -- Clear list to save space; it will re-init as needed. + redis.call('del',kSlots,kSlotsLastFree) + elseif redis.call('lLen',kSlots) < 1*rMaxWorkers then + -- Add slot back to pool and update the "last free" time + redis.call('rPush',kSlots,rSlot) + redis.call('zAdd',kSlotsLastFree,rTime,i) + -- Always keep renewing the expiry on use + redis.call('expire',kSlots,rExpiry) + redis.call('expire',kSlotsLastFree,rExpiry) + end + end + -- Update an ephemeral list to wake up other clients that can + -- reuse any cached work from this process. Only do this if no + -- slots are currently free (e.g. clients could be waiting). + if 1*rAwakeAll == 1 then + local count = redis.call('zCard',kWaiting) + for i = 1,count do + redis.call('rPush',kWakeup,'w') + end + redis.call('pexpire',kWakeup,1) + end + return 1 +LUA; + try { + $res = $conn->luaEval( $script, + array( + $this->getSlotListKey(), + $this->getSlotFTimeSetKey(), + $this->getWakeupListKey(), + $this->getWaitSetKey(), + $this->workers, + $this->lockTTL, // how long a job should take to finish + $this->slot, + ( $this->onRelease === self::AWAKE_ALL ) ? 1 : 0, + time() + ), + 4 # number of first argument(s) that are keys + ); + } catch ( RedisException $e ) { + return Status::newFatal( 'pool-error-unknown', $e->getMessage() ); + } + + $this->slot = null; + $this->onRelease = null; + + return Status::newGood( PoolCounter::RELEASED ); + } + + /** + * @param RedisConnRef $conn + * @param bool $doWakeup Wake-up if an existing process finishes and wake up such others + * @return Status + */ + protected function waitForSlotOrNotif( RedisConnRef $conn, $doWakeup ) { + try { + $slot = $this->initAndPopPoolSlotList( $conn ); + if ( ctype_digit( $slot ) ) { + // Pool slot acquired by this process + } elseif ( $slot === 'QUEUE_FULL' ) { + return Status::newGood( PoolCounter::QUEUE_FULL ); + } elseif ( $slot === 'QUEUE_WAIT' ) { + // This process is now registed as waiting + if ( $doWakeup ) { + // The order of the keys to blPop() matters; the wale-up takes priority. + // When a process relents and we get a 'w' slot (from $wakeKey), then we + // know that the work that process did can be reused by the caller. + $keys = array( $this->getWakeupListKey(), $this->getSlotListKey() ); + } else { + $keys = array( $this->getSlotListKey() ); + } + $res = $conn->blPop( $keys, $this->timeout ); // (list key,value) or () + if ( $res === array() ) { + return Status::newGood( PoolCounter::TIMEOUT ); + } + $slot = $res[1]; + // Unregister this process as waiting + $conn->sRem( $this->getWaitSetKey(), $this->session ); + } else { + return Status::newFatal( 'pool-error-unknown', "Server gave '$slot'." ); + } + } catch ( RedisException $e ) { + return Status::newFatal( 'pool-error-unknown', $e->getMessage() ); + } + + $this->slot = $slot; + $this->onRelease = $doWakeup ? self::AWAKE_ALL : self::AWAKE_ONE; + + if ( $slot === 'w' ) { + // Got a "process done" notification instead of a pool slot (only for $doWakeup) + return Status::newGood( PoolCounter::DONE ); + } else { + return Status::newGood( PoolCounter::LOCKED ); + } + } + + /** + * @param RedisConnRef $conn + * @return int|string + */ + protected function initAndPopPoolSlotList( RedisConnRef $conn ) { + static $script = +<<<LUA + local kSlots,kSlotsLastFree,kSlotWaits = unpack(KEYS) + local rMaxWorkers,rMaxQueue,rTimeout,rExpiry,rSess,rTime = unpack(ARGV) + -- Initialize if *both* the slot list and the last-free time hash are empty. + -- The former happens if all slots are busy and also when nothing is initialized. + -- If the list is empty but the hash is not, then it is the later case. + if redis.call('exists',kSlots) == 0 and redis.call('exists',kSlotsLastFree) == 0 then + for i = 1,1*rMaxWorkers do + redis.call('rPush',kSlots,i) + redis.call('zAdd',kSlotsLastFree,rTime,i) + end + -- Otherwise do maintenance to clean up after network partitions + else + -- Find stale slot locks and add free them as needed + local staleLocks = redis.call('zRangeByScore',kSlotsLastFree,0,1*rTime - 1*rExpiry) + for k,slot in ipairs(staleLocks) do + redis.call('rPush',kSlots,slot) + redis.call('zAdd',kSlotsLastFree,rTime,slot) + end + -- Find stale wait slot entries and remove them + redis.call('zRemRangeByScore',kSlotWaits,0,1*rTime - 2*rTimeout) + end + local slot + -- Try to acquire a slot if possible now + if redis.call('lLen',kSlots) > 0 then + slot = redis.call('lPop',kSlots) + elseif redis.call('zCard',kSlotWaits) >= 1*rMaxQueue then + slot = 'QUEUE_FULL' + else + slot = 'QUEUE_WAIT' + redis.call('zAdd',kSlotWaits,rTime,rSess) + end + -- Always keep renewing the expiry on use + redis.call('expire',kSlots,rExpiry) + redis.call('expire',kSlotsLastFree,rExpiry) + return slot +LUA; + return $conn->luaEval( $script, + array( + $this->getSlotListKey(), + $this->getSlotFTimeSetKey(), + $this->getWaitSetKey(), + $this->workers, + $this->maxqueue, + $this->timeout, + $this->lockTTL, // how long a job should take to finish + $this->session, + time() + ), + 3 # number of first argument(s) that are keys + ); + } + + /** + * @return string + */ + protected function getSlotListKey() { + // @note: worker count does not vary within each $wgPoolCounterConf type + return 'poolcounter:l-slots-' . sha1( $this->key ) . "-{$this->workers}"; + } + + /** + * @return string + */ + protected function getSlotFTimeSetKey() { + // @note: worker count does not vary within each $wgPoolCounterConf type + return 'poolcounter:z-slots-' . sha1( $this->key ) . "-{$this->workers}"; + } + + /** + * @return string + */ + protected function getWaitSetKey() { + // @note: worker count does not vary within each $wgPoolCounterConf type + return 'poolcounter:z-wait-' . sha1( $this->key ) . "-{$this->workers}"; + } + + /** + * @return string + */ + protected function getWakeupListKey() { + // @note: worker count does not vary within each $wgPoolCounterConf type + return 'poolcounter:l-wakeup-' . sha1( $this->key ) . "-{$this->workers}"; + } + + /** + * Try to make sure that locks get released + */ + function __destruct() { + try { + if ( $this->slot !== null ) { + $this->release(); + } + } catch ( Exception $e ) {} + } +} -- To view, visit https://gerrit.wikimedia.org/r/118955 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Icb4a7481b944fa0818c4635e3edbe12d08af9924 Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/core Gerrit-Branch: master Gerrit-Owner: Aaron Schulz <asch...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits