Aaron Schulz has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/308310

Change subject: [WIP] objectcache: add WANObjectCacheRepear for replaying purges
......................................................................

[WIP] objectcache: add WANObjectCacheRepear for replaying purges

This triggers as a deferred updates on RC view.

Change-Id: I7f14b9ca2533032147e62b1a3cc004a23da86579
---
M autoload.php
A includes/deferred/WANCacheReapUpdate.php
M includes/libs/objectcache/WANObjectCache.php
A includes/libs/objectcache/WANObjectCacheReaper.php
M includes/specialpage/ChangesListSpecialPage.php
5 files changed, 329 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/core 
refs/changes/10/308310/1

diff --git a/autoload.php b/autoload.php
index 652535c..2132341 100644
--- a/autoload.php
+++ b/autoload.php
@@ -1497,7 +1497,9 @@
        'ViewAction' => __DIR__ . '/includes/actions/ViewAction.php',
        'VirtualRESTService' => __DIR__ . 
'/includes/libs/virtualrest/VirtualRESTService.php',
        'VirtualRESTServiceClient' => __DIR__ . 
'/includes/libs/virtualrest/VirtualRESTServiceClient.php',
+       'WANCacheReapUpdate' => __DIR__ . 
'/includes/deferred/WANCacheReapUpdate.php',
        'WANObjectCache' => __DIR__ . 
'/includes/libs/objectcache/WANObjectCache.php',
+       'WANObjectCacheRepear' => __DIR__ . 
'/includes/libs/objectcache/WANObjectCacheReaper.php',
        'WaitConditionLoop' => __DIR__ . '/includes/libs/WaitConditionLoop.php',
        'WantedCategoriesPage' => __DIR__ . 
'/includes/specials/SpecialWantedcategories.php',
        'WantedFilesPage' => __DIR__ . 
