Awight has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/340157 )
Change subject: [WIP] Remove unused key-value dead-end ...................................................................... [WIP] Remove unused key-value dead-end At one point, I had introduced a hybrid FIFO and key-value interface for backwards compatiblity with some Wikimedia Fundraising code. Now we've decided to use pure FIFO queues and can remove this nastiness. Change-Id: I6f38b1528c1ec33c8d248ef700d444637677be51 --- M src/PHPQueue/Backend/Beanstalkd.php M src/PHPQueue/Backend/IronMQ.php M src/PHPQueue/Backend/Memcache.php M src/PHPQueue/Backend/MongoDB.php M src/PHPQueue/Backend/PDO.php M src/PHPQueue/Backend/Predis.php M src/PHPQueue/Backend/Stomp.php D src/PHPQueue/Interfaces/IndexedFifoQueueStore.php D src/PHPQueue/Interfaces/KeyValueStore.php 9 files changed, 19 insertions(+), 264 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/php-queue refs/changes/57/340157/1 diff --git a/src/PHPQueue/Backend/Beanstalkd.php b/src/PHPQueue/Backend/Beanstalkd.php index 2d9210f..fc0d6b7 100644 --- a/src/PHPQueue/Backend/Beanstalkd.php +++ b/src/PHPQueue/Backend/Beanstalkd.php @@ -3,11 +3,11 @@ use PHPQueue\Exception\BackendException; use PHPQueue\Exception\JobNotFoundException; -use PHPQueue\Interfaces\IndexedFifoQueueStore; +use PHPQueue\Interfaces\FifoQueueStore; class Beanstalkd extends Base - implements IndexedFifoQueueStore + implements FifoQueueStore { public $server_uri; public $tube; diff --git a/src/PHPQueue/Backend/IronMQ.php b/src/PHPQueue/Backend/IronMQ.php index eb195fd..0cbefd9 100644 --- a/src/PHPQueue/Backend/IronMQ.php +++ b/src/PHPQueue/Backend/IronMQ.php @@ -2,11 +2,11 @@ namespace PHPQueue\Backend; use PHPQueue\Exception\BackendException; -use PHPQueue\Interfaces\IndexedFifoQueueStore; +use PHPQueue\Interfaces\FifoQueueStore; class IronMQ extends Base - implements IndexedFifoQueueStore + implements FifoQueueStore { public $token = null; public $project_id = null; diff --git a/src/PHPQueue/Backend/Memcache.php b/src/PHPQueue/Backend/Memcache.php index a210972..b7c03f1 100644 --- a/src/PHPQueue/Backend/Memcache.php +++ b/src/PHPQueue/Backend/Memcache.php @@ -2,11 +2,9 @@ namespace PHPQueue\Backend; use PHPQueue\Exception\BackendException; -use PHPQueue\Interfaces\KeyValueStore; class Memcache extends Base - implements KeyValueStore { public $servers; public $is_persistent = false; diff --git a/src/PHPQueue/Backend/MongoDB.php b/src/PHPQueue/Backend/MongoDB.php index 86fc6c1..6a4440f 100644 --- a/src/PHPQueue/Backend/MongoDB.php +++ b/src/PHPQueue/Backend/MongoDB.php @@ -5,11 +5,9 @@ use PHPQueue\Exception\BackendException; use PHPQueue\Exception\JobNotFoundException; -use PHPQueue\Interfaces\KeyValueStore; class MongoDB extends Base - implements KeyValueStore { public $server_uri; public $db_name; diff --git a/src/PHPQueue/Backend/PDO.php b/src/PHPQueue/Backend/PDO.php index fe41689..86593b0 100644 --- a/src/PHPQueue/Backend/PDO.php +++ b/src/PHPQueue/Backend/PDO.php @@ -4,15 +4,11 @@ use PHPQueue\Exception\BackendException; use PHPQueue\Interfaces\AtomicReadBuffer; use PHPQueue\Interfaces\FifoQueueStore; -use PHPQueue\Interfaces\IndexedFifoQueueStore; -use PHPQueue\Interfaces\KeyValueStore; class PDO extends Base implements AtomicReadBuffer, - FifoQueueStore, - IndexedFifoQueueStore, - KeyValueStore + FifoQueueStore { private $connection_string; private $db_user; diff --git a/src/PHPQueue/Backend/Predis.php b/src/PHPQueue/Backend/Predis.php index aaeb9bd..26e1306 100644 --- a/src/PHPQueue/Backend/Predis.php +++ b/src/PHPQueue/Backend/Predis.php @@ -6,34 +6,20 @@ use PHPQueue\Exception\BackendException; use PHPQueue\Interfaces\AtomicReadBuffer; -use PHPQueue\Interfaces\KeyValueStore; use PHPQueue\Interfaces\FifoQueueStore; use PHPQueue\Json; /** - * Wraps several styles of redis use: - * - If constructed with a "order_key" option, the data will be accessible - * as a key-value store, and will also provide pop and push using - * $data[$order_key] as the FIFO ordering. If the ordering value is a - * timestamp, for example, then the queue will have real-world FIFO - * behavior over time, and even if the data comes in out of order, we will - * always pop the true oldest record. - * If you wish to push to this type of store, you'll also need to provide - * the "correlation_key" option so the random-access key can be - * extracted from data. + * Wraps redis use: * - Pushing scalar data will store it as a queue under queue_name. * - Setting scalar data will store it under the key. * - If data is an array, setting will store it as a hash, under the key. - * - * TODO: The different behaviors should be modeled as several backends which - * perhaps inherit from an AbstractPredis. */ class Predis extends Base implements AtomicReadBuffer, - FifoQueueStore, - KeyValueStore + FifoQueueStore { const TYPE_STRING='string'; const TYPE_HASH='hash'; @@ -41,15 +27,10 @@ const TYPE_SET='set'; const TYPE_NONE='none'; - // Internal sub-key to hold the ordering. - const FIFO_INDEX = 'fifo'; - public $servers; public $redis_options = array(); public $queue_name; public $expiry; - public $order_key; - public $correlation_key; public function __construct($options=array()) { @@ -65,13 +46,6 @@ } if (!empty($options['expiry'])) { $this->expiry = $options['expiry']; - } - if (!empty($options['order_key'])) { - $this->order_key = $options['order_key']; - $this->redis_options['prefix'] = $this->queue_name . ':'; - } - if (!empty($options['correlation_key'])) { - $this->correlation_key = $options['correlation_key']; } } @@ -100,60 +74,10 @@ throw new BackendException("No queue specified."); } $encoded_data = json_encode($data); - if ($this->order_key) { - if (!$this->correlation_key) { - throw new BackendException("Cannot push to indexed fifo queue without a correlation key."); - } - $key = $data[$this->correlation_key]; - if (!$key) { - throw new BackendException("Cannot push to indexed fifo queue without correlation data."); - } - $status = $this->addToIndexedFifoQueue($key, $data); - if (!self::boolStatus($status)) { - throw new BackendException('Couldn\'t push to indexed fifo queue: ' . $status->getMessage()); - } - } else { - // Note that we're ignoring the "new length" return value, cos I don't - // see how to make it useful. - $this->getConnection()->rpush($this->queue_name, $encoded_data); - } - } - /** - * Remove stale elements at the top of the queue and return the first real entry - * - * When data expires, it still leaves a queue entry linking to its - * correlation ID. Clear any of these stale entries at the head of - * the queue. - * - * Note that we run this from inside a transaction, to make it less - * likely that we'll hit a race condition. - * - * @param MultiExec $tx transaction we're working within. - * - * @return string|null Top element's key, or null if the queue is empty. - */ - public function peekWithCleanup(MultiExec $tx) - { - for (;;) { - // Look up the first element in the FIFO ordering. - $values = $tx->zrange(Predis::FIFO_INDEX, 0, 0); - if ($values) { - // Use that value as a key into the key-value block. - $key = $values[0]; - $exists = $tx->exists($key); - - if (!$exists) { - // If the data is missing, then remove from the FIFO index. - $tx->zrem(Predis::FIFO_INDEX, $key); - } else { - return $key; - } - } else { - break; - } - } - return null; + // Note that we're ignoring the "new length" return value, cos I don't + // see how to make it useful. + $this->getConnection()->rpush($this->queue_name, $encoded_data); } /** @@ -166,33 +90,7 @@ if (!$this->hasQueue()) { throw new BackendException("No queue specified."); } - if ($this->order_key) { - // Pop the first element. - // Adapted from https://github.com/nrk/predis/blob/v1.0/examples/transaction_using_cas.php - $options = array( - 'cas' => true, - 'watch' => self::FIFO_INDEX, - 'retry' => 3, - ); - $self = $this; - $this->getConnection()->transaction($options, function ($tx) use (&$data, &$self) { - // Begin transaction. - $tx->multi(); - - $key = $self->peekWithCleanup($tx); - - if ($key) { - // Use that value as a key into the key-value block. - $data = $tx->get($key); - - // Remove from both indexes. - $tx->zrem(Predis::FIFO_INDEX, $key); - $tx->del($key); - } - }); - } else { - $data = $this->getConnection()->lpop($this->queue_name); - } + $data = $this->getConnection()->lpop($this->queue_name); if (!$data) { return null; } @@ -206,9 +104,6 @@ public function popAtomic($callback) { if (!$this->hasQueue()) { throw new BackendException("No queue specified."); - } - if ($this->order_key) { - throw new BackendException("atomicPop not yet supported for zsets"); } // Pop and process the first element, erring on the side of @@ -246,33 +141,12 @@ if (!$this->hasQueue()) { throw new BackendException("No queue specified."); } - if ($this->order_key) { - // Adapted from https://github.com/nrk/predis/blob/v1.0/examples/transaction_using_cas.php - $options = array( - 'cas' => true, - 'watch' => self::FIFO_INDEX, - 'retry' => 3, - ); - $self = $this; - $this->getConnection()->transaction($options, function ($tx) use (&$data, &$self) { - // Begin transaction. - $tx->multi(); - - $key = $self->peekWithCleanup($tx); - - if ($key) { - // Use that value as a key into the key-value block. - $data = $tx->get($key); - } - }); + $data_range = $this->getConnection()->lrange($this->queue_name, 0, 0); + if (!$data_range) { + return null; } else { - $data_range = $this->getConnection()->lrange($this->queue_name, 0, 0); - if (!$data_range) { - return null; - } else { - // Unpack list. - $data = $data_range[0]; - } + // Unpack list. + $data = $data_range[0]; } if (!$data) { return null; @@ -323,9 +197,7 @@ $this->beforeAdd(); try { $status = false; - if ($this->order_key) { - $status = $this->addToIndexedFifoQueue($key, $data); - } elseif (is_array($data)) { + if (is_array($data)) { // FIXME: Assert $status = $this->getConnection()->hmset($key, $data); } elseif (is_string($data) || is_numeric($data)) { @@ -341,36 +213,6 @@ } catch (\Exception $ex) { throw new BackendException($ex->getMessage(), $ex->getCode()); } - } - - /** - * Store the data under its order and correlation keys - * - * @param string $key - * @param array $data - * @return Predis\Response\ResponseInterface - */ - protected function addToIndexedFifoQueue($key, $data) - { - $options = array( - 'cas' => true, - 'watch' => self::FIFO_INDEX, - 'retry' => 3, - ); - $score = $data[$this->order_key]; - $encoded_data = json_encode($data); - $status = false; - $expiry = $this->expiry; - $this->getConnection()->transaction($options, function ($tx) use ($key, $score, $encoded_data, $expiry, &$status) { - $tx->multi(); - $tx->zadd(Predis::FIFO_INDEX, $score, $key); - if ($expiry) { - $status = $tx->setex($key, $expiry, $encoded_data); - } else { - $status = $tx->set($key, $encoded_data); - } - }); - return $status; } /** @deprecated */ @@ -394,10 +236,6 @@ return null; } $this->beforeGet($key); - if ($this->order_key) { - $data = $this->getConnection()->get($key); - return Json::safe_decode($data); - } $type = $this->getConnection()->type($key); switch ($type) { case self::TYPE_STRING: @@ -433,16 +271,7 @@ { $this->beforeClear($key); - if ($this->order_key) { - $result = $this->getConnection()->pipeline() - ->zrem(self::FIFO_INDEX, $key) - ->del($key) - ->execute(); - - $num_removed = $result[1]; - } else { - $num_removed = $this->getConnection()->del($key); - } + $num_removed = $this->getConnection()->del($key); $this->afterClearRelease(); diff --git a/src/PHPQueue/Backend/Stomp.php b/src/PHPQueue/Backend/Stomp.php index e855af3..65bd705 100644 --- a/src/PHPQueue/Backend/Stomp.php +++ b/src/PHPQueue/Backend/Stomp.php @@ -7,7 +7,6 @@ use PHPQueue\Exception\BackendException; use PHPQueue\Exception\JobNotFoundException; use PHPQueue\Interfaces\FifoQueueStore; -use PHPQueue\Interfaces\KeyValueStore; /** * Wrap a STOMP queue @@ -18,7 +17,7 @@ */ class Stomp extends Base - implements FifoQueueStore, KeyValueStore + implements FifoQueueStore { public $queue_name; public $uri; diff --git a/src/PHPQueue/Interfaces/IndexedFifoQueueStore.php b/src/PHPQueue/Interfaces/IndexedFifoQueueStore.php deleted file mode 100644 index ff06f66..0000000 --- a/src/PHPQueue/Interfaces/IndexedFifoQueueStore.php +++ /dev/null @@ -1,32 +0,0 @@ -<?php -namespace PHPQueue\Interfaces; - -/** - * Implemented by backends that provide queue-like access, where each message - * also has an ID. - */ -interface IndexedFifoQueueStore extends FifoQueueStore -{ - /** - * @param mixed $value Serializable value. - * @return string Message ID. - * @throws \Exception On failure. - */ - public function push($value); - - /** - * @return array The next available data. - * @throws \PHPQueue\Exception\JobNotFoundException When no data is available. - * @throws \Exception Other failures. - * - * @deprecated This is not a safe operation. Consider using - * AtomicReadBuffer::popAtomic instead. - */ - public function pop(); - - /** - * @param $key string - * @throws \Exception - */ - public function clear($key); -} diff --git a/src/PHPQueue/Interfaces/KeyValueStore.php b/src/PHPQueue/Interfaces/KeyValueStore.php deleted file mode 100644 index b9d5823..0000000 --- a/src/PHPQueue/Interfaces/KeyValueStore.php +++ /dev/null @@ -1,33 +0,0 @@ -<?php -namespace PHPQueue\Interfaces; - -/** - * Implemented by backends that support key-value retrieval. - */ -interface KeyValueStore -{ - /** - * @param $key string - * @param $value mixed Serializable value - * @param $properties array optional additional message properties - * FIXME: Define better. Are these columns the indexes? Why separate - * from the message? - * @throws \Exception - */ - public function set($key, $value, $properties=array()); - - /** - * Look up and return a value by its index value. - * - * @param $key string - * @return array The data. - * @throws \Exception - */ - public function get($key); - - /** - * @param $key string - * @throws \Exception - */ - public function clear($key); -} -- To view, visit https://gerrit.wikimedia.org/r/340157 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I6f38b1528c1ec33c8d248ef700d444637677be51 Gerrit-PatchSet: 1 Gerrit-Project: wikimedia/fundraising/php-queue Gerrit-Branch: master Gerrit-Owner: Awight <awi...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits