Awight has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/340157 )

Change subject: [WIP] Remove unused key-value dead-end
......................................................................

[WIP] Remove unused key-value dead-end

At one point, I had introduced a hybrid FIFO and key-value interface for
backwards compatiblity with some Wikimedia Fundraising code.  Now we've
decided to use pure FIFO queues and can remove this nastiness.

Change-Id: I6f38b1528c1ec33c8d248ef700d444637677be51
---
M src/PHPQueue/Backend/Beanstalkd.php
M src/PHPQueue/Backend/IronMQ.php
M src/PHPQueue/Backend/Memcache.php
M src/PHPQueue/Backend/MongoDB.php
M src/PHPQueue/Backend/PDO.php
M src/PHPQueue/Backend/Predis.php
M src/PHPQueue/Backend/Stomp.php
D src/PHPQueue/Interfaces/IndexedFifoQueueStore.php
D src/PHPQueue/Interfaces/KeyValueStore.php
9 files changed, 19 insertions(+), 264 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/php-queue 
refs/changes/57/340157/1

diff --git a/src/PHPQueue/Backend/Beanstalkd.php 
b/src/PHPQueue/Backend/Beanstalkd.php
index 2d9210f..fc0d6b7 100644
--- a/src/PHPQueue/Backend/Beanstalkd.php
+++ b/src/PHPQueue/Backend/Beanstalkd.php
@@ -3,11 +3,11 @@
 
 use PHPQueue\Exception\BackendException;
 use PHPQueue\Exception\JobNotFoundException;
-use PHPQueue\Interfaces\IndexedFifoQueueStore;
+use PHPQueue\Interfaces\FifoQueueStore;
 
 class Beanstalkd
     extends Base
-    implements IndexedFifoQueueStore
+    implements FifoQueueStore
 {
     public $server_uri;
     public $tube;
diff --git a/src/PHPQueue/Backend/IronMQ.php b/src/PHPQueue/Backend/IronMQ.php
index eb195fd..0cbefd9 100644
--- a/src/PHPQueue/Backend/IronMQ.php
+++ b/src/PHPQueue/Backend/IronMQ.php
@@ -2,11 +2,11 @@
 namespace PHPQueue\Backend;
 
 use PHPQueue\Exception\BackendException;
-use PHPQueue\Interfaces\IndexedFifoQueueStore;
+use PHPQueue\Interfaces\FifoQueueStore;
 
 class IronMQ
     extends Base
-    implements IndexedFifoQueueStore
+    implements FifoQueueStore
 {
     public $token = null;
     public $project_id = null;
diff --git a/src/PHPQueue/Backend/Memcache.php 
b/src/PHPQueue/Backend/Memcache.php
index a210972..b7c03f1 100644
--- a/src/PHPQueue/Backend/Memcache.php
+++ b/src/PHPQueue/Backend/Memcache.php
@@ -2,11 +2,9 @@
 namespace PHPQueue\Backend;
 
 use PHPQueue\Exception\BackendException;
-use PHPQueue\Interfaces\KeyValueStore;
 
 class Memcache
     extends Base
-    implements KeyValueStore
 {
     public $servers;
     public $is_persistent = false;
diff --git a/src/PHPQueue/Backend/MongoDB.php b/src/PHPQueue/Backend/MongoDB.php
index 86fc6c1..6a4440f 100644
--- a/src/PHPQueue/Backend/MongoDB.php
+++ b/src/PHPQueue/Backend/MongoDB.php
@@ -5,11 +5,9 @@
 
 use PHPQueue\Exception\BackendException;
 use PHPQueue\Exception\JobNotFoundException;
-use PHPQueue\Interfaces\KeyValueStore;
 
 class MongoDB
     extends Base
-    implements KeyValueStore
 {
     public $server_uri;
     public $db_name;
diff --git a/src/PHPQueue/Backend/PDO.php b/src/PHPQueue/Backend/PDO.php
index fe41689..86593b0 100644
--- a/src/PHPQueue/Backend/PDO.php
+++ b/src/PHPQueue/Backend/PDO.php
@@ -4,15 +4,11 @@
 use PHPQueue\Exception\BackendException;
 use PHPQueue\Interfaces\AtomicReadBuffer;
 use PHPQueue\Interfaces\FifoQueueStore;
-use PHPQueue\Interfaces\IndexedFifoQueueStore;
-use PHPQueue\Interfaces\KeyValueStore;
 
 class PDO
     extends Base
     implements AtomicReadBuffer,
-        FifoQueueStore,
-        IndexedFifoQueueStore,
-        KeyValueStore
+        FifoQueueStore
 {
     private $connection_string;
     private $db_user;
diff --git a/src/PHPQueue/Backend/Predis.php b/src/PHPQueue/Backend/Predis.php
index aaeb9bd..26e1306 100644
--- a/src/PHPQueue/Backend/Predis.php
+++ b/src/PHPQueue/Backend/Predis.php
@@ -6,34 +6,20 @@
 
 use PHPQueue\Exception\BackendException;
 use PHPQueue\Interfaces\AtomicReadBuffer;
-use PHPQueue\Interfaces\KeyValueStore;
 use PHPQueue\Interfaces\FifoQueueStore;
 use PHPQueue\Json;
 
 /**
- * Wraps several styles of redis use:
- *     - If constructed with a "order_key" option, the data will be accessible
- *       as a key-value store, and will also provide pop and push using
- *       $data[$order_key] as the FIFO ordering.  If the ordering value is a
- *       timestamp, for example, then the queue will have real-world FIFO
- *       behavior over time, and even if the data comes in out of order, we 
will
- *       always pop the true oldest record.
- *       If you wish to push to this type of store, you'll also need to provide
- *       the "correlation_key" option so the random-access key can be
- *       extracted from data.
+ * Wraps redis use:
  *     - Pushing scalar data will store it as a queue under queue_name.
  *     - Setting scalar data will store it under the key.
  *     - If data is an array, setting will store it as a hash, under the key.
- *
- * TODO: The different behaviors should be modeled as several backends which
- * perhaps inherit from an AbstractPredis.
  */
 class Predis
     extends Base
     implements
         AtomicReadBuffer,
-        FifoQueueStore,
-        KeyValueStore
+        FifoQueueStore
 {
     const TYPE_STRING='string';
     const TYPE_HASH='hash';
@@ -41,15 +27,10 @@
     const TYPE_SET='set';
     const TYPE_NONE='none';
 
-    // Internal sub-key to hold the ordering.
-    const FIFO_INDEX = 'fifo';
-
     public $servers;
     public $redis_options = array();
     public $queue_name;
     public $expiry;
-    public $order_key;
-    public $correlation_key;
 
     public function __construct($options=array())
     {
@@ -65,13 +46,6 @@
         }
         if (!empty($options['expiry'])) {
             $this->expiry = $options['expiry'];
-        }
-        if (!empty($options['order_key'])) {
-            $this->order_key = $options['order_key'];
-            $this->redis_options['prefix'] = $this->queue_name . ':';
-        }
-        if (!empty($options['correlation_key'])) {
-            $this->correlation_key = $options['correlation_key'];
         }
     }
 
@@ -100,60 +74,10 @@
             throw new BackendException("No queue specified.");
         }
         $encoded_data = json_encode($data);
-        if ($this->order_key) {
-            if (!$this->correlation_key) {
-                throw new BackendException("Cannot push to indexed fifo queue 
without a correlation key.");
-            }
-            $key = $data[$this->correlation_key];
-            if (!$key) {
-                throw new BackendException("Cannot push to indexed fifo queue 
without correlation data.");
-            }
-            $status = $this->addToIndexedFifoQueue($key, $data);
-            if (!self::boolStatus($status)) {
-                throw new BackendException('Couldn\'t push to indexed fifo 
queue: ' . $status->getMessage());
-            }
-        } else {
-            // Note that we're ignoring the "new length" return value, cos I 
don't
-            // see how to make it useful.
-            $this->getConnection()->rpush($this->queue_name, $encoded_data);
-        }
-    }
 
-    /**
-     * Remove stale elements at the top of the queue and return the first real 
entry
-     *
-     * When data expires, it still leaves a queue entry linking to its
-     * correlation ID.  Clear any of these stale entries at the head of
-     * the queue.
-     *
-     * Note that we run this from inside a transaction, to make it less
-     * likely that we'll hit a race condition.
-     *
-     * @param MultiExec $tx transaction we're working within.
-     *
-     * @return string|null Top element's key, or null if the queue is empty.
-     */
-    public function peekWithCleanup(MultiExec $tx)
-    {
-        for (;;) {
-            // Look up the first element in the FIFO ordering.
-            $values = $tx->zrange(Predis::FIFO_INDEX, 0, 0);
-            if ($values) {
-                // Use that value as a key into the key-value block.
-                $key = $values[0];
-                $exists = $tx->exists($key);
-
-                if (!$exists) {
-                    // If the data is missing, then remove from the FIFO index.
-                    $tx->zrem(Predis::FIFO_INDEX, $key);
-                } else {
-                    return $key;
-                }
-            } else {
-                break;
-            }
-        }
-        return null;
+        // Note that we're ignoring the "new length" return value, cos I don't
+        // see how to make it useful.
+        $this->getConnection()->rpush($this->queue_name, $encoded_data);
     }
 
     /**
@@ -166,33 +90,7 @@
         if (!$this->hasQueue()) {
             throw new BackendException("No queue specified.");
         }
-        if ($this->order_key) {
-            // Pop the first element.
-            // Adapted from 
https://github.com/nrk/predis/blob/v1.0/examples/transaction_using_cas.php
-            $options = array(
-                'cas' => true,
-                'watch' => self::FIFO_INDEX,
-                'retry' => 3,
-            );
-            $self = $this;
-            $this->getConnection()->transaction($options, function ($tx) use 
(&$data, &$self) {
-                // Begin transaction.
-                $tx->multi();
-
-                $key = $self->peekWithCleanup($tx);
-
-                if ($key) {
-                    // Use that value as a key into the key-value block.
-                    $data = $tx->get($key);
-
-                    // Remove from both indexes.
-                    $tx->zrem(Predis::FIFO_INDEX, $key);
-                    $tx->del($key);
-                }
-            });
-        } else {
-            $data = $this->getConnection()->lpop($this->queue_name);
-        }
+        $data = $this->getConnection()->lpop($this->queue_name);
         if (!$data) {
             return null;
         }
@@ -206,9 +104,6 @@
     public function popAtomic($callback) {
         if (!$this->hasQueue()) {
             throw new BackendException("No queue specified.");
-        }
-        if ($this->order_key) {
-            throw new BackendException("atomicPop not yet supported for 
zsets");
         }
 
         // Pop and process the first element, erring on the side of
@@ -246,33 +141,12 @@
         if (!$this->hasQueue()) {
             throw new BackendException("No queue specified.");
         }
-        if ($this->order_key) {
-            // Adapted from 
https://github.com/nrk/predis/blob/v1.0/examples/transaction_using_cas.php
-            $options = array(
-                'cas' => true,
-                'watch' => self::FIFO_INDEX,
-                'retry' => 3,
-            );
-            $self = $this;
-            $this->getConnection()->transaction($options, function ($tx) use 
(&$data, &$self) {
-                // Begin transaction.
-                $tx->multi();
-
-                $key = $self->peekWithCleanup($tx);
-
-                if ($key) {
-                    // Use that value as a key into the key-value block.
-                    $data = $tx->get($key);
-                }
-            });
+        $data_range = $this->getConnection()->lrange($this->queue_name, 0, 0);
+        if (!$data_range) {
+            return null;
         } else {
-            $data_range = $this->getConnection()->lrange($this->queue_name, 0, 
0);
-            if (!$data_range) {
-                return null;
-            } else {
-                // Unpack list.
-                $data = $data_range[0];
-            }
+            // Unpack list.
+            $data = $data_range[0];
         }
         if (!$data) {
             return null;
@@ -323,9 +197,7 @@
         $this->beforeAdd();
         try {
             $status = false;
-            if ($this->order_key) {
-                $status = $this->addToIndexedFifoQueue($key, $data);
-            } elseif (is_array($data)) {
+            if (is_array($data)) {
                 // FIXME: Assert
                 $status = $this->getConnection()->hmset($key, $data);
             } elseif (is_string($data) || is_numeric($data)) {
@@ -341,36 +213,6 @@
         } catch (\Exception $ex) {
             throw new BackendException($ex->getMessage(), $ex->getCode());
         }
-    }
-
-    /**
-     * Store the data under its order and correlation keys
-     *
-     * @param string $key
-     * @param array $data
-     * @return Predis\Response\ResponseInterface
-     */
-    protected function addToIndexedFifoQueue($key, $data)
-    {
-        $options = array(
-            'cas' => true,
-            'watch' => self::FIFO_INDEX,
-            'retry' => 3,
-        );
-        $score = $data[$this->order_key];
-        $encoded_data = json_encode($data);
-        $status = false;
-        $expiry = $this->expiry;
-        $this->getConnection()->transaction($options, function ($tx) use 
($key, $score, $encoded_data, $expiry, &$status) {
-            $tx->multi();
-            $tx->zadd(Predis::FIFO_INDEX, $score, $key);
-            if ($expiry) {
-                $status = $tx->setex($key, $expiry, $encoded_data);
-            } else {
-                $status = $tx->set($key, $encoded_data);
-            }
-        });
-        return $status;
     }
 
     /** @deprecated */
@@ -394,10 +236,6 @@
             return null;
         }
         $this->beforeGet($key);
-        if ($this->order_key) {
-            $data = $this->getConnection()->get($key);
-            return Json::safe_decode($data);
-        }
         $type = $this->getConnection()->type($key);
         switch ($type) {
             case self::TYPE_STRING:
@@ -433,16 +271,7 @@
     {
         $this->beforeClear($key);
 
-        if ($this->order_key) {
-            $result = $this->getConnection()->pipeline()
-                ->zrem(self::FIFO_INDEX, $key)
-                ->del($key)
-                ->execute();
-
-            $num_removed = $result[1];
-        } else {
-            $num_removed = $this->getConnection()->del($key);
-        }
+        $num_removed = $this->getConnection()->del($key);
 
         $this->afterClearRelease();
 
diff --git a/src/PHPQueue/Backend/Stomp.php b/src/PHPQueue/Backend/Stomp.php
index e855af3..65bd705 100644
--- a/src/PHPQueue/Backend/Stomp.php
+++ b/src/PHPQueue/Backend/Stomp.php
@@ -7,7 +7,6 @@
 use PHPQueue\Exception\BackendException;
 use PHPQueue\Exception\JobNotFoundException;
 use PHPQueue\Interfaces\FifoQueueStore;
-use PHPQueue\Interfaces\KeyValueStore;
 
 /**
  * Wrap a STOMP queue
@@ -18,7 +17,7 @@
  */
 class Stomp
     extends Base
-    implements FifoQueueStore, KeyValueStore
+    implements FifoQueueStore
 {
     public $queue_name;
     public $uri;
diff --git a/src/PHPQueue/Interfaces/IndexedFifoQueueStore.php 
b/src/PHPQueue/Interfaces/IndexedFifoQueueStore.php
deleted file mode 100644
index ff06f66..0000000
--- a/src/PHPQueue/Interfaces/IndexedFifoQueueStore.php
+++ /dev/null
@@ -1,32 +0,0 @@
-<?php
-namespace PHPQueue\Interfaces;
-
-/**
- * Implemented by backends that provide queue-like access, where each message
- * also has an ID.
- */
-interface IndexedFifoQueueStore extends FifoQueueStore
-{
-    /**
-     * @param mixed $value Serializable value.
-     * @return string Message ID.
-     * @throws \Exception On failure.
-     */
-    public function push($value);
-
-    /**
-     * @return array The next available data.
-     * @throws \PHPQueue\Exception\JobNotFoundException When no data is 
available.
-     * @throws \Exception Other failures.
-     *
-     * @deprecated This is not a safe operation.  Consider using
-     *     AtomicReadBuffer::popAtomic instead.
-     */
-    public function pop();
-
-    /**
-     * @param $key string
-     * @throws \Exception
-     */
-    public function clear($key);
-}
diff --git a/src/PHPQueue/Interfaces/KeyValueStore.php 
b/src/PHPQueue/Interfaces/KeyValueStore.php
deleted file mode 100644
index b9d5823..0000000
--- a/src/PHPQueue/Interfaces/KeyValueStore.php
+++ /dev/null
@@ -1,33 +0,0 @@
-<?php
-namespace PHPQueue\Interfaces;
-
-/**
- * Implemented by backends that support key-value retrieval.
- */
-interface KeyValueStore
-{
-    /**
-     * @param $key string
-     * @param $value mixed Serializable value
-     * @param $properties array optional additional message properties
-     *     FIXME: Define better.  Are these columns the indexes?  Why separate
-     *     from the message?
-     * @throws \Exception
-     */
-    public function set($key, $value, $properties=array());
-
-    /**
-     * Look up and return a value by its index value.
-     *
-     * @param $key string
-     * @return array The data.
-     * @throws \Exception
-     */
-    public function get($key);
-
-    /**
-     * @param $key string
-     * @throws \Exception
-     */
-    public function clear($key);
-}

-- 
To view, visit https://gerrit.wikimedia.org/r/340157
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I6f38b1528c1ec33c8d248ef700d444637677be51
Gerrit-PatchSet: 1
Gerrit-Project: wikimedia/fundraising/php-queue
Gerrit-Branch: master
Gerrit-Owner: Awight <awi...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to