Ottomata has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/325589 )

Change subject: Add EventBus RCFeed classes
......................................................................


Add EventBus RCFeed classes

This allows the RecentChange configuration to use the EventBus extension
to POST recentchange events to the EventBus service.

This commit also allows different EventBus instances
to be configured with different URIs.

https://gerrit.wikimedia.org/r/#/c/325585 needs to be deployed to
the EventBus service before this will work.

Bug: T152030
Change-Id: I7edc4d57fa0845b5448a28e5b4123d70e63a4a3f
---
M EventBus.php
A EventBusRCFeedEngine.php
A EventBusRCFeedFormatter.php
M README.md
M extension.json
5 files changed, 204 insertions(+), 42 deletions(-)

Approvals:
  Ottomata: Verified; Looks good to me, approved



diff --git a/EventBus.php b/EventBus.php
index 67f8319..498b5bd 100644
--- a/EventBus.php
+++ b/EventBus.php
@@ -26,18 +26,35 @@
 
 class EventBus {
 
-       /** HTTP request timeout in seconds */
-       const REQ_TIMEOUT = 5;
+       /** @const Default HTTP request timeout in seconds */
+       const DEFAULT_REQUEST_TIMEOUT = 5;
 
-       /** @var EventBus */
-       private static $instance;
+       /** @var array of EventBus instances */
+       private static $instances = [];
+
+       /** @var logger instance for all EventBus instances */
+       private static $logger;
 
        /** @var MultiHttpClient */
        protected $http;
 
-       public function __construct() {
+       /** @var EventServiceUrl for this EventBus instance */
+       protected $url;
+
+       /** @var HTTP request timeout for this EventBus instance */
+       protected $timeout;
+
+       /**
+        * @param string     url  EventBus service endpoint URL. E.g. 
http://localhost:8085/v1/events
+        * @param integer    timeout HTTP request timeout in seconds, defaults 
to 5.
+        *
+        * @constructor
+        */
+       public function __construct( $url, $timeout = null ) {
                $this->http = new MultiHttpClient( [] );
-               $this->logger = LoggerFactory::getInstance( 'EventBus' );
+
+               $this->url = $url;
+               $this->timeout = $timeout ?: self::DEFAULT_REQUEST_TIMEOUT;
        }
 
        /**
@@ -48,13 +65,9 @@
        public function send( $events ) {
                if ( empty( $events ) ) {
                        $context = [ 'backtrace' => debug_backtrace() ];
-                       $this->logger->error( 'Must call send with at least 1 
event. Aborting send.', $context );
+                       self::logger()->error( 'Must call send with at least 1 
event. Aborting send.', $context );
                        return;
                }
-
-               $config = self::getConfig();
-               $eventServiceUrl = $config->get( 'EventServiceUrl' );
-               $eventServiceTimeout = $config->get( 'EventServiceTimeout' );
 
                // If we already have a JSON string of events, just use it as 
the body.
                if ( is_string( $events ) ) {
@@ -70,7 +83,7 @@
                }
 
                $req = [
-                       'url'           => $eventServiceUrl,
+                       'url'           => $this->url,
                        'method'        => 'POST',
                        'body'          => $body,
                        'headers'       => [ 'content-type' => 
'application/json' ]
@@ -79,7 +92,7 @@
                $res = $this->http->run(
                        $req,
                        [
-                               'reqTimeout' => $eventServiceTimeout ?: 
self::REQ_TIMEOUT
+                               'reqTimeout' => $this->timeout,
                        ]
                );
 
@@ -87,9 +100,13 @@
                // 207: some but not all events are accepted
                // 400: no events are accepted
                if ( $res['code'] != 201 ) {
-                       $this->onError( $req, $res );
+                       $message = empty( $res['error'] ) ? $res['code'] . ': ' 
. $res['reason'] : $res['error'];
+                       $context = [ 'EventBus' => [ 'request' => $req, 
'response' => $res ] ];
+                       self::logger()->error( "Unable to deliver all events: 
${message}", $context );
                }
        }
+
+       // == static helper functions below ==
 
        /**
         * Serializes $events array to a JSON string.  If FormatJson::encode()
@@ -98,7 +115,7 @@
         * @param array $events
         * @return string JSON
         */
-       public function serializeEvents( $events ) {
+       public static function serializeEvents( $events ) {
                $serializedEvents = FormatJson::encode( $events );
 
                if ( empty ( $serializedEvents ) ) {
@@ -107,7 +124,7 @@
                                'events' => $events,
                                'json_last_error' => json_last_error()
                        ];
-                       $this->logger->error(
+                       self::logger()->error(
                                'FormatJson::encode($events) failed: ' . 
$context['json_last_error'] .
                                '. Aborting send.', $context
                        );
@@ -116,25 +133,6 @@
 
                return $serializedEvents;
        }
-
-       private function onError( $req, $res ) {
-               $message = empty( $res['error'] ) ? $res['code'] . ': ' . 
$res['reason'] : $res['error'];
-               $context = [ 'EventBus' => [ 'request' => $req, 'response' => 
$res ] ];
-               $this->logger->error( "Unable to deliver event: ${message}", 
$context );
-       }
-
-       /**
-        * @return EventBus
-        */
-       public static function getInstance() {
-               if ( self::$instance === null ) {
-                       self::$instance = new self();
-               }
-
-               return self::$instance;
-       }
-
-       // == static helper functions below ==
 
        /**
         * Creates a full article path
@@ -240,10 +238,51 @@
        }
 
        /**
-        * Retrieve main config
-        * @return Config
+        * Returns a singleton logger instance for all EventBus instances.
+        * Use like: self::logger()->info( $mesage )
+        * We use this so we don't have to check if the logger has been created
+        * before attempting to log a message.
         */
-       private static function getConfig() {
-               return MediaWikiServices::getInstance()->getMainConfig();
+       private static function logger() {
+               if ( !self::$logger ) {
+                       self::$logger = LoggerFactory::getInstance( 'EventBus' 
);
+               }
+               return self::$logger;
+       }
+
+       /**
+        * @param array $config EventBus config object.  This must at least 
contain EventServiceUrl.
+        *                      EventServiceTimeout is also a valid config key. 
 If null (default)
+        *                      this will lookup config using
+        *                      
MediawikiServices::getInstance()->getMainConfig() and look for
+        *                      for EventServiceUrl and EventServiceTimeout.
+        *                      Note that instances are URL keyed singletons, 
so the first
+        *                      instance created with a given URL will be the 
only one.
+        *
+        * @return EventBus
+        */
+       public static function getInstance( $config = null ) {
+               if ( !$config ) {
+                       $config = 
MediaWikiServices::getInstance()->getMainConfig();
+                       $url = $config->get( 'EventServiceUrl' );
+                       $timeout = $config->get( 'EventServiceTimeout' );
+               } else {
+                       $url = $config['EventServiceUrl'];
+                       $timeout = array_key_exists( 'EventServiceTimeout', 
$config ) ?
+                               $config['EventServiceTimeout'] : null;
+               }
+
+               if ( !$url ) {
+                       self::logger()->error(
+                               'Failed configuration of EventBus instance. 
\'EventServiceUrl\' must be set in $config.'
+                       );
+                       return;
+               }
+
+               if ( !array_key_exists( $url, self::$instances ) ) {
+                       self::$instances[$url] = new self( $url, $timeout );
+               }
+
+               return self::$instances[$url];
        }
 }
diff --git a/EventBusRCFeedEngine.php b/EventBusRCFeedEngine.php
new file mode 100644
index 0000000..5dec860
--- /dev/null
+++ b/EventBusRCFeedEngine.php
@@ -0,0 +1,40 @@
+<?php
+
+/**
+ * Emit a recent change notification via EventBus.  The feed uri should be
+ * start with eventbus://.  The event's topic will be 'mediawiki.recentchange' 
as
+ * set in EventBusRCFeedFormatter::TOPIC.
+ *
+ * @example
+ * $wgRCFeeds['eventbus'] = array(
+ *      'formatter' => 'EventBusRCFeedFormatter',
+ *      'uri'       => 'eventbus://eventbus.svc.eqiad.wmnet:8085/v1/events'
+ * );
+ * $wgRCEngines = array(
+ *      'eventbus' => 'EventBusRCFeedEngine'
+ * );
+ *
+ */
+class EventBusRCFeedEngine implements RCFeedEngine {
+
+       /**
+        * @param array        $feed will be used for EventBus $config.  
Singleton instances
+        *                     are identified by $feed['uri'];
+        * @param string|array $line to send
+        *
+        * @see RCFeedEngine::send
+        */
+       public function send( array $feed, $line ) {
+               DeferredUpdates::addCallableUpdate(
+                       function() use ( $feed, $line ) {
+                               // construct EventBus config from RCFeed $feed 
uri.
+                               $config = $feed;
+                               // RCFeedEngines are selected via URI protocol 
schemes.  This engine
+                               // is chosen using eventbus://, but EventBus 
URIs are just HTTP REST
+                               // endpoints.  Replace eventbus:// with http://
+                               $config['EventServiceUrl'] = str_replace( 
'eventbus://', 'http://', $feed['uri'] );
+                               return EventBus::getInstance( $config )->send( 
$line );
+                       }
+               );
+       }
+}
diff --git a/EventBusRCFeedFormatter.php b/EventBusRCFeedFormatter.php
new file mode 100644
index 0000000..9556c97
--- /dev/null
+++ b/EventBusRCFeedFormatter.php
@@ -0,0 +1,54 @@
+<?php
+
+/**
+ * Augments the recentchanges object for use with the EventBus service, and 
then
+ * formats it into a JSON string.
+ *
+ * @extends MachineReadableRCFeedFormatter
+ */
+class EventBusRCFeedFormatter extends MachineReadableRCFeedFormatter {
+
+       /**
+        * @const string topic that will be set as meta.topic for a recentchange
+        *               event that is POSTed to the EventBus service.
+        */
+       const TOPIC = 'mediawiki.recentchange';
+
+       /**
+        * Calls MachineReadableRCFeedFormatter's getLine(), augments
+        * the returned object so that it is suitable for POSTing to
+        * the EventBus service, and then returns those events
+        * serialized (AKA formatted) as a JSON string by calling
+        * EventBus serializeEvents().
+        *
+        * @see MachineReadableRCFeedFormatter::getLine
+        */
+       public function getLine( array $feed, RecentChange $rc, $actionComment 
) {
+               $event = EventBus::createEvent(
+                       EventBus::getArticleURL( $rc->getTitle() ),
+                       self::TOPIC,
+                       parent::getLine( $feed, $rc, $actionComment )
+               );
+
+               // If timestamp exists on the recentchange event (it should),
+               // then use it as the meta.dt event datetime.
+               if ( array_key_exists( 'timestamp', $event ) ) {
+                       $event['meta']['dt'] = date( 'c', $event['timestamp'] );
+               }
+               $events = [ $event ];
+               return EventBus::serializeEvents( $events );
+       }
+
+       /**
+        * Here, formatArray is implemented to just return the same
+        * event it is given.  Since parent::getLine() calls this,
+        * and we need to augment the $event after it is returned from
+        * parent::getLine, we don't actually want to serialize (AKA format)
+        * the event at this time.  This class' getLine function will
+        * serialize/format the event after it has augmented the
+        * event returned here.
+        */
+       protected function formatArray( array $event ) {
+               return $event;
+       }
+}
diff --git a/README.md b/README.md
index b60312e..9770e2d 100644
--- a/README.md
+++ b/README.md
@@ -8,14 +8,40 @@
 
 ## Configuration
 
-To configure the URL of the event service:
+To configure the URL of the EventBus service:
 
     $wgEventServiceUrl = 'http://localhost:8085/v1/topics';
 
-To configure the event service request timeout:
+To configure the EventBus service request timeout:
 
     $wgEventServiceTimeout = 5;    // 5 second timeout
 
+
+## EventBus RCFeed
+
+This extension also provides an RCFeedEngine and RCFeedFormatter implementation
+That will allow RCFeed configuration to post to the EventBus service in the
+`mediawiki.recentchange` topic.  To use,
+add the following to your `LocalSettings.php`:
+
+```php
+$wgRCFeeds['eventbus'] = array(
+    'formatter' => 'EventBusRCFeedFormatter',
+    'uri'       => 'eventbus://localhost:8085/v1/events',
+);
+$wgRCEngines = array(
+    'eventbus' => 'EventBusRCFeedEngine',
+);
+```
+
+Substitude `uri` with the `$wgEventServiceUrl`, but with  `eventbus://` 
instead of `http://`.
+
+Note that the protocol schema part of the `uri` configured in`$wgRCFeeds` 
starts with
+`eventbus://`.  `$wgRCEngines` config are mapped from protocol schemes.  
However,
+`EventServiceUrl` which is used to configure EventBus configuration expects 
this to be
+a usual `http://` REST endpoint.  `EventBusRCFeedEngine` is aware of this 
discrepency, and
+replaces the `eventbus://` in the `uri` with `http://` when configuring its 
EventBus instance.
+
 ## References
 
   * [Reliable publish / subscribe event 
bus](https://phabricator.wikimedia.org/T84923)
diff --git a/extension.json b/extension.json
index d9b211d..4ece993 100644
--- a/extension.json
+++ b/extension.json
@@ -1,3 +1,4 @@
+
 {
        "name": "EventBus",
        "version": "0.2.12",
@@ -17,7 +18,9 @@
                "EventBus": "EventBus.php",
                "EventBusHooks": "EventBus.hooks.php",
                "EventRelayerBus": "EventRelayerBus.php",
-               "PurgeEventRelayerBus": "PurgeEventRelayerBus.php"
+               "PurgeEventRelayerBus": "PurgeEventRelayerBus.php",
+               "EventBusRCFeedEngine": "EventBusRCFeedEngine.php",
+               "EventBusRCFeedFormatter": "EventBusRCFeedFormatter.php"
        },
        "MessagesDirs": {
                "EventBus": [

-- 
To view, visit https://gerrit.wikimedia.org/r/325589
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I7edc4d57fa0845b5448a28e5b4123d70e63a4a3f
Gerrit-PatchSet: 13
Gerrit-Project: mediawiki/extensions/EventBus
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Alex Monk <[email protected]>
Gerrit-Reviewer: Krinkle <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: Ppchelko <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to