Ejegg has uploaded a new change for review.
https://gerrit.wikimedia.org/r/303081
Change subject: WIP subclass QueueConsumer instead of callbacks
......................................................................
WIP subclass QueueConsumer instead of callbacks
Change-Id: Iae7ed316b84da61f1dc7b0e49aee1c894d4db646
---
R Core/QueueConsumers/QueueConsumer.php
1 file changed, 9 insertions(+), 12 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/SmashPig
refs/changes/81/303081/1
diff --git a/Core/DataStores/QueueConsumer.php
b/Core/QueueConsumers/QueueConsumer.php
similarity index 86%
rename from Core/DataStores/QueueConsumer.php
rename to Core/QueueConsumers/QueueConsumer.php
index a4f6980..8fb64c9 100644
--- a/Core/DataStores/QueueConsumer.php
+++ b/Core/QueueConsumers/QueueConsumer.php
@@ -1,11 +1,12 @@
<?php
-namespace SmashPig\Core\DataStores;
+namespace SmashPig\Core\QueueConsumers;
use Exception;
use InvalidArgumentException;
use PHPQueue\Interfaces\AtomicReadBuffer;
use SmashPig\Core\Context;
+use SmashPig\Core\DataStores\DamagedDatabase;
use SmashPig\Core\Logging\Logger;
/**
@@ -13,7 +14,7 @@
* interface. Exceptions in the processing callback will cause the message to
* be sent to a damaged message datastore.
*/
-class QueueConsumer {
+abstract class BaseQueueConsumer {
/**
* @var AtomicReadBuffer
@@ -36,19 +37,19 @@
protected $messageLimit = 0;
+ abstract function processMessage( $message );
+
/**
* Gets a fresh QueueConsumer
*
* @param string $queueName key of queue configured in data-store, must
* implement @see PHPQueue\Interfaces\AtomicReadBuffer.
- * @param callable $callback processing function taking message array
* @param int $timeLimit max number of seconds to loop, 0 for no limit
* @param int $messageLimit max number of messages to process, 0 for all
* @throws \SmashPig\Core\ConfigurationKeyException
*/
public function __construct(
$queueName,
- $callback,
$timeLimit = 0,
$messageLimit = 0
) {
@@ -58,12 +59,8 @@
if ( !is_numeric( $messageLimit ) ) {
throw new InvalidArgumentException( 'messageLimit must
be numeric' );
}
- if ( !is_callable( $callback ) ) {
- throw new InvalidArgumentException( "Processing
callback must be callable" );
- }
$this->queueName = $queueName;
- $this->callback = $callback;
$this->timeLimit = intval( $timeLimit );
$this->messageLimit = intval( $messageLimit );
@@ -87,7 +84,7 @@
public function dequeueMessages() {
$startTime = time();
$processed = 0;
- $realCallback = array( $this, 'processMessage' );
+ $realCallback = array( $this, 'wrappedCallback' );
do {
$data = $this->backend->popAtomic( $realCallback );
if ( $data !== null ) {
@@ -100,9 +97,9 @@
return $processed;
}
- public function processMessage( $message ) {
+ public function wrappedCallback( $message ) {
try {
- call_user_func( $this->callback, $message );
+ $this->processMessage( $message );
} catch ( Exception $ex ) {
$this->handleError( $message, $ex );
}
@@ -117,7 +114,7 @@
*/
protected function handleError( $message, Exception $ex ) {
Logger::error(
- 'Error processing message, moving to damaged queue.',
+ 'Error processing message, moving to damaged store.',
$message,
$ex
);
--
To view, visit https://gerrit.wikimedia.org/r/303081
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iae7ed316b84da61f1dc7b0e49aee1c894d4db646
Gerrit-PatchSet: 1
Gerrit-Project: wikimedia/fundraising/SmashPig
Gerrit-Branch: master
Gerrit-Owner: Ejegg <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits