Reedy has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/260156

Change subject: Update nmred/kafka-php to v0.1.5
......................................................................

Update nmred/kafka-php to v0.1.5

https://github.com/nmred/kafka-php/releases/tag/v0.1.5
https://github.com/nmred/kafka-php/compare/v0.1.4...v0.1.5

Change-Id: Ibb9e988b9e81445641c96075a064090d5ed146ca
---
M composer.json
M composer.lock
M composer/installed.json
M nmred/kafka-php/src/Kafka/Client.php
M nmred/kafka-php/src/Kafka/Consumer.php
M nmred/kafka-php/src/Kafka/Exception/NotSupported.php
M nmred/kafka-php/src/Kafka/Exception/Protocol.php
M nmred/kafka-php/src/Kafka/Exception/Socket.php
M nmred/kafka-php/src/Kafka/Log.php
M nmred/kafka-php/src/Kafka/MetaDataFromKafka.php
M nmred/kafka-php/src/Kafka/Offset.php
M nmred/kafka-php/src/Kafka/Produce.php
M nmred/kafka-php/src/Kafka/Protocol/Decoder.php
M nmred/kafka-php/src/Kafka/Protocol/Encoder.php
M nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php
M nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php
M nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php
M nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php
M nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php
M nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php
M nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php
M nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php
M nmred/kafka-php/src/Kafka/Protocol/Protocol.php
M nmred/kafka-php/src/Kafka/Socket.php
M nmred/kafka-php/src/Kafka/ZooKeeper.php
25 files changed, 308 insertions(+), 216 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/vendor 
refs/changes/56/260156/1

diff --git a/composer.json b/composer.json
index a649bbc..537af48 100644
--- a/composer.json
+++ b/composer.json
@@ -15,7 +15,7 @@
                "liuggio/statsd-php-client": "1.0.18",
                "mediawiki/at-ease": "1.1.0",
                "monolog/monolog": "1.17.2",
-               "nmred/kafka-php": "0.1.4",
+               "nmred/kafka-php": "0.1.5",
                "oojs/oojs-ui": "0.14.1",
                "oyejorge/less.php": "1.7.0.9",
                "php": ">=5.3.3",
diff --git a/composer.lock b/composer.lock
index 1bd7403..4bbdd9c 100644
--- a/composer.lock
+++ b/composer.lock
@@ -4,8 +4,8 @@
         "Read more about it at 
https://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file";,
         "This file is @generated automatically"
     ],
-    "hash": "8ef0be88ea376ffdd03bda4b73524e91",
-    "content-hash": "efc2113760078de01b7f379cca798a47",
+    "hash": "593526f306a0d147a461d8f9cc8ce6ed",
+    "content-hash": "672a92d75cd3f056b38f283ef83f37d9",
     "packages": [
         {
             "name": "composer/semver",
@@ -323,16 +323,16 @@
         },
         {
             "name": "nmred/kafka-php",
-            "version": "v0.1.4",
+            "version": "v0.1.5",
             "source": {
                 "type": "git",
                 "url": "https://github.com/nmred/kafka-php.git";,
-                "reference": "06817c95e40b23918c3a420960ee9526e499275d"
+                "reference": "317ad8c208684db8b9e6d2f5bf7f471e89a8b4eb"
             },
             "dist": {
                 "type": "zip",
-                "url": 
"https://api.github.com/repos/nmred/kafka-php/zipball/06817c95e40b23918c3a420960ee9526e499275d";,
-                "reference": "06817c95e40b23918c3a420960ee9526e499275d",
+                "url": 
"https://api.github.com/repos/nmred/kafka-php/zipball/317ad8c208684db8b9e6d2f5bf7f471e89a8b4eb";,
+                "reference": "317ad8c208684db8b9e6d2f5bf7f471e89a8b4eb",
                 "shasum": ""
             },
             "require": {
@@ -359,7 +359,7 @@
                 "client",
                 "kafka"
             ],
-            "time": "2015-09-06 01:39:05"
+            "time": "2015-12-02 12:06:23"
         },
         {
             "name": "oojs/oojs-ui",
diff --git a/composer/installed.json b/composer/installed.json
index 3365c01..184879a 100644
--- a/composer/installed.json
+++ b/composer/installed.json
@@ -432,48 +432,6 @@
         "homepage": "https://www.mediawiki.org/wiki/CDB";
     },
     {
-        "name": "nmred/kafka-php",
-        "version": "v0.1.4",
-        "version_normalized": "0.1.4.0",
-        "source": {
-            "type": "git",
-            "url": "https://github.com/nmred/kafka-php.git";,
-            "reference": "06817c95e40b23918c3a420960ee9526e499275d"
-        },
-        "dist": {
-            "type": "zip",
-            "url": 
"https://api.github.com/repos/nmred/kafka-php/zipball/06817c95e40b23918c3a420960ee9526e499275d";,
-            "reference": "06817c95e40b23918c3a420960ee9526e499275d",
-            "shasum": ""
-        },
-        "require": {
-            "php": ">=5.3.3"
-        },
-        "require-dev": {
-            "phpunit/phpcov": "*",
-            "phpunit/phpunit": "~4.0",
-            "satooshi/php-coveralls": "dev-master"
-        },
-        "time": "2015-09-06 01:39:05",
-        "type": "library",
-        "installation-source": "dist",
-        "autoload": {
-            "psr-0": {
-                "Kafka\\": "src/"
-            }
-        },
-        "notification-url": "https://packagist.org/downloads/";,
-        "license": [
-            "BSD-3-Clause"
-        ],
-        "description": "Kafka client for php",
-        "homepage": "http://www.swanlinux.net";,
-        "keywords": [
-            "client",
-            "kafka"
-        ]
-    },
-    {
         "name": "wikimedia/avro",
         "version": "v1.7.7",
         "version_normalized": "1.7.7.0",
@@ -1290,5 +1248,47 @@
             "Apache-2.0"
         ],
         "description": "Convert CSS stylesheets between left-to-right and 
right-to-left."
+    },
+    {
+        "name": "nmred/kafka-php",
+        "version": "v0.1.5",
+        "version_normalized": "0.1.5.0",
+        "source": {
+            "type": "git",
+            "url": "https://github.com/nmred/kafka-php.git";,
+            "reference": "317ad8c208684db8b9e6d2f5bf7f471e89a8b4eb"
+        },
+        "dist": {
+            "type": "zip",
+            "url": 
"https://api.github.com/repos/nmred/kafka-php/zipball/317ad8c208684db8b9e6d2f5bf7f471e89a8b4eb";,
+            "reference": "317ad8c208684db8b9e6d2f5bf7f471e89a8b4eb",
+            "shasum": ""
+        },
+        "require": {
+            "php": ">=5.3.3"
+        },
+        "require-dev": {
+            "phpunit/phpcov": "*",
+            "phpunit/phpunit": "~4.0",
+            "satooshi/php-coveralls": "dev-master"
+        },
+        "time": "2015-12-02 12:06:23",
+        "type": "library",
+        "installation-source": "dist",
+        "autoload": {
+            "psr-0": {
+                "Kafka\\": "src/"
+            }
+        },
+        "notification-url": "https://packagist.org/downloads/";,
+        "license": [
+            "BSD-3-Clause"
+        ],
+        "description": "Kafka client for php",
+        "homepage": "http://www.swanlinux.net";,
+        "keywords": [
+            "client",
+            "kafka"
+        ]
     }
 ]
diff --git a/nmred/kafka-php/src/Kafka/Client.php 
b/nmred/kafka-php/src/Kafka/Client.php
index a38e705..06249d9 100644
--- a/nmred/kafka-php/src/Kafka/Client.php
+++ b/nmred/kafka-php/src/Kafka/Client.php
@@ -77,7 +77,7 @@
      * __construct
      *
      * @access public
-     * @return void
+     * @param ClusterMetaData $metadata
      */
     public function __construct(ClusterMetaData $metadata)
     {
@@ -151,7 +151,7 @@
      * get broker server
      *
      * @access public
-     * @return void
+     * @return array
      */
     public function getBrokers()
     {
@@ -219,8 +219,9 @@
      * get broker broker connect
      *
      * @param string $host
+     * @param null $lockKey
+     * @return array
      * @access private
-     * @return void
      */
     public function getStream($host, $lockKey = null)
     {
diff --git a/nmred/kafka-php/src/Kafka/Consumer.php 
b/nmred/kafka-php/src/Kafka/Consumer.php
index 5ff2d43..1b9d42c 100644
--- a/nmred/kafka-php/src/Kafka/Consumer.php
+++ b/nmred/kafka-php/src/Kafka/Consumer.php
@@ -35,7 +35,7 @@
     /**
      * client
      *
-     * @var mixed
+     * @var Client
      * @access private
      */
     private $client = null;
@@ -73,22 +73,6 @@
     private static $instance = null;
 
     /**
-     * broker host list
-     *
-     * @var array
-     * @access private
-     */
-    private $hostList = array();
-
-    /**
-     * save broker connection
-     *
-     * @var array
-     * @access private
-     */
-    private $stream = array();
-
-    /**
      * maxSize
      *
      * @var integer
@@ -109,7 +93,9 @@
      * set send messages
      *
      * @access public
-     * @return void
+     * @param $hostList
+     * @param null $timeout
+     * @return Consumer
      */
     public static function getInstance($hostList, $timeout = null)
     {
@@ -127,7 +113,8 @@
      * __construct
      *
      * @access public
-     * @return void
+     * @param $hostList
+     * @param null $timeout
      */
     private function __construct($hostList, $timeout = null)
     {
@@ -156,7 +143,9 @@
      * set topic name
      *
      * @access public
-     * @return void
+     * @param $topicName
+     * @param null $defaultOffset
+     * @return Consumer
      */
     public function setTopic($topicName, $defaultOffset = null)
     {
@@ -180,7 +169,10 @@
      * set topic partition
      *
      * @access public
-     * @return void
+     * @param $topicName
+     * @param int $partitionId
+     * @param null $offset
+     * @return Consumer
      */
     public function setPartition($topicName, $partitionId = 0, $offset = null)
     {
@@ -236,7 +228,7 @@
      *
      * @param string $group
      * @access public
-     * @return void
+     * @return Consumer
      */
     public function setGroup($group)
     {
@@ -251,7 +243,7 @@
      * fetch message to broker
      *
      * @access public
-     * @return void
+     * @return \Kafka\Protocol\Fetch\Topic|bool
      */
     public function fetch()
     {
@@ -260,7 +252,6 @@
             return false;
         }
 
-        $responseData = array();
         $streams = array();
         foreach ($data as $host => $requestData) {
             $connArr = $this->client->getStream($host);
@@ -295,7 +286,7 @@
      * get client object
      *
      * @access public
-     * @return void
+     * @return Client
      */
     public function getClient()
     {
@@ -366,7 +357,7 @@
      * const EARLIEST_OFFSET = -2;
      * const DEFAULT_LAST  = -2;
      * const DEFAULT_EARLY = -1;
-     * @param type $offsetStrategy
+     * @param int $offsetStrategy
      */
     public function setOffsetStrategy($offsetStrategy)
     {
diff --git a/nmred/kafka-php/src/Kafka/Exception/NotSupported.php 
b/nmred/kafka-php/src/Kafka/Exception/NotSupported.php
index 011129a..cb6d23c 100644
--- a/nmred/kafka-php/src/Kafka/Exception/NotSupported.php
+++ b/nmred/kafka-php/src/Kafka/Exception/NotSupported.php
@@ -28,6 +28,6 @@
 +------------------------------------------------------------------------------
 */
 
-class NotSupported extends \Exception
+class NotSupported extends Exception
 {
 }
diff --git a/nmred/kafka-php/src/Kafka/Exception/Protocol.php 
b/nmred/kafka-php/src/Kafka/Exception/Protocol.php
index 6e213f0..d7fa251 100644
--- a/nmred/kafka-php/src/Kafka/Exception/Protocol.php
+++ b/nmred/kafka-php/src/Kafka/Exception/Protocol.php
@@ -28,6 +28,6 @@
 +------------------------------------------------------------------------------
 */
 
-class Protocol extends \Exception
+class Protocol extends Exception
 {
 }
diff --git a/nmred/kafka-php/src/Kafka/Exception/Socket.php 
b/nmred/kafka-php/src/Kafka/Exception/Socket.php
index aca93e2..59ab5aa 100644
--- a/nmred/kafka-php/src/Kafka/Exception/Socket.php
+++ b/nmred/kafka-php/src/Kafka/Exception/Socket.php
@@ -28,6 +28,6 @@
 +------------------------------------------------------------------------------
 */
 
-class Socket extends \Exception
+class Socket extends Exception
 {
 }
diff --git a/nmred/kafka-php/src/Kafka/Log.php 
b/nmred/kafka-php/src/Kafka/Log.php
index 481bfc3..efa5476 100644
--- a/nmred/kafka-php/src/Kafka/Log.php
+++ b/nmred/kafka-php/src/Kafka/Log.php
@@ -48,7 +48,7 @@
      * setLog
      *
      * @access public
-     * @return void
+     * @param $log
      */
     public static function setLog($log)
     {
@@ -64,7 +64,8 @@
      * log
      *
      * @access public
-     * @return void
+     * @param $message
+     * @param int $level
      */
     public static function log($message, $level = LOG_DEBUG)
     {
diff --git a/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php 
b/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php
index 9d2c613..e61e096 100644
--- a/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php
+++ b/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php
@@ -117,6 +117,11 @@
     // }}}
     // {{{ public function getPartitionState()
 
+    /**
+     * @param string $topicName
+     * @param int $partitionId
+     * @return null
+     */
     public function getPartitionState($topicName, $partitionId = 0)
     {
         if (!isset( $this->topics[$topicName] ) ) {
@@ -164,6 +169,9 @@
     // }}}
     // {{{ private function loadTopicDetail()
 
+    /**
+     * @param array $topics
+     */
     private function loadTopicDetail(array $topics)
     {
         if ($this->client === null) {
diff --git a/nmred/kafka-php/src/Kafka/Offset.php 
b/nmred/kafka-php/src/Kafka/Offset.php
index 7ad3f9d..744d208 100644
--- a/nmred/kafka-php/src/Kafka/Offset.php
+++ b/nmred/kafka-php/src/Kafka/Offset.php
@@ -14,8 +14,6 @@
 
 namespace Kafka;
 
-use \Kafka\Log;
-
 /**
 +------------------------------------------------------------------------------
 * Kafka protocol since Kafka v0.8
@@ -119,7 +117,10 @@
      * __construct
      *
      * @access public
-     * @return void
+     * @param $client
+     * @param $groupId
+     * @param $topicName
+     * @param int $partitionId
      */
     public function __construct($client, $groupId, $topicName, $partitionId = 
0)
     {
@@ -192,7 +193,7 @@
      *   if defaultOffset -1 instead of early offset
      *   if defaultOffset -2 instead of last offset
      * @access public
-     * @return void
+     * @return int
      */
     public function getOffset($defaultOffset = self::DEFAULT_LAST)
     {
@@ -224,8 +225,8 @@
         if ($result[$topicName][$partitionId]['errCode'] == 3) {
             switch ($defaultOffset) {
                 case self::DEFAULT_LAST:
-                    return $maxOffset;
                     Log::log("topic name: $topicName, partitionId: 
$partitionId, get offset value is default last.", LOG_INFO);
+                    return $maxOffset;
                 case self::DEFAULT_EARLY:
                     Log::log("topic name: $topicName, partitionId: 
$partitionId, get offset value is default early.", LOG_INFO);
                     return $minOffset;
@@ -233,10 +234,6 @@
                     $this->setOffset($defaultOffset);
                     Log::log("topic name: $topicName, partitionId: 
$partitionId, get offset value is default $defaultOffset.", LOG_INFO);
                     return $defaultOffset;
-            }
-            if ($defaultOffset) {
-                $this->setOffset($defaultOffset);
-                return $defaultOffset;
             }
         } elseif ($result[$topicName][$partitionId]['errCode'] == 0) {
             $offset = $result[$topicName][$partitionId]['offset'];
@@ -261,8 +258,7 @@
     /**
      * get produce server offset
      *
-     * @param string $topicName
-     * @param integer $partitionId
+     * @param integer $timeLine
      * @access public
      * @return int
      */
diff --git a/nmred/kafka-php/src/Kafka/Produce.php 
b/nmred/kafka-php/src/Kafka/Produce.php
index 2b9e6cd..8e735e8 100644
--- a/nmred/kafka-php/src/Kafka/Produce.php
+++ b/nmred/kafka-php/src/Kafka/Produce.php
@@ -72,22 +72,6 @@
      */
     private static $instance = null;
 
-    /**
-     * broker host list
-     *
-     * @var array
-     * @access private
-     */
-    private $hostList = array();
-
-    /**
-     * save broker connection
-     *
-     * @var array
-     * @access private
-     */
-    private $stream = array();
-
     // }}}
     // {{{ functions
     // {{{ public function static getInstance()
@@ -96,7 +80,10 @@
      * set send messages
      *
      * @access public
-     * @return void
+     * @param $hostList
+     * @param $timeout
+     * @param null $kafkaHostList
+     * @return Produce
      */
     public static function getInstance($hostList, $timeout, $kafkaHostList = 
null)
     {
@@ -114,7 +101,9 @@
      * __construct
      *
      * @access public
-     * @return void
+     * @param $hostList
+     * @param null $timeout
+     * @param null $kafkaHostList
      */
     public function __construct($hostList, $timeout = null, $kafkaHostList = 
null)
     {
@@ -135,7 +124,10 @@
      * set send messages
      *
      * @access public
-     * @return void
+     * @param $topicName
+     * @param int $partitionId
+     * @param array $messages
+     * @return Produce
      */
     public function setMessages($topicName, $partitionId = 0, $messages = 
array())
     {
@@ -166,7 +158,7 @@
      *
      * @param int $ack
      * @access public
-     * @return void
+     * @return Produce
      */
     public function setRequireAck($ack = 0)
     {
@@ -185,7 +177,7 @@
      *
      * @param int $timeout
      * @access public
-     * @return void
+     * @return Produce
      */
     public function setTimeOut($timeout = 100)
     {
@@ -202,7 +194,7 @@
      * send message to broker
      *
      * @access public
-     * @return void
+     * @return bool|array
      */
     public function send()
     {
@@ -243,7 +235,7 @@
      * get client object
      *
      * @access public
-     * @return void
+     * @return Client
      */
     public function getClient()
     {
@@ -268,6 +260,7 @@
      * get available partition
      *
      * @access public
+     * @param $topicName
      * @return array
      */
     public function getAvailablePartitions($topicName)
diff --git a/nmred/kafka-php/src/Kafka/Protocol/Decoder.php 
b/nmred/kafka-php/src/Kafka/Protocol/Decoder.php
index f1e4b49..261906e 100644
--- a/nmred/kafka-php/src/Kafka/Protocol/Decoder.php
+++ b/nmred/kafka-php/src/Kafka/Protocol/Decoder.php
@@ -34,7 +34,6 @@
     /**
      * decode produce response
      *
-     * @param string $data
      * @access public
      * @return array
      */
@@ -66,7 +65,7 @@
             for ($j = 0; $j < $partitionCount; $j++) {
                 $partitionId = self::unpack(self::BIT_B32, substr($data, 
$offset, 4));
                 $offset += 4;
-                $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 
2));
+                $errCode = self::unpack(self::BIT_B16_SIGNED, substr($data, 
$offset, 2));
                 $offset += 2;
                 $partitionOffset = self::unpack(self::BIT_B64, substr($data, 
$offset, 8));
                 $offset += 8;
@@ -86,9 +85,8 @@
     /**
      * decode fetch response
      *
-     * @param string $data
      * @access public
-     * @return Iterator
+     * @return \Iterator
      */
     public function fetchResponse()
     {
@@ -101,13 +99,11 @@
     /**
      * decode metadata response
      *
-     * @param string $data
      * @access public
      * @return array
      */
     public function metadataResponse()
     {
-        $result = array();
         $broker = array();
         $topic = array();
         $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
@@ -202,7 +198,6 @@
     /**
      * decode offset response
      *
-     * @param string $data
      * @access public
      * @return array
      */
@@ -257,7 +252,6 @@
     /**
      * decode commit offset response
      *
-     * @param string $data
      * @access public
      * @return array
      */
@@ -303,7 +297,6 @@
     /**
      * decode fetch offset response
      *
-     * @param string $data
      * @access public
      * @return array
      */
@@ -343,7 +336,7 @@
                     $metaData = substr($data, $offset, $metaLen);
                     $offset += $metaLen;
                 }
-                $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 
2));
+                $errCode = self::unpack(self::BIT_B16_SIGNED, substr($data, 
$offset, 2));
                 $offset += 2;
                 $result[$topicName][$partitionId[1]] = array(
                     'offset'   => $partitionOffset,
@@ -368,7 +361,6 @@
      */
     public static function getError($errCode)
     {
-        $error = '';
         switch($errCode) {
             case 0:
                 $error = 'No error--it worked!';
diff --git a/nmred/kafka-php/src/Kafka/Protocol/Encoder.php 
b/nmred/kafka-php/src/Kafka/Protocol/Encoder.php
index 7d36e10..8f5e27b 100644
--- a/nmred/kafka-php/src/Kafka/Protocol/Encoder.php
+++ b/nmred/kafka-php/src/Kafka/Protocol/Encoder.php
@@ -35,9 +35,9 @@
      * produce request
      *
      * @param array $payloads
-     * @static
+     * @param int $compression
+     * @return int
      * @access public
-     * @return void
      */
     public function produceRequest($payloads, $compression = 
self::COMPRESSION_NONE)
     {
@@ -222,10 +222,11 @@
      * encode pack string type
      *
      * @param string $string
-     * @param int $bytes   self::PACK_INT32: int32 big endian order. 
self::PACK_INT16: int16 big endian order.
+     * @param int $bytes self::PACK_INT32: int32 big endian order. 
self::PACK_INT16: int16 big endian order.
+     * @param int $compression
+     * @return string
      * @static
      * @access public
-     * @return string
      */
     public static function encodeString($string, $bytes, $compression = 
self::COMPRESSION_NONE)
     {
@@ -252,9 +253,10 @@
      *
      * @param array $array
      * @param Callable $func
+     * @param null $options
+     * @return string
      * @static
      * @access public
-     * @return string
      */
     public static function encodeArray(array $array, $func, $options = null)
     {
@@ -285,9 +287,10 @@
      * in the protocol.
      *
      * @param array $messages
+     * @param int $compression
+     * @return string
      * @static
      * @access public
-     * @return string
      */
     public static function encodeMessageSet($messages, $compression = 
self::COMPRESSION_NONE)
     {
@@ -316,7 +319,7 @@
      * @param integer $apiKey
      * @static
      * @access public
-     * @return void
+     * @return string
      */
     public static function requestHeader($clientId, $correlationId, $apiKey)
     {
@@ -338,9 +341,10 @@
      * encode signal message
      *
      * @param string $message
+     * @param int $compression
+     * @return string
      * @static
      * @access protected
-     * @return string
      */
     protected static function _encodeMessage($message, $compression = 
self::COMPRESSION_NONE)
     {
@@ -368,10 +372,12 @@
     /**
      * encode signal part
      *
-     * @param partions
+     * @param $values
+     * @param $compression
+     * @return string
+     * @internal param $partions
      * @static
      * @access protected
-     * @return string
      */
     protected static function _encodeProcudePartion($values, $compression)
     {
@@ -395,10 +401,12 @@
     /**
      * encode signal topic
      *
-     * @param partions
+     * @param $values
+     * @param $compression
+     * @return string
+     * @internal param $partions
      * @static
      * @access protected
-     * @return string
      */
     protected static function _encodeProcudeTopic($values, $compression)
     {
diff --git a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php 
b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php
index 8424cf7..9fadb56 100644
--- a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php
+++ b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php
@@ -46,7 +46,7 @@
      * __construct
      *
      * @access public
-     * @return void
+     * @param $client
      */
     public function __construct($client)
     {
@@ -60,7 +60,7 @@
      * set consumer group
      *
      * @access public
-     * @return void
+     * @param $group
      */
     public function setGroup($group)
     {
@@ -111,7 +111,7 @@
         $topicName = $partition->getTopicName();
         $offset    = $partition->getMessageOffset();
         $offsetObject = new  \Kafka\Offset($this->client, $this->group, 
$topicName, $partitionId);
-        $offsetObject->setOffset($offset);
+        $offsetObject->setOffset($offset + 1);
     }
 
     // }}}
diff --git a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php 
b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php
index acf0223..49560b9 100644
--- a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php
+++ b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php
@@ -11,13 +11,18 @@
 
     protected $offsetStrategy;
 
-
+    /**
+     * Consumer constructor.
+     * @param \Kafka\Consumer $consumer
+     */
     public function __construct(\Kafka\Consumer $consumer)
     {
         $this->consumer = $consumer;
     }
 
-
+    /**
+     * @param \Kafka\Protocol\Fetch\Partition $partition
+     */
     public function onPartitionEof($partition)
     {
         $partitionId = $partition->key();
@@ -27,11 +32,17 @@
         $this->consumer->setPartition($topicName, $partitionId, ($offset +1));
     }
 
+    /**
+     * @param string $streamKey
+     */
     public function onStreamEof($streamKey)
     {
 
     }
 
+    /**
+     * @param string $topicName
+     */
     public function onTopicEof($topicName)
     {
 
diff --git a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php 
b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php
index bba38dd..b82ca3c 100644
--- a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php
+++ b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php
@@ -46,7 +46,7 @@
      * __construct
      *
      * @access public
-     * @return void
+     * @param $client
      */
     public function __construct($client)
     {
@@ -60,7 +60,7 @@
      * set streams
      *
      * @access public
-     * @return void
+     * @param $streams
      */
     public function setStreams($streams)
     {
diff --git a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php 
b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php
index 4ec2392..33ba7d2 100644
--- a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php
+++ b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php
@@ -97,7 +97,7 @@
     public static function onStreamEof($streamKey)
     {
         if (empty(self::$helpers)) {
-            return false;
+            return;
         }
 
         foreach (self::$helpers as $key => $helper) {
@@ -121,7 +121,7 @@
     public static function onTopicEof($topicName)
     {
         if (empty(self::$helpers)) {
-            return false;
+            return;
         }
 
         foreach (self::$helpers as $key => $helper) {
@@ -145,7 +145,7 @@
     public static function onPartitionEof($partition)
     {
         if (empty(self::$helpers)) {
-            return false;
+            return;
         }
 
         foreach (self::$helpers as $key => $helper) {
diff --git a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php 
b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php
index 42d7da1..bedb165 100644
--- a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php
+++ b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php
@@ -33,22 +33,6 @@
     // {{{ members
 
     /**
-     * init read bytes
-     *
-     * @var float
-     * @access private
-     */
-    private $initOffset = 0;
-
-    /**
-     * validByteCount
-     *
-     * @var float
-     * @access private
-     */
-    private $validByteCount = 0;
-
-    /**
      * crc32 code
      *
      * @var float
@@ -99,7 +83,6 @@
      *
      * @param string(raw) $msg
      * @access public
-     * @return void
      */
     public function __construct($msg)
     {
@@ -163,7 +146,7 @@
      * __toString
      *
      * @access public
-     * @return void
+     * @return string (raw)
      */
     public function __toString()
     {
diff --git a/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php 
b/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php
index 50413b6..13c7aac 100644
--- a/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php
+++ b/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php
@@ -87,6 +87,11 @@
      */
     private $context = array();
 
+    /**
+     * @var Message
+     */
+    private $current = null;
+
     // }}}
     // {{{ functions
     // {{{ public function __construct()
@@ -94,10 +99,9 @@
     /**
      * __construct
      *
-     * @param \Kafka\Socket $stream
-     * @param int $initOffset
+     * @param Partition $partition
+     * @param array $context
      * @access public
-     * @return void
      */
     public function __construct(\Kafka\Protocol\Fetch\Partition $partition, 
$context = array())
     {
@@ -115,7 +119,7 @@
      * current
      *
      * @access public
-     * @return void
+     * @return Message
      */
     public function current()
     {
@@ -129,7 +133,7 @@
      * key
      *
      * @access public
-     * @return void
+     * @return float
      */
     public function key()
     {
@@ -178,7 +182,7 @@
      * implements Iterator function
      *
      * @access public
-     * @return integer
+     * @return void
      */
     public function next()
     {
@@ -214,7 +218,7 @@
      * load next message
      *
      * @access public
-     * @return void
+     * @return bool
      */
     public function loadNextMessage()
     {
@@ -257,7 +261,7 @@
     /**
      * current message offset in producer
      *
-     * @return void
+     * @return float
      */
     public function messageOffset()
     {
diff --git a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php 
b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php
index 9f8578d..7baa3bc 100644
--- a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php
+++ b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php
@@ -106,7 +106,7 @@
     private $valid = false;
 
     /**
-     * cuerrent topic name
+     * current topic name
      *
      * @var string
      * @access private
@@ -128,9 +128,8 @@
      * __construct
      *
      * @param \Kafka\Protocol\Fetch\Topic $topic
-     * @param int $initOffset
+     * @param array $context
      * @access public
-     * @return void
      */
     public function __construct(\Kafka\Protocol\Fetch\Topic $topic, $context = 
array())
     {
@@ -147,7 +146,7 @@
      * current
      *
      * @access public
-     * @return void
+     * @return mixed
      */
     public function current()
     {
@@ -161,7 +160,7 @@
      * key
      *
      * @access public
-     * @return void
+     * @return string
      */
     public function key()
     {
@@ -203,7 +202,7 @@
      * implements Iterator function
      *
      * @access public
-     * @return integer
+     * @return void
      */
     public function next()
     {
@@ -231,7 +230,7 @@
      * get partition errcode
      *
      * @access public
-     * @return void
+     * @return float
      */
     public function getErrCode()
     {
@@ -245,7 +244,7 @@
      * get partition high offset
      *
      * @access public
-     * @return void
+     * @return float
      */
     public function getHighOffset()
     {
@@ -259,7 +258,7 @@
      * get partition topic name
      *
      * @access public
-     * @return void
+     * @return string
      */
     public function getTopicName()
     {
@@ -297,7 +296,7 @@
         $data = Decoder::unpack(Decoder::BIT_B32, $data);
         $count = array_shift($data);
         if ($count <= 0) {
-            throw new \Kafka\Exception\OutOfRange($size . ' is not a valid 
partition count');
+            throw new \Kafka\Exception\OutOfRange($count . ' is not a valid 
partition count');
         }
 
         return $count;
@@ -310,7 +309,7 @@
      * load next partition
      *
      * @access public
-     * @return void
+     * @return bool
      */
     public function loadNextPartition()
     {
@@ -349,7 +348,7 @@
     /**
      * set messageSet fetch offset current
      *
-     * @param  intger $offset
+     * @param int $offset
      * @return void
      */
     public function setMessageOffset($offset)
diff --git a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php 
b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php
index 500e6b1..64f60f2 100644
--- a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php
+++ b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php
@@ -127,10 +127,9 @@
     /**
      * __construct
      *
-     * @param \Kafka\Socket $stream
-     * @param int $initOffset
+     * @param \Kafka\Socket|array $streams
+     * @param array $context
      * @access public
-     * @return void
      */
     public function __construct($streams, $context = array())
     {
@@ -168,7 +167,7 @@
      * current
      *
      * @access public
-     * @return void
+     * @return mixed
      */
     public function current()
     {
@@ -182,7 +181,7 @@
      * key
      *
      * @access public
-     * @return void
+     * @return string
      */
     public function key()
     {
@@ -196,7 +195,7 @@
      * implements Iterator function
      *
      * @access public
-     * @return integer
+     * @return void
      */
     public function rewind()
     {
@@ -224,7 +223,7 @@
      * implements Iterator function
      *
      * @access public
-     * @return integer
+     * @return void
      */
     public function next()
     {
@@ -281,7 +280,7 @@
      * load next topic
      *
      * @access public
-     * @return void
+     * @return bool
      */
     public function loadNextTopic()
     {
diff --git a/nmred/kafka-php/src/Kafka/Protocol/Protocol.php 
b/nmred/kafka-php/src/Kafka/Protocol/Protocol.php
index a31067b..f9fed57 100644
--- a/nmred/kafka-php/src/Kafka/Protocol/Protocol.php
+++ b/nmred/kafka-php/src/Kafka/Protocol/Protocol.php
@@ -81,6 +81,7 @@
     const BIT_B64 = 'N2';
     const BIT_B32 = 'N';
     const BIT_B16 = 'n';
+    const BIT_B16_SIGNED = 's';
     const BIT_B8  = 'C';
 
     // }}}
@@ -94,6 +95,17 @@
      */
     protected $stream = null;
 
+    /**
+     * isBigEndianSystem
+     *
+     * gets set to true if the computer this code is running is little endian,
+     * gets set to false if the computer this code is running on is big endian.
+     *
+     * @var null|bool
+     * @access private
+     */
+    private static $isLittleEndianSystem = null;
+
     // }}}
     // {{{ functions
     // {{{ public function __construct()
@@ -103,7 +115,6 @@
      *
      * @param \Kafka\Socket $stream
      * @access public
-     * @return void
      */
     public function __construct(\Kafka\Socket $stream)
     {
@@ -144,7 +155,9 @@
      *
      * @static
      * @access public
-     * @return integer
+     * @param $type
+     * @param $bytes
+     * @return int
      */
     public static function unpack($type, $bytes)
     {
@@ -153,6 +166,17 @@
             $set = unpack($type, $bytes);
             $original = ($set[1] & 0xFFFFFFFF) << 32 | ($set[2] & 0xFFFFFFFF);
             return $original;
+        } elseif ($type == self::BIT_B16_SIGNED) {
+            // According to PHP docs: 's' = signed short (always 16 bit, 
machine byte order)
+            // So lets unpack it..
+            $set = unpack($type, $bytes);
+
+            // But if our system is little endian
+            if (self::isSystemLittleEndian()) {
+                // We need to flip the endianess because coming from kafka it 
is big endian
+                $set = 
self::convertSignedShortFromLittleEndianToBigEndian($set);
+            }
+            return $set;
         } else {
             return unpack($type, $bytes);
         }
@@ -166,7 +190,9 @@
      *
      * @static
      * @access public
-     * @return integer
+     * @param $type
+     * @param $data
+     * @return int
      */
     public static function pack($type, $data)
     {
@@ -215,6 +241,9 @@
             case self::BIT_B16:
                 $len = 2;
                 break;
+            case self::BIT_B16_SIGNED:
+                $len = 2;
+                break;
             case self::BIT_B8:
                 $len = 1;
                 break;
@@ -226,5 +255,63 @@
     }
 
     // }}}
+    // {{{ public static function isSystemLittleEndian()
+
+    /**
+     * Determines if the computer currently running this code is big endian or 
little endian.
+     *
+     * @access public
+     * @return bool - false if big endian, true if little endian
+     */
+    public static function isSystemLittleEndian()
+    {
+        // If we don't know if our system is big endian or not yet...
+        if (is_null(self::$isLittleEndianSystem)) {
+            // Lets find out
+            list ($endiantest) = array_values(unpack('L1L', pack('V', 1)));
+            if ($endiantest != 1) {
+                // This is a big endian system
+                self::$isLittleEndianSystem = false;
+            } else {
+                // This is a little endian system
+                self::$isLittleEndianSystem = true;
+            }
+        }
+
+        return self::$isLittleEndianSystem;
+    }
+
+    // }}}
+    // {{{ public static function 
convertSignedShortFromLittleEndianToBigEndian()
+
+    /**
+     * Converts a signed short (16 bits) from little endian to big endian.
+     *
+     * @param int[] $bits
+     * @access public
+     * @return array
+     */
+    public static function convertSignedShortFromLittleEndianToBigEndian($bits)
+    {
+        foreach ($bits as $index => $bit) {
+
+            // get LSB
+            $lsb = $bit & 0xff;
+
+            // get MSB
+            $msb = $bit >> 8 & 0xff;
+
+            // swap bytes
+            $bit = $lsb <<8 | $msb;
+
+            if ($bit >= 32768) {
+                $bit -= 65536;
+            }
+            $bits[$index] = $bit;
+        }
+        return $bits;
+    }
+
+    // }}}
     // }}}
 }
diff --git a/nmred/kafka-php/src/Kafka/Socket.php 
b/nmred/kafka-php/src/Kafka/Socket.php
index be7321f..d0902c2 100644
--- a/nmred/kafka-php/src/Kafka/Socket.php
+++ b/nmred/kafka-php/src/Kafka/Socket.php
@@ -32,6 +32,13 @@
 
     const READ_MAX_LEN = 5242880; // read socket max length 5MB
 
+    /**
+     * max write socket buffer
+     * fixed:send of 8192 bytes failed with errno=11 Resource temporarily
+     * unavailable error info
+     */
+    const MAX_WRITE_BUFFER = 4096;
+
     // }}}
     // {{{ members
 
@@ -99,7 +106,12 @@
      * __construct
      *
      * @access public
-     * @return void
+     * @param $host
+     * @param $port
+     * @param int $recvTimeoutSec
+     * @param int $recvTimeoutUsec
+     * @param int $sendTimeoutSec
+     * @param int $sendTimeoutUsec
      */
     public function __construct($host, $port, $recvTimeoutSec = 0, 
$recvTimeoutUsec = 750000, $sendTimeoutSec = 0, $sendTimeoutUsec = 100000)
     {
@@ -153,7 +165,8 @@
      *
      * @static
      * @access public
-     * @return void
+     * @param $stream
+     * @return Socket
      */
     public static function createFromStream($stream)
     {
@@ -189,7 +202,7 @@
     public function connect()
     {
         if (is_resource($this->stream)) {
-            return false;
+            return;
         }
 
         if (empty($this->host)) {
@@ -246,7 +259,7 @@
      * @param boolean $verifyExactLength Throw an exception if the number of 
read bytes is less than $len
      *
      * @return string Binary data
-     * @throws Kafka_Exception_Socket
+     * @throws \Kafka\Exception\SocketEOF
      */
     public function read($len, $verifyExactLength = false)
     {
@@ -311,7 +324,7 @@
      * @param string $buf The data to write
      *
      * @return integer
-     * @throws Kafka_Exception_Socket
+     * @throws \Kafka\Exception\SocketEOF
      */
     public function write($buf)
     {
@@ -326,8 +339,13 @@
             // wait for stream to become available for writing
             $writable = stream_select($null, $write, $null, 
$this->sendTimeoutSec, $this->sendTimeoutUsec);
             if ($writable > 0) {
-                // write remaining buffer bytes to stream
-                $wrote = fwrite($this->stream, substr($buf, $written));
+                if ($buflen - $written > self::MAX_WRITE_BUFFER) {
+                    // write max buffer size
+                    $wrote = fwrite($this->stream, substr($buf, $written, 
self::MAX_WRITE_BUFFER));
+                } else {
+                    // write remaining buffer bytes to stream
+                    $wrote = fwrite($this->stream, substr($buf, $written));
+                }
                 if ($wrote === -1 || $wrote === false) {
                     throw new \Kafka\Exception\Socket('Could not write ' . 
strlen($buf) . ' bytes to stream, completed writing only ' . $written . ' 
bytes');
                 }
diff --git a/nmred/kafka-php/src/Kafka/ZooKeeper.php 
b/nmred/kafka-php/src/Kafka/ZooKeeper.php
index f48b5cb..d141dde 100644
--- a/nmred/kafka-php/src/Kafka/ZooKeeper.php
+++ b/nmred/kafka-php/src/Kafka/ZooKeeper.php
@@ -84,7 +84,8 @@
      * __construct
      *
      * @access public
-     * @return void
+     * @param $hostList
+     * @param null $timeout
      */
     public function __construct($hostList, $timeout = null)
     {
@@ -129,7 +130,7 @@
      *
      * @param integer $brokerId
      * @access public
-     * @return void
+     * @return string|bool
      */
     public function getBrokerDetail($brokerId)
     {
@@ -155,7 +156,7 @@
      *
      * @param string $topicName
      * @access public
-     * @return void
+     * @return string|bool
      */
     public function getTopicDetail($topicName)
     {
@@ -181,7 +182,7 @@
      * @param string $topicName
      * @param integer $partitionId
      * @access public
-     * @return void
+     * @return string|bool
      */
     public function getPartitionState($topicName, $partitionId = 0)
     {
@@ -204,15 +205,15 @@
     /**
      * register consumer
      *
-     * @param string $topicName
-     * @param integer $partitionId
+     * @param $groupId
+     * @param integer $consumerId
+     * @param array $topics
      * @access public
-     * @return void
      */
     public function registerConsumer($groupId, $consumerId, $topics = array())
     {
         if (empty($topics)) {
-            return true;
+            return;
         }
 
         $path = sprintf(self::REG_CONSUMER, (string) $groupId, (string) 
$consumerId);
@@ -241,7 +242,7 @@
      *
      * @param string $groupId
      * @access public
-     * @return void
+     * @return array
      */
     public function listConsumer($groupId)
     {

-- 
To view, visit https://gerrit.wikimedia.org/r/260156
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ibb9e988b9e81445641c96075a064090d5ed146ca
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/vendor
Gerrit-Branch: master
Gerrit-Owner: Reedy <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to