jenkins-bot has submitted this change and it was merged.
Change subject: Subclass BaseQueueConsumer instead of callbacks
......................................................................
Subclass BaseQueueConsumer instead of callbacks
BaseQueueConsumer subclasses need to implement processMessage( $message )
TODO: looks like we could have a single all-purpose maintenance script
for all queue consumer jobs that takes a single mandatory argument
specifying a consumer subclass.
Change-Id: Iae7ed316b84da61f1dc7b0e49aee1c894d4db646
---
R Core/QueueConsumers/BaseQueueConsumer.php
A Core/QueueConsumers/JobQueueConsumer.php
A Core/QueueConsumers/PendingQueueConsumer.php
M Maintenance/ConsumePendingQueue.php
M Maintenance/QueueJobRunner.php
M Tests/QueueConsumerTest.php
A Tests/TestingQueueConsumer.php
7 files changed, 170 insertions(+), 103 deletions(-)
Approvals:
Awight: Looks good to me, approved
jenkins-bot: Verified
diff --git a/Core/DataStores/QueueConsumer.php
b/Core/QueueConsumers/BaseQueueConsumer.php
similarity index 72%
rename from Core/DataStores/QueueConsumer.php
rename to Core/QueueConsumers/BaseQueueConsumer.php
index a4f6980..21c2498 100644
--- a/Core/DataStores/QueueConsumer.php
+++ b/Core/QueueConsumers/BaseQueueConsumer.php
@@ -1,11 +1,12 @@
<?php
-namespace SmashPig\Core\DataStores;
+namespace SmashPig\Core\QueueConsumers;
use Exception;
use InvalidArgumentException;
use PHPQueue\Interfaces\AtomicReadBuffer;
use SmashPig\Core\Context;
+use SmashPig\Core\DataStores\DamagedDatabase;
use SmashPig\Core\Logging\Logger;
/**
@@ -13,7 +14,7 @@
* interface. Exceptions in the processing callback will cause the message to
* be sent to a damaged message datastore.
*/
-class QueueConsumer {
+abstract class BaseQueueConsumer {
/**
* @var AtomicReadBuffer
@@ -37,18 +38,24 @@
protected $messageLimit = 0;
/**
+ * Do something with the message popped from the queue. Return value is
+ * ignored, and exceptions will be caught and handled by handleError.
+ *
+ * @param array $message
+ */
+ abstract function processMessage( $message );
+
+ /**
* Gets a fresh QueueConsumer
*
* @param string $queueName key of queue configured in data-store, must
* implement @see PHPQueue\Interfaces\AtomicReadBuffer.
- * @param callable $callback processing function taking message array
* @param int $timeLimit max number of seconds to loop, 0 for no limit
* @param int $messageLimit max number of messages to process, 0 for all
* @throws \SmashPig\Core\ConfigurationKeyException
*/
public function __construct(
$queueName,
- $callback,
$timeLimit = 0,
$messageLimit = 0
) {
@@ -58,12 +65,8 @@
if ( !is_numeric( $messageLimit ) ) {
throw new InvalidArgumentException( 'messageLimit must
be numeric' );
}
- if ( !is_callable( $callback ) ) {
- throw new InvalidArgumentException( "Processing
callback must be callable" );
- }
$this->queueName = $queueName;
- $this->callback = $callback;
$this->timeLimit = intval( $timeLimit );
$this->messageLimit = intval( $messageLimit );
@@ -87,7 +90,7 @@
public function dequeueMessages() {
$startTime = time();
$processed = 0;
- $realCallback = array( $this, 'processMessage' );
+ $realCallback = array( $this, 'processMessageWithErrorHandling'
);
do {
$data = $this->backend->popAtomic( $realCallback );
if ( $data !== null ) {
@@ -100,9 +103,15 @@
return $processed;
}
- public function processMessage( $message ) {
+ /**
+ * Call the concrete processMessage function and handle any errors that
+ * may arise.
+ *
+ * @param array $message
+ */
+ public function processMessageWithErrorHandling( $message ) {
try {
- call_user_func( $this->callback, $message );
+ $this->processMessage( $message );
} catch ( Exception $ex ) {
$this->handleError( $message, $ex );
}
@@ -116,15 +125,28 @@
* @param Exception $ex
*/
protected function handleError( $message, Exception $ex ) {
+ $this->sendToDamagedStore( $message, $ex );
+ }
+
+ /**
+ * @param array $message The data
+ * @param Exception $ex The problem
+ * @param int| null $retryDate If provided, retry after this timestamp
+ * @return int ID of message in damaged database
+ */
+ protected function sendToDamagedStore(
+ $message, Exception $ex, $retryDate = null
+ ) {
Logger::error(
- 'Error processing message, moving to damaged queue.',
+ 'Error processing message, moving to damaged store.',
$message,
$ex
);
- $this->damagedDb->storeMessage(
+ return $this->damagedDb->storeMessage(
$message,
$this->queueName,
- $ex->getMessage() . "\n" . $ex->getTraceAsString()
+ $ex->getMessage() . "\n" . $ex->getTraceAsString(),
+ $retryDate
);
}
diff --git a/Core/QueueConsumers/JobQueueConsumer.php
b/Core/QueueConsumers/JobQueueConsumer.php
new file mode 100644
index 0000000..ecaf5ca
--- /dev/null
+++ b/Core/QueueConsumers/JobQueueConsumer.php
@@ -0,0 +1,52 @@
+<?php namespace SmashPig\Core\QueueConsumers;
+
+use RuntimeException;
+use SmashPig\Core\DataStores\KeyedOpaqueStorableObject;
+use SmashPig\Core\Jobs\RunnableJob;
+
+class JobQueueConsumer extends BaseQueueConsumer {
+
+ protected $successCount = 0;
+
+ /**
+ * Instantiates and runs a job defined by a queue message. Depends on
+ * the base consumer's damaged message store functionality to either
+ * divert messages or stop execution on bad message or job failure.
+ * @param array $jobMessage
+ * @throws \SmashPig\Core\DataStores\DataSerializationException
+ */
+ function processMessage( $jobMessage ) {
+ if ( !isset( $jobMessage['php-message-class'] ) ) {
+ throw new RuntimeException(
+ 'Job message missing required key
\'php-message-class\''
+ );
+ }
+
+ $className = $jobMessage['php-message-class'];
+ $jsonMessage = json_encode( $jobMessage );
+ $jobObj = KeyedOpaqueStorableObject::fromJsonProxy( $className,
$jsonMessage );
+
+ if ( !( $jobObj instanceof RunnableJob ) ) {
+ throw new RuntimeException(
+ get_class( $jobObj ) . ' is not an instance of
RunnableJob. '
+ . 'Could not execute and sending to damaged
message store.'
+ );
+ }
+
+ if ( !$jobObj->execute() ) {
+ throw new RuntimeException(
+ 'Job tells us that it did not successfully
execute. '
+ . 'Sending to damaged message store.'
+ );
+ }
+
+ $this->successCount += 1;
+ }
+
+ /**
+ * @return int
+ */
+ public function getSuccessCount() {
+ return $this->successCount;
+ }
+}
diff --git a/Core/QueueConsumers/PendingQueueConsumer.php
b/Core/QueueConsumers/PendingQueueConsumer.php
new file mode 100644
index 0000000..40655b8
--- /dev/null
+++ b/Core/QueueConsumers/PendingQueueConsumer.php
@@ -0,0 +1,17 @@
+<?php namespace SmashPig\Core\QueueConsumers;
+
+use SmashPig\Core\DataStores\PendingDatabase;
+
+class PendingQueueConsumer extends BaseQueueConsumer {
+
+ protected $pendingDatabase;
+
+ public function __construct( $queueName, $timeLimit, $messageLimit ) {
+ parent::__construct( $queueName, $timeLimit, $messageLimit );
+ $this->pendingDatabase = PendingDatabase::get();
+ }
+
+ public function processMessage( $message ) {
+ $this->pendingDatabase->storeMessage( $message );
+ }
+}
diff --git a/Maintenance/ConsumePendingQueue.php
b/Maintenance/ConsumePendingQueue.php
index 22300b2..92acc5a 100644
--- a/Maintenance/ConsumePendingQueue.php
+++ b/Maintenance/ConsumePendingQueue.php
@@ -3,11 +3,9 @@
require ( 'MaintenanceBase.php' );
-use \PDO;
-
use SmashPig\Core\Logging\Logger;
use SmashPig\Core\DataStores\PendingDatabase;
-use SmashPig\Core\DataStores\QueueConsumer;
+use SmashPig\Core\QueueConsumers\PendingQueueConsumer;
$maintClass = '\SmashPig\Maintenance\ConsumePendingQueue';
@@ -32,12 +30,10 @@
* Do the actual work of the script.
*/
public function execute() {
- $this->pendingDatabase = PendingDatabase::get();
$basePath = 'maintenance/consume-pending/';
- $consumer = new QueueConsumer(
+ $consumer = new PendingQueueConsumer(
$this->getOption( 'queue' ),
- array( $this->pendingDatabase, 'storeMessage' ),
$this->getOptionOrConfig( 'time-limit', $basePath .
'time-limit' ),
$this->getOptionOrConfig( 'max-messages', $basePath .
'message-limit' )
);
diff --git a/Maintenance/QueueJobRunner.php b/Maintenance/QueueJobRunner.php
index b01265c..bb2ab8d 100644
--- a/Maintenance/QueueJobRunner.php
+++ b/Maintenance/QueueJobRunner.php
@@ -2,12 +2,9 @@
require ( 'MaintenanceBase.php' );
-use RuntimeException;
-
-use SmashPig\Core\DataStores\KeyedOpaqueStorableObject;
-use SmashPig\Core\Jobs\RunnableJob;
use SmashPig\Core\Logging\Logger;
use SmashPig\Core\DataStores\QueueConsumer;
+use SmashPig\Core\QueueConsumers\JobQueueConsumer;
$maintClass = '\SmashPig\Maintenance\QueueJobRunner';
@@ -17,8 +14,6 @@
* after it completes no new jobs will be dispatched and this script will exit.
*/
class QueueJobRunner extends MaintenanceBase {
-
- protected $successCount = 0;
public function __construct() {
parent::__construct();
@@ -36,9 +31,8 @@
// Get some defaults from configuration
$basePath = 'maintenance/job-runner/';
- $consumer = new QueueConsumer(
+ $consumer = new JobQueueConsumer(
$this->getOption( 'queue' ),
- array( $this, 'runJob' ),
$this->getOptionOrConfig( 'time-limit', $basePath .
'time-limit' ),
$this->getOptionOrConfig( 'max-messages', $basePath .
'message-limit' )
);
@@ -46,45 +40,11 @@
$startTime = time();
$messageCount = $consumer->dequeueMessages();
+ $successCount = $consumer->getSuccessCount();
$elapsedTime = time() - $startTime;
Logger::info(
- "Processed $messageCount ($this->successCount
successful) jobs in $elapsedTime seconds."
+ "Processed $messageCount ($successCount successful)
jobs in $elapsedTime seconds."
);
- }
-
- /**
- * Instantiates and runs a job defined by a queue message. Depends on
- * the queue consumer's damaged message store functionality to either
- * divert messages or stop execution on bad message or job failure.
- * @param array $jobMessage
- * @throws \SmashPig\Core\DataStores\DataSerializationException
- */
- public function runJob( $jobMessage ) {
- if ( !isset( $jobMessage['php-message-class'] ) ) {
- throw new RuntimeException(
- 'Job message missing required key
\'php-message-class\''
- );
- }
-
- $className = $jobMessage['php-message-class'];
- $jsonMessage = json_encode( $jobMessage );
- $jobObj = KeyedOpaqueStorableObject::fromJsonProxy( $className,
$jsonMessage );
-
- if ( !( $jobObj instanceof RunnableJob ) ) {
- throw new RuntimeException(
- get_class( $jobObj ) . ' is not an instance of
RunnableJob. '
- . 'Could not execute and sending to damaged
message store.'
- );
- }
-
- if ( !$jobObj->execute() ) {
- throw new RuntimeException(
- 'Job tells us that it did not successfully
execute. '
- . 'Sending to damaged message store.'
- );
- }
-
- $this->successCount += 1;
}
}
diff --git a/Tests/QueueConsumerTest.php b/Tests/QueueConsumerTest.php
index a26a8ef..43993d7 100644
--- a/Tests/QueueConsumerTest.php
+++ b/Tests/QueueConsumerTest.php
@@ -2,10 +2,11 @@
namespace SmashPig\Tests;
+use Exception;
use PDO;
use PHPQueue\Interfaces\FifoQueueStore;
use SmashPig\Core\DataStores\DamagedDatabase;
-use SmashPig\Core\DataStores\QueueConsumer;
+use SmashPig\Core\QueueConsumers\BaseQueueConsumer;
class QueueConsumerTest extends BaseSmashPigUnitTestCase {
@@ -21,7 +22,7 @@
public function setUp() {
parent::setUp();
$this->setConfig( 'default', __DIR__ .
'/data/config_queue.yaml' );
- $this->queue = QueueConsumer::getQueue( 'test' );
+ $this->queue = BaseQueueConsumer::getQueue( 'test' );
$this->queue->createTable( 'test' );
$this->damaged = DamagedDatabase::get()->getDatabase();
@@ -33,18 +34,13 @@
}
public function testEmptyQueue() {
- $noOp = function( $unused ) {};
- $consumer = new QueueConsumer( 'test', $noOp );
+ $consumer = new TestingQueueConsumer( 'test' );
$count = $consumer->dequeueMessages();
$this->assertEquals( 0, $count, 'Should report 0 messages
processed' );
}
public function testOneMessage() {
- $processed = array();
- $cb = function( $message ) use ( &$processed ) {
- $processed[] = $message;
- };
- $consumer = new QueueConsumer( 'test', $cb );
+ $consumer = new TestingQueueConsumer( 'test' );
$payload = array(
'wednesday' => 'addams',
'spookiness' => mt_rand(),
@@ -52,7 +48,7 @@
$this->queue->push( $payload );
$count = $consumer->dequeueMessages();
$this->assertEquals( 1, $count, 'Should report 1 message
processed' );
- $this->assertEquals( array( $payload ), $processed, 'Bad
message' );
+ $this->assertEquals( array( $payload ), $consumer->processed,
'Bad message' );
$this->assertNull( $this->queue->pop(),
'Should delete message when processing is successful'
);
@@ -66,25 +62,23 @@
'cousin' => 'itt',
'kookiness' => mt_rand(),
);
- $self = $this;
- $ran = false;
- $cb = function( $message ) use ( &$ran, $payload, $self ) {
- $self->assertEquals( $message, $payload );
- $ran = true;
- throw new \Exception( 'kaboom!' );
- };
- $consumer = new QueueConsumer( 'test', $cb, 0, 0 );
+ $consumer = new TestingQueueConsumer( 'test' );
+ $consumer->exception = new Exception( 'kaboom!' );
$this->queue->push( $payload );
try {
$consumer->dequeueMessages();
- } catch ( \Exception $ex ) {
+ } catch ( Exception $ex ) {
$this->fail(
'Exception should not have bubbled up: ' .
$ex->getMessage()
);
}
- $this->assertTrue( $ran, 'Callback was not called' );
+ $this->assertEquals(
+ array( $payload ),
+ $consumer->processed,
+ 'Processing snafu'
+ );
$damaged = $this->getDamagedQueueMessage( $payload );
$this->assertEquals(
@@ -111,23 +105,29 @@
$messages[] = $message;
$this->queue->push( $message );
}
- $processedMessages = array();
- $callback = function( $message ) use ( &$processedMessages ) {
- $processedMessages[] = $message;
- };
// Should work when you pass in the limits as strings.
- $consumer = new QueueConsumer( 'test', $callback, 0, '3' );
+ $consumer = new TestingQueueConsumer( 'test', 0, '3' );
$count = $consumer->dequeueMessages();
- $this->assertEquals( 3, $count, 'dequeueMessages returned wrong
count' );
- $this->assertEquals( 3, count( $processedMessages ), 'Called
callback wrong number of times' );
+ $this->assertEquals(
+ 3, $count, 'dequeueMessages returned wrong count'
+ );
+ $this->assertEquals(
+ 3,
+ count( $consumer->processed ),
+ 'Called callback wrong number of times'
+ );
for ( $i = 0; $i < 3; $i++ ) {
- $this->assertEquals( $messages[$i],
$processedMessages[$i], 'Message mutated' );
+ $this->assertEquals(
+ $messages[$i],
+ $consumer->processed[$i],
+ 'Message mutated'
+ );
}
$this->assertEquals(
$messages[3],
$this->queue->pop(),
- 'Messed with too many messages'
+ 'Dequeued too many messages'
);
}
@@ -144,26 +144,30 @@
$messages[] = $message;
$this->queue->push( $message );
}
- $processedMessages = array();
- $cb = function( $message ) use ( &$processedMessages ) {
- $processedMessages[] = $message;
- throw new \Exception( 'kaboom!' );
- };
- $consumer = new QueueConsumer( 'test', $cb, 0, 3 );
+ $consumer = new TestingQueueConsumer( 'test', 0, 3 );
+ $consumer->exception = new Exception( 'Kaboom!' );
$count = 0;
try {
$count = $consumer->dequeueMessages();
- } catch ( \Exception $ex ) {
+ } catch ( Exception $ex ) {
$this->fail(
'Exception should not have bubbled up: ' .
$ex->getMessage()
);
}
- $this->assertEquals( 3, $count, 'dequeueMessages returned wrong
count' );
- $this->assertEquals( 3, count( $processedMessages ), 'Called
callback wrong number of times' );
+ $this->assertEquals(
+ 3, $count, 'dequeueMessages returned wrong count'
+ );
+ $this->assertEquals(
+ 3,
+ count( $consumer->processed ),
+ 'Called callback wrong number of times'
+ );
for ( $i = 0; $i < 3; $i++ ) {
- $this->assertEquals( $messages[$i],
$processedMessages[$i], 'Message mutated' );
+ $this->assertEquals(
+ $messages[$i], $consumer->processed[$i],
'Message mutated'
+ );
$damaged = $this->getDamagedQueueMessage( $messages[$i]
);
$this->assertEquals(
$messages[$i],
diff --git a/Tests/TestingQueueConsumer.php b/Tests/TestingQueueConsumer.php
new file mode 100644
index 0000000..351edd9
--- /dev/null
+++ b/Tests/TestingQueueConsumer.php
@@ -0,0 +1,16 @@
+<?php namespace SmashPig\Tests;
+
+use SmashPig\Core\QueueConsumers\BaseQueueConsumer;
+
+class TestingQueueConsumer extends BaseQueueConsumer {
+
+ public $exception;
+ public $processed = array();
+
+ function processMessage( $message ) {
+ $this->processed[] = $message;
+ if ( $this->exception ) {
+ throw $this->exception;
+ }
+ }
+}
--
To view, visit https://gerrit.wikimedia.org/r/303081
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Iae7ed316b84da61f1dc7b0e49aee1c894d4db646
Gerrit-PatchSet: 8
Gerrit-Project: wikimedia/fundraising/SmashPig
Gerrit-Branch: master
Gerrit-Owner: Ejegg <[email protected]>
Gerrit-Reviewer: Awight <[email protected]>
Gerrit-Reviewer: Cdentinger <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits