jenkins-bot has submitted this change and it was merged.

Change subject: Subclass BaseQueueConsumer instead of callbacks
......................................................................


Subclass BaseQueueConsumer instead of callbacks

BaseQueueConsumer subclasses need to implement processMessage( $message )

TODO: looks like we could have a single all-purpose maintenance script
for all queue consumer jobs that takes a single mandatory argument
specifying a consumer subclass.

Change-Id: Iae7ed316b84da61f1dc7b0e49aee1c894d4db646
---
R Core/QueueConsumers/BaseQueueConsumer.php
A Core/QueueConsumers/JobQueueConsumer.php
A Core/QueueConsumers/PendingQueueConsumer.php
M Maintenance/ConsumePendingQueue.php
M Maintenance/QueueJobRunner.php
M Tests/QueueConsumerTest.php
A Tests/TestingQueueConsumer.php
7 files changed, 170 insertions(+), 103 deletions(-)

Approvals:
  Awight: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/Core/DataStores/QueueConsumer.php 
b/Core/QueueConsumers/BaseQueueConsumer.php
similarity index 72%
rename from Core/DataStores/QueueConsumer.php
rename to Core/QueueConsumers/BaseQueueConsumer.php
index a4f6980..21c2498 100644
--- a/Core/DataStores/QueueConsumer.php
+++ b/Core/QueueConsumers/BaseQueueConsumer.php
@@ -1,11 +1,12 @@
 <?php
-namespace SmashPig\Core\DataStores;
+namespace SmashPig\Core\QueueConsumers;
 
 use Exception;
 use InvalidArgumentException;
 use PHPQueue\Interfaces\AtomicReadBuffer;
 
 use SmashPig\Core\Context;
+use SmashPig\Core\DataStores\DamagedDatabase;
 use SmashPig\Core\Logging\Logger;
 
 /**
@@ -13,7 +14,7 @@
  * interface. Exceptions in the processing callback will cause the message to
  * be sent to a damaged message datastore.
  */
-class QueueConsumer {
+abstract class BaseQueueConsumer {
 
        /**
         * @var AtomicReadBuffer
@@ -37,18 +38,24 @@
        protected $messageLimit = 0;
 
        /**
+        * Do something with the message popped from the queue. Return value is
+        * ignored, and exceptions will be caught and handled by handleError.
+        *
+        * @param array $message
+        */
+       abstract function processMessage( $message );
+
+       /**
         * Gets a fresh QueueConsumer
         *
         * @param string $queueName key of queue configured in data-store, must
         *  implement @see PHPQueue\Interfaces\AtomicReadBuffer.
-        * @param callable $callback processing function taking message array
         * @param int $timeLimit max number of seconds to loop, 0 for no limit
         * @param int $messageLimit max number of messages to process, 0 for all
         * @throws \SmashPig\Core\ConfigurationKeyException
         */
        public function __construct(
                $queueName,
-               $callback,
                $timeLimit = 0,
                $messageLimit = 0
        ) {
@@ -58,12 +65,8 @@
                if ( !is_numeric( $messageLimit ) ) {
                        throw new InvalidArgumentException( 'messageLimit must 
be numeric' );
                }
-               if ( !is_callable( $callback ) ) {
-                       throw new InvalidArgumentException( "Processing 
callback must be callable" );
-               }
 
                $this->queueName = $queueName;
-               $this->callback = $callback;
                $this->timeLimit = intval( $timeLimit );
                $this->messageLimit = intval( $messageLimit );
 
@@ -87,7 +90,7 @@
        public function dequeueMessages() {
                $startTime = time();
                $processed = 0;
-               $realCallback = array( $this, 'processMessage' );
+               $realCallback = array( $this, 'processMessageWithErrorHandling' 
);
                do {
                        $data = $this->backend->popAtomic( $realCallback );
                        if ( $data !== null ) {
@@ -100,9 +103,15 @@
                return $processed;
        }
 
-       public function processMessage( $message ) {
+       /**
+        * Call the concrete processMessage function and handle any errors that
+        * may arise.
+        *
+        * @param array $message
+        */
+       public function processMessageWithErrorHandling( $message ) {
                try {
-                       call_user_func( $this->callback, $message );
+                       $this->processMessage( $message );
                } catch ( Exception $ex ) {
                        $this->handleError( $message, $ex );
                }
@@ -116,15 +125,28 @@
         * @param Exception $ex
         */
        protected function handleError( $message, Exception $ex ) {
+               $this->sendToDamagedStore( $message, $ex );
+       }
+
+       /**
+        * @param array $message The data
+        * @param Exception $ex The problem
+        * @param int| null $retryDate If provided, retry after this timestamp
+        * @return int ID of message in damaged database
+        */
+       protected function sendToDamagedStore(
+               $message, Exception $ex, $retryDate = null
+       ) {
                Logger::error(
-                       'Error processing message, moving to damaged queue.',
+                       'Error processing message, moving to damaged store.',
                        $message,
                        $ex
                );
-               $this->damagedDb->storeMessage(
+               return $this->damagedDb->storeMessage(
                        $message,
                        $this->queueName,
-                       $ex->getMessage() . "\n" . $ex->getTraceAsString()
+                       $ex->getMessage() . "\n" . $ex->getTraceAsString(),
+                       $retryDate
                );
        }
 
diff --git a/Core/QueueConsumers/JobQueueConsumer.php 
b/Core/QueueConsumers/JobQueueConsumer.php
new file mode 100644
index 0000000..ecaf5ca
--- /dev/null
+++ b/Core/QueueConsumers/JobQueueConsumer.php
@@ -0,0 +1,52 @@
+<?php namespace SmashPig\Core\QueueConsumers;
+
+use RuntimeException;
+use SmashPig\Core\DataStores\KeyedOpaqueStorableObject;
+use SmashPig\Core\Jobs\RunnableJob;
+
+class JobQueueConsumer extends BaseQueueConsumer {
+
+       protected $successCount = 0;
+
+       /**
+        * Instantiates and runs a job defined by a queue message. Depends on
+        * the base consumer's damaged message store functionality to either
+        * divert messages or stop execution on bad message or job failure.
+        * @param array $jobMessage
+        * @throws \SmashPig\Core\DataStores\DataSerializationException
+        */
+       function processMessage( $jobMessage ) {
+               if ( !isset( $jobMessage['php-message-class'] ) ) {
+                       throw new RuntimeException(
+                               'Job message missing required key 
\'php-message-class\''
+                       );
+               }
+
+               $className = $jobMessage['php-message-class'];
+               $jsonMessage = json_encode( $jobMessage );
+               $jobObj = KeyedOpaqueStorableObject::fromJsonProxy( $className, 
$jsonMessage );
+
+               if ( !( $jobObj instanceof RunnableJob ) ) {
+                       throw new RuntimeException(
+                               get_class( $jobObj ) . ' is not an instance of 
RunnableJob. '
+                               . 'Could not execute and sending to damaged 
message store.'
+                       );
+               }
+
+               if ( !$jobObj->execute() ) {
+                       throw new RuntimeException(
+                               'Job tells us that it did not successfully 
execute. '
+                               . 'Sending to damaged message store.'
+                       );
+               }
+
+               $this->successCount += 1;
+       }
+
+       /**
+        * @return int
+        */
+       public function getSuccessCount() {
+               return $this->successCount;
+       }
+}
diff --git a/Core/QueueConsumers/PendingQueueConsumer.php 
b/Core/QueueConsumers/PendingQueueConsumer.php
new file mode 100644
index 0000000..40655b8
--- /dev/null
+++ b/Core/QueueConsumers/PendingQueueConsumer.php
@@ -0,0 +1,17 @@
+<?php namespace SmashPig\Core\QueueConsumers;
+
+use SmashPig\Core\DataStores\PendingDatabase;
+
+class PendingQueueConsumer extends BaseQueueConsumer {
+
+       protected $pendingDatabase;
+
+       public function __construct( $queueName, $timeLimit, $messageLimit ) {
+               parent::__construct( $queueName, $timeLimit, $messageLimit );
+               $this->pendingDatabase = PendingDatabase::get();
+       }
+
+       public function processMessage( $message ) {
+               $this->pendingDatabase->storeMessage( $message );
+       }
+}
diff --git a/Maintenance/ConsumePendingQueue.php 
b/Maintenance/ConsumePendingQueue.php
index 22300b2..92acc5a 100644
--- a/Maintenance/ConsumePendingQueue.php
+++ b/Maintenance/ConsumePendingQueue.php
@@ -3,11 +3,9 @@
 
 require ( 'MaintenanceBase.php' );
 
-use \PDO;
-
 use SmashPig\Core\Logging\Logger;
 use SmashPig\Core\DataStores\PendingDatabase;
-use SmashPig\Core\DataStores\QueueConsumer;
+use SmashPig\Core\QueueConsumers\PendingQueueConsumer;
 
 $maintClass = '\SmashPig\Maintenance\ConsumePendingQueue';
 
@@ -32,12 +30,10 @@
         * Do the actual work of the script.
         */
        public function execute() {
-               $this->pendingDatabase = PendingDatabase::get();
 
                $basePath = 'maintenance/consume-pending/';
-               $consumer = new QueueConsumer(
+               $consumer = new PendingQueueConsumer(
                        $this->getOption( 'queue' ),
-                       array( $this->pendingDatabase, 'storeMessage' ),
                        $this->getOptionOrConfig( 'time-limit', $basePath . 
'time-limit' ),
                        $this->getOptionOrConfig( 'max-messages', $basePath . 
'message-limit' )
                );
diff --git a/Maintenance/QueueJobRunner.php b/Maintenance/QueueJobRunner.php
index b01265c..bb2ab8d 100644
--- a/Maintenance/QueueJobRunner.php
+++ b/Maintenance/QueueJobRunner.php
@@ -2,12 +2,9 @@
 
 require ( 'MaintenanceBase.php' );
 
-use RuntimeException;
-
-use SmashPig\Core\DataStores\KeyedOpaqueStorableObject;
-use SmashPig\Core\Jobs\RunnableJob;
 use SmashPig\Core\Logging\Logger;
 use SmashPig\Core\DataStores\QueueConsumer;
+use SmashPig\Core\QueueConsumers\JobQueueConsumer;
 
 $maintClass = '\SmashPig\Maintenance\QueueJobRunner';
 
@@ -17,8 +14,6 @@
  * after it completes no new jobs will be dispatched and this script will exit.
  */
 class QueueJobRunner extends MaintenanceBase {
-
-       protected $successCount = 0;
 
        public function __construct() {
                parent::__construct();
@@ -36,9 +31,8 @@
                // Get some defaults from configuration
                $basePath = 'maintenance/job-runner/';
 
-               $consumer = new QueueConsumer(
+               $consumer = new JobQueueConsumer(
                        $this->getOption( 'queue' ),
-                       array( $this, 'runJob' ),
                        $this->getOptionOrConfig( 'time-limit', $basePath . 
'time-limit' ),
                        $this->getOptionOrConfig( 'max-messages', $basePath . 
'message-limit' )
                );
@@ -46,45 +40,11 @@
                $startTime = time();
                $messageCount = $consumer->dequeueMessages();
 
+               $successCount = $consumer->getSuccessCount();
                $elapsedTime = time() - $startTime;
                Logger::info(
-                       "Processed $messageCount ($this->successCount 
successful) jobs in $elapsedTime seconds."
+                       "Processed $messageCount ($successCount successful) 
jobs in $elapsedTime seconds."
                );
-       }
-
-       /**
-        * Instantiates and runs a job defined by a queue message. Depends on
-        * the queue consumer's damaged message store functionality to either
-        * divert messages or stop execution on bad message or job failure.
-        * @param array $jobMessage
-        * @throws \SmashPig\Core\DataStores\DataSerializationException
-        */
-       public function runJob( $jobMessage ) {
-               if ( !isset( $jobMessage['php-message-class'] ) ) {
-                       throw new RuntimeException(
-                               'Job message missing required key 
\'php-message-class\''
-                       );
-               }
-
-               $className = $jobMessage['php-message-class'];
-               $jsonMessage = json_encode( $jobMessage );
-               $jobObj = KeyedOpaqueStorableObject::fromJsonProxy( $className, 
$jsonMessage );
-
-               if ( !( $jobObj instanceof RunnableJob ) ) {
-                       throw new RuntimeException(
-                               get_class( $jobObj ) . ' is not an instance of 
RunnableJob. '
-                               . 'Could not execute and sending to damaged 
message store.'
-                       );
-               }
-
-               if ( !$jobObj->execute() ) {
-                       throw new RuntimeException(
-                               'Job tells us that it did not successfully 
execute. '
-                               . 'Sending to damaged message store.'
-                       );
-               }
-
-               $this->successCount += 1;
        }
 
 }
diff --git a/Tests/QueueConsumerTest.php b/Tests/QueueConsumerTest.php
index a26a8ef..43993d7 100644
--- a/Tests/QueueConsumerTest.php
+++ b/Tests/QueueConsumerTest.php
@@ -2,10 +2,11 @@
 
 namespace SmashPig\Tests;
 
+use Exception;
 use PDO;
 use PHPQueue\Interfaces\FifoQueueStore;
 use SmashPig\Core\DataStores\DamagedDatabase;
-use SmashPig\Core\DataStores\QueueConsumer;
+use SmashPig\Core\QueueConsumers\BaseQueueConsumer;
 
 class QueueConsumerTest extends BaseSmashPigUnitTestCase {
 
@@ -21,7 +22,7 @@
        public function setUp() {
                parent::setUp();
                $this->setConfig( 'default', __DIR__ . 
'/data/config_queue.yaml' );
-               $this->queue = QueueConsumer::getQueue( 'test' );
+               $this->queue = BaseQueueConsumer::getQueue( 'test' );
                $this->queue->createTable( 'test' );
                $this->damaged = DamagedDatabase::get()->getDatabase();
 
@@ -33,18 +34,13 @@
        }
 
        public function testEmptyQueue() {
-               $noOp = function( $unused ) {};
-               $consumer = new QueueConsumer( 'test', $noOp );
+               $consumer = new TestingQueueConsumer( 'test' );
                $count = $consumer->dequeueMessages();
                $this->assertEquals( 0, $count, 'Should report 0 messages 
processed' );
        }
 
        public function testOneMessage() {
-               $processed = array();
-               $cb = function( $message ) use ( &$processed ) {
-                       $processed[] = $message;
-               };
-               $consumer = new QueueConsumer( 'test', $cb );
+               $consumer = new TestingQueueConsumer( 'test' );
                $payload = array(
                        'wednesday' => 'addams',
                        'spookiness' => mt_rand(),
@@ -52,7 +48,7 @@
                $this->queue->push( $payload );
                $count = $consumer->dequeueMessages();
                $this->assertEquals( 1, $count, 'Should report 1 message 
processed' );
-               $this->assertEquals( array( $payload ), $processed, 'Bad 
message' );
+               $this->assertEquals( array( $payload ), $consumer->processed, 
'Bad message' );
                $this->assertNull( $this->queue->pop(),
                        'Should delete message when processing is successful'
                );
@@ -66,25 +62,23 @@
                        'cousin' => 'itt',
                        'kookiness' => mt_rand(),
                );
-               $self = $this;
-               $ran = false;
-               $cb = function( $message ) use ( &$ran, $payload, $self ) {
-                       $self->assertEquals( $message, $payload );
-                       $ran = true;
-                       throw new \Exception( 'kaboom!' );
-               };
 
-               $consumer = new QueueConsumer( 'test', $cb, 0, 0 );
+               $consumer = new TestingQueueConsumer( 'test' );
+               $consumer->exception = new Exception( 'kaboom!' );
 
                $this->queue->push( $payload );
                try {
                        $consumer->dequeueMessages();
-               } catch ( \Exception $ex ) {
+               } catch ( Exception $ex ) {
                        $this->fail(
                                'Exception should not have bubbled up: ' . 
$ex->getMessage()
                        );
                }
-               $this->assertTrue( $ran, 'Callback was not called' );
+               $this->assertEquals(
+                       array( $payload ),
+                       $consumer->processed,
+                       'Processing snafu'
+               );
 
                $damaged = $this->getDamagedQueueMessage( $payload );
                $this->assertEquals(
@@ -111,23 +105,29 @@
                        $messages[] = $message;
                        $this->queue->push( $message );
                }
-               $processedMessages = array();
-               $callback = function( $message ) use ( &$processedMessages ) {
-                       $processedMessages[] = $message;
-               };
                // Should work when you pass in the limits as strings.
-               $consumer = new QueueConsumer( 'test', $callback, 0, '3' );
+               $consumer = new TestingQueueConsumer( 'test', 0, '3' );
                $count = $consumer->dequeueMessages();
-               $this->assertEquals( 3, $count, 'dequeueMessages returned wrong 
count' );
-               $this->assertEquals( 3, count( $processedMessages ), 'Called 
callback wrong number of times' );
+               $this->assertEquals(
+                       3, $count, 'dequeueMessages returned wrong count'
+               );
+               $this->assertEquals(
+                       3,
+                       count( $consumer->processed ),
+                       'Called callback wrong number of times'
+               );
 
                for ( $i = 0; $i < 3; $i++ ) {
-                       $this->assertEquals( $messages[$i], 
$processedMessages[$i], 'Message mutated' );
+                       $this->assertEquals(
+                               $messages[$i],
+                               $consumer->processed[$i],
+                               'Message mutated'
+                       );
                }
                $this->assertEquals(
                        $messages[3],
                        $this->queue->pop(),
-                       'Messed with too many messages'
+                       'Dequeued too many messages'
                );
        }
 
@@ -144,26 +144,30 @@
                        $messages[] = $message;
                        $this->queue->push( $message );
                }
-               $processedMessages = array();
-               $cb = function( $message ) use ( &$processedMessages ) {
-                       $processedMessages[] = $message;
-                       throw new \Exception( 'kaboom!' );
-               };
 
-               $consumer = new QueueConsumer( 'test', $cb, 0, 3 );
+               $consumer = new TestingQueueConsumer( 'test', 0, 3 );
+               $consumer->exception = new Exception( 'Kaboom!' );
                $count = 0;
                try {
                        $count = $consumer->dequeueMessages();
-               } catch ( \Exception $ex ) {
+               } catch ( Exception $ex ) {
                        $this->fail(
                                'Exception should not have bubbled up: ' . 
$ex->getMessage()
                        );
                }
-               $this->assertEquals( 3, $count, 'dequeueMessages returned wrong 
count' );
-               $this->assertEquals( 3, count( $processedMessages ), 'Called 
callback wrong number of times' );
+               $this->assertEquals(
+                       3, $count, 'dequeueMessages returned wrong count'
+               );
+               $this->assertEquals(
+                       3,
+                       count( $consumer->processed ),
+                       'Called callback wrong number of times'
+               );
 
                for ( $i = 0; $i < 3; $i++ ) {
-                       $this->assertEquals( $messages[$i], 
$processedMessages[$i], 'Message mutated' );
+                       $this->assertEquals(
+                               $messages[$i], $consumer->processed[$i], 
'Message mutated'
+                       );
                        $damaged = $this->getDamagedQueueMessage( $messages[$i] 
);
                        $this->assertEquals(
                                $messages[$i],
diff --git a/Tests/TestingQueueConsumer.php b/Tests/TestingQueueConsumer.php
new file mode 100644
index 0000000..351edd9
--- /dev/null
+++ b/Tests/TestingQueueConsumer.php
@@ -0,0 +1,16 @@
+<?php namespace SmashPig\Tests;
+
+use SmashPig\Core\QueueConsumers\BaseQueueConsumer;
+
+class TestingQueueConsumer extends BaseQueueConsumer {
+
+       public $exception;
+       public $processed = array();
+
+       function processMessage( $message ) {
+               $this->processed[] = $message;
+               if ( $this->exception ) {
+                       throw $this->exception;
+               }
+       }
+}

-- 
To view, visit https://gerrit.wikimedia.org/r/303081
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Iae7ed316b84da61f1dc7b0e49aee1c894d4db646
Gerrit-PatchSet: 8
Gerrit-Project: wikimedia/fundraising/SmashPig
Gerrit-Branch: master
Gerrit-Owner: Ejegg <[email protected]>
Gerrit-Reviewer: Awight <[email protected]>
Gerrit-Reviewer: Cdentinger <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to