'/includes/specials/SpecialWantedfiles.php',
diff --git a/includes/deferred/WANCacheReapUpdate.php 
b/includes/deferred/WANCacheReapUpdate.php
new file mode 100644
index 0000000..3dc3476
--- /dev/null
+++ b/includes/deferred/WANCacheReapUpdate.php
@@ -0,0 +1,108 @@
+<?php
+
+/**
+ * Class for fixing bad key entries in WANObjectCache from an event source
+ *
+ * @since 1.28
+ */
+class WANCacheReapUpdate implements DeferrableUpdate {
+       /** @var IDatabase */
+       private $db;
+
+       /**
+        * @param IDatabase $db
+        */
+       public function __construct( IDatabase $db ) {
+               $this->db = $db;
+       }
+
+       function doUpdate() {
+               $reaper = new WANObjectCacheRepear(
+                       ObjectCache::getMainWANInstance(),
+                       ObjectCache::getLocalClusterInstance(),
+                       [ $this, 'getChangedTitles' ],
+                       [ $this, 'getAffectedKeys' ],
+                       [
+                               'channel' => 'table:recentchanges',
+                               'logger' => 
\MediaWiki\Logger\LoggerFactory::getInstance( 'objectcache' )
+                       ]
+               );
+
+               $reaper->invoke();
+       }
+
+       /**
+        * @see WANObjectCacheRepear
+        *
+        * @param int $start
+        * @param int $id
+        * @param int $end
+        * @return array
+        */
+       public function getChangedTitles( $start, $id, $end ) {
+               $db = $this->db;
+
+               $encStart = $db->addQuotes( $db->timestamp( $start ) );
+               $encEnd = $db->addQuotes( $db->timestamp( $end ) );
+
+               $res = $db->select(
+                       'recentchanges',
+                       [ 'rc_namespace', 'rc_title', 'rc_timestamp', 'rc_id' ],
+                       [
+                               $db->makeList( [
+                                       "rc_timestamp > $encStart",
+                                       "rc_timestamp = $encStart AND rc_id > " 
. $db->addQuotes( $id )
+                               ], LIST_OR ),
+                               "rc_timestamp < $encEnd"
+                       ],
+                       __METHOD__,
+                       [ 'ORDER BY' => 'rc_timestamp ASC, rc_id ASC' ]
+               );
+
+               $events = [];
+               foreach ( $res as $row ) {
+                       $events[] = [
+                               'id' => (int)$row->rc_id,
+                               'pos' => wfTimestamp( TS_UNIX, 
$row->rc_timestamp ),
+                               'item' => new TitleValue( 
(int)$row->rc_namespace, $row->rc_title )
+                       ];
+               }
+
+               return $events;
+       }
+
+       /**
+        * Gets a list of important cache keys associated with a title
+        *
+        * @see WANObjectCacheRepear
+        * @param WANObjectCache $cache
+        * @param TitleValue $t
+        * @returns string[]
+        * @TODO: avoid key generation code duplication
+        */
+       public function getAffectedKeys( WANObjectCache $cache, TitleValue $t ) 
{
+               $keys = [];
+               if ( $t->inNamespace( NS_FILE ) ) {
+                       /** @var LocalFile $file */
+                       $file = 
RepoGroup::singleton()->getLocalRepo()->newFile( $t->getDBkey() );
+                       $keys[] = $file->getCacheKey();
+               }
+
+               if ( $t->inNamespace( NS_FILE ) || $t->inNamespace( NS_TEMPLATE 
) ) {
+                       $keys[] = $cache->makeKey( 'page', $t->getNamespace(), 
sha1( $t->getDBkey() ) );
+               }
+
+               if ( $t->inNamespace( NS_USER ) || $t->inNamespace( 
NS_USER_TALK ) ) {
+                       $id = User::idFromName( $t->getDBkey() );
+                       if ( $id ) {
+                               $keys[] = $cache->makeGlobalKey( 'user', 'id', 
wfWikiID(), $id );
+                       }
+               }
+
+               if ( $keys ) {
+                       wfDebugLog( 'objectcache', __CLASS__ . ': checking 
key(s) ' . implode( ', ', $keys ) );
+               }
+
+               return $keys;
+       }
+}
diff --git a/includes/libs/objectcache/WANObjectCache.php 
b/includes/libs/objectcache/WANObjectCache.php
index 0d7da91..7a2541b 100644
--- a/includes/libs/objectcache/WANObjectCache.php
+++ b/includes/libs/objectcache/WANObjectCache.php
@@ -970,6 +970,37 @@
        }
 
        /**
+        * Locally destroy any value or "check" key $key if older than 
$cutoffTime
+        *
+        * @param string $key Cache key
+        * @param int $cutoffTime UNIX timestamp
+        * @return bool Success
+        * @since 1.28
+        */
+       public function reap( $key, $cutoffTime ) {
+               $ok = true;
+
+               $wrapped = $this->cache->get( self::VALUE_KEY_PREFIX . $key );
+               if ( is_array( $wrapped ) && $wrapped[self::FLD_TIME] < 
$cutoffTime ) {
+                       $this->logger->warning( "Reaping bad value key '$key'." 
);
+                       $ok = $this->cache->changeTTL( self::VALUE_KEY_PREFIX . 
$key, 1 ) && $ok;
+               }
+
+               $rawValue = $this->cache->get( self::TIME_KEY_PREFIX . $key );
+               $purgeValue = $this->parsePurgeValue( $rawValue );
+               if ( $purgeValue && $purgeValue[self::FLD_TIME] < $cutoffTime ) 
{
+                       $this->logger->warning( "Reaping bad value key '$key'." 
);
+                       $ok = $this->cache->changeTTL( self::TIME_KEY_PREFIX . 
$key, 1 ) && $ok;
+               }
+
+               if ( !$ok ) {
+                       $this->logger->error( "Could not complete reap of key 
'$key'." );
+               }
+
+               return $ok;
+       }
+
+       /**
         * @see BagOStuff::makeKey()
         * @param string ... Key component
         * @return string
diff --git a/includes/libs/objectcache/WANObjectCacheReaper.php 
b/includes/libs/objectcache/WANObjectCacheReaper.php
new file mode 100644
index 0000000..99f5bd2
--- /dev/null
+++ b/includes/libs/objectcache/WANObjectCacheReaper.php
@@ -0,0 +1,185 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Cache
+ * @author Aaron Schulz
+ */
+
+use Psr\Log\LoggerAwareInterface;
+use Psr\Log\LoggerInterface;
+use Psr\Log\NullLogger;
+
+/**
+ * Class for scanning through chronological, log-structured data or change logs
+ * and locally purging cache keys related to entities that appear in this data.
+ *
+ * This is useful for repairing cache when purges are missed by using a 
reliable
+ * stream, such as Kafka or a replicated MySQL table. Purge loss between 
datacenters
+ * is expected to be more common than within them.
+ *
+ * @since 1.28
+ */
+class WANObjectCacheRepear implements LoggerAwareInterface {
+       /** @var WANObjectCache */
+       protected $cache;
+       /** @var BagOStuff */
+       protected $store;
+       /** @var callable */
+       protected $logChunkCallback;
+       /** @var callable */
+       protected $keyListCallback;
+       /** @var Logger */
+       protected $logger;
+
+       /** @var string */
+       protected $channel;
+       /** @var integer */
+       protected $initialStartWindow;
+
+       /**
+        * @param WANObjectCache $cache Cache to reap bad keys from
+        * @param BagOStuff $store Cache to store positions use for locking
+        * @param callable $logCallback Callback taking arguments:
+        *          - The starting position as a UNIX timestamp
+        *          - The starting unique ID used for breaking timestamp 
collisions
+        *          - The ending position as a UNIX timestamp
+        *        It returns a list of maps of (key: cache key, pos: UNIX 
timestamp, id: unique ID)
+        *        for each key affected, with the corrosponding event 
timestamp/ID information.
+        * @param callable $keyCallback Callback taking arguments:
+        *          - The WANObjectCache instance
+        *          - An object from the event log
+        *        It should return a list of WAN cache keys.
+        *        The callback must fully duck-type test the object, since can 
be any model class.
+        * @param array $params Additional options:
+        *          - channel: the name of the update event stream.
+        *            Default: WANObjectCache::DEFAULT_PURGE_CHANNEL.
+        *          - initialStartWindow: seconds back in time to start if the 
position is lost.
+        *            Default: 1 hour.
+        *          - logger: an SPL monolog instance [optional]
+        */
+       public function __construct(
+               WANObjectCache $cache,
+               BagOStuff $store,
+               callable $logCallback,
+               callable $keyCallback,
+               array $params
+       ) {
+               $this->cache = $cache;
+               $this->store = $store;
+
+               $this->logChunkCallback = $logCallback;
+               $this->keyListCallback = $keyCallback;
+
+               $this->channel = isset( $params['channel'] )
+                       ? $params['channel']
+                       : WANObjectCache::DEFAULT_PURGE_CHANNEL;
+               $this->initialStartWindow = isset( 
$params['initialStartWindow'] )
+                       ? $params['initialStartWindow']
+                       : 3600;
+               $this->logger = isset( $params['logger'] )
+                       ? $params['logger']
+                       : new NullLogger();
+       }
+
+       public function setLogger( LoggerInterface $logger ) {
+               $this->logger = $logger;
+       }
+
+       /**
+        * Check and reap stale keys based on a chunk of events
+        *
+        * @param int $n Number of events
+        * @return int Number of keys checked
+        */
+       final public function invoke( $n = 100 ) {
+               $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', 
$this->channel );
+               $scopeLock = $this->store->getScopedLock( "$posKey:busy", 0 );
+               if ( !$scopeLock ) {
+                       return 0;
+               }
+
+               $status = $this->store->get( $posKey );
+               if ( !$status ) {
+                       $status = [ 'pos' => time() - 
$this->initialStartWindow, 'id' => 1 ];
+               }
+
+               $events = call_user_func_array(
+                       $this->logChunkCallback,
+                       [ $status['pos'], $status['id'], time() - 
WANObjectCache::HOLDOFF_TTL - 1, $n ]
+               );
+
+               $purgeCount = 0;
+               $lastEvent = null;
+               $keyEvents = [];
+               foreach ( $events as $event ) {
+                       $keys = call_user_func_array(
+                               $this->keyListCallback,
+                               [ $this->cache, $event['item'] ]
+                       );
+                       foreach ( $keys as $key ) {
+                               $keyEvents[$key] = [
+                                       'pos' => $event['pos'], 
+                                       'id' => $event['id']
+                               ];
+                       }
+                       $lastEvent = $event;
+               }
+
+               $lastKeyEvent = null;
+               foreach ( $keyEvents as $key => $keyEvent ) {
+                       if ( !$this->cache->reap( $key, $keyEvent['pos'] ) ) {
+                               break;
+                       }
+                       ++$purgeCount;
+               }
+
+               if ( $lastEvent ) {
+                       $ok = $this->store->merge(
+                               $posKey,
+                               function ( $bag, $key, $curValue ) use ( 
$lastEvent ) {
+                                       if ( !$curValue || $lastEvent['pos'] > 
$curValue['pos'] ) {
+                                               // Use new position
+                                       } elseif ( $lastEvent['id'] >= 
$curValue['id'] ) {
+                                               // Use new position
+                                       } else {
+                                               // Keep prior position instead 
of rolling it back
+                                               return $curValue;
+                                       }
+
+                                       return [
+                                               'pos' => $lastEvent['pos'],
+                                               'id' => $lastEvent['id']
+                                       ];
+                               },
+                               IExpiringStore::TTL_INDEFINITE
+                       );
+
+                       $pos = $lastEvent['pos'];
+                       $id = $lastEvent['id'];
+                       if ( $ok ) {
+                               $this->logger->info( "Updated cache reap 
position ($pos,$id)." );
+                       } else {
+                               $this->logger->error( "Could not update cache 
reap position ($pos,$id)." );
+                       }
+               }
+
+               ScopedCallback::consume( $scopeLock );
+
+               return $purgeCount;
+       }
+}
diff --git a/includes/specialpage/ChangesListSpecialPage.php 
b/includes/specialpage/ChangesListSpecialPage.php
index 60f1dd8..d85931a 100644
--- a/includes/specialpage/ChangesListSpecialPage.php
+++ b/includes/specialpage/ChangesListSpecialPage.php
@@ -77,6 +77,9 @@
                $this->webOutput( $rows, $opts );
 
                $rows->free();
+
+               // Clean up any bad page entries for titles showing up in RC
+               DeferredUpdates::addUpdate( new WANCacheReapUpdate( wfGetDB( 
DB_SLAVE ) ) );
        }
 
        /**

-- 
To view, visit https://gerrit.wikimedia.org/r/308310
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7f14b9ca2533032147e62b1a3cc004a23da86579
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/core
Gerrit-Branch: master
Gerrit-Owner: Aaron Schulz <asch...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to