jenkins-bot has submitted this change and it was merged.

Change subject: Fix time and message limits
......................................................................


Fix time and message limits

Numeric coersion fixes strict comparison failures in loop condition,
also fix an off-by-one message limit error.

Bug: T133965
Change-Id: I7034e0deaf8421e238791a4da292e6c139fdef28
---
M Core/DataStores/QueueConsumer.php
M Tests/QueueConsumerTest.php
2 files changed, 90 insertions(+), 7 deletions(-)

Approvals:
  Awight: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/Core/DataStores/QueueConsumer.php 
b/Core/DataStores/QueueConsumer.php
index 02cb5a3..95fcb9f 100644
--- a/Core/DataStores/QueueConsumer.php
+++ b/Core/DataStores/QueueConsumer.php
@@ -54,18 +54,24 @@
                $messageLimit = 0,
                $damagedQueue = null
        ) {
+               if ( !is_numeric( $timeLimit ) ) {
+                       throw new InvalidArgumentException( 'timeLimit must be 
numeric' );
+               }
+               if ( !is_numeric( $messageLimit ) ) {
+                       throw new InvalidArgumentException( 'messageLimit must 
be numeric' );
+               }
+               if ( !is_callable( $callback ) ) {
+                       throw new InvalidArgumentException( "Processing 
callback must be callable" );
+               }
+
                $this->callback = $callback;
-               $this->timeLimit = $timeLimit;
-               $this->messageLimit = $messageLimit;
+               $this->timeLimit = intval( $timeLimit );
+               $this->messageLimit = intval( $messageLimit );
 
                $this->backend = self::getQueue( $queueName );
 
                if ( !$this->backend instanceof AtomicReadBuffer ) {
                        throw new InvalidArgumentException( "Queue $queueName 
is not an AtomicReadBuffer" );
-               }
-
-               if ( !is_callable( $callback ) ) {
-                       throw new InvalidArgumentException( "Processing 
callback is not callable" );
                }
 
                if ( $damagedQueue ) {
@@ -105,7 +111,7 @@
                                $processed++;
                        }
                        $timeOk = $this->timeLimit === 0 || time() <= 
$startTime + $this->timeLimit;
-                       $countOk = $this->messageLimit === 0 || $processed <= 
$this->messageLimit;
+                       $countOk = $this->messageLimit === 0 || $processed < 
$this->messageLimit;
                }
                while( $timeOk && $countOk && $data !== null );
                return $processed;
diff --git a/Tests/QueueConsumerTest.php b/Tests/QueueConsumerTest.php
index 4dec67e..a8a0988 100644
--- a/Tests/QueueConsumerTest.php
+++ b/Tests/QueueConsumerTest.php
@@ -111,4 +111,81 @@
                        'Should delete message on exception when damaged queue 
exists'
                );
        }
+
+       public function testMessageLimit() {
+               $messages = array();
+               for ( $i = 0; $i < 5; $i++ ) {
+                       $message = array(
+                               'box' => 'thing' . $i,
+                               'creepiness' => mt_rand(),
+                       );
+                       $messages[] = $message;
+                       $this->queue->push( $message );
+               }
+               $processedMessages = array();
+               $callback = function( $message ) use ( &$processedMessages ) {
+                       $processedMessages[] = $message;
+               };
+               // Should work when you pass in the limits as strings.
+               $consumer = new QueueConsumer( 'test', $callback, 0, '3' );
+               $count = $consumer->dequeueMessages();
+               $this->assertEquals( 3, $count, 'dequeueMessages returned wrong 
count' );
+               $this->assertEquals( 3, count( $processedMessages ), 'Called 
callback wrong number of times' );
+
+               for ( $i = 0; $i < 3; $i++ ) {
+                       $this->assertEquals( $messages[$i], 
$processedMessages[$i], 'Message mutated' );
+               }
+               $this->assertEquals(
+                       $messages[3],
+                       $this->queue->popAtomic( function( $unused ) {} ),
+                       'Messed with too many messages'
+               );
+       }
+
+       public function testKeepRunningOnDamage() {
+               $damagedQueue = QueueConsumer::getQueue( 'damaged' );
+               $damagedQueue->createTable( 'damaged' ); // FIXME: should not 
need
+
+               $messages = array();
+               for ( $i = 0; $i < 5; $i++ ) {
+                       $message = array(
+                               'box' => 'thing' . $i,
+                               'creepiness' => mt_rand(),
+                       );
+                       $messages[] = $message;
+                       $this->queue->push( $message );
+               }
+               $processedMessages = array();
+               $cb = function( $message ) use ( &$processedMessages ) {
+                       $processedMessages[] = $message;
+                       throw new \Exception( 'kaboom!' );
+               };
+
+               $consumer = new QueueConsumer( 'test', $cb, 0, 3, 'damaged' );
+               $count = 0;
+               try {
+                       $count = $consumer->dequeueMessages();
+               } catch ( \Exception $ex ) {
+                       $this->fail(
+                               'Exception should not have bubbled up: ' . 
$ex->getMessage()
+                       );
+               }
+               $this->assertEquals( 3, $count, 'dequeueMessages returned wrong 
count' );
+               $this->assertEquals( 3, count( $processedMessages ), 'Called 
callback wrong number of times' );
+
+               for ( $i = 0; $i < 3; $i++ ) {
+                       $this->assertEquals( $messages[$i], 
$processedMessages[$i], 'Message mutated' );
+                       $this->assertEquals(
+                               $messages[$i],
+                               $damagedQueue->popAtomic( function( $unused ) 
{} ),
+                               'Should move message to damaged queue when 
exception is thrown'
+                       );
+               }
+               $this->assertEquals(
+                       $messages[3],
+                       $this->queue->popAtomic( function( $unused ) {} ),
+                       'message 4 should be at the head of the queue'
+               );
+       }
+
 }

-- 
To view, visit https://gerrit.wikimedia.org/r/288449
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I7034e0deaf8421e238791a4da292e6c139fdef28
Gerrit-PatchSet: 7
Gerrit-Project: wikimedia/fundraising/SmashPig
Gerrit-Branch: master
Gerrit-Owner: Ejegg <eeggles...@wikimedia.org>
Gerrit-Reviewer: Awight <awi...@wikimedia.org>
Gerrit-Reviewer: Cdentinger <cdentin...@wikimedia.org>
Gerrit-Reviewer: Ejegg <eeggles...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to