jenkins-bot has submitted this change and it was merged. Change subject: Fix time and message limits ......................................................................
Fix time and message limits Numeric coersion fixes strict comparison failures in loop condition, also fix an off-by-one message limit error. Bug: T133965 Change-Id: I7034e0deaf8421e238791a4da292e6c139fdef28 --- M Core/DataStores/QueueConsumer.php M Tests/QueueConsumerTest.php 2 files changed, 90 insertions(+), 7 deletions(-) Approvals: Awight: Looks good to me, approved jenkins-bot: Verified diff --git a/Core/DataStores/QueueConsumer.php b/Core/DataStores/QueueConsumer.php index 02cb5a3..95fcb9f 100644 --- a/Core/DataStores/QueueConsumer.php +++ b/Core/DataStores/QueueConsumer.php @@ -54,18 +54,24 @@ $messageLimit = 0, $damagedQueue = null ) { + if ( !is_numeric( $timeLimit ) ) { + throw new InvalidArgumentException( 'timeLimit must be numeric' ); + } + if ( !is_numeric( $messageLimit ) ) { + throw new InvalidArgumentException( 'messageLimit must be numeric' ); + } + if ( !is_callable( $callback ) ) { + throw new InvalidArgumentException( "Processing callback must be callable" ); + } + $this->callback = $callback; - $this->timeLimit = $timeLimit; - $this->messageLimit = $messageLimit; + $this->timeLimit = intval( $timeLimit ); + $this->messageLimit = intval( $messageLimit ); $this->backend = self::getQueue( $queueName ); if ( !$this->backend instanceof AtomicReadBuffer ) { throw new InvalidArgumentException( "Queue $queueName is not an AtomicReadBuffer" ); - } - - if ( !is_callable( $callback ) ) { - throw new InvalidArgumentException( "Processing callback is not callable" ); } if ( $damagedQueue ) { @@ -105,7 +111,7 @@ $processed++; } $timeOk = $this->timeLimit === 0 || time() <= $startTime + $this->timeLimit; - $countOk = $this->messageLimit === 0 || $processed <= $this->messageLimit; + $countOk = $this->messageLimit === 0 || $processed < $this->messageLimit; } while( $timeOk && $countOk && $data !== null ); return $processed; diff --git a/Tests/QueueConsumerTest.php b/Tests/QueueConsumerTest.php index 4dec67e..a8a0988 100644 --- a/Tests/QueueConsumerTest.php +++ b/Tests/QueueConsumerTest.php @@ -111,4 +111,81 @@ 'Should delete message on exception when damaged queue exists' ); } + + public function testMessageLimit() { + $messages = array(); + for ( $i = 0; $i < 5; $i++ ) { + $message = array( + 'box' => 'thing' . $i, + 'creepiness' => mt_rand(), + ); + $messages[] = $message; + $this->queue->push( $message ); + } + $processedMessages = array(); + $callback = function( $message ) use ( &$processedMessages ) { + $processedMessages[] = $message; + }; + // Should work when you pass in the limits as strings. + $consumer = new QueueConsumer( 'test', $callback, 0, '3' ); + $count = $consumer->dequeueMessages(); + $this->assertEquals( 3, $count, 'dequeueMessages returned wrong count' ); + $this->assertEquals( 3, count( $processedMessages ), 'Called callback wrong number of times' ); + + for ( $i = 0; $i < 3; $i++ ) { + $this->assertEquals( $messages[$i], $processedMessages[$i], 'Message mutated' ); + } + $this->assertEquals( + $messages[3], + $this->queue->popAtomic( function( $unused ) {} ), + 'Messed with too many messages' + ); + } + + public function testKeepRunningOnDamage() { + $damagedQueue = QueueConsumer::getQueue( 'damaged' ); + $damagedQueue->createTable( 'damaged' ); // FIXME: should not need + + $messages = array(); + for ( $i = 0; $i < 5; $i++ ) { + $message = array( + 'box' => 'thing' . $i, + 'creepiness' => mt_rand(), + ); + $messages[] = $message; + $this->queue->push( $message ); + } + $processedMessages = array(); + $cb = function( $message ) use ( &$processedMessages ) { + $processedMessages[] = $message; + throw new \Exception( 'kaboom!' ); + }; + + $consumer = new QueueConsumer( 'test', $cb, 0, 3, 'damaged' ); + $count = 0; + try { + $count = $consumer->dequeueMessages(); + } catch ( \Exception $ex ) { + $this->fail( + 'Exception should not have bubbled up: ' . $ex->getMessage() + ); + } + $this->assertEquals( 3, $count, 'dequeueMessages returned wrong count' ); + $this->assertEquals( 3, count( $processedMessages ), 'Called callback wrong number of times' ); + + for ( $i = 0; $i < 3; $i++ ) { + $this->assertEquals( $messages[$i], $processedMessages[$i], 'Message mutated' ); + $this->assertEquals( + $messages[$i], + $damagedQueue->popAtomic( function( $unused ) {} ), + 'Should move message to damaged queue when exception is thrown' + ); + } + $this->assertEquals( + $messages[3], + $this->queue->popAtomic( function( $unused ) {} ), + 'message 4 should be at the head of the queue' + ); + } + } -- To view, visit https://gerrit.wikimedia.org/r/288449 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I7034e0deaf8421e238791a4da292e6c139fdef28 Gerrit-PatchSet: 7 Gerrit-Project: wikimedia/fundraising/SmashPig Gerrit-Branch: master Gerrit-Owner: Ejegg <eeggles...@wikimedia.org> Gerrit-Reviewer: Awight <awi...@wikimedia.org> Gerrit-Reviewer: Cdentinger <cdentin...@wikimedia.org> Gerrit-Reviewer: Ejegg <eeggles...@wikimedia.org> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits