Ottomata has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/325589 )
Change subject: Add EventBus RCFeed classes ...................................................................... Add EventBus RCFeed classes This allows the RecentChange configuration to use the EventBus extension to POST recentchange events to the EventBus service. This commit also allows different EventBus instances to be configured with different URIs. https://gerrit.wikimedia.org/r/#/c/325585 needs to be deployed to the EventBus service before this will work. Bug: T152030 Change-Id: I7edc4d57fa0845b5448a28e5b4123d70e63a4a3f --- M EventBus.php A EventBusRCFeedEngine.php A EventBusRCFeedFormatter.php M README.md M extension.json 5 files changed, 204 insertions(+), 42 deletions(-) Approvals: Ottomata: Verified; Looks good to me, approved diff --git a/EventBus.php b/EventBus.php index 67f8319..498b5bd 100644 --- a/EventBus.php +++ b/EventBus.php @@ -26,18 +26,35 @@ class EventBus { - /** HTTP request timeout in seconds */ - const REQ_TIMEOUT = 5; + /** @const Default HTTP request timeout in seconds */ + const DEFAULT_REQUEST_TIMEOUT = 5; - /** @var EventBus */ - private static $instance; + /** @var array of EventBus instances */ + private static $instances = []; + + /** @var logger instance for all EventBus instances */ + private static $logger; /** @var MultiHttpClient */ protected $http; - public function __construct() { + /** @var EventServiceUrl for this EventBus instance */ + protected $url; + + /** @var HTTP request timeout for this EventBus instance */ + protected $timeout; + + /** + * @param string url EventBus service endpoint URL. E.g. http://localhost:8085/v1/events + * @param integer timeout HTTP request timeout in seconds, defaults to 5. + * + * @constructor + */ + public function __construct( $url, $timeout = null ) { $this->http = new MultiHttpClient( [] ); - $this->logger = LoggerFactory::getInstance( 'EventBus' ); + + $this->url = $url; + $this->timeout = $timeout ?: self::DEFAULT_REQUEST_TIMEOUT; } /** @@ -48,13 +65,9 @@ public function send( $events ) { if ( empty( $events ) ) { $context = [ 'backtrace' => debug_backtrace() ]; - $this->logger->error( 'Must call send with at least 1 event. Aborting send.', $context ); + self::logger()->error( 'Must call send with at least 1 event. Aborting send.', $context ); return; } - - $config = self::getConfig(); - $eventServiceUrl = $config->get( 'EventServiceUrl' ); - $eventServiceTimeout = $config->get( 'EventServiceTimeout' ); // If we already have a JSON string of events, just use it as the body. if ( is_string( $events ) ) { @@ -70,7 +83,7 @@ } $req = [ - 'url' => $eventServiceUrl, + 'url' => $this->url, 'method' => 'POST', 'body' => $body, 'headers' => [ 'content-type' => 'application/json' ] @@ -79,7 +92,7 @@ $res = $this->http->run( $req, [ - 'reqTimeout' => $eventServiceTimeout ?: self::REQ_TIMEOUT + 'reqTimeout' => $this->timeout, ] ); @@ -87,9 +100,13 @@ // 207: some but not all events are accepted // 400: no events are accepted if ( $res['code'] != 201 ) { - $this->onError( $req, $res ); + $message = empty( $res['error'] ) ? $res['code'] . ': ' . $res['reason'] : $res['error']; + $context = [ 'EventBus' => [ 'request' => $req, 'response' => $res ] ]; + self::logger()->error( "Unable to deliver all events: ${message}", $context ); } } + + // == static helper functions below == /** * Serializes $events array to a JSON string. If FormatJson::encode() @@ -98,7 +115,7 @@ * @param array $events * @return string JSON */ - public function serializeEvents( $events ) { + public static function serializeEvents( $events ) { $serializedEvents = FormatJson::encode( $events ); if ( empty ( $serializedEvents ) ) { @@ -107,7 +124,7 @@ 'events' => $events, 'json_last_error' => json_last_error() ]; - $this->logger->error( + self::logger()->error( 'FormatJson::encode($events) failed: ' . $context['json_last_error'] . '. Aborting send.', $context ); @@ -116,25 +133,6 @@ return $serializedEvents; } - - private function onError( $req, $res ) { - $message = empty( $res['error'] ) ? $res['code'] . ': ' . $res['reason'] : $res['error']; - $context = [ 'EventBus' => [ 'request' => $req, 'response' => $res ] ]; - $this->logger->error( "Unable to deliver event: ${message}", $context ); - } - - /** - * @return EventBus - */ - public static function getInstance() { - if ( self::$instance === null ) { - self::$instance = new self(); - } - - return self::$instance; - } - - // == static helper functions below == /** * Creates a full article path @@ -240,10 +238,51 @@ } /** - * Retrieve main config - * @return Config + * Returns a singleton logger instance for all EventBus instances. + * Use like: self::logger()->info( $mesage ) + * We use this so we don't have to check if the logger has been created + * before attempting to log a message. */ - private static function getConfig() { - return MediaWikiServices::getInstance()->getMainConfig(); + private static function logger() { + if ( !self::$logger ) { + self::$logger = LoggerFactory::getInstance( 'EventBus' ); + } + return self::$logger; + } + + /** + * @param array $config EventBus config object. This must at least contain EventServiceUrl. + * EventServiceTimeout is also a valid config key. If null (default) + * this will lookup config using + * MediawikiServices::getInstance()->getMainConfig() and look for + * for EventServiceUrl and EventServiceTimeout. + * Note that instances are URL keyed singletons, so the first + * instance created with a given URL will be the only one. + * + * @return EventBus + */ + public static function getInstance( $config = null ) { + if ( !$config ) { + $config = MediaWikiServices::getInstance()->getMainConfig(); + $url = $config->get( 'EventServiceUrl' ); + $timeout = $config->get( 'EventServiceTimeout' ); + } else { + $url = $config['EventServiceUrl']; + $timeout = array_key_exists( 'EventServiceTimeout', $config ) ? + $config['EventServiceTimeout'] : null; + } + + if ( !$url ) { + self::logger()->error( + 'Failed configuration of EventBus instance. \'EventServiceUrl\' must be set in $config.' + ); + return; + } + + if ( !array_key_exists( $url, self::$instances ) ) { + self::$instances[$url] = new self( $url, $timeout ); + } + + return self::$instances[$url]; } } diff --git a/EventBusRCFeedEngine.php b/EventBusRCFeedEngine.php new file mode 100644 index 0000000..5dec860 --- /dev/null +++ b/EventBusRCFeedEngine.php @@ -0,0 +1,40 @@ +<?php + +/** + * Emit a recent change notification via EventBus. The feed uri should be + * start with eventbus://. The event's topic will be 'mediawiki.recentchange' as + * set in EventBusRCFeedFormatter::TOPIC. + * + * @example + * $wgRCFeeds['eventbus'] = array( + * 'formatter' => 'EventBusRCFeedFormatter', + * 'uri' => 'eventbus://eventbus.svc.eqiad.wmnet:8085/v1/events' + * ); + * $wgRCEngines = array( + * 'eventbus' => 'EventBusRCFeedEngine' + * ); + * + */ +class EventBusRCFeedEngine implements RCFeedEngine { + + /** + * @param array $feed will be used for EventBus $config. Singleton instances + * are identified by $feed['uri']; + * @param string|array $line to send + * + * @see RCFeedEngine::send + */ + public function send( array $feed, $line ) { + DeferredUpdates::addCallableUpdate( + function() use ( $feed, $line ) { + // construct EventBus config from RCFeed $feed uri. + $config = $feed; + // RCFeedEngines are selected via URI protocol schemes. This engine + // is chosen using eventbus://, but EventBus URIs are just HTTP REST + // endpoints. Replace eventbus:// with http:// + $config['EventServiceUrl'] = str_replace( 'eventbus://', 'http://', $feed['uri'] ); + return EventBus::getInstance( $config )->send( $line ); + } + ); + } +} diff --git a/EventBusRCFeedFormatter.php b/EventBusRCFeedFormatter.php new file mode 100644 index 0000000..9556c97 --- /dev/null +++ b/EventBusRCFeedFormatter.php @@ -0,0 +1,54 @@ +<?php + +/** + * Augments the recentchanges object for use with the EventBus service, and then + * formats it into a JSON string. + * + * @extends MachineReadableRCFeedFormatter + */ +class EventBusRCFeedFormatter extends MachineReadableRCFeedFormatter { + + /** + * @const string topic that will be set as meta.topic for a recentchange + * event that is POSTed to the EventBus service. + */ + const TOPIC = 'mediawiki.recentchange'; + + /** + * Calls MachineReadableRCFeedFormatter's getLine(), augments + * the returned object so that it is suitable for POSTing to + * the EventBus service, and then returns those events + * serialized (AKA formatted) as a JSON string by calling + * EventBus serializeEvents(). + * + * @see MachineReadableRCFeedFormatter::getLine + */ + public function getLine( array $feed, RecentChange $rc, $actionComment ) { + $event = EventBus::createEvent( + EventBus::getArticleURL( $rc->getTitle() ), + self::TOPIC, + parent::getLine( $feed, $rc, $actionComment ) + ); + + // If timestamp exists on the recentchange event (it should), + // then use it as the meta.dt event datetime. + if ( array_key_exists( 'timestamp', $event ) ) { + $event['meta']['dt'] = date( 'c', $event['timestamp'] ); + } + $events = [ $event ]; + return EventBus::serializeEvents( $events ); + } + + /** + * Here, formatArray is implemented to just return the same + * event it is given. Since parent::getLine() calls this, + * and we need to augment the $event after it is returned from + * parent::getLine, we don't actually want to serialize (AKA format) + * the event at this time. This class' getLine function will + * serialize/format the event after it has augmented the + * event returned here. + */ + protected function formatArray( array $event ) { + return $event; + } +} diff --git a/README.md b/README.md index b60312e..9770e2d 100644 --- a/README.md +++ b/README.md @@ -8,14 +8,40 @@ ## Configuration -To configure the URL of the event service: +To configure the URL of the EventBus service: $wgEventServiceUrl = 'http://localhost:8085/v1/topics'; -To configure the event service request timeout: +To configure the EventBus service request timeout: $wgEventServiceTimeout = 5; // 5 second timeout + +## EventBus RCFeed + +This extension also provides an RCFeedEngine and RCFeedFormatter implementation +That will allow RCFeed configuration to post to the EventBus service in the +`mediawiki.recentchange` topic. To use, +add the following to your `LocalSettings.php`: + +```php +$wgRCFeeds['eventbus'] = array( + 'formatter' => 'EventBusRCFeedFormatter', + 'uri' => 'eventbus://localhost:8085/v1/events', +); +$wgRCEngines = array( + 'eventbus' => 'EventBusRCFeedEngine', +); +``` + +Substitude `uri` with the `$wgEventServiceUrl`, but with `eventbus://` instead of `http://`. + +Note that the protocol schema part of the `uri` configured in`$wgRCFeeds` starts with +`eventbus://`. `$wgRCEngines` config are mapped from protocol schemes. However, +`EventServiceUrl` which is used to configure EventBus configuration expects this to be +a usual `http://` REST endpoint. `EventBusRCFeedEngine` is aware of this discrepency, and +replaces the `eventbus://` in the `uri` with `http://` when configuring its EventBus instance. + ## References * [Reliable publish / subscribe event bus](https://phabricator.wikimedia.org/T84923) diff --git a/extension.json b/extension.json index d9b211d..4ece993 100644 --- a/extension.json +++ b/extension.json @@ -1,3 +1,4 @@ + { "name": "EventBus", "version": "0.2.12", @@ -17,7 +18,9 @@ "EventBus": "EventBus.php", "EventBusHooks": "EventBus.hooks.php", "EventRelayerBus": "EventRelayerBus.php", - "PurgeEventRelayerBus": "PurgeEventRelayerBus.php" + "PurgeEventRelayerBus": "PurgeEventRelayerBus.php", + "EventBusRCFeedEngine": "EventBusRCFeedEngine.php", + "EventBusRCFeedFormatter": "EventBusRCFeedFormatter.php" }, "MessagesDirs": { "EventBus": [ -- To view, visit https://gerrit.wikimedia.org/r/325589 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I7edc4d57fa0845b5448a28e5b4123d70e63a4a3f Gerrit-PatchSet: 13 Gerrit-Project: mediawiki/extensions/EventBus Gerrit-Branch: master Gerrit-Owner: Ottomata <[email protected]> Gerrit-Reviewer: Alex Monk <[email protected]> Gerrit-Reviewer: Krinkle <[email protected]> Gerrit-Reviewer: Ottomata <[email protected]> Gerrit-Reviewer: Ppchelko <[email protected]> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
