http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/protocol/CommandCustomHeader.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/CommandCustomHeader.cpp b/rocketmq-client4cpp/src/protocol/CommandCustomHeader.cpp deleted file mode 100755 index fb2d2a6..0000000 --- a/rocketmq-client4cpp/src/protocol/CommandCustomHeader.cpp +++ /dev/null @@ -1,672 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#include "CommandCustomHeader.h" - -#include <stdlib.h> -#include <unistd.h> -#include <arpa/inet.h> -#include <sstream> -#include <string> -#include <cstdlib> -#include "RemotingCommand.h" -#include "MQProtos.h" -#include "KPRUtil.h" -#include "UtilAll.h" - -#include "json/json.h" - -namespace rmq -{ - - -CommandCustomHeader* CommandCustomHeader::decode(int code, Json::Value& data, bool isResponseType) -{ - CommandCustomHeader* pCustomHeader = NULL; - - try - { - if (isResponseType) - { - switch (code) - { - case SEND_MESSAGE_VALUE: - case SEND_MESSAGE_V2_VALUE: - pCustomHeader = SendMessageResponseHeader::decode(data); - break; - case PULL_MESSAGE_VALUE: - pCustomHeader = PullMessageResponseHeader::decode(data); - break; - case QUERY_CONSUMER_OFFSET_VALUE: - pCustomHeader = QueryConsumerOffsetResponseHeader::decode(data); - break; - case SEARCH_OFFSET_BY_TIMESTAMP_VALUE: - pCustomHeader = SearchOffsetResponseHeader::decode(data); - break; - case GET_MAX_OFFSET_VALUE: - pCustomHeader = GetMaxOffsetResponseHeader::decode(data); - break; - case GET_MIN_OFFSET_VALUE: - pCustomHeader = GetMinOffsetResponseHeader::decode(data); - break; - case GET_EARLIEST_MSG_STORETIME_VALUE: - pCustomHeader = GetEarliestMsgStoretimeResponseHeader::decode(data); - break; - case QUERY_MESSAGE_VALUE: - pCustomHeader = QueryMessageResponseHeader::decode(data); - break; - case GET_KV_CONFIG_VALUE: - pCustomHeader = GetKVConfigResponseHeader::decode(data); - break; - - default: - break; - } - } - else - { - switch (code) - { - case NOTIFY_CONSUMER_IDS_CHANGED_VALUE: - pCustomHeader = NotifyConsumerIdsChangedRequestHeader::decode(data); - break; - case GET_CONSUMER_RUNNING_INFO_VALUE: - pCustomHeader = GetConsumerRunningInfoRequestHeader::decode(data); - break; - default: - break; - } - } - } - catch(std::exception& e) - { - if (pCustomHeader != NULL) - { - delete pCustomHeader; - pCustomHeader = NULL; - } - RMQ_ERROR("CommandCustomHeader decode exception, %d, %d, %s, %s", - code, isResponseType, UtilAll::toString(data).c_str(), e.what()); - } - catch(...) - { - if (pCustomHeader != NULL) - { - delete pCustomHeader; - pCustomHeader = NULL; - } - RMQ_ERROR("CommandCustomHeader decode exception, %d, %d, %s", - code, isResponseType, UtilAll::toString(data).c_str()); - } - - return pCustomHeader; -} - - -//////////////////////////////////////////////////////////////////////////////// -//GET_ROUTEINTO_BY_TOPIC_VALUE -//////////////////////////////////////////////////////////////////////////////// -void GetRouteInfoRequestHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"topic\":\"" << topic << "\"" - << "}"; - - outData = ss.str(); -} - - -//////////////////////////////////////////////////////////////////////////////// -// UPDATE_AND_CREATE_TOPIC_VALUE -//////////////////////////////////////////////////////////////////////////////// -void CreateTopicRequestHeader::encode(std::string& outData) -{ - std::stringstream ss; - - ss << "{" - << "\"topic\":\"" << topic << "\"," - << "\"defaultTopic\":\"" << defaultTopic << "\"," - << "\"readQueueNums\":\"" << readQueueNums << "\"," - << "\"writeQueueNums\":\"" << writeQueueNums << "\"," - << "\"perm\":\"" << perm << "\"," - << "\"topicFilterType\":\"" << topicFilterType << "\"," - << "\"topicSysFlag\":\"" << topicFilterType << "\"," - << "\"order\":\"" << topicFilterType << "\"" - << "}"; - - outData = ss.str(); -} - - -//////////////////////////////////////////////////////////////////////////////// -// SEND_MESSAGE_VALUE/SEND_MESSAGE_V2_VALUE -//////////////////////////////////////////////////////////////////////////////// -void SendMessageRequestHeader::encode(std::string& outData) -{ - std::stringstream ss; - - ss << "{" - << "\"producerGroup\":\"" << producerGroup << "\"," - << "\"topic\":\"" << topic << "\"," - << "\"defaultTopic\":\"" << defaultTopic << "\"," - << "\"defaultTopicQueueNums\":" << defaultTopicQueueNums << "," - << "\"queueId\":" << queueId << "," - << "\"sysFlag\":" << sysFlag << "," - << "\"bornTimestamp\":" << bornTimestamp << "," - << "\"flag\":" << flag << "," - << "\"properties\":\"" << properties << "\"," - << "\"reconsumeTimes\":" << reconsumeTimes - << "}"; - - outData = ss.str(); -} - -void SendMessageRequestHeaderV2::encode(std::string& outData) -{ - std::stringstream ss; - - ss << "{" - << "\"a\":\"" << a << "\"," - << "\"b\":\"" << b << "\"," - << "\"c\":\"" << c << "\"," - << "\"d\":\"" << d << "\"," - << "\"e\":\"" << e << "\"," - << "\"f\":\"" << f << "\"," - << "\"g\":\"" << g << "\"," - << "\"h\":\"" << h << "\"," - << "\"i\":\"" << i << "\"," - << "\"j\":\"" << j << "\"" - << "}"; - - outData = ss.str(); -} - -SendMessageRequestHeader* SendMessageRequestHeaderV2::createSendMessageRequestHeaderV1( - const SendMessageRequestHeaderV2* v2) -{ - SendMessageRequestHeader* v1 = new SendMessageRequestHeader(); - v1->producerGroup = v2->a; - v1->topic = v2->b; - v1->defaultTopic = v2->c; - v1->defaultTopicQueueNums = v2->d; - v1->queueId = v2->e; - v1->sysFlag = v2->f; - v1->bornTimestamp = v2->g; - v1->flag = v2->h; - v1->properties = v2->i; - v1->reconsumeTimes = v2->j; - - return v1; -} - -SendMessageRequestHeaderV2* SendMessageRequestHeaderV2::createSendMessageRequestHeaderV2( - const SendMessageRequestHeader* v1) -{ - SendMessageRequestHeaderV2* v2 = new SendMessageRequestHeaderV2(); - v2->a = v1->producerGroup; - v2->b = v1->topic; - v2->c = v1->defaultTopic; - v2->d = v1->defaultTopicQueueNums; - v2->e = v1->queueId; - v2->f = v1->sysFlag; - v2->g = v1->bornTimestamp; - v2->h = v1->flag; - v2->i = v1->properties; - v2->j = v1->reconsumeTimes; - - return v2; -} - -void SendMessageResponseHeader::encode(std::string& outData) -{ -} - -CommandCustomHeader* SendMessageResponseHeader::decode(Json::Value& data) -{ - std::string msgId = data["msgId"].asString(); - int queueId = atoi(data["queueId"].asCString()); - long long queueOffset = KPRUtil::str2ll(data["queueOffset"].asCString()); - - SendMessageResponseHeader* h = new SendMessageResponseHeader(); - - h->msgId = msgId; - h->queueId = queueId; - h->queueOffset = queueOffset; - - return h; -} - - -//////////////////////////////////////////////////////////////////////////////// -// PULL_MESSAGE_VALUE -//////////////////////////////////////////////////////////////////////////////// -void PullMessageRequestHeader::encode(std::string& outData) -{ - std::stringstream ss; - - ss << "{" - << "\"consumerGroup\":\"" << consumerGroup << "\"," - << "\"topic\":\"" << topic << "\"," - << "\"queueId\":\"" << queueId << "\"," - << "\"queueOffset\":\"" << queueOffset << "\"," - << "\"maxMsgNums\":\"" << maxMsgNums << "\"," - << "\"sysFlag\":\"" << sysFlag << "\"," - << "\"commitOffset\":\"" << commitOffset << "\"," - << "\"suspendTimeoutMillis\":\"" << suspendTimeoutMillis << "\"," - << "\"subscription\":\"" << subscription << "\"," - << "\"subVersion\":\"" << subVersion << "\"" - << "}"; - - outData = ss.str(); -} - -void PullMessageResponseHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"suggestWhichBrokerId\":\"" << suggestWhichBrokerId << "\"," - << "\"nextBeginOffset\":\"" << nextBeginOffset << "\"," - << "\"minOffset\":\"" << minOffset << "\"," - << "\"maxOffset\":\"" << maxOffset << "\"" - << "}"; - outData = ss.str(); -} - -CommandCustomHeader* PullMessageResponseHeader::decode(Json::Value& data) -{ - long long suggestWhichBrokerId = KPRUtil::str2ll(data["suggestWhichBrokerId"].asCString()); - long long nextBeginOffset = KPRUtil::str2ll(data["nextBeginOffset"].asCString()); - long long minOffset = KPRUtil::str2ll(data["minOffset"].asCString()); - long long maxOffset = KPRUtil::str2ll(data["maxOffset"].asCString()); - - PullMessageResponseHeader* h = new PullMessageResponseHeader(); - h->suggestWhichBrokerId = suggestWhichBrokerId; - h->nextBeginOffset = nextBeginOffset; - h->minOffset = minOffset; - h->maxOffset = maxOffset; - - return h; -} - - - -//////////////////////////////////////////////////////////////////////////////// -// GET_CONSUMER_LIST_BY_GROUP_VALUE -//////////////////////////////////////////////////////////////////////////////// -void GetConsumerListByGroupRequestHeader::encode(std::string& outData) -{ - std::stringstream ss; - - ss << "{" - << "\"consumerGroup\":\"" << consumerGroup << "\"" - << "}"; - - outData = ss.str(); -} - - -//////////////////////////////////////////////////////////////////////////////// -// CONSUMER_SEND_MSG_BACK_VALUE -//////////////////////////////////////////////////////////////////////////////// -void ConsumerSendMsgBackRequestHeader::encode(std::string& outData) -{ - std::stringstream ss; - - ss << "{" - << "\"offset\":\"" << offset << "\"," - << "\"group\":\"" << group << "\"," - << "\"delayLevel\":\"" << delayLevel << "\"" - << "}"; - - outData = ss.str(); -} - - -//////////////////////////////////////////////////////////////////////////////// -// QUERY_CONSUMER_OFFSET_VALUE -//////////////////////////////////////////////////////////////////////////////// -void QueryConsumerOffsetRequestHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"consumerGroup\":\"" << consumerGroup << "\"," - << "\"topic\":\"" << topic << "\"," - << "\"queueId\":\"" << queueId << "\"" - << "}"; - outData = ss.str(); -} - -void QueryConsumerOffsetResponseHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"offset\":\"" << offset << "\"" - << "}"; - outData = ss.str(); -} - -CommandCustomHeader* QueryConsumerOffsetResponseHeader::decode(Json::Value& data) -{ - long long offset = -1; - - if (data.isMember("offset")) - { - offset = KPRUtil::str2ll(data["offset"].asCString()); - } - - QueryConsumerOffsetResponseHeader* h = new QueryConsumerOffsetResponseHeader(); - h->offset = offset; - - return h; -} - - -//////////////////////////////////////////////////////////////////////////////// -// UPDATE_CONSUMER_OFFSET_VALUE -//////////////////////////////////////////////////////////////////////////////// -void UpdateConsumerOffsetRequestHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"consumerGroup\":\"" << consumerGroup << "\"," - << "\"topic\":\"" << topic << "\"," - << "\"queueId\":\"" << queueId << "\"," - << "\"commitOffset\":\"" << commitOffset << "\"" - << "}"; - outData = ss.str(); -} - - -//////////////////////////////////////////////////////////////////////////////// -// UNREGISTER_CLIENT_VALUE -//////////////////////////////////////////////////////////////////////////////// -void UnregisterClientRequestHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"producerGroup\":\"" << producerGroup << "\"," - << "\"consumerGroup\":\"" << consumerGroup << "\"," - << "\"clientID\":\"" << clientID << "\"" - << "}"; - outData = ss.str(); -} - - -/////////////////////////////////////////////////////////////////////// -// VIEW_MESSAGE_BY_ID_VALUE -/////////////////////////////////////////////////////////////////////// -void ViewMessageRequestHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"offset\":" << offset - << "}"; - outData = ss.str(); -} - - -/////////////////////////////////////////////////////////////////////// -// SEARCH_OFFSET_BY_TIMESTAMP_VALUE -/////////////////////////////////////////////////////////////////////// -void SearchOffsetRequestHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"topic\":\"" << topic << "\"," - << "\"queueId\":\"" << queueId << "\"," - << "\"timestamp\":\"" << timestamp << "\"" - << "}"; - outData = ss.str(); -} - -void SearchOffsetResponseHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"offset\":\"" << offset << "\"" - << "}"; - outData = ss.str(); -} - -CommandCustomHeader* SearchOffsetResponseHeader::decode(Json::Value& data) -{ - long long offset = KPRUtil::str2ll(data["offset"].asCString()); - - SearchOffsetResponseHeader* h = new SearchOffsetResponseHeader(); - h->offset = offset; - - return h; -} - - -/////////////////////////////////////////////////////////////////////// -// GET_MAX_OFFSET_VALUE -/////////////////////////////////////////////////////////////////////// -void GetMaxOffsetRequestHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"topic\":\"" << topic << "\"," - << "\"queueId\":\"" << queueId << "\"" - << "}"; - outData = ss.str(); -} - -void GetMaxOffsetResponseHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"offset\":\"" << offset << "\"" - << "}"; - outData = ss.str(); -} - -CommandCustomHeader* GetMaxOffsetResponseHeader::decode(Json::Value& data) -{ - long long offset = KPRUtil::str2ll(data["offset"].asCString()); - - GetMaxOffsetResponseHeader* h = new GetMaxOffsetResponseHeader(); - h->offset = offset; - - return h; -} - - -/////////////////////////////////////////////////////////////////////// -// GET_MIN_OFFSET_VALUE -/////////////////////////////////////////////////////////////////////// -void GetMinOffsetRequestHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"topic\":\"" << topic << "\"," - << "\"queueId\":\"" << queueId << "\"" - << "}"; - outData = ss.str(); -} - -void GetMinOffsetResponseHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"offset\":\"" << offset << "\"" - << "}"; - outData = ss.str(); -} - -CommandCustomHeader* GetMinOffsetResponseHeader::decode(Json::Value& data) -{ - long long offset = KPRUtil::str2ll(data["offset"].asCString()); - - GetMinOffsetResponseHeader* h = new GetMinOffsetResponseHeader(); - h->offset = offset; - - return h; -} - - - -/////////////////////////////////////////////////////////////////////// -// GET_EARLIEST_MSG_STORETIME_VALUE -/////////////////////////////////////////////////////////////////////// -void GetEarliestMsgStoretimeRequestHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"topic\":\"" << topic << "\"," - << "\"queueId\":\"" << queueId << "\"" - << "}"; - outData = ss.str(); -} - -void GetEarliestMsgStoretimeResponseHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"timestamp\":\"" << timestamp << "\"" - << "}"; - outData = ss.str(); -} - - -CommandCustomHeader* GetEarliestMsgStoretimeResponseHeader::decode(Json::Value& data) -{ - long long timestamp = KPRUtil::str2ll(data["timestamp"].asCString()); - - GetEarliestMsgStoretimeResponseHeader* h = new GetEarliestMsgStoretimeResponseHeader(); - h->timestamp = timestamp; - - return h; -} - - -/////////////////////////////////////////////////////////////////////// -// QUERY_MESSAGE_VALUE -/////////////////////////////////////////////////////////////////////// -void QueryMessageRequestHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"topic\":\"" << topic << "\"," - << "\"key\":\"" << key << "\"," - << "\"maxNum\":\"" << maxNum << "\"," - << "\"beginTimestamp\":\"" << beginTimestamp << "\"," - << "\"endTimestamp\":\"" << endTimestamp << "\"" - << "}"; - outData = ss.str(); -} - -void QueryMessageResponseHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"indexLastUpdateTimestamp\":\"" << indexLastUpdateTimestamp << "\"," - << "\"indexLastUpdatePhyoffset\":\"" << indexLastUpdatePhyoffset << "\"" - << "}"; - outData = ss.str(); -} - -CommandCustomHeader* QueryMessageResponseHeader::decode(Json::Value& data) -{ - long long indexLastUpdateTimestamp = KPRUtil::str2ll(data["indexLastUpdateTimestamp"].asCString()); - long long indexLastUpdatePhyoffset = KPRUtil::str2ll(data["indexLastUpdatePhyoffset"].asCString()); - - QueryMessageResponseHeader* h = new QueryMessageResponseHeader(); - h->indexLastUpdateTimestamp = indexLastUpdateTimestamp; - h->indexLastUpdatePhyoffset = indexLastUpdatePhyoffset; - - return h; -} - - -/////////////////////////////////////////////////////////////////////// -// GET_KV_CONFIG_VALUE -/////////////////////////////////////////////////////////////////////// -void GetKVConfigRequestHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"namespace\":\"" << namespace_ << "\"," - << "\"key\":\"" << key << "\"" - << "}"; - outData = ss.str(); -} - -void GetKVConfigResponseHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"value\":\"" << value << "\"" - << "}"; - outData = ss.str(); -} - -CommandCustomHeader* GetKVConfigResponseHeader::decode(Json::Value& data) -{ - GetKVConfigResponseHeader* h = new GetKVConfigResponseHeader(); - h->value = data["value"].asString(); - - return h; -} - - -/////////////////////////////////////////////////////////////////////// -// NOTIFY_CONSUMER_IDS_CHANGED_VALUE -/////////////////////////////////////////////////////////////////////// -void NotifyConsumerIdsChangedRequestHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"consumerGroup\":\"" << consumerGroup << "\"" - << "}"; - outData = ss.str(); -} - -CommandCustomHeader* NotifyConsumerIdsChangedRequestHeader::decode(Json::Value& data) -{ - NotifyConsumerIdsChangedRequestHeader* h = new NotifyConsumerIdsChangedRequestHeader(); - h->consumerGroup = data["consumerGroup"].asString(); - - return h; -} - - -/////////////////////////////////////////////////////////////////////// -// GET_CONSUMER_RUNNING_INFO_VALUE -/////////////////////////////////////////////////////////////////////// -void GetConsumerRunningInfoRequestHeader::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "\"consumerGroup\":\"" << consumerGroup << "\"," - << "\"clientId\":\"" << clientId << "\"," - << "\"jstackEnable\":\"" << jstackEnable << "\"," - << "}"; - outData = ss.str(); -} - -CommandCustomHeader* GetConsumerRunningInfoRequestHeader::decode(Json::Value& data) -{ - GetConsumerRunningInfoRequestHeader* h = new GetConsumerRunningInfoRequestHeader(); - h->consumerGroup = data["consumerGroup"].asString(); - h->clientId = data["clientId"].asString(); - h->jstackEnable = false;//not support - - return h; -} - - -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/protocol/CommandCustomHeader.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/CommandCustomHeader.h b/rocketmq-client4cpp/src/protocol/CommandCustomHeader.h deleted file mode 100755 index 93f811a..0000000 --- a/rocketmq-client4cpp/src/protocol/CommandCustomHeader.h +++ /dev/null @@ -1,604 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __COMMANDCUSTOMHEADER_H__ -#define __COMMANDCUSTOMHEADER_H__ - -#include <string> -#include <json/json.h> - -namespace rmq -{ - /** - * RemotingCommand custom header - * - */ - class CommandCustomHeader - { - public : - virtual ~CommandCustomHeader() {} - virtual void encode(std::string& outData) = 0; - static CommandCustomHeader* decode(int code, Json::Value& data, bool isResponseType); - }; - - /////////////////////////////////////////////////////////////////////// - // GET_ROUTEINTO_BY_TOPIC_VALUE - /////////////////////////////////////////////////////////////////////// - class GetRouteInfoRequestHeader : public CommandCustomHeader - { - public: - GetRouteInfoRequestHeader() - { - }; - ~GetRouteInfoRequestHeader() {}; - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - std::string topic; - }; - - /////////////////////////////////////////////////////////////////////// - // UPDATE_AND_CREATE_TOPIC_VALUE - /////////////////////////////////////////////////////////////////////// - class CreateTopicRequestHeader : public CommandCustomHeader - { - public: - CreateTopicRequestHeader() - { - readQueueNums = 0; - writeQueueNums = 0; - perm = 0; - topicSysFlag = 0; - order = false; - }; - ~CreateTopicRequestHeader() {}; - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - std::string topic; - std::string defaultTopic; - int readQueueNums; - int writeQueueNums; - int perm; - std::string topicFilterType; - int topicSysFlag; - bool order; - }; - - /////////////////////////////////////////////////////////////////////// - // SEND_MESSAGE_VALUE/SEND_MESSAGE_V2_VALUE - /////////////////////////////////////////////////////////////////////// - class SendMessageRequestHeader: public CommandCustomHeader - { - public: - SendMessageRequestHeader() - : defaultTopicQueueNums(0),queueId(0),sysFlag(0), - bornTimestamp(0),flag(0),reconsumeTimes(0) - { - }; - ~SendMessageRequestHeader() {}; - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - std::string producerGroup; - std::string topic; - std::string defaultTopic; - int defaultTopicQueueNums; - int queueId; - int sysFlag; - long long bornTimestamp; - int flag; - std::string properties; - int reconsumeTimes; - }; - - class SendMessageRequestHeaderV2: public CommandCustomHeader - { - public: - SendMessageRequestHeaderV2() - : d(0),e(0),f(0), - g(0),h(0),j(0) - { - }; - ~SendMessageRequestHeaderV2() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - static SendMessageRequestHeader* createSendMessageRequestHeaderV1(const SendMessageRequestHeaderV2* v2); - static SendMessageRequestHeaderV2* createSendMessageRequestHeaderV2(const SendMessageRequestHeader* v1); - public: - std::string a; //producerGroup - std::string b; //topic - std::string c; //defaultTopic - int d; //defaultTopicQueueNums - int e; //queueId - int f; //sysFlag - long long g; //bornTimestamp - int h; //flag - std::string i; //properties - int j; //reconsumeTimes - }; - - class SendMessageResponseHeader: public CommandCustomHeader - { - public: - SendMessageResponseHeader() - { - queueId = 0; - queueOffset = 0; - }; - ~SendMessageResponseHeader() {}; - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - std::string msgId; - int queueId; - long long queueOffset; - }; - - - /////////////////////////////////////////////////////////////////////// - // PULL_MESSAGE_VALUE - /////////////////////////////////////////////////////////////////////// - class PullMessageRequestHeader: public CommandCustomHeader - { - public: - PullMessageRequestHeader() - { - queueId = 0; - queueOffset = 0; - maxMsgNums = 0; - sysFlag = 0; - commitOffset = 0; - suspendTimeoutMillis = 0; - subVersion = 0; - }; - ~PullMessageRequestHeader() {}; - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - std::string consumerGroup; - std::string topic; - int queueId; - long long queueOffset; - int maxMsgNums; - int sysFlag; - long long commitOffset; - long long suspendTimeoutMillis; - std::string subscription; - long long subVersion; - }; - - class PullMessageResponseHeader: public CommandCustomHeader - { - public: - PullMessageResponseHeader() - { - suggestWhichBrokerId = 0; - nextBeginOffset = 0; - minOffset = 0; - maxOffset = 0; - }; - ~PullMessageResponseHeader() {}; - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - long long suggestWhichBrokerId; - long long nextBeginOffset; - long long minOffset; - long long maxOffset; - }; - - /////////////////////////////////////////////////////////////////////// - // GET_CONSUMER_LIST_BY_GROUP_VALUE - /////////////////////////////////////////////////////////////////////// - class GetConsumerListByGroupRequestHeader : public CommandCustomHeader - { - public: - GetConsumerListByGroupRequestHeader() {}; - ~GetConsumerListByGroupRequestHeader() {}; - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - std::string consumerGroup; - }; - - - /////////////////////////////////////////////////////////////////////// - // CONSUMER_SEND_MSG_BACK_VALUE - /////////////////////////////////////////////////////////////////////// - class ConsumerSendMsgBackRequestHeader : public CommandCustomHeader - { - public: - ConsumerSendMsgBackRequestHeader() - { - offset = 0; - delayLevel = 0; - }; - ~ConsumerSendMsgBackRequestHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - long long offset; - std::string group; - int delayLevel; - }; - - - /////////////////////////////////////////////////////////////////////// - // QUERY_CONSUMER_OFFSET_VALUE - /////////////////////////////////////////////////////////////////////// - class QueryConsumerOffsetRequestHeader : public CommandCustomHeader - { - public: - QueryConsumerOffsetRequestHeader() - { - queueId = 0; - }; - ~QueryConsumerOffsetRequestHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - std::string consumerGroup; - std::string topic; - int queueId; - }; - - class QueryConsumerOffsetResponseHeader : public CommandCustomHeader - { - public: - QueryConsumerOffsetResponseHeader() - { - offset = 0; - }; - ~QueryConsumerOffsetResponseHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - long long offset; - }; - - /////////////////////////////////////////////////////////////////////// - // UPDATE_CONSUMER_OFFSET_VALUE - /////////////////////////////////////////////////////////////////////// - class UpdateConsumerOffsetRequestHeader : public CommandCustomHeader - { - public: - UpdateConsumerOffsetRequestHeader() - { - queueId = 0; - commitOffset = 0; - }; - ~UpdateConsumerOffsetRequestHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - std::string consumerGroup; - std::string topic; - int queueId; - long long commitOffset; - }; - - /////////////////////////////////////////////////////////////////////// - // UNREGISTER_CLIENT_VALUE - /////////////////////////////////////////////////////////////////////// - class UnregisterClientRequestHeader : public CommandCustomHeader - { - public: - UnregisterClientRequestHeader() {}; - ~UnregisterClientRequestHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - std::string clientID; - std::string producerGroup; - std::string consumerGroup; - }; - - - /////////////////////////////////////////////////////////////////////// - // VIEW_MESSAGE_BY_ID_VALUE - /////////////////////////////////////////////////////////////////////// - class ViewMessageRequestHeader : public CommandCustomHeader - { - public: - ViewMessageRequestHeader() - { - offset = 0; - }; - ~ViewMessageRequestHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - long long offset; - }; - - /////////////////////////////////////////////////////////////////////// - // SEARCH_OFFSET_BY_TIMESTAMP_VALUE - /////////////////////////////////////////////////////////////////////// - class SearchOffsetRequestHeader : public CommandCustomHeader - { - public: - SearchOffsetRequestHeader() - { - queueId = 0; - timestamp = 0; - }; - ~SearchOffsetRequestHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - std::string topic; - int queueId; - long long timestamp; - }; - - class SearchOffsetResponseHeader : public CommandCustomHeader - { - public: - SearchOffsetResponseHeader() - { - offset = 0; - }; - ~SearchOffsetResponseHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - long long offset; - }; - - /////////////////////////////////////////////////////////////////////// - // GET_MAX_OFFSET_VALUE - /////////////////////////////////////////////////////////////////////// - class GetMaxOffsetRequestHeader : public CommandCustomHeader - { - public: - GetMaxOffsetRequestHeader() - { - queueId = 0; - }; - ~GetMaxOffsetRequestHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - std::string topic; - int queueId; - }; - - class GetMaxOffsetResponseHeader : public CommandCustomHeader - { - public: - GetMaxOffsetResponseHeader() - { - offset = 0; - }; - ~GetMaxOffsetResponseHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - long long offset; - }; - - /////////////////////////////////////////////////////////////////////// - // GET_MIN_OFFSET_VALUE - /////////////////////////////////////////////////////////////////////// - class GetMinOffsetRequestHeader : public CommandCustomHeader - { - public: - GetMinOffsetRequestHeader() - { - queueId = 0; - }; - ~GetMinOffsetRequestHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - std::string topic; - int queueId; - }; - - class GetMinOffsetResponseHeader : public CommandCustomHeader - { - public: - GetMinOffsetResponseHeader() - { - offset = 0; - }; - ~GetMinOffsetResponseHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - long long offset; - }; - - - /////////////////////////////////////////////////////////////////////// - // GET_EARLIEST_MSG_STORETIME_VALUE - /////////////////////////////////////////////////////////////////////// - class GetEarliestMsgStoretimeRequestHeader : public CommandCustomHeader - { - public: - GetEarliestMsgStoretimeRequestHeader() - { - queueId = 0; - }; - ~GetEarliestMsgStoretimeRequestHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - std::string topic; - int queueId; - }; - - class GetEarliestMsgStoretimeResponseHeader : public CommandCustomHeader - { - public: - GetEarliestMsgStoretimeResponseHeader() - { - timestamp = 0; - }; - ~GetEarliestMsgStoretimeResponseHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - long long timestamp; - }; - - /////////////////////////////////////////////////////////////////////// - // QUERY_MESSAGE_VALUE - /////////////////////////////////////////////////////////////////////// - class QueryMessageRequestHeader : public CommandCustomHeader - { - public: - QueryMessageRequestHeader() - { - maxNum = 0; - beginTimestamp = 0; - endTimestamp = 0; - }; - ~QueryMessageRequestHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - std::string topic; - std::string key; - int maxNum; - long long beginTimestamp; - long long endTimestamp; - }; - - class QueryMessageResponseHeader : public CommandCustomHeader - { - public: - QueryMessageResponseHeader() - { - indexLastUpdateTimestamp = 0; - indexLastUpdatePhyoffset = 0; - }; - ~QueryMessageResponseHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - long long indexLastUpdateTimestamp; - long long indexLastUpdatePhyoffset; - }; - - /////////////////////////////////////////////////////////////////////// - // GET_KV_CONFIG_VALUE - /////////////////////////////////////////////////////////////////////// - class GetKVConfigRequestHeader : public CommandCustomHeader - { - public: - GetKVConfigRequestHeader() {}; - ~GetKVConfigRequestHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - std::string namespace_; - std::string key; - }; - - class GetKVConfigResponseHeader : public CommandCustomHeader - { - public: - GetKVConfigResponseHeader() {}; - ~GetKVConfigResponseHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - std::string value; - }; - - /////////////////////////////////////////////////////////////////////// - // NOTIFY_CONSUMER_IDS_CHANGED_VALUE - /////////////////////////////////////////////////////////////////////// - class NotifyConsumerIdsChangedRequestHeader : public CommandCustomHeader - { - public: - NotifyConsumerIdsChangedRequestHeader() {}; - ~NotifyConsumerIdsChangedRequestHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - std::string consumerGroup; - }; - - - /////////////////////////////////////////////////////////////////////// - // GET_CONSUMER_RUNNING_INFO_VALUE - /////////////////////////////////////////////////////////////////////// - class GetConsumerRunningInfoRequestHeader : public CommandCustomHeader - { - public: - GetConsumerRunningInfoRequestHeader() {}; - ~GetConsumerRunningInfoRequestHeader() {}; - - virtual void encode(std::string& outData); - static CommandCustomHeader* decode(Json::Value& data); - - public: - std::string consumerGroup; - std::string clientId; - bool jstackEnable; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.cpp b/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.cpp deleted file mode 100755 index 58cecde..0000000 --- a/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.cpp +++ /dev/null @@ -1,168 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#include "ConsumerRunningInfo.h" - -namespace rmq -{ - -const std::string ConsumerRunningInfo::PROP_NAMESERVER_ADDR = "PROP_NAMESERVER_ADDR"; -const std::string ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE = "PROP_THREADPOOL_CORE_SIZE"; -const std::string ConsumerRunningInfo::PROP_CONSUME_ORDERLY = "PROP_CONSUMEORDERLY"; -const std::string ConsumerRunningInfo::PROP_CONSUME_TYPE = "PROP_CONSUME_TYPE"; -const std::string ConsumerRunningInfo::PROP_CLIENT_VERSION = "PROP_CLIENT_VERSION"; -const std::string ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP = "PROP_CONSUMER_START_TIMESTAMP"; - - -ConsumerRunningInfo::ConsumerRunningInfo() -{ -} - -ConsumerRunningInfo::~ConsumerRunningInfo() -{ -} - -void ConsumerRunningInfo::encode(std::string& outData) -{ - std::stringstream ss; - ss << "{" - << "}"; - outData = ss.str(); -} - - -std::string ConsumerRunningInfo::formatString() -{ - std::string sb = "rocketmq-client4cpp not suppport this feature"; - - /* - // 1 - { - sb.append("#Consumer Properties#\n"); - Iterator<Entry<Object, Object>> it = m_properties.entrySet().iterator(); - while (it.hasNext()) { - Entry<Object, Object> next = it.next(); - String item = - String.format("%-40s: %s\n", next.getKey().toString(), next.getValue().toString()); - sb.append(item); - } - } - - // 2 - { - sb.append("\n\n#Consumer Subscription#\n"); - - Iterator<SubscriptionData> it = m_subscriptionSet.iterator(); - int i = 0; - while (it.hasNext()) { - SubscriptionData next = it.next(); - String item = String.format("%03d Topic: %-40s ClassFilter: %-8s SubExpression: %s\n", // - ++i,// - next.getTopic(),// - next.isClassFilterMode(),// - next.getSubString()); - - sb.append(item); - } - } - - // 3 - { - sb.append("\n\n#Consumer Offset#\n"); - sb.append(String.format("%-32s %-32s %-4s %-20s\n",// - "#Topic",// - "#Broker Name",// - "#QID",// - "#Consumer Offset"// - )); - - Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = m_mqTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<MessageQueue, ProcessQueueInfo> next = it.next(); - String item = String.format("%-32s %-32s %-4d %-20d\n",// - next.getKey().getTopic(),// - next.getKey().getBrokerName(),// - next.getKey().getQueueId(),// - next.getValue().getCommitOffset()); - - sb.append(item); - } - } - - // 4 - { - sb.append("\n\n#Consumer MQ Detail#\n"); - sb.append(String.format("%-32s %-32s %-4s %-20s\n",// - "#Topic",// - "#Broker Name",// - "#QID",// - "#ProcessQueueInfo"// - )); - - Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = m_mqTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<MessageQueue, ProcessQueueInfo> next = it.next(); - String item = String.format("%-32s %-32s %-4d %s\n",// - next.getKey().getTopic(),// - next.getKey().getBrokerName(),// - next.getKey().getQueueId(),// - next.getValue().toString()); - - sb.append(item); - } - } - - // 5 - { - sb.append("\n\n#Consumer RT&TPS#\n"); - sb.append(String.format("%-32s %14s %14s %14s %14s %18s %25s\n",// - "#Topic",// - "#Pull RT",// - "#Pull TPS",// - "#Consume RT",// - "#ConsumeOK TPS",// - "#ConsumeFailed TPS",// - "#ConsumeFailedMsgsInHour"// - )); - - Iterator<Entry<String, ConsumeStatus>> it = m_statusTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, ConsumeStatus> next = it.next(); - String item = String.format("%-32s %14.2f %14.2f %14.2f %14.2f %18.2f %25d\n",// - next.getKey(),// - next.getValue().getPullRT(),// - next.getValue().getPullTPS(),// - next.getValue().getConsumeRT(),// - next.getValue().getConsumeOKTPS(),// - next.getValue().getConsumeFailedTPS(),// - next.getValue().getConsumeFailedMsgs()// - ); - - sb.append(item); - } - } - - // 6 - if (m_jstack != null) { - sb.append("\n\n#Consumer jstack#\n"); - sb.append(m_jstack); - } - */ - - return sb; -} - - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.h b/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.h deleted file mode 100755 index 588bf07..0000000 --- a/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.h +++ /dev/null @@ -1,97 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __ConsumerRunningInfo_H__ -#define __ConsumerRunningInfo_H__ - -#include <string> -#include <set> -#include <map> - -#include "RemotingSerializable.h" -#include "MessageQueue.h" -#include "SubscriptionData.h" -#include "ConsumerStatManage.h" - -namespace rmq -{ - class ConsumerRunningInfo : public RemotingSerializable - { - public: - ConsumerRunningInfo(); - ~ConsumerRunningInfo(); - - /* - std::map<std::string, std::string>& getProperties() - { - return m_properties; - } - void setProperties(const std::map<std::string, std::string>& properties) - { - m_properties = properties; - } - - std::map<MessageQueue, ProcessQueueInfo>& getMqTable() - { - return m_mqTable; - } - void setMqTable(const std::map<MessageQueue, ProcessQueueInfo>& mqTable) - { - m_mqTable = mqTable; - } - - std::map<std::string, ConsumeStatus>& getStatusTable() - { - return m_statusTable; - } - void setStatusTable(const std::map<std::string, ConsumeStatus>& statusTable) - { - m_statusTable = statusTable; - } - - std::set<SubscriptionData>& getSubscriptionSet() - { - return m_subscriptionSet; - } - void setSubscriptionSet(const std::set<SubscriptionData>& subscriptionSet) - { - m_subscriptionSet = subscriptionSet; - } - */ - - void encode(std::string& outData); - std::string formatString(); - - public: - static const std::string PROP_NAMESERVER_ADDR; - static const std::string PROP_THREADPOOL_CORE_SIZE; - static const std::string PROP_CONSUME_ORDERLY; - static const std::string PROP_CONSUME_TYPE; - static const std::string PROP_CLIENT_VERSION; - static const std::string PROP_CONSUMER_START_TIMESTAMP; - - private: - /* - std::map<std::string, std::string> m_properties; - std::set<SubscriptionData> m_subscriptionSet; - std::map<MessageQueue, ProcessQueueInfo> m_mqTable; - std::map<string, ConsumerStat> m_statusTable; - std::string m_jstack; - */ - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/protocol/GetConsumerListByGroupResponseBody.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/GetConsumerListByGroupResponseBody.h b/rocketmq-client4cpp/src/protocol/GetConsumerListByGroupResponseBody.h deleted file mode 100755 index 0ea19da..0000000 --- a/rocketmq-client4cpp/src/protocol/GetConsumerListByGroupResponseBody.h +++ /dev/null @@ -1,97 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __GETCONSUMERLISTBYGROUPRESPONSEBODY_H__ -#define __GETCONSUMERLISTBYGROUPRESPONSEBODY_H__ - -#include <string> -#include <sstream> -#include <list> -#include "UtilAll.h" -#include "RemotingSerializable.h" - -namespace rmq -{ - class GetConsumerListByGroupResponseBody : public RemotingSerializable - { - public: - GetConsumerListByGroupResponseBody() - { - - } - - ~GetConsumerListByGroupResponseBody() - { - - } - - void encode(std::string& outData) - { - - } - - static GetConsumerListByGroupResponseBody* decode(const char* pData, int len) - { - /* - {"consumerIdList":["10.12.22.213@DEFAULT", "10.12.22.213@xxx"]} - */ - //RMQ_DEBUG("GET_CONSUMER_LIST_BY_GROUP_VALUE:%s", pData); - - Json::Reader reader; - Json::Value object; - if (!reader.parse(pData, pData + len, object)) - { - RMQ_ERROR("parse fail: %s", reader.getFormattedErrorMessages().c_str()); - return NULL; - } - - GetConsumerListByGroupResponseBody* rsp = new GetConsumerListByGroupResponseBody(); - Json::Value cidList = object["consumerIdList"]; - for (size_t i = 0; i < cidList.size(); i++) - { - Json::Value cid = cidList[i]; - if (cid != Json::Value::null) - { - rsp->m_consumerIdList.push_back(cid.asString()); - } - } - - return rsp; - } - - std::list<std::string>& getConsumerIdList() - { - return m_consumerIdList; - } - - void setConsumerIdList(const std::list<std::string>& consumerIdList) - { - m_consumerIdList = consumerIdList; - } - - std::string toString() const - { - std::stringstream ss; - ss << "{consumerIdList=" << UtilAll::toString(m_consumerIdList) << "}"; - return ss.str(); - } - - private: - std::list<std::string> m_consumerIdList; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/protocol/HeartbeatData.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/HeartbeatData.cpp b/rocketmq-client4cpp/src/protocol/HeartbeatData.cpp deleted file mode 100755 index 73f197a..0000000 --- a/rocketmq-client4cpp/src/protocol/HeartbeatData.cpp +++ /dev/null @@ -1,52 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#include "HeartbeatData.h" - -namespace rmq -{ - -void HeartbeatData::encode(std::string& outData) -{ - //{"clientID":"10.6.223.90@16164","consumerDataSet":[{"consumeFromWhere":"CONSUME_FROM_LAST_OFFSET","consumeType":"CONSUME_ACTIVELY","groupName":"please_rename_unique_group_name_5","messageModel":"CLUSTERING","subscriptionDataSet":[],"unitMode":false}],"producerDataSet":[{"groupName":"CLIENT_INNER_PRODUCER"}]} - Json::Value obj; - obj["clientID"] = m_clientID; - - Json::Value consumerDataSet(Json::arrayValue); - for (typeof(m_consumerDataSet.begin()) it = m_consumerDataSet.begin(); it != m_consumerDataSet.end(); it++) - { - Json::Value o; - (*it).toJson(o); - consumerDataSet.append(o); - } - obj["consumerDataSet"] = consumerDataSet; - - Json::Value producerDataSet(Json::arrayValue); - for (typeof(m_producerDataSet.begin()) it = m_producerDataSet.begin(); it != m_producerDataSet.end(); it++) - { - Json::Value o; - it->toJson(o); - producerDataSet.append(o); - } - obj["producerDataSet"] = producerDataSet; - - Json::FastWriter outer; - outData = outer.write(obj); -} - - - - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/protocol/HeartbeatData.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/HeartbeatData.h b/rocketmq-client4cpp/src/protocol/HeartbeatData.h deleted file mode 100755 index cb0f720..0000000 --- a/rocketmq-client4cpp/src/protocol/HeartbeatData.h +++ /dev/null @@ -1,157 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#ifndef __HEARTBEATDATA_H__ -#define __HEARTBEATDATA_H__ - -#include <string> -#include <set> -#include <sstream> - -#include "RocketMQClient.h" -#include "ConsumeType.h" -#include "SubscriptionData.h" -#include "RemotingSerializable.h" -#include "UtilAll.h" - -namespace rmq -{ - struct ConsumerData - { - std::string groupName; - ConsumeType consumeType; - MessageModel messageModel; - ConsumeFromWhere consumeFromWhere; - std::set<SubscriptionData> subscriptionDataSet; - bool operator < (const ConsumerData& cd)const - { - return groupName < cd.groupName; - } - - void toJson(Json::Value& obj) const - { - //{"consumeFromWhere":"CONSUME_FROM_LAST_OFFSET","consumeType":"CONSUME_ACTIVELY","groupName":"please_rename_unique_group_name_5","messageModel":"CLUSTERING","subscriptionDataSet":[],"unitMode":false} - obj["groupName"] = groupName; - obj["messageModel"] = getMessageModelString(messageModel); - obj["consumeFromWhere"] = getConsumeFromWhereString(consumeFromWhere); - obj["consumeType"] = getConsumeTypeString(consumeType); - obj["unitMode"] = false; - - Json::Value objSub(Json::arrayValue); - RMQ_FOR_EACH(subscriptionDataSet, it) - { - Json::Value o; - (*it).toJson(o); - objSub.append(o); - } - obj["subscriptionDataSet"] = objSub; - } - - std::string toString() const - { - std::stringstream ss; - ss << "{groupName=" << groupName - << ",messageModel=" << getMessageModelString(messageModel) - << ",consumeFromWhere=" << getConsumeFromWhereString(consumeFromWhere) - << ",consumeType=" << getConsumeTypeString(consumeType) - << ",subscriptionDataSet=" << UtilAll::toString(subscriptionDataSet) - << "}"; - return ss.str(); - } - }; - inline std::ostream& operator<<(std::ostream& os, const ConsumerData& obj) - { - os << obj.toString(); - return os; - } - - struct ProducerData - { - std::string groupName; - bool operator < (const ProducerData& pd)const - { - return groupName < pd.groupName; - } - void toJson(Json::Value& obj) const - { - obj["groupName"] = groupName; - } - - std::string toString() const - { - std::stringstream ss; - ss << "{groupName=" << groupName << "}"; - return ss.str(); - } - }; - inline std::ostream& operator<<(std::ostream& os, const ProducerData& obj) - { - os << obj.toString(); - return os; - } - - - class HeartbeatData : public RemotingSerializable - { - public: - void encode(std::string& outData); - - std::string getClientID() - { - return m_clientID; - } - - void setClientID(const std::string& clientID) - { - m_clientID = clientID; - } - - std::set<ProducerData>& getProducerDataSet() - { - return m_producerDataSet; - } - - void setProducerDataSet(const std::set<ProducerData>& producerDataSet) - { - m_producerDataSet = producerDataSet; - } - - std::set<ConsumerData>& getConsumerDataSet() - { - return m_consumerDataSet; - } - - void setConsumerDataSet(const std::set<ConsumerData>& consumerDataSet) - { - m_consumerDataSet = consumerDataSet; - } - - std::string toString() const - { - std::stringstream ss; - ss << "{clientID=" << m_clientID - << ",producerDataSet=" << UtilAll::toString(m_producerDataSet) - << ",consumerDataSet=" << UtilAll::toString(m_consumerDataSet) << "}"; - return ss.str(); - } - - private: - std::string m_clientID; - std::set<ProducerData> m_producerDataSet; - std::set<ConsumerData> m_consumerDataSet; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/protocol/KVTable.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/KVTable.h b/rocketmq-client4cpp/src/protocol/KVTable.h deleted file mode 100755 index 726b872..0000000 --- a/rocketmq-client4cpp/src/protocol/KVTable.h +++ /dev/null @@ -1,58 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __KVTABLE_H__ -#define __KVTABLE_H__ - -#include <map> -#include <string> -#include "RemotingSerializable.h" -#include "UtilAll.h" - -namespace rmq -{ - class KVTable : public RemotingSerializable - { - public: - void encode(std::string& outData) - { - - } - - std::string toString() const - { - std::stringstream ss; - ss << "{table=" << UtilAll::toString(m_table) - << "}"; - return ss.str(); - } - - const std::map<std::string, std::string>& getTable() - { - return m_table; - } - - void setTable(const std::map<std::string, std::string>& table) - { - m_table = table; - } - - private: - std::map<std::string, std::string> m_table ; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/protocol/LockBatchBody.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/LockBatchBody.cpp b/rocketmq-client4cpp/src/protocol/LockBatchBody.cpp deleted file mode 100755 index 947abe2..0000000 --- a/rocketmq-client4cpp/src/protocol/LockBatchBody.cpp +++ /dev/null @@ -1,112 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#include "LockBatchBody.h" -#include "UtilAll.h" - -namespace rmq -{ - -LockBatchRequestBody::LockBatchRequestBody() -{ -} - -LockBatchRequestBody::~LockBatchRequestBody() -{ -} - -void LockBatchRequestBody::encode(std::string& outData) -{ - -} - -std::string LockBatchRequestBody::toString() const -{ - std::stringstream ss; - ss << "{consumerGroup=" << m_consumerGroup - << ",clientId=" << m_clientId - << ",mqSet=" << UtilAll::toString(m_mqSet) - << "}"; - return ss.str(); -} - - -std::string LockBatchRequestBody::getConsumerGroup() -{ - return m_consumerGroup; -} - -void LockBatchRequestBody::setConsumerGroup(const std::string& consumerGroup) -{ - m_consumerGroup = consumerGroup; -} - -std::string LockBatchRequestBody::getClientId() -{ - return m_clientId; -} - -void LockBatchRequestBody::setClientId(const std::string& clientId) -{ - m_clientId = clientId; -} - -std::set<MessageQueue>& LockBatchRequestBody::getMqSet() -{ - return m_mqSet; -} - -void LockBatchRequestBody::setMqSet(const std::set<MessageQueue>& mqSet) -{ - m_mqSet = mqSet; -} - -LockBatchResponseBody::LockBatchResponseBody() -{ -} - -LockBatchResponseBody::~LockBatchResponseBody() -{ -} - -void LockBatchResponseBody::encode(std::string& outData) -{ -} - -std::string LockBatchResponseBody::toString() const -{ - std::stringstream ss; - ss << "{consumerGroup=" << UtilAll::toString(m_lockOKMQSet) - << "}"; - return ss.str(); -} - - -LockBatchResponseBody* LockBatchResponseBody::decode(const char* pData, int len) -{ - return new LockBatchResponseBody(); -} - -std::set<MessageQueue> LockBatchResponseBody::getLockOKMQSet() -{ - return m_lockOKMQSet; -} - -void LockBatchResponseBody::setLockOKMQSet(const std::set<MessageQueue>& lockOKMQSet) -{ - m_lockOKMQSet = lockOKMQSet; -} - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/protocol/LockBatchBody.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/LockBatchBody.h b/rocketmq-client4cpp/src/protocol/LockBatchBody.h deleted file mode 100755 index ab9ee02..0000000 --- a/rocketmq-client4cpp/src/protocol/LockBatchBody.h +++ /dev/null @@ -1,73 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __LOCKBATCHBODY_H__ -#define __LOCKBATCHBODY_H__ - -#include <string> -#include <set> - -#include "RemotingSerializable.h" -#include "MessageQueue.h" - -namespace rmq -{ - class LockBatchRequestBody : public RemotingSerializable - { - public: - LockBatchRequestBody(); - ~LockBatchRequestBody(); - - void encode(std::string& outData); - std::string toString() const; - - std::string getConsumerGroup(); - void setConsumerGroup(const std::string& consumerGroup); - - std::string getClientId(); - void setClientId(const std::string& clientId); - - std::set<MessageQueue>& getMqSet(); - void setMqSet(const std::set<MessageQueue>& mqSet); - - private: - std::string m_consumerGroup; - std::string m_clientId; - std::set<MessageQueue> m_mqSet; - }; - - class LockBatchResponseBody : public RemotingSerializable - { - public: - LockBatchResponseBody(); - ~LockBatchResponseBody(); - - void encode(std::string& outData); - std::string toString() const; - - static LockBatchResponseBody* decode(const char* pData, int len); - - std::set<MessageQueue> getLockOKMQSet(); - void setLockOKMQSet(const std::set<MessageQueue>& lockOKMQSet); - - private: - std::set<MessageQueue> m_lockOKMQSet; - }; - - typedef LockBatchRequestBody UnlockBatchRequestBody; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/protocol/MQProtos.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/MQProtos.cpp b/rocketmq-client4cpp/src/protocol/MQProtos.cpp deleted file mode 100755 index 052c104..0000000 --- a/rocketmq-client4cpp/src/protocol/MQProtos.cpp +++ /dev/null @@ -1,248 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#include "MQProtos.h" - -namespace rmq -{ - -const char* getMQRequestCodeString(int code) -{ - switch (code) - { - case SEND_MESSAGE_VALUE: - return "SEND_MESSAGE_VALUE"; - case PULL_MESSAGE_VALUE: - return "PULL_MESSAGE_VALUE"; - case QUERY_MESSAGE_VALUE: - return "QUERY_MESSAGE_VALUE"; - case QUERY_BROKER_OFFSET_VALUE: - return "QUERY_BROKER_OFFSET_VALUE"; - case QUERY_CONSUMER_OFFSET_VALUE: - return "QUERY_CONSUMER_OFFSET_VALUE"; - case UPDATE_CONSUMER_OFFSET_VALUE: - return "UPDATE_CONSUMER_OFFSET_VALUE"; - case UPDATE_AND_CREATE_TOPIC_VALUE: - return "UPDATE_AND_CREATE_TOPIC_VALUE"; - case GET_ALL_TOPIC_CONFIG_VALUE: - return "GET_ALL_TOPIC_CONFIG_VALUE"; - case GET_TOPIC_CONFIG_LIST_VALUE: - return "GET_TOPIC_CONFIG_LIST_VALUE"; - case GET_TOPIC_NAME_LIST_VALUE: - return "GET_TOPIC_NAME_LIST_VALUE"; - case UPDATE_BROKER_CONFIG_VALUE: - return "UPDATE_BROKER_CONFIG_VALUE"; - case GET_BROKER_CONFIG_VALUE: - return "GET_BROKER_CONFIG_VALUE"; - case TRIGGER_DELETE_FILES_VALUE: - return "TRIGGER_DELETE_FILES_VALUE"; - case GET_BROKER_RUNTIME_INFO_VALUE: - return "GET_BROKER_RUNTIME_INFO_VALUE"; - case SEARCH_OFFSET_BY_TIMESTAMP_VALUE: - return "SEARCH_OFFSET_BY_TIMESTAMP_VALUE"; - case GET_MAX_OFFSET_VALUE: - return "GET_MAX_OFFSET_VALUE"; - case GET_MIN_OFFSET_VALUE: - return "GET_MIN_OFFSET_VALUE"; - case GET_EARLIEST_MSG_STORETIME_VALUE: - return "GET_EARLIEST_MSG_STORETIME_VALUE"; - case VIEW_MESSAGE_BY_ID_VALUE: - return "VIEW_MESSAGE_BY_ID_VALUE"; - case HEART_BEAT_VALUE: - return "HEART_BEAT_VALUE"; - case UNREGISTER_CLIENT_VALUE: - return "UNREGISTER_CLIENT_VALUE"; - case CONSUMER_SEND_MSG_BACK_VALUE: - return "CONSUMER_SEND_MSG_BACK_VALUE"; - case END_TRANSACTION_VALUE: - return "END_TRANSACTION_VALUE"; - case GET_CONSUMER_LIST_BY_GROUP_VALUE: - return "GET_CONSUMER_LIST_BY_GROUP_VALUE"; - case CHECK_TRANSACTION_STATE_VALUE: - return "CHECK_TRANSACTION_STATE_VALUE"; - case NOTIFY_CONSUMER_IDS_CHANGED_VALUE: - return "NOTIFY_CONSUMER_IDS_CHANGED_VALUE"; - case LOCK_BATCH_MQ_VALUE: - return "LOCK_BATCH_MQ_VALUE"; - case UNLOCK_BATCH_MQ_VALUE: - return "UNLOCK_BATCH_MQ_VALUE"; - case GET_ALL_CONSUMER_OFFSET_VALUE: - return "GET_ALL_CONSUMER_OFFSET_VALUE"; - case GET_ALL_DELAY_OFFSET_VALUE: - return "GET_ALL_DELAY_OFFSET_VALUE"; - case PUT_KV_CONFIG_VALUE: - return "PUT_KV_CONFIG_VALUE"; - case GET_KV_CONFIG_VALUE: - return "GET_KV_CONFIG_VALUE"; - case DELETE_KV_CONFIG_VALUE: - return "DELETE_KV_CONFIG_VALUE"; - case REGISTER_BROKER_VALUE: - return "REGISTER_BROKER_VALUE"; - case UNREGISTER_BROKER_VALUE: - return "UNREGISTER_BROKER_VALUE"; - case GET_ROUTEINTO_BY_TOPIC_VALUE: - return "GET_ROUTEINTO_BY_TOPIC_VALUE"; - case GET_BROKER_CLUSTER_INFO_VALUE: - return "GET_BROKER_CLUSTER_INFO_VALUE"; - case UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_VALUE: - return "UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_VALUE"; - case GET_ALL_SUBSCRIPTIONGROUP_CONFIG_VALUE: - return "GET_ALL_SUBSCRIPTIONGROUP_CONFIG_VALUE"; - case GET_TOPIC_STATS_INFO_VALUE: - return "GET_TOPIC_STATS_INFO_VALUE"; - case GET_CONSUMER_CONNECTION_LIST_VALUE: - return "GET_CONSUMER_CONNECTION_LIST_VALUE"; - case GET_PRODUCER_CONNECTION_LIST_VALUE: - return "GET_PRODUCER_CONNECTION_LIST_VALUE"; - case WIPE_WRITE_PERM_OF_BROKER_VALUE: - return "WIPE_WRITE_PERM_OF_BROKER_VALUE"; - case GET_ALL_TOPIC_LIST_FROM_NAMESERVER_VALUE: - return "GET_ALL_TOPIC_LIST_FROM_NAMESERVER_VALUE"; - case DELETE_SUBSCRIPTIONGROUP_VALUE: - return "DELETE_SUBSCRIPTIONGROUP_VALUE"; - case GET_CONSUME_STATS_VALUE: - return "GET_CONSUME_STATS_VALUE"; - case SUSPEND_CONSUMER_VALUE: - return "SUSPEND_CONSUMER_VALUE"; - case RESUME_CONSUMER_VALUE: - return "RESUME_CONSUMER_VALUE"; - case RESET_CONSUMER_OFFSET_IN_CONSUMER_VALUE: - return "RESET_CONSUMER_OFFSET_IN_CONSUMER_VALUE"; - case RESET_CONSUMER_OFFSET_IN_BROKER_VALUE: - return "RESET_CONSUMER_OFFSET_IN_BROKER_VALUE"; - case ADJUST_CONSUMER_THREAD_POOL_VALUE: - return "ADJUST_CONSUMER_THREAD_POOL_VALUE"; - case WHO_CONSUME_THE_MESSAGE_VALUE: - return "WHO_CONSUME_THE_MESSAGE_VALUE"; - case DELETE_TOPIC_IN_BROKER_VALUE: - return "DELETE_TOPIC_IN_BROKER_VALUE"; - case DELETE_TOPIC_IN_NAMESRV_VALUE: - return "DELETE_TOPIC_IN_NAMESRV_VALUE"; - case GET_KV_CONFIG_BY_VALUE_VALUE: - return "GET_KV_CONFIG_BY_VALUE_VALUE"; - case DELETE_KV_CONFIG_BY_VALUE_VALUE: - return "DELETE_KV_CONFIG_BY_VALUE_VALUE"; - case GET_KVLIST_BY_NAMESPACE_VALUE: - return "GET_KVLIST_BY_NAMESPACE_VALUE"; - case RESET_CONSUMER_CLIENT_OFFSET_VALUE: - return "RESET_CONSUMER_CLIENT_OFFSET_VALUE"; - case GET_CONSUMER_STATUS_FROM_CLIENT_VALUE: - return "GET_CONSUMER_STATUS_FROM_CLIENT_VALUE"; - case INVOKE_BROKER_TO_RESET_OFFSET_VALUE: - return "INVOKE_BROKER_TO_RESET_OFFSET_VALUE"; - case INVOKE_BROKER_TO_GET_CONSUMER_STATUS_VALUE: - return "INVOKE_BROKER_TO_GET_CONSUMER_STATUS_VALUE"; - case QUERY_TOPIC_CONSUME_BY_WHO_VALUE: - return "QUERY_TOPIC_CONSUME_BY_WHO_VALUE"; - case GET_TOPICS_BY_CLUSTER_VALUE: - return "GET_TOPICS_BY_CLUSTER_VALUE"; - case REGISTER_FILTER_SERVER_VALUE: - return "REGISTER_FILTER_SERVER_VALUE"; - case REGISTER_MESSAGE_FILTER_CLASS_VALUE: - return "REGISTER_MESSAGE_FILTER_CLASS_VALUE"; - case QUERY_CONSUME_TIME_SPAN_VALUE: - return "QUERY_CONSUME_TIME_SPAN_VALUE"; - case GET_SYSTEM_TOPIC_LIST_FROM_NS_VALUE: - return "GET_SYSTEM_TOPIC_LIST_FROM_NS_VALUE"; - case GET_SYSTEM_TOPIC_LIST_FROM_BROKER_VALUE: - return "GET_SYSTEM_TOPIC_LIST_FROM_BROKER_VALUE"; - case CLEAN_EXPIRED_CONSUMEQUEUE_VALUE: - return "CLEAN_EXPIRED_CONSUMEQUEUE_VALUE"; - case GET_CONSUMER_RUNNING_INFO_VALUE: - return "GET_CONSUMER_RUNNING_INFO_VALUE"; - case QUERY_CORRECTION_OFFSET_VALUE: - return "QUERY_CORRECTION_OFFSET_VALUE"; - case CONSUME_MESSAGE_DIRECTLY_VALUE: - return "CONSUME_MESSAGE_DIRECTLY_VALUE"; - case SEND_MESSAGE_V2_VALUE: - return "SEND_MESSAGE_V2_VALUE"; - case GET_UNIT_TOPIC_LIST_VALUE: - return "GET_UNIT_TOPIC_LIST_VALUE"; - case GET_HAS_UNIT_SUB_TOPIC_LIST_VALUE: - return "GET_HAS_UNIT_SUB_TOPIC_LIST_VALUE"; - case GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST_VALUE: - return "GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST_VALUE"; - case CLONE_GROUP_OFFSET_VALUE: - return "CLONE_GROUP_OFFSET_VALUE"; - case VIEW_BROKER_STATS_DATA_VALUE: - return "VIEW_BROKER_STATS_DATA_VALUE"; - } - - return "UnknowMQRequestCode"; -} - -const char* getMQResponseCodeString(int code) -{ - switch (code) - { - case 0: - return "OK"; - case FLUSH_DISK_TIMEOUT_VALUE: - return "FLUSH_DISK_TIMEOUT_VALUE"; - case SLAVE_NOT_AVAILABLE_VALUE: - return "SLAVE_NOT_AVAILABLE_VALUE"; - case FLUSH_SLAVE_TIMEOUT_VALUE: - return "FLUSH_SLAVE_TIMEOUT_VALUE"; - case MESSAGE_ILLEGAL_VALUE: - return "MESSAGE_ILLEGAL_VALUE"; - case SERVICE_NOT_AVAILABLE_VALUE: - return "SERVICE_NOT_AVAILABLE_VALUE"; - case VERSION_NOT_SUPPORTED_VALUE: - return "VERSION_NOT_SUPPORTED_VALUE"; - case NO_PERMISSION_VALUE: - return "NO_PERMISSION_VALUE"; - case TOPIC_NOT_EXIST_VALUE: - return "TOPIC_NOT_EXIST_VALUE"; - case TOPIC_EXIST_ALREADY_VALUE: - return "TOPIC_EXIST_ALREADY_VALUE"; - case PULL_NOT_FOUND_VALUE: - return "PULL_NOT_FOUND_VALUE"; - case PULL_RETRY_IMMEDIATELY_VALUE: - return "PULL_RETRY_IMMEDIATELY_VALUE"; - case PULL_OFFSET_MOVED_VALUE: - return "PULL_OFFSET_MOVED_VALUE"; - case QUERY_NOT_FOUND_VALUE: - return "QUERY_NOT_FOUND_VALUE"; - case SUBSCRIPTION_PARSE_FAILED_VALUE: - return "SUBSCRIPTION_PARSE_FAILED_VALUE"; - case SUBSCRIPTION_NOT_EXIST_VALUE: - return "SUBSCRIPTION_NOT_EXIST_VALUE"; - case SUBSCRIPTION_NOT_LATEST_VALUE: - return "SUBSCRIPTION_NOT_LATEST_VALUE"; - case SUBSCRIPTION_GROUP_NOT_EXIST_VALUE: - return "SUBSCRIPTION_GROUP_NOT_EXIST_VALUE"; - case TRANSACTION_SHOULD_COMMIT_VALUE: - return "TRANSACTION_SHOULD_COMMIT_VALUE"; - case TRANSACTION_SHOULD_ROLLBACK_VALUE: - return "TRANSACTION_SHOULD_ROLLBACK_VALUE"; - case TRANSACTION_STATE_UNKNOW_VALUE: - return "TRANSACTION_STATE_UNKNOW_VALUE"; - case TRANSACTION_STATE_GROUP_WRONG_VALUE: - return "TRANSACTION_STATE_GROUP_WRONG_VALUE"; - case NO_BUYER_ID_VALUE: - return "NO_BUYER_ID_VALUE"; - case NOT_IN_CURRENT_UNIT_VALUE: - return "NOT_IN_CURRENT_UNIT_VALUE"; - case CONSUMER_NOT_ONLINE_VALUE: - return "CONSUMER_NOT_ONLINE_VALUE"; - case CONSUME_MSG_TIMEOUT_VALUE: - return "CONSUME_MSG_TIMEOUT_VALUE"; - } - - return "UnknowMQResponseCode"; -} - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/protocol/MQProtos.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/MQProtos.h b/rocketmq-client4cpp/src/protocol/MQProtos.h deleted file mode 100755 index 94167ea..0000000 --- a/rocketmq-client4cpp/src/protocol/MQProtos.h +++ /dev/null @@ -1,150 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#ifndef __MQPROTOS_H__ -#define __MQPROTOS_H__ - -namespace rmq -{ - enum MQRequestCode - { - // broker - SEND_MESSAGE_VALUE = 10, - PULL_MESSAGE_VALUE = 11, - QUERY_MESSAGE_VALUE = 12, - QUERY_BROKER_OFFSET_VALUE = 13, - QUERY_CONSUMER_OFFSET_VALUE = 14, - UPDATE_CONSUMER_OFFSET_VALUE = 15, - UPDATE_AND_CREATE_TOPIC_VALUE = 17, - - GET_ALL_TOPIC_CONFIG_VALUE = 21, - GET_TOPIC_CONFIG_LIST_VALUE = 22, - GET_TOPIC_NAME_LIST_VALUE = 23, - UPDATE_BROKER_CONFIG_VALUE = 25, - GET_BROKER_CONFIG_VALUE = 26, - TRIGGER_DELETE_FILES_VALUE = 27, - GET_BROKER_RUNTIME_INFO_VALUE = 28, - SEARCH_OFFSET_BY_TIMESTAMP_VALUE = 29, - - GET_MAX_OFFSET_VALUE = 30, - GET_MIN_OFFSET_VALUE = 31, - GET_EARLIEST_MSG_STORETIME_VALUE = 32, - VIEW_MESSAGE_BY_ID_VALUE = 33, - HEART_BEAT_VALUE = 34, - UNREGISTER_CLIENT_VALUE = 35, - CONSUMER_SEND_MSG_BACK_VALUE = 36, - END_TRANSACTION_VALUE = 37, - GET_CONSUMER_LIST_BY_GROUP_VALUE = 38, - CHECK_TRANSACTION_STATE_VALUE = 39, - - NOTIFY_CONSUMER_IDS_CHANGED_VALUE = 40, - LOCK_BATCH_MQ_VALUE = 41, - UNLOCK_BATCH_MQ_VALUE = 42, - GET_ALL_CONSUMER_OFFSET_VALUE = 43, - GET_ALL_DELAY_OFFSET_VALUE = 45, - - // Namesrv - PUT_KV_CONFIG_VALUE = 100, - GET_KV_CONFIG_VALUE = 101, - DELETE_KV_CONFIG_VALUE = 102, - REGISTER_BROKER_VALUE = 103, - UNREGISTER_BROKER_VALUE = 104, - GET_ROUTEINTO_BY_TOPIC_VALUE = 105, - GET_BROKER_CLUSTER_INFO_VALUE = 106, - - // broker && namesrv - UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_VALUE = 200, - GET_ALL_SUBSCRIPTIONGROUP_CONFIG_VALUE = 201, - GET_TOPIC_STATS_INFO_VALUE = 202, - GET_CONSUMER_CONNECTION_LIST_VALUE = 203, - GET_PRODUCER_CONNECTION_LIST_VALUE = 204, - WIPE_WRITE_PERM_OF_BROKER_VALUE = 205, - GET_ALL_TOPIC_LIST_FROM_NAMESERVER_VALUE = 206, - DELETE_SUBSCRIPTIONGROUP_VALUE = 207, - GET_CONSUME_STATS_VALUE = 208, - SUSPEND_CONSUMER_VALUE = 209, - - RESUME_CONSUMER_VALUE = 210, - RESET_CONSUMER_OFFSET_IN_CONSUMER_VALUE = 211, - RESET_CONSUMER_OFFSET_IN_BROKER_VALUE = 212, - ADJUST_CONSUMER_THREAD_POOL_VALUE = 213, - WHO_CONSUME_THE_MESSAGE_VALUE = 214, - DELETE_TOPIC_IN_BROKER_VALUE = 215, - DELETE_TOPIC_IN_NAMESRV_VALUE = 216, - GET_KV_CONFIG_BY_VALUE_VALUE = 217, - DELETE_KV_CONFIG_BY_VALUE_VALUE = 218, - GET_KVLIST_BY_NAMESPACE_VALUE = 219, - - RESET_CONSUMER_CLIENT_OFFSET_VALUE = 220, - GET_CONSUMER_STATUS_FROM_CLIENT_VALUE = 221, - INVOKE_BROKER_TO_RESET_OFFSET_VALUE = 222, - INVOKE_BROKER_TO_GET_CONSUMER_STATUS_VALUE = 223, - GET_TOPICS_BY_CLUSTER_VALUE = 224, - - QUERY_TOPIC_CONSUME_BY_WHO_VALUE = 300, - REGISTER_FILTER_SERVER_VALUE = 301, - REGISTER_MESSAGE_FILTER_CLASS_VALUE = 302, - QUERY_CONSUME_TIME_SPAN_VALUE = 303, - GET_SYSTEM_TOPIC_LIST_FROM_NS_VALUE = 304, - GET_SYSTEM_TOPIC_LIST_FROM_BROKER_VALUE = 305, - CLEAN_EXPIRED_CONSUMEQUEUE_VALUE = 306, - GET_CONSUMER_RUNNING_INFO_VALUE = 307, - QUERY_CORRECTION_OFFSET_VALUE = 308, - CONSUME_MESSAGE_DIRECTLY_VALUE = 309, - - SEND_MESSAGE_V2_VALUE = 310, - GET_UNIT_TOPIC_LIST_VALUE = 311, - GET_HAS_UNIT_SUB_TOPIC_LIST_VALUE = 312, - GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST_VALUE = 313, - CLONE_GROUP_OFFSET_VALUE = 314, - VIEW_BROKER_STATS_DATA_VALUE = 315, - }; - - enum MQResponseCode - { - FLUSH_DISK_TIMEOUT_VALUE = 10, - SLAVE_NOT_AVAILABLE_VALUE = 11, - FLUSH_SLAVE_TIMEOUT_VALUE = 12, - MESSAGE_ILLEGAL_VALUE = 13, - SERVICE_NOT_AVAILABLE_VALUE = 14, - VERSION_NOT_SUPPORTED_VALUE = 15, - NO_PERMISSION_VALUE = 16, - TOPIC_NOT_EXIST_VALUE = 17, - TOPIC_EXIST_ALREADY_VALUE = 18, - PULL_NOT_FOUND_VALUE = 19, - - PULL_RETRY_IMMEDIATELY_VALUE = 20, - PULL_OFFSET_MOVED_VALUE = 21, - QUERY_NOT_FOUND_VALUE = 22, - SUBSCRIPTION_PARSE_FAILED_VALUE = 23, - SUBSCRIPTION_NOT_EXIST_VALUE = 24, - SUBSCRIPTION_NOT_LATEST_VALUE = 25, - SUBSCRIPTION_GROUP_NOT_EXIST_VALUE = 26, - - TRANSACTION_SHOULD_COMMIT_VALUE = 200, - TRANSACTION_SHOULD_ROLLBACK_VALUE = 201, - TRANSACTION_STATE_UNKNOW_VALUE = 202, - TRANSACTION_STATE_GROUP_WRONG_VALUE = 203, - NO_BUYER_ID_VALUE = 204, - NOT_IN_CURRENT_UNIT_VALUE = 205, - CONSUMER_NOT_ONLINE_VALUE = 206, - CONSUME_MSG_TIMEOUT_VALUE = 207, - }; - - const char* getMQRequestCodeString(int code); - const char* getMQResponseCodeString(int code); -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/protocol/OffsetSerializeWrapper.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/OffsetSerializeWrapper.h b/rocketmq-client4cpp/src/protocol/OffsetSerializeWrapper.h deleted file mode 100755 index 56ee4e4..0000000 --- a/rocketmq-client4cpp/src/protocol/OffsetSerializeWrapper.h +++ /dev/null @@ -1,135 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __OFFSETSERIALIZEWRAPPER_H__ -#define __OFFSETSERIALIZEWRAPPER_H__ - -#include <map> -#include <string> -#include "RemotingSerializable.h" -#include "MessageQueue.h" -#include "AtomicValue.h" -#include "UtilAll.h" -#include "json/json.h" - - -namespace rmq -{ - class OffsetSerializeWrapper : public RemotingSerializable - { - public: - void encode(std::string& outData) - { - Json::Value offsetTable; - RMQ_FOR_EACH(m_offsetTable, it) - { - MessageQueue mq = it->first; - kpr::AtomicLong& offset = it->second; - - std::string mqStr = mq.toJsonString(); - offsetTable[mqStr] = offset.get(); - } - - Json::Value obj; - obj["offsetTable"] = offsetTable; - - Json::FastWriter writer; - outData = writer.write(obj); - } - static OffsetSerializeWrapper* decode(const char* pData, int len) - { - /* - { - "offsetTable":{ - '{"brokerName":"broker-a","queueId":3,"topic":"TopicTest"}':0, - '{"brokerName":"broker-a","queueId":2,"topic":"TopicTest"}':0 - } - - } - */ - - RMQ_DEBUG("decode, data:%s", pData); - - Json::Reader reader; - Json::Value obj; - if (!reader.parse(pData, pData + len, obj)) - { - return NULL; - } - - RMQ_DEBUG("decode ok"); - - if (obj.isObject()) - { - Json::Value objOffsetTable = obj["offsetTable"]; - if (objOffsetTable.isObject()) - { - std::map<MessageQueue, kpr::AtomicLong> offsetTable; - OffsetSerializeWrapper* offsetWrapper = new OffsetSerializeWrapper(); - - Json::Value::Members members = objOffsetTable.getMemberNames(); - for (typeof(members.begin()) it = members.begin(); it != members.end(); it++) - { - std::string key = *it; - Json::Value objMq; - RMQ_DEBUG("decode, key:%s", key.c_str()); - if (!reader.parse(key, objMq)) - { - continue; - } - RMQ_DEBUG("decode, key ok"); - - MessageQueue mq(objMq["topic"].asString(), objMq["brokerName"].asString(), - objMq["queueId"].asInt()); - long long offset = objOffsetTable[key].asInt64(); - - offsetTable[mq] = kpr::AtomicLong(offset); - } - offsetWrapper->setOffsetTable(offsetTable); - - return offsetWrapper; - } - } - - return NULL; - } - - std::string toString() const - { - std::stringstream ss; - ss << "{offsetTable=" << UtilAll::toString(m_offsetTable) - << "}"; - return ss.str(); - } - - std::map<MessageQueue, kpr::AtomicLong>& getOffsetTable() - { - return m_offsetTable; - } - - void setOffsetTable(const std::map<MessageQueue, kpr::AtomicLong>& table) - { - m_offsetTable = table; - } - - private: - std::map<MessageQueue, kpr::AtomicLong> m_offsetTable; - }; - - typedef kpr::RefHandleT<OffsetSerializeWrapper> OffsetSerializeWrapperPtr; -} - -#endif