Ejegg has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/287147

Change subject: WIP pending queue consumer
......................................................................

WIP pending queue consumer

TODO: catch exeptions, send to damaged queue

also filter fields and set correct param types

Bug: T133197
Change-Id: I4209d8ed96ef476f9031a7f266740292447418a7
---
A Maintenance/ConsumePendingQueue.php
1 file changed, 74 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/SmashPig 
refs/changes/47/287147/1

diff --git a/Maintenance/ConsumePendingQueue.php 
b/Maintenance/ConsumePendingQueue.php
new file mode 100644
index 0000000..4e05a3f
--- /dev/null
+++ b/Maintenance/ConsumePendingQueue.php
@@ -0,0 +1,74 @@
+<?php namespace SmashPig\Maintenance;
+
+require ( 'MaintenanceBase.php' );
+
+use SmashPig\Core\Context;
+use SmashPig\Core\Logging\Logger;
+use SmashPig\Core\DataStores\QueueConsumer;
+use \PDO;
+
+$maintClass = '\SmashPig\Maintenance\ConsumePendingQueue';
+
+/**
+ * Reads messages out of the pending queue and inserts them into a db table
+ */
+class ConsumePendingQueue extends MaintenanceBase {
+
+       /**
+        * @var PDO
+        */
+       protected $pendingDatabase = null;
+
+       public function __construct() {
+               parent::__construct();
+               $this->addOption( 'queue', 'queue name to consume from', 
'pending' );
+               $this->addOption( 'destination', 'PDO database to store 
messages', 'pending-db' );
+               $this->addOption( 'time-limit', 'Try to keep execution under 
<n> seconds', 60, 't' );
+               $this->addOption( 'max-messages', 'At most consume <n> 
messages', 10, 'm' );
+       }
+
+       /**
+        * Do the actual work of the script.
+        */
+       public function execute() {
+               $config = Context::get()->getConfiguration();
+               $this->pendingDatabase = $config->object(
+                       'data-store/' . $this->getOption( 'destination' )
+               );
+
+               $consumer = new QueueConsumer(
+                       $this->getOption( 'queue' ),
+                       $this->getOption( 'time-limit' ),
+                       $this->getOption( 'message-limit' )
+               );
+
+               $startTime = time();
+               $messageCount = $consumer->dequeueMessages(
+                       array( $this, 'storeMessage' )
+               );
+
+               $elapsedTime = time() - $startTime;
+               Logger::info(
+                       "Processed $messageCount pending messages in 
$elapsedTime seconds."
+               );
+       }
+
+       public function storeMessage( $message ) {
+               // TODO: filter to just the things we have db fields for
+               $fieldList = implode( ',', array_keys( $message ) );
+               $paramNames = ':' . implode( ', :', array_keys( $message ) );
+               $insert = "INSERT INTO pending ( $fieldList ) values ( 
$paramNames );";
+               $prepared = $this->pendingDatabase->prepare( $insert );
+               foreach ( $message as $field => $value ) {
+                       // TODO: types, lengths
+                       $prepared->bindParam(
+                               ':' . $field,
+                               $value,
+                               PDO::PARAM_STR
+                       );
+               }
+               $prepared->execute();
+       }
+}
+
+require ( RUN_MAINTENANCE_IF_MAIN );

-- 
To view, visit https://gerrit.wikimedia.org/r/287147
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I4209d8ed96ef476f9031a7f266740292447418a7
Gerrit-PatchSet: 1
Gerrit-Project: wikimedia/fundraising/SmashPig
Gerrit-Branch: master
Gerrit-Owner: Ejegg <eeggles...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to