Ejegg has uploaded a new change for review. https://gerrit.wikimedia.org/r/287147
Change subject: WIP pending queue consumer ...................................................................... WIP pending queue consumer TODO: catch exeptions, send to damaged queue also filter fields and set correct param types Bug: T133197 Change-Id: I4209d8ed96ef476f9031a7f266740292447418a7 --- A Maintenance/ConsumePendingQueue.php 1 file changed, 74 insertions(+), 0 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/SmashPig refs/changes/47/287147/1 diff --git a/Maintenance/ConsumePendingQueue.php b/Maintenance/ConsumePendingQueue.php new file mode 100644 index 0000000..4e05a3f --- /dev/null +++ b/Maintenance/ConsumePendingQueue.php @@ -0,0 +1,74 @@ +<?php namespace SmashPig\Maintenance; + +require ( 'MaintenanceBase.php' ); + +use SmashPig\Core\Context; +use SmashPig\Core\Logging\Logger; +use SmashPig\Core\DataStores\QueueConsumer; +use \PDO; + +$maintClass = '\SmashPig\Maintenance\ConsumePendingQueue'; + +/** + * Reads messages out of the pending queue and inserts them into a db table + */ +class ConsumePendingQueue extends MaintenanceBase { + + /** + * @var PDO + */ + protected $pendingDatabase = null; + + public function __construct() { + parent::__construct(); + $this->addOption( 'queue', 'queue name to consume from', 'pending' ); + $this->addOption( 'destination', 'PDO database to store messages', 'pending-db' ); + $this->addOption( 'time-limit', 'Try to keep execution under <n> seconds', 60, 't' ); + $this->addOption( 'max-messages', 'At most consume <n> messages', 10, 'm' ); + } + + /** + * Do the actual work of the script. + */ + public function execute() { + $config = Context::get()->getConfiguration(); + $this->pendingDatabase = $config->object( + 'data-store/' . $this->getOption( 'destination' ) + ); + + $consumer = new QueueConsumer( + $this->getOption( 'queue' ), + $this->getOption( 'time-limit' ), + $this->getOption( 'message-limit' ) + ); + + $startTime = time(); + $messageCount = $consumer->dequeueMessages( + array( $this, 'storeMessage' ) + ); + + $elapsedTime = time() - $startTime; + Logger::info( + "Processed $messageCount pending messages in $elapsedTime seconds." + ); + } + + public function storeMessage( $message ) { + // TODO: filter to just the things we have db fields for + $fieldList = implode( ',', array_keys( $message ) ); + $paramNames = ':' . implode( ', :', array_keys( $message ) ); + $insert = "INSERT INTO pending ( $fieldList ) values ( $paramNames );"; + $prepared = $this->pendingDatabase->prepare( $insert ); + foreach ( $message as $field => $value ) { + // TODO: types, lengths + $prepared->bindParam( + ':' . $field, + $value, + PDO::PARAM_STR + ); + } + $prepared->execute(); + } +} + +require ( RUN_MAINTENANCE_IF_MAIN ); -- To view, visit https://gerrit.wikimedia.org/r/287147 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I4209d8ed96ef476f9031a7f266740292447418a7 Gerrit-PatchSet: 1 Gerrit-Project: wikimedia/fundraising/SmashPig Gerrit-Branch: master Gerrit-Owner: Ejegg <eeggles...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits