Reedy has uploaded a new change for review. https://gerrit.wikimedia.org/r/260156
Change subject: Update nmred/kafka-php to v0.1.5 ...................................................................... Update nmred/kafka-php to v0.1.5 https://github.com/nmred/kafka-php/releases/tag/v0.1.5 https://github.com/nmred/kafka-php/compare/v0.1.4...v0.1.5 Change-Id: Ibb9e988b9e81445641c96075a064090d5ed146ca --- M composer.json M composer.lock M composer/installed.json M nmred/kafka-php/src/Kafka/Client.php M nmred/kafka-php/src/Kafka/Consumer.php M nmred/kafka-php/src/Kafka/Exception/NotSupported.php M nmred/kafka-php/src/Kafka/Exception/Protocol.php M nmred/kafka-php/src/Kafka/Exception/Socket.php M nmred/kafka-php/src/Kafka/Log.php M nmred/kafka-php/src/Kafka/MetaDataFromKafka.php M nmred/kafka-php/src/Kafka/Offset.php M nmred/kafka-php/src/Kafka/Produce.php M nmred/kafka-php/src/Kafka/Protocol/Decoder.php M nmred/kafka-php/src/Kafka/Protocol/Encoder.php M nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php M nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php M nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php M nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php M nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php M nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php M nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php M nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php M nmred/kafka-php/src/Kafka/Protocol/Protocol.php M nmred/kafka-php/src/Kafka/Socket.php M nmred/kafka-php/src/Kafka/ZooKeeper.php 25 files changed, 308 insertions(+), 216 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/vendor refs/changes/56/260156/1 diff --git a/composer.json b/composer.json index a649bbc..537af48 100644 --- a/composer.json +++ b/composer.json @@ -15,7 +15,7 @@ "liuggio/statsd-php-client": "1.0.18", "mediawiki/at-ease": "1.1.0", "monolog/monolog": "1.17.2", - "nmred/kafka-php": "0.1.4", + "nmred/kafka-php": "0.1.5", "oojs/oojs-ui": "0.14.1", "oyejorge/less.php": "1.7.0.9", "php": ">=5.3.3", diff --git a/composer.lock b/composer.lock index 1bd7403..4bbdd9c 100644 --- a/composer.lock +++ b/composer.lock @@ -4,8 +4,8 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file", "This file is @generated automatically" ], - "hash": "8ef0be88ea376ffdd03bda4b73524e91", - "content-hash": "efc2113760078de01b7f379cca798a47", + "hash": "593526f306a0d147a461d8f9cc8ce6ed", + "content-hash": "672a92d75cd3f056b38f283ef83f37d9", "packages": [ { "name": "composer/semver", @@ -323,16 +323,16 @@ }, { "name": "nmred/kafka-php", - "version": "v0.1.4", + "version": "v0.1.5", "source": { "type": "git", "url": "https://github.com/nmred/kafka-php.git", - "reference": "06817c95e40b23918c3a420960ee9526e499275d" + "reference": "317ad8c208684db8b9e6d2f5bf7f471e89a8b4eb" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/nmred/kafka-php/zipball/06817c95e40b23918c3a420960ee9526e499275d", - "reference": "06817c95e40b23918c3a420960ee9526e499275d", + "url": "https://api.github.com/repos/nmred/kafka-php/zipball/317ad8c208684db8b9e6d2f5bf7f471e89a8b4eb", + "reference": "317ad8c208684db8b9e6d2f5bf7f471e89a8b4eb", "shasum": "" }, "require": { @@ -359,7 +359,7 @@ "client", "kafka" ], - "time": "2015-09-06 01:39:05" + "time": "2015-12-02 12:06:23" }, { "name": "oojs/oojs-ui", diff --git a/composer/installed.json b/composer/installed.json index 3365c01..184879a 100644 --- a/composer/installed.json +++ b/composer/installed.json @@ -432,48 +432,6 @@ "homepage": "https://www.mediawiki.org/wiki/CDB" }, { - "name": "nmred/kafka-php", - "version": "v0.1.4", - "version_normalized": "0.1.4.0", - "source": { - "type": "git", - "url": "https://github.com/nmred/kafka-php.git", - "reference": "06817c95e40b23918c3a420960ee9526e499275d" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/nmred/kafka-php/zipball/06817c95e40b23918c3a420960ee9526e499275d", - "reference": "06817c95e40b23918c3a420960ee9526e499275d", - "shasum": "" - }, - "require": { - "php": ">=5.3.3" - }, - "require-dev": { - "phpunit/phpcov": "*", - "phpunit/phpunit": "~4.0", - "satooshi/php-coveralls": "dev-master" - }, - "time": "2015-09-06 01:39:05", - "type": "library", - "installation-source": "dist", - "autoload": { - "psr-0": { - "Kafka\\": "src/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "BSD-3-Clause" - ], - "description": "Kafka client for php", - "homepage": "http://www.swanlinux.net", - "keywords": [ - "client", - "kafka" - ] - }, - { "name": "wikimedia/avro", "version": "v1.7.7", "version_normalized": "1.7.7.0", @@ -1290,5 +1248,47 @@ "Apache-2.0" ], "description": "Convert CSS stylesheets between left-to-right and right-to-left." + }, + { + "name": "nmred/kafka-php", + "version": "v0.1.5", + "version_normalized": "0.1.5.0", + "source": { + "type": "git", + "url": "https://github.com/nmred/kafka-php.git", + "reference": "317ad8c208684db8b9e6d2f5bf7f471e89a8b4eb" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/nmred/kafka-php/zipball/317ad8c208684db8b9e6d2f5bf7f471e89a8b4eb", + "reference": "317ad8c208684db8b9e6d2f5bf7f471e89a8b4eb", + "shasum": "" + }, + "require": { + "php": ">=5.3.3" + }, + "require-dev": { + "phpunit/phpcov": "*", + "phpunit/phpunit": "~4.0", + "satooshi/php-coveralls": "dev-master" + }, + "time": "2015-12-02 12:06:23", + "type": "library", + "installation-source": "dist", + "autoload": { + "psr-0": { + "Kafka\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "description": "Kafka client for php", + "homepage": "http://www.swanlinux.net", + "keywords": [ + "client", + "kafka" + ] } ] diff --git a/nmred/kafka-php/src/Kafka/Client.php b/nmred/kafka-php/src/Kafka/Client.php index a38e705..06249d9 100644 --- a/nmred/kafka-php/src/Kafka/Client.php +++ b/nmred/kafka-php/src/Kafka/Client.php @@ -77,7 +77,7 @@ * __construct * * @access public - * @return void + * @param ClusterMetaData $metadata */ public function __construct(ClusterMetaData $metadata) { @@ -151,7 +151,7 @@ * get broker server * * @access public - * @return void + * @return array */ public function getBrokers() { @@ -219,8 +219,9 @@ * get broker broker connect * * @param string $host + * @param null $lockKey + * @return array * @access private - * @return void */ public function getStream($host, $lockKey = null) { diff --git a/nmred/kafka-php/src/Kafka/Consumer.php b/nmred/kafka-php/src/Kafka/Consumer.php index 5ff2d43..1b9d42c 100644 --- a/nmred/kafka-php/src/Kafka/Consumer.php +++ b/nmred/kafka-php/src/Kafka/Consumer.php @@ -35,7 +35,7 @@ /** * client * - * @var mixed + * @var Client * @access private */ private $client = null; @@ -73,22 +73,6 @@ private static $instance = null; /** - * broker host list - * - * @var array - * @access private - */ - private $hostList = array(); - - /** - * save broker connection - * - * @var array - * @access private - */ - private $stream = array(); - - /** * maxSize * * @var integer @@ -109,7 +93,9 @@ * set send messages * * @access public - * @return void + * @param $hostList + * @param null $timeout + * @return Consumer */ public static function getInstance($hostList, $timeout = null) { @@ -127,7 +113,8 @@ * __construct * * @access public - * @return void + * @param $hostList + * @param null $timeout */ private function __construct($hostList, $timeout = null) { @@ -156,7 +143,9 @@ * set topic name * * @access public - * @return void + * @param $topicName + * @param null $defaultOffset + * @return Consumer */ public function setTopic($topicName, $defaultOffset = null) { @@ -180,7 +169,10 @@ * set topic partition * * @access public - * @return void + * @param $topicName + * @param int $partitionId + * @param null $offset + * @return Consumer */ public function setPartition($topicName, $partitionId = 0, $offset = null) { @@ -236,7 +228,7 @@ * * @param string $group * @access public - * @return void + * @return Consumer */ public function setGroup($group) { @@ -251,7 +243,7 @@ * fetch message to broker * * @access public - * @return void + * @return \Kafka\Protocol\Fetch\Topic|bool */ public function fetch() { @@ -260,7 +252,6 @@ return false; } - $responseData = array(); $streams = array(); foreach ($data as $host => $requestData) { $connArr = $this->client->getStream($host); @@ -295,7 +286,7 @@ * get client object * * @access public - * @return void + * @return Client */ public function getClient() { @@ -366,7 +357,7 @@ * const EARLIEST_OFFSET = -2; * const DEFAULT_LAST = -2; * const DEFAULT_EARLY = -1; - * @param type $offsetStrategy + * @param int $offsetStrategy */ public function setOffsetStrategy($offsetStrategy) { diff --git a/nmred/kafka-php/src/Kafka/Exception/NotSupported.php b/nmred/kafka-php/src/Kafka/Exception/NotSupported.php index 011129a..cb6d23c 100644 --- a/nmred/kafka-php/src/Kafka/Exception/NotSupported.php +++ b/nmred/kafka-php/src/Kafka/Exception/NotSupported.php @@ -28,6 +28,6 @@ +------------------------------------------------------------------------------ */ -class NotSupported extends \Exception +class NotSupported extends Exception { } diff --git a/nmred/kafka-php/src/Kafka/Exception/Protocol.php b/nmred/kafka-php/src/Kafka/Exception/Protocol.php index 6e213f0..d7fa251 100644 --- a/nmred/kafka-php/src/Kafka/Exception/Protocol.php +++ b/nmred/kafka-php/src/Kafka/Exception/Protocol.php @@ -28,6 +28,6 @@ +------------------------------------------------------------------------------ */ -class Protocol extends \Exception +class Protocol extends Exception { } diff --git a/nmred/kafka-php/src/Kafka/Exception/Socket.php b/nmred/kafka-php/src/Kafka/Exception/Socket.php index aca93e2..59ab5aa 100644 --- a/nmred/kafka-php/src/Kafka/Exception/Socket.php +++ b/nmred/kafka-php/src/Kafka/Exception/Socket.php @@ -28,6 +28,6 @@ +------------------------------------------------------------------------------ */ -class Socket extends \Exception +class Socket extends Exception { } diff --git a/nmred/kafka-php/src/Kafka/Log.php b/nmred/kafka-php/src/Kafka/Log.php index 481bfc3..efa5476 100644 --- a/nmred/kafka-php/src/Kafka/Log.php +++ b/nmred/kafka-php/src/Kafka/Log.php @@ -48,7 +48,7 @@ * setLog * * @access public - * @return void + * @param $log */ public static function setLog($log) { @@ -64,7 +64,8 @@ * log * * @access public - * @return void + * @param $message + * @param int $level */ public static function log($message, $level = LOG_DEBUG) { diff --git a/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php b/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php index 9d2c613..e61e096 100644 --- a/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php +++ b/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php @@ -117,6 +117,11 @@ // }}} // {{{ public function getPartitionState() + /** + * @param string $topicName + * @param int $partitionId + * @return null + */ public function getPartitionState($topicName, $partitionId = 0) { if (!isset( $this->topics[$topicName] ) ) { @@ -164,6 +169,9 @@ // }}} // {{{ private function loadTopicDetail() + /** + * @param array $topics + */ private function loadTopicDetail(array $topics) { if ($this->client === null) { diff --git a/nmred/kafka-php/src/Kafka/Offset.php b/nmred/kafka-php/src/Kafka/Offset.php index 7ad3f9d..744d208 100644 --- a/nmred/kafka-php/src/Kafka/Offset.php +++ b/nmred/kafka-php/src/Kafka/Offset.php @@ -14,8 +14,6 @@ namespace Kafka; -use \Kafka\Log; - /** +------------------------------------------------------------------------------ * Kafka protocol since Kafka v0.8 @@ -119,7 +117,10 @@ * __construct * * @access public - * @return void + * @param $client + * @param $groupId + * @param $topicName + * @param int $partitionId */ public function __construct($client, $groupId, $topicName, $partitionId = 0) { @@ -192,7 +193,7 @@ * if defaultOffset -1 instead of early offset * if defaultOffset -2 instead of last offset * @access public - * @return void + * @return int */ public function getOffset($defaultOffset = self::DEFAULT_LAST) { @@ -224,8 +225,8 @@ if ($result[$topicName][$partitionId]['errCode'] == 3) { switch ($defaultOffset) { case self::DEFAULT_LAST: - return $maxOffset; Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is default last.", LOG_INFO); + return $maxOffset; case self::DEFAULT_EARLY: Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is default early.", LOG_INFO); return $minOffset; @@ -233,10 +234,6 @@ $this->setOffset($defaultOffset); Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is default $defaultOffset.", LOG_INFO); return $defaultOffset; - } - if ($defaultOffset) { - $this->setOffset($defaultOffset); - return $defaultOffset; } } elseif ($result[$topicName][$partitionId]['errCode'] == 0) { $offset = $result[$topicName][$partitionId]['offset']; @@ -261,8 +258,7 @@ /** * get produce server offset * - * @param string $topicName - * @param integer $partitionId + * @param integer $timeLine * @access public * @return int */ diff --git a/nmred/kafka-php/src/Kafka/Produce.php b/nmred/kafka-php/src/Kafka/Produce.php index 2b9e6cd..8e735e8 100644 --- a/nmred/kafka-php/src/Kafka/Produce.php +++ b/nmred/kafka-php/src/Kafka/Produce.php @@ -72,22 +72,6 @@ */ private static $instance = null; - /** - * broker host list - * - * @var array - * @access private - */ - private $hostList = array(); - - /** - * save broker connection - * - * @var array - * @access private - */ - private $stream = array(); - // }}} // {{{ functions // {{{ public function static getInstance() @@ -96,7 +80,10 @@ * set send messages * * @access public - * @return void + * @param $hostList + * @param $timeout + * @param null $kafkaHostList + * @return Produce */ public static function getInstance($hostList, $timeout, $kafkaHostList = null) { @@ -114,7 +101,9 @@ * __construct * * @access public - * @return void + * @param $hostList + * @param null $timeout + * @param null $kafkaHostList */ public function __construct($hostList, $timeout = null, $kafkaHostList = null) { @@ -135,7 +124,10 @@ * set send messages * * @access public - * @return void + * @param $topicName + * @param int $partitionId + * @param array $messages + * @return Produce */ public function setMessages($topicName, $partitionId = 0, $messages = array()) { @@ -166,7 +158,7 @@ * * @param int $ack * @access public - * @return void + * @return Produce */ public function setRequireAck($ack = 0) { @@ -185,7 +177,7 @@ * * @param int $timeout * @access public - * @return void + * @return Produce */ public function setTimeOut($timeout = 100) { @@ -202,7 +194,7 @@ * send message to broker * * @access public - * @return void + * @return bool|array */ public function send() { @@ -243,7 +235,7 @@ * get client object * * @access public - * @return void + * @return Client */ public function getClient() { @@ -268,6 +260,7 @@ * get available partition * * @access public + * @param $topicName * @return array */ public function getAvailablePartitions($topicName) diff --git a/nmred/kafka-php/src/Kafka/Protocol/Decoder.php b/nmred/kafka-php/src/Kafka/Protocol/Decoder.php index f1e4b49..261906e 100644 --- a/nmred/kafka-php/src/Kafka/Protocol/Decoder.php +++ b/nmred/kafka-php/src/Kafka/Protocol/Decoder.php @@ -34,7 +34,6 @@ /** * decode produce response * - * @param string $data * @access public * @return array */ @@ -66,7 +65,7 @@ for ($j = 0; $j < $partitionCount; $j++) { $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4)); $offset += 4; - $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $errCode = self::unpack(self::BIT_B16_SIGNED, substr($data, $offset, 2)); $offset += 2; $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8)); $offset += 8; @@ -86,9 +85,8 @@ /** * decode fetch response * - * @param string $data * @access public - * @return Iterator + * @return \Iterator */ public function fetchResponse() { @@ -101,13 +99,11 @@ /** * decode metadata response * - * @param string $data * @access public * @return array */ public function metadataResponse() { - $result = array(); $broker = array(); $topic = array(); $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true)); @@ -202,7 +198,6 @@ /** * decode offset response * - * @param string $data * @access public * @return array */ @@ -257,7 +252,6 @@ /** * decode commit offset response * - * @param string $data * @access public * @return array */ @@ -303,7 +297,6 @@ /** * decode fetch offset response * - * @param string $data * @access public * @return array */ @@ -343,7 +336,7 @@ $metaData = substr($data, $offset, $metaLen); $offset += $metaLen; } - $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2)); + $errCode = self::unpack(self::BIT_B16_SIGNED, substr($data, $offset, 2)); $offset += 2; $result[$topicName][$partitionId[1]] = array( 'offset' => $partitionOffset, @@ -368,7 +361,6 @@ */ public static function getError($errCode) { - $error = ''; switch($errCode) { case 0: $error = 'No error--it worked!'; diff --git a/nmred/kafka-php/src/Kafka/Protocol/Encoder.php b/nmred/kafka-php/src/Kafka/Protocol/Encoder.php index 7d36e10..8f5e27b 100644 --- a/nmred/kafka-php/src/Kafka/Protocol/Encoder.php +++ b/nmred/kafka-php/src/Kafka/Protocol/Encoder.php @@ -35,9 +35,9 @@ * produce request * * @param array $payloads - * @static + * @param int $compression + * @return int * @access public - * @return void */ public function produceRequest($payloads, $compression = self::COMPRESSION_NONE) { @@ -222,10 +222,11 @@ * encode pack string type * * @param string $string - * @param int $bytes self::PACK_INT32: int32 big endian order. self::PACK_INT16: int16 big endian order. + * @param int $bytes self::PACK_INT32: int32 big endian order. self::PACK_INT16: int16 big endian order. + * @param int $compression + * @return string * @static * @access public - * @return string */ public static function encodeString($string, $bytes, $compression = self::COMPRESSION_NONE) { @@ -252,9 +253,10 @@ * * @param array $array * @param Callable $func + * @param null $options + * @return string * @static * @access public - * @return string */ public static function encodeArray(array $array, $func, $options = null) { @@ -285,9 +287,10 @@ * in the protocol. * * @param array $messages + * @param int $compression + * @return string * @static * @access public - * @return string */ public static function encodeMessageSet($messages, $compression = self::COMPRESSION_NONE) { @@ -316,7 +319,7 @@ * @param integer $apiKey * @static * @access public - * @return void + * @return string */ public static function requestHeader($clientId, $correlationId, $apiKey) { @@ -338,9 +341,10 @@ * encode signal message * * @param string $message + * @param int $compression + * @return string * @static * @access protected - * @return string */ protected static function _encodeMessage($message, $compression = self::COMPRESSION_NONE) { @@ -368,10 +372,12 @@ /** * encode signal part * - * @param partions + * @param $values + * @param $compression + * @return string + * @internal param $partions * @static * @access protected - * @return string */ protected static function _encodeProcudePartion($values, $compression) { @@ -395,10 +401,12 @@ /** * encode signal topic * - * @param partions + * @param $values + * @param $compression + * @return string + * @internal param $partions * @static * @access protected - * @return string */ protected static function _encodeProcudeTopic($values, $compression) { diff --git a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php index 8424cf7..9fadb56 100644 --- a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php +++ b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php @@ -46,7 +46,7 @@ * __construct * * @access public - * @return void + * @param $client */ public function __construct($client) { @@ -60,7 +60,7 @@ * set consumer group * * @access public - * @return void + * @param $group */ public function setGroup($group) { @@ -111,7 +111,7 @@ $topicName = $partition->getTopicName(); $offset = $partition->getMessageOffset(); $offsetObject = new \Kafka\Offset($this->client, $this->group, $topicName, $partitionId); - $offsetObject->setOffset($offset); + $offsetObject->setOffset($offset + 1); } // }}} diff --git a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php index acf0223..49560b9 100644 --- a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php +++ b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php @@ -11,13 +11,18 @@ protected $offsetStrategy; - + /** + * Consumer constructor. + * @param \Kafka\Consumer $consumer + */ public function __construct(\Kafka\Consumer $consumer) { $this->consumer = $consumer; } - + /** + * @param \Kafka\Protocol\Fetch\Partition $partition + */ public function onPartitionEof($partition) { $partitionId = $partition->key(); @@ -27,11 +32,17 @@ $this->consumer->setPartition($topicName, $partitionId, ($offset +1)); } + /** + * @param string $streamKey + */ public function onStreamEof($streamKey) { } + /** + * @param string $topicName + */ public function onTopicEof($topicName) { diff --git a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php index bba38dd..b82ca3c 100644 --- a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php +++ b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php @@ -46,7 +46,7 @@ * __construct * * @access public - * @return void + * @param $client */ public function __construct($client) { @@ -60,7 +60,7 @@ * set streams * * @access public - * @return void + * @param $streams */ public function setStreams($streams) { diff --git a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php index 4ec2392..33ba7d2 100644 --- a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php +++ b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php @@ -97,7 +97,7 @@ public static function onStreamEof($streamKey) { if (empty(self::$helpers)) { - return false; + return; } foreach (self::$helpers as $key => $helper) { @@ -121,7 +121,7 @@ public static function onTopicEof($topicName) { if (empty(self::$helpers)) { - return false; + return; } foreach (self::$helpers as $key => $helper) { @@ -145,7 +145,7 @@ public static function onPartitionEof($partition) { if (empty(self::$helpers)) { - return false; + return; } foreach (self::$helpers as $key => $helper) { diff --git a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php index 42d7da1..bedb165 100644 --- a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php +++ b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php @@ -33,22 +33,6 @@ // {{{ members /** - * init read bytes - * - * @var float - * @access private - */ - private $initOffset = 0; - - /** - * validByteCount - * - * @var float - * @access private - */ - private $validByteCount = 0; - - /** * crc32 code * * @var float @@ -99,7 +83,6 @@ * * @param string(raw) $msg * @access public - * @return void */ public function __construct($msg) { @@ -163,7 +146,7 @@ * __toString * * @access public - * @return void + * @return string (raw) */ public function __toString() { diff --git a/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php b/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php index 50413b6..13c7aac 100644 --- a/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php +++ b/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php @@ -87,6 +87,11 @@ */ private $context = array(); + /** + * @var Message + */ + private $current = null; + // }}} // {{{ functions // {{{ public function __construct() @@ -94,10 +99,9 @@ /** * __construct * - * @param \Kafka\Socket $stream - * @param int $initOffset + * @param Partition $partition + * @param array $context * @access public - * @return void */ public function __construct(\Kafka\Protocol\Fetch\Partition $partition, $context = array()) { @@ -115,7 +119,7 @@ * current * * @access public - * @return void + * @return Message */ public function current() { @@ -129,7 +133,7 @@ * key * * @access public - * @return void + * @return float */ public function key() { @@ -178,7 +182,7 @@ * implements Iterator function * * @access public - * @return integer + * @return void */ public function next() { @@ -214,7 +218,7 @@ * load next message * * @access public - * @return void + * @return bool */ public function loadNextMessage() { @@ -257,7 +261,7 @@ /** * current message offset in producer * - * @return void + * @return float */ public function messageOffset() { diff --git a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php index 9f8578d..7baa3bc 100644 --- a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php +++ b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php @@ -106,7 +106,7 @@ private $valid = false; /** - * cuerrent topic name + * current topic name * * @var string * @access private @@ -128,9 +128,8 @@ * __construct * * @param \Kafka\Protocol\Fetch\Topic $topic - * @param int $initOffset + * @param array $context * @access public - * @return void */ public function __construct(\Kafka\Protocol\Fetch\Topic $topic, $context = array()) { @@ -147,7 +146,7 @@ * current * * @access public - * @return void + * @return mixed */ public function current() { @@ -161,7 +160,7 @@ * key * * @access public - * @return void + * @return string */ public function key() { @@ -203,7 +202,7 @@ * implements Iterator function * * @access public - * @return integer + * @return void */ public function next() { @@ -231,7 +230,7 @@ * get partition errcode * * @access public - * @return void + * @return float */ public function getErrCode() { @@ -245,7 +244,7 @@ * get partition high offset * * @access public - * @return void + * @return float */ public function getHighOffset() { @@ -259,7 +258,7 @@ * get partition topic name * * @access public - * @return void + * @return string */ public function getTopicName() { @@ -297,7 +296,7 @@ $data = Decoder::unpack(Decoder::BIT_B32, $data); $count = array_shift($data); if ($count <= 0) { - throw new \Kafka\Exception\OutOfRange($size . ' is not a valid partition count'); + throw new \Kafka\Exception\OutOfRange($count . ' is not a valid partition count'); } return $count; @@ -310,7 +309,7 @@ * load next partition * * @access public - * @return void + * @return bool */ public function loadNextPartition() { @@ -349,7 +348,7 @@ /** * set messageSet fetch offset current * - * @param intger $offset + * @param int $offset * @return void */ public function setMessageOffset($offset) diff --git a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php index 500e6b1..64f60f2 100644 --- a/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php +++ b/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php @@ -127,10 +127,9 @@ /** * __construct * - * @param \Kafka\Socket $stream - * @param int $initOffset + * @param \Kafka\Socket|array $streams + * @param array $context * @access public - * @return void */ public function __construct($streams, $context = array()) { @@ -168,7 +167,7 @@ * current * * @access public - * @return void + * @return mixed */ public function current() { @@ -182,7 +181,7 @@ * key * * @access public - * @return void + * @return string */ public function key() { @@ -196,7 +195,7 @@ * implements Iterator function * * @access public - * @return integer + * @return void */ public function rewind() { @@ -224,7 +223,7 @@ * implements Iterator function * * @access public - * @return integer + * @return void */ public function next() { @@ -281,7 +280,7 @@ * load next topic * * @access public - * @return void + * @return bool */ public function loadNextTopic() { diff --git a/nmred/kafka-php/src/Kafka/Protocol/Protocol.php b/nmred/kafka-php/src/Kafka/Protocol/Protocol.php index a31067b..f9fed57 100644 --- a/nmred/kafka-php/src/Kafka/Protocol/Protocol.php +++ b/nmred/kafka-php/src/Kafka/Protocol/Protocol.php @@ -81,6 +81,7 @@ const BIT_B64 = 'N2'; const BIT_B32 = 'N'; const BIT_B16 = 'n'; + const BIT_B16_SIGNED = 's'; const BIT_B8 = 'C'; // }}} @@ -94,6 +95,17 @@ */ protected $stream = null; + /** + * isBigEndianSystem + * + * gets set to true if the computer this code is running is little endian, + * gets set to false if the computer this code is running on is big endian. + * + * @var null|bool + * @access private + */ + private static $isLittleEndianSystem = null; + // }}} // {{{ functions // {{{ public function __construct() @@ -103,7 +115,6 @@ * * @param \Kafka\Socket $stream * @access public - * @return void */ public function __construct(\Kafka\Socket $stream) { @@ -144,7 +155,9 @@ * * @static * @access public - * @return integer + * @param $type + * @param $bytes + * @return int */ public static function unpack($type, $bytes) { @@ -153,6 +166,17 @@ $set = unpack($type, $bytes); $original = ($set[1] & 0xFFFFFFFF) << 32 | ($set[2] & 0xFFFFFFFF); return $original; + } elseif ($type == self::BIT_B16_SIGNED) { + // According to PHP docs: 's' = signed short (always 16 bit, machine byte order) + // So lets unpack it.. + $set = unpack($type, $bytes); + + // But if our system is little endian + if (self::isSystemLittleEndian()) { + // We need to flip the endianess because coming from kafka it is big endian + $set = self::convertSignedShortFromLittleEndianToBigEndian($set); + } + return $set; } else { return unpack($type, $bytes); } @@ -166,7 +190,9 @@ * * @static * @access public - * @return integer + * @param $type + * @param $data + * @return int */ public static function pack($type, $data) { @@ -215,6 +241,9 @@ case self::BIT_B16: $len = 2; break; + case self::BIT_B16_SIGNED: + $len = 2; + break; case self::BIT_B8: $len = 1; break; @@ -226,5 +255,63 @@ } // }}} + // {{{ public static function isSystemLittleEndian() + + /** + * Determines if the computer currently running this code is big endian or little endian. + * + * @access public + * @return bool - false if big endian, true if little endian + */ + public static function isSystemLittleEndian() + { + // If we don't know if our system is big endian or not yet... + if (is_null(self::$isLittleEndianSystem)) { + // Lets find out + list ($endiantest) = array_values(unpack('L1L', pack('V', 1))); + if ($endiantest != 1) { + // This is a big endian system + self::$isLittleEndianSystem = false; + } else { + // This is a little endian system + self::$isLittleEndianSystem = true; + } + } + + return self::$isLittleEndianSystem; + } + + // }}} + // {{{ public static function convertSignedShortFromLittleEndianToBigEndian() + + /** + * Converts a signed short (16 bits) from little endian to big endian. + * + * @param int[] $bits + * @access public + * @return array + */ + public static function convertSignedShortFromLittleEndianToBigEndian($bits) + { + foreach ($bits as $index => $bit) { + + // get LSB + $lsb = $bit & 0xff; + + // get MSB + $msb = $bit >> 8 & 0xff; + + // swap bytes + $bit = $lsb <<8 | $msb; + + if ($bit >= 32768) { + $bit -= 65536; + } + $bits[$index] = $bit; + } + return $bits; + } + + // }}} // }}} } diff --git a/nmred/kafka-php/src/Kafka/Socket.php b/nmred/kafka-php/src/Kafka/Socket.php index be7321f..d0902c2 100644 --- a/nmred/kafka-php/src/Kafka/Socket.php +++ b/nmred/kafka-php/src/Kafka/Socket.php @@ -32,6 +32,13 @@ const READ_MAX_LEN = 5242880; // read socket max length 5MB + /** + * max write socket buffer + * fixed:send of 8192 bytes failed with errno=11 Resource temporarily + * unavailable error info + */ + const MAX_WRITE_BUFFER = 4096; + // }}} // {{{ members @@ -99,7 +106,12 @@ * __construct * * @access public - * @return void + * @param $host + * @param $port + * @param int $recvTimeoutSec + * @param int $recvTimeoutUsec + * @param int $sendTimeoutSec + * @param int $sendTimeoutUsec */ public function __construct($host, $port, $recvTimeoutSec = 0, $recvTimeoutUsec = 750000, $sendTimeoutSec = 0, $sendTimeoutUsec = 100000) { @@ -153,7 +165,8 @@ * * @static * @access public - * @return void + * @param $stream + * @return Socket */ public static function createFromStream($stream) { @@ -189,7 +202,7 @@ public function connect() { if (is_resource($this->stream)) { - return false; + return; } if (empty($this->host)) { @@ -246,7 +259,7 @@ * @param boolean $verifyExactLength Throw an exception if the number of read bytes is less than $len * * @return string Binary data - * @throws Kafka_Exception_Socket + * @throws \Kafka\Exception\SocketEOF */ public function read($len, $verifyExactLength = false) { @@ -311,7 +324,7 @@ * @param string $buf The data to write * * @return integer - * @throws Kafka_Exception_Socket + * @throws \Kafka\Exception\SocketEOF */ public function write($buf) { @@ -326,8 +339,13 @@ // wait for stream to become available for writing $writable = stream_select($null, $write, $null, $this->sendTimeoutSec, $this->sendTimeoutUsec); if ($writable > 0) { - // write remaining buffer bytes to stream - $wrote = fwrite($this->stream, substr($buf, $written)); + if ($buflen - $written > self::MAX_WRITE_BUFFER) { + // write max buffer size + $wrote = fwrite($this->stream, substr($buf, $written, self::MAX_WRITE_BUFFER)); + } else { + // write remaining buffer bytes to stream + $wrote = fwrite($this->stream, substr($buf, $written)); + } if ($wrote === -1 || $wrote === false) { throw new \Kafka\Exception\Socket('Could not write ' . strlen($buf) . ' bytes to stream, completed writing only ' . $written . ' bytes'); } diff --git a/nmred/kafka-php/src/Kafka/ZooKeeper.php b/nmred/kafka-php/src/Kafka/ZooKeeper.php index f48b5cb..d141dde 100644 --- a/nmred/kafka-php/src/Kafka/ZooKeeper.php +++ b/nmred/kafka-php/src/Kafka/ZooKeeper.php @@ -84,7 +84,8 @@ * __construct * * @access public - * @return void + * @param $hostList + * @param null $timeout */ public function __construct($hostList, $timeout = null) { @@ -129,7 +130,7 @@ * * @param integer $brokerId * @access public - * @return void + * @return string|bool */ public function getBrokerDetail($brokerId) { @@ -155,7 +156,7 @@ * * @param string $topicName * @access public - * @return void + * @return string|bool */ public function getTopicDetail($topicName) { @@ -181,7 +182,7 @@ * @param string $topicName * @param integer $partitionId * @access public - * @return void + * @return string|bool */ public function getPartitionState($topicName, $partitionId = 0) { @@ -204,15 +205,15 @@ /** * register consumer * - * @param string $topicName - * @param integer $partitionId + * @param $groupId + * @param integer $consumerId + * @param array $topics * @access public - * @return void */ public function registerConsumer($groupId, $consumerId, $topics = array()) { if (empty($topics)) { - return true; + return; } $path = sprintf(self::REG_CONSUMER, (string) $groupId, (string) $consumerId); @@ -241,7 +242,7 @@ * * @param string $groupId * @access public - * @return void + * @return array */ public function listConsumer($groupId) { -- To view, visit https://gerrit.wikimedia.org/r/260156 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ibb9e988b9e81445641c96075a064090d5ed146ca Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/vendor Gerrit-Branch: master Gerrit-Owner: Reedy <[email protected]> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
