http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/MQClientAPIImpl.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/MQClientAPIImpl.cpp b/rocketmq-client4cpp/src/MQClientAPIImpl.cpp deleted file mode 100755 index fa5a2b9..0000000 --- a/rocketmq-client4cpp/src/MQClientAPIImpl.cpp +++ /dev/null @@ -1,1323 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#include <assert.h> - -#include "MQClientAPIImpl.h" -#include "MQClientException.h" -#include "SocketUtil.h" -#include "UtilAll.h" -#include "TcpRemotingClient.h" -#include "MQProtos.h" -#include "PullResultExt.h" -#include "ConsumerInvokeCallback.h" -#include "NamesrvUtil.h" -#include "VirtualEnvUtil.h" -#include "ClientRemotingProcessor.h" -#include "CommandCustomHeader.h" -#include "TopicList.h" -#include "ProducerInvokeCallback.h" -#include "MessageDecoder.h" -#include "MessageSysFlag.h" -#include "GetConsumerListByGroupResponseBody.h" - - -namespace rmq -{ - - -MQClientAPIImpl::MQClientAPIImpl(ClientConfig& clientConfig, - const RemoteClientConfig& remoteClientConfig, - ClientRemotingProcessor* pClientRemotingProcessor) - : m_pClientRemotingProcessor(pClientRemotingProcessor) -{ - m_pRemotingClient = new TcpRemotingClient(remoteClientConfig); - - m_pRemotingClient->registerProcessor(CHECK_TRANSACTION_STATE_VALUE, m_pClientRemotingProcessor); - m_pRemotingClient->registerProcessor(NOTIFY_CONSUMER_IDS_CHANGED_VALUE, m_pClientRemotingProcessor); - m_pRemotingClient->registerProcessor(RESET_CONSUMER_CLIENT_OFFSET_VALUE, m_pClientRemotingProcessor); - m_pRemotingClient->registerProcessor(GET_CONSUMER_STATUS_FROM_CLIENT_VALUE, m_pClientRemotingProcessor); - m_pRemotingClient->registerProcessor(GET_CONSUMER_RUNNING_INFO_VALUE, m_pClientRemotingProcessor); - m_pRemotingClient->registerProcessor(CONSUME_MESSAGE_DIRECTLY_VALUE, m_pClientRemotingProcessor); -} - -MQClientAPIImpl::~MQClientAPIImpl() -{ -} - -std::string MQClientAPIImpl::getProjectGroupPrefix() -{ - return m_projectGroupPrefix; -} - -std::vector<std::string> MQClientAPIImpl::getNameServerAddressList() -{ - return m_pRemotingClient->getNameServerAddressList(); -} - -TcpRemotingClient* MQClientAPIImpl::getRemotingClient() -{ - return m_pRemotingClient; -} - -std::string MQClientAPIImpl::fetchNameServerAddr() -{ - try - { - std::string addrs = m_topAddressing.fetchNSAddr(); - if (!addrs.empty()) - { - if (addrs != m_nameSrvAddr) - { - RMQ_INFO("name server address changed, %s -> %s", - m_nameSrvAddr.c_str(), addrs.c_str()); - updateNameServerAddressList(addrs); - m_nameSrvAddr = addrs; - return m_nameSrvAddr; - } - } - } - catch (...) - { - RMQ_ERROR("fetchNameServerAddr Exception"); - } - - return m_nameSrvAddr; -} - -void MQClientAPIImpl::updateNameServerAddressList(const std::string& addrs) -{ - m_nameSrvAddr = addrs; - std::vector<std::string> av; - UtilAll::Split(av, addrs, ";"); - if (av.size() > 0) - { - m_pRemotingClient->updateNameServerAddressList(av); - } -} - -void MQClientAPIImpl::start() -{ - m_pRemotingClient->start(); - - try - { - std::string localAddress = getLocalAddress(); - m_projectGroupPrefix = getProjectGroupByIp(localAddress, 3000); - } - catch (std::exception e) - { - } -} - -void MQClientAPIImpl::shutdown() -{ - m_pRemotingClient->shutdown(); -} - -void MQClientAPIImpl::createSubscriptionGroup(const std::string& addr, - SubscriptionGroupConfig config, - int timeoutMillis) -{ - //TODO -} - - -void MQClientAPIImpl::createTopic(const std::string& addr, - const std::string& defaultTopic, - TopicConfig topicConfig, - int timeoutMillis) -{ - std::string topicWithProjectGroup = topicConfig.getTopicName(); - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - topicWithProjectGroup = - VirtualEnvUtil::buildWithProjectGroup(topicConfig.getTopicName(), m_projectGroupPrefix); - } - - CreateTopicRequestHeader* requestHeader = new CreateTopicRequestHeader(); - requestHeader->topic = (topicWithProjectGroup); - requestHeader->defaultTopic = (defaultTopic); - requestHeader->readQueueNums = (topicConfig.getReadQueueNums()); - requestHeader->writeQueueNums = (topicConfig.getWriteQueueNums()); - requestHeader->perm = (topicConfig.getPerm()); - requestHeader->topicFilterType = (topicConfig.getTopicFilterType()); - requestHeader->topicSysFlag = (topicConfig.getTopicSysFlag()); - requestHeader->order = (topicConfig.isOrder()); - - RemotingCommandPtr request = - RemotingCommand::createRequestCommand(UPDATE_AND_CREATE_TOPIC_VALUE, requestHeader); - - RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); - if (response) - { - switch (response->getCode()) - { - case SUCCESS_VALUE: - { - return; - } - default: - break; - } - THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); - } - - THROW_MQEXCEPTION(MQClientException, "createTopic failed", -1); -} - -SendResult MQClientAPIImpl::sendMessage(const std::string& addr, - const std::string& brokerName, - Message& msg, - SendMessageRequestHeader* pRequestHeader, - int timeoutMillis, - CommunicationMode communicationMode, - SendCallback* pSendCallback) -{ - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - msg.setTopic(VirtualEnvUtil::buildWithProjectGroup(msg.getTopic(), m_projectGroupPrefix)); - pRequestHeader->producerGroup = (VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->producerGroup, - m_projectGroupPrefix)); - pRequestHeader->topic = (VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic, - m_projectGroupPrefix)); - } - - bool sendSmartMsg = true; - RemotingCommandPtr request = NULL; - if (sendSmartMsg) - { - SendMessageRequestHeaderV2* pRequestHeaderV2 = SendMessageRequestHeaderV2::createSendMessageRequestHeaderV2(pRequestHeader); - request = RemotingCommand::createRequestCommand(SEND_MESSAGE_V2_VALUE, pRequestHeaderV2); - delete pRequestHeader; - } - else - { - request = RemotingCommand::createRequestCommand(SEND_MESSAGE_VALUE, pRequestHeader); - } - - if (msg.getCompressBody() != NULL) - { - request->setBody((char*)msg.getCompressBody(), msg.getCompressBodyLen(), false); - } - else - { - request->setBody((char*)msg.getBody(), msg.getBodyLen(), false); - } - - SendResult result; - switch (communicationMode) - { - case ONEWAY: - m_pRemotingClient->invokeOneway(addr, request, timeoutMillis); - return result; - case ASYNC: - sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, pSendCallback); - return result; - case SYNC: - { - SendResult* r = sendMessageSync(addr, brokerName, msg, timeoutMillis, request); - if (r) - { - result = *r; - delete r; - } - return result; - } - default: - break; - } - return result; -} - -PullResult* MQClientAPIImpl::pullMessage(const std::string& addr, - PullMessageRequestHeader* pRequestHeader, - int timeoutMillis, - CommunicationMode communicationMode, - PullCallback* pPullCallback) -{ - - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - pRequestHeader->consumerGroup = (VirtualEnvUtil::buildWithProjectGroup( - pRequestHeader->consumerGroup, m_projectGroupPrefix)); - pRequestHeader->topic = (VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic, - m_projectGroupPrefix)); - } - - RemotingCommandPtr request = RemotingCommand::createRequestCommand(PULL_MESSAGE_VALUE, pRequestHeader); - - PullResult* result = NULL; - switch (communicationMode) - { - case ONEWAY: - break; - case ASYNC: - pullMessageAsync(addr, request, timeoutMillis, pPullCallback); - break; - case SYNC: - result = pullMessageSync(addr, request, timeoutMillis); - break; - default: - assert(false); - break; - } - - return result; -} - -MessageExt* MQClientAPIImpl::viewMessage(const std::string& addr, long long phyoffset, int timeoutMillis) -{ - ViewMessageRequestHeader* requestHeader = new ViewMessageRequestHeader(); - requestHeader->offset = phyoffset; - - RemotingCommandPtr request = - RemotingCommand::createRequestCommand(VIEW_MESSAGE_BY_ID_VALUE, requestHeader); - - RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); - if (response) - { - switch (response->getCode()) - { - case SUCCESS_VALUE: - { - if (response->getBody() != NULL) - { - int len = 0; - MessageExt* messageExt = MessageDecoder::decode((char*)response->getBody(), - response->getBodyLen(), len); - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - messageExt->setTopic(VirtualEnvUtil::clearProjectGroup(messageExt->getTopic(), - m_projectGroupPrefix)); - } - return messageExt; - } - } - default: - break; - } - THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); - } - - THROW_MQEXCEPTION(MQClientException, "viewMessage failed", -1); -} - -long long MQClientAPIImpl::searchOffset(const std::string& addr, - const std::string& topic, - int queueId, - long long timestamp, - int timeoutMillis) -{ - std::string topicWithProjectGroup = topic; - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, m_projectGroupPrefix); - } - - SearchOffsetRequestHeader* pRequestHeader = new SearchOffsetRequestHeader(); - pRequestHeader->topic = topicWithProjectGroup; - pRequestHeader->queueId = queueId; - pRequestHeader->timestamp = timestamp; - - RemotingCommandPtr request = - RemotingCommand::createRequestCommand(SEARCH_OFFSET_BY_TIMESTAMP_VALUE, pRequestHeader); - - RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); - if (response) - { - switch (response->getCode()) - { - case SUCCESS_VALUE: - { - SearchOffsetResponseHeader* ret = (SearchOffsetResponseHeader*)response->getCommandCustomHeader(); - return ret->offset; - } - default: - break; - } - //THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); - } - - //THROW_MQEXCEPTION(MQClientException, "searchOffset failed", -1); - return -1; -} - -long long MQClientAPIImpl::getMaxOffset(const std::string& addr, - const std::string& topic, - int queueId, - int timeoutMillis) -{ - std::string topicWithProjectGroup = topic; - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, m_projectGroupPrefix); - } - - GetMaxOffsetRequestHeader* pRequestHeader = new GetMaxOffsetRequestHeader(); - pRequestHeader->topic = topicWithProjectGroup; - pRequestHeader->queueId = queueId; - - RemotingCommandPtr request = - RemotingCommand::createRequestCommand(GET_MAX_OFFSET_VALUE, pRequestHeader); - - RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); - if (response) - { - switch (response->getCode()) - { - case SUCCESS_VALUE: - { - GetMaxOffsetResponseHeader* ret = (GetMaxOffsetResponseHeader*)response->getCommandCustomHeader(); - return ret->offset; - } - default: - break; - } - //THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); - } - - //THROW_MQEXCEPTION(MQClientException, "getMaxOffset failed", -1); - return -1; -} - - -std::list<std::string> MQClientAPIImpl::getConsumerIdListByGroup(const std::string& addr, - const std::string& consumerGroup, - int timeoutMillis) -{ - std::string consumerGroupWithProjectGroup = consumerGroup; - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - consumerGroupWithProjectGroup = - VirtualEnvUtil::buildWithProjectGroup(consumerGroup, m_projectGroupPrefix); - } - - GetConsumerListByGroupRequestHeader* requestHeader = new GetConsumerListByGroupRequestHeader(); - requestHeader->consumerGroup = consumerGroupWithProjectGroup; - - RemotingCommandPtr request = - RemotingCommand::createRequestCommand(GET_CONSUMER_LIST_BY_GROUP_VALUE, requestHeader); - - RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); - if (response) - { - switch (response->getCode()) - { - case SUCCESS_VALUE: - { - if (response->getBody() != NULL) - { - GetConsumerListByGroupResponseBody* body = - GetConsumerListByGroupResponseBody::decode((char*)response->getBody(), response->getBodyLen()); - std::list<std::string> ret = body->getConsumerIdList(); - delete body; - return ret; - } - } - default: - break; - } - - THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); - } - - THROW_MQEXCEPTION(MQClientException, "getConsumerIdListByGroup failed", -1); -} - -long long MQClientAPIImpl::getMinOffset(const std::string& addr, - const std::string& topic, - int queueId, - int timeoutMillis) -{ - std::string topicWithProjectGroup = topic; - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, m_projectGroupPrefix); - } - - GetMinOffsetRequestHeader* pRequestHeader = new GetMinOffsetRequestHeader(); - pRequestHeader->topic = topicWithProjectGroup; - pRequestHeader->queueId = queueId; - - RemotingCommandPtr request = - RemotingCommand::createRequestCommand(GET_MIN_OFFSET_VALUE, pRequestHeader); - - RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); - if (response) - { - switch (response->getCode()) - { - case SUCCESS_VALUE: - { - GetMinOffsetResponseHeader* ret = (GetMinOffsetResponseHeader*)response->getCommandCustomHeader(); - return ret->offset; - } - default: - break; - } - //THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); - } - - //THROW_MQEXCEPTION(MQClientException, "getMinOffset failed", -1); - return -1; -} - -long long MQClientAPIImpl::getEarliestMsgStoretime(const std::string& addr, - const std::string& topic, - int queueId, - int timeoutMillis) -{ - std::string topicWithProjectGroup = topic; - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, m_projectGroupPrefix); - } - - GetEarliestMsgStoretimeRequestHeader* pRequestHeader = new GetEarliestMsgStoretimeRequestHeader(); - pRequestHeader->topic = topicWithProjectGroup; - pRequestHeader->queueId = queueId; - - RemotingCommandPtr request = - RemotingCommand::createRequestCommand(GET_EARLIEST_MSG_STORETIME_VALUE, pRequestHeader); - - RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); - if (response) - { - switch (response->getCode()) - { - case SUCCESS_VALUE: - { - GetEarliestMsgStoretimeResponseHeader* ret = (GetEarliestMsgStoretimeResponseHeader*)response->getCommandCustomHeader(); - return ret->timestamp; - } - default: - break; - } - THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); - } - - THROW_MQEXCEPTION(MQClientException, "getEarliestMsgStoretime failed", -1); -} - -long long MQClientAPIImpl::queryConsumerOffset(const std::string& addr, - QueryConsumerOffsetRequestHeader* pRequestHeader, - int timeoutMillis) -{ - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - pRequestHeader->consumerGroup = VirtualEnvUtil::buildWithProjectGroup( - pRequestHeader->consumerGroup, m_projectGroupPrefix); - pRequestHeader->topic = VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic, - m_projectGroupPrefix); - } - - RemotingCommandPtr request = - RemotingCommand::createRequestCommand(QUERY_CONSUMER_OFFSET_VALUE, pRequestHeader); - - RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); - if (response) - { - switch (response->getCode()) - { - case SUCCESS_VALUE: - { - QueryConsumerOffsetResponseHeader* ret = (QueryConsumerOffsetResponseHeader*)response->getCommandCustomHeader(); - long long offset = ret->offset; - return offset; - } - default: - break; - } - THROW_MQEXCEPTION(MQBrokerException, response->getRemark(), response->getCode()); - } - - THROW_MQEXCEPTION(MQClientException, "queryConsumerOffset failed", -1); - return -1; -} - -void MQClientAPIImpl::updateConsumerOffset(const std::string& addr, - UpdateConsumerOffsetRequestHeader* pRequestHeader, - int timeoutMillis) -{ - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - pRequestHeader->consumerGroup = VirtualEnvUtil::buildWithProjectGroup( - pRequestHeader->consumerGroup, m_projectGroupPrefix); - pRequestHeader->topic = VirtualEnvUtil::buildWithProjectGroup( - pRequestHeader->topic, m_projectGroupPrefix); - } - - RemotingCommandPtr request = RemotingCommand::createRequestCommand(UPDATE_CONSUMER_OFFSET_VALUE, pRequestHeader); - - RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); - if (response) - { - switch (response->getCode()) - { - case SUCCESS_VALUE: - { - return; - } - default: - break; - } - - THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); - } - - THROW_MQEXCEPTION(MQClientException, "updateConsumerOffset failed", -1); -} - -void MQClientAPIImpl::updateConsumerOffsetOneway(const std::string& addr, - UpdateConsumerOffsetRequestHeader* pRequestHeader, - int timeoutMillis) -{ - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - pRequestHeader->consumerGroup = VirtualEnvUtil::buildWithProjectGroup( - pRequestHeader->consumerGroup, m_projectGroupPrefix); - pRequestHeader->topic = VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic, - m_projectGroupPrefix); - } - - RemotingCommandPtr request = - RemotingCommand::createRequestCommand(UPDATE_CONSUMER_OFFSET_VALUE, pRequestHeader); - - m_pRemotingClient->invokeOneway(addr, request, timeoutMillis); -} - -void MQClientAPIImpl::sendHearbeat(const std::string& addr, HeartbeatData* pHeartbeatData, int timeoutMillis) -{ - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - std::set<ConsumerData>& consumerDatas = pHeartbeatData->getConsumerDataSet(); - std::set<ConsumerData>::iterator it = consumerDatas.begin(); - for (; it != consumerDatas.end(); it++) - { - ConsumerData& consumerData = (ConsumerData&)(*it); - consumerData.groupName = VirtualEnvUtil::buildWithProjectGroup(consumerData.groupName, - m_projectGroupPrefix); - - std::set<SubscriptionData>& subscriptionDatas = consumerData.subscriptionDataSet; - std::set<SubscriptionData>::iterator itsub = subscriptionDatas.begin(); - for (; itsub != subscriptionDatas.end(); itsub++) - { - SubscriptionData& subscriptionData = (SubscriptionData&)(*itsub); - subscriptionData.setTopic(VirtualEnvUtil::buildWithProjectGroup( - subscriptionData.getTopic(), m_projectGroupPrefix)); - } - } - - std::set<ProducerData>& producerDatas = pHeartbeatData->getProducerDataSet(); - std::set<ProducerData>::iterator itp = producerDatas.begin(); - for (; itp != producerDatas.end(); itp++) - { - ProducerData& producerData = (ProducerData&)(*itp); - producerData.groupName = VirtualEnvUtil::buildWithProjectGroup(producerData.groupName, - m_projectGroupPrefix); - } - } - - RemotingCommandPtr request = RemotingCommand::createRequestCommand(HEART_BEAT_VALUE, NULL); - - std::string body; - pHeartbeatData->encode(body); - request->setBody((char*)body.data(), body.length(), true); - - RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); - if (response) - { - switch (response->getCode()) - { - case SUCCESS_VALUE: - { - return; - } - default: - break; - } - - THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); - } - - THROW_MQEXCEPTION(MQClientException, "sendHearbeat failed", -1); -} - -void MQClientAPIImpl::unregisterClient(const std::string& addr, - const std::string& clientID, - const std::string& producerGroup, - const std::string& consumerGroup, - int timeoutMillis) -{ - std::string producerGroupWithProjectGroup = producerGroup; - std::string consumerGroupWithProjectGroup = consumerGroup; - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - producerGroupWithProjectGroup = - VirtualEnvUtil::buildWithProjectGroup(producerGroup, m_projectGroupPrefix); - consumerGroupWithProjectGroup = - VirtualEnvUtil::buildWithProjectGroup(consumerGroup, m_projectGroupPrefix); - } - - UnregisterClientRequestHeader* requestHeader = new UnregisterClientRequestHeader(); - requestHeader->clientID = (clientID); - requestHeader->producerGroup = (producerGroupWithProjectGroup); - requestHeader->consumerGroup = (consumerGroupWithProjectGroup); - - RemotingCommandPtr request = - RemotingCommand::createRequestCommand(UNREGISTER_CLIENT_VALUE, requestHeader); - - RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); - if (response) - { - switch (response->getCode()) - { - case SUCCESS_VALUE: - return; - default: - break; - } - - THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); - } - - THROW_MQEXCEPTION(MQClientException, "unregisterClient failed", -1); -} - -void MQClientAPIImpl::endTransactionOneway(const std::string& addr, - EndTransactionRequestHeader* pRequestHeader, - const std::string& remark, - int timeoutMillis) -{ - //TODO -} - -void MQClientAPIImpl::queryMessage(const std::string& addr, - QueryMessageRequestHeader* pRequestHeader, - int timeoutMillis, - InvokeCallback* pInvokeCallback) -{ - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - pRequestHeader->topic = VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic, - m_projectGroupPrefix); - } - - RemotingCommandPtr request = - RemotingCommand::createRequestCommand(QUERY_MESSAGE_VALUE, pRequestHeader); - - m_pRemotingClient->invokeAsync(addr, request, timeoutMillis, pInvokeCallback); - return; -} - -bool MQClientAPIImpl::registerClient(const std::string& addr, HeartbeatData& heartbeat, int timeoutMillis) -{ - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - std::set<ConsumerData>& consumerDatas = heartbeat.getConsumerDataSet(); - std::set<ConsumerData>::iterator it = consumerDatas.begin(); - - for (; it != consumerDatas.end(); it++) - { - ConsumerData& consumerData = (ConsumerData&)(*it); - - consumerData.groupName = VirtualEnvUtil::buildWithProjectGroup(consumerData.groupName, - m_projectGroupPrefix); - std::set<SubscriptionData>& subscriptionDatas = consumerData.subscriptionDataSet; - std::set<SubscriptionData>::iterator itsub = subscriptionDatas.begin(); - - for (; itsub != subscriptionDatas.end(); itsub++) - { - SubscriptionData& subscriptionData = (SubscriptionData&)(*itsub); - subscriptionData.setTopic(VirtualEnvUtil::buildWithProjectGroup( - subscriptionData.getTopic(), m_projectGroupPrefix)); - } - } - - std::set<ProducerData>& producerDatas = heartbeat.getProducerDataSet(); - std::set<ProducerData>::iterator itp = producerDatas.begin(); - for (; itp != producerDatas.end(); itp++) - { - ProducerData& producerData = (ProducerData&)(*itp); - producerData.groupName = VirtualEnvUtil::buildWithProjectGroup(producerData.groupName, - m_projectGroupPrefix); - } - } - - RemotingCommandPtr request = RemotingCommand::createRequestCommand(HEART_BEAT_VALUE, NULL); - - std::string body; - heartbeat.encode(body); - - request->setBody((char*)body.data(), body.length(), true); - - RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); - return (response && response->getCode() == SUCCESS_VALUE); -} - -void MQClientAPIImpl::consumerSendMessageBack( - const std::string& addr, - MessageExt& msg, - const std::string& consumerGroup, - int delayLevel, - int timeoutMillis) -{ - std::string consumerGroupWithProjectGroup = consumerGroup; - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - consumerGroupWithProjectGroup = - VirtualEnvUtil::buildWithProjectGroup(consumerGroup, m_projectGroupPrefix); - msg.setTopic(VirtualEnvUtil::buildWithProjectGroup(msg.getTopic(), m_projectGroupPrefix)); - } - - ConsumerSendMsgBackRequestHeader* requestHeader = new ConsumerSendMsgBackRequestHeader(); - requestHeader->group = consumerGroupWithProjectGroup; - requestHeader->offset = msg.getCommitLogOffset(); - requestHeader->delayLevel = delayLevel; - - RemotingCommandPtr request = RemotingCommand::createRequestCommand(CONSUMER_SEND_MSG_BACK_VALUE, requestHeader); - - std::string brokerAddr = addr.empty() ? socketAddress2IPPort(msg.getStoreHost()) : addr; - RemotingCommandPtr response = m_pRemotingClient->invokeSync(brokerAddr, request, timeoutMillis); - if (response) - { - switch (response->getCode()) - { - case SUCCESS_VALUE: - return; - break; - default: - break; - } - - THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); - } - - THROW_MQEXCEPTION(MQClientException, "consumerSendMessageBack failed", -1); -} - -std::set<MessageQueue> MQClientAPIImpl::lockBatchMQ(const std::string& addr, - LockBatchRequestBody* pRequestBody, - int timeoutMillis) -{ - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - pRequestBody->setConsumerGroup((VirtualEnvUtil::buildWithProjectGroup( - pRequestBody->getConsumerGroup(), m_projectGroupPrefix))); - std::set<MessageQueue>& messageQueues = pRequestBody->getMqSet(); - std::set<MessageQueue>::iterator it = messageQueues.begin(); - - for (; it != messageQueues.end(); it++) - { - MessageQueue& messageQueue = (MessageQueue&)(*it); - messageQueue.setTopic(VirtualEnvUtil::buildWithProjectGroup(messageQueue.getTopic(), - m_projectGroupPrefix)); - } - } - - RemotingCommandPtr request = RemotingCommand::createRequestCommand(LOCK_BATCH_MQ_VALUE, NULL); - - std::string body; - pRequestBody->encode(body); - request->setBody((char*)body.data(), body.length(), true); - - RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); - if (response) - { - switch (response->getCode()) - { - case SUCCESS_VALUE: - { - LockBatchResponseBody* responseBody = - LockBatchResponseBody::decode(response->getBody(), response->getBodyLen()); - std::set<MessageQueue> messageQueues = responseBody->getLockOKMQSet(); - - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - std::set<MessageQueue>::iterator it = messageQueues.begin(); - - for (; it != messageQueues.end(); it++) - { - MessageQueue& messageQueue = (MessageQueue&)(*it); - messageQueue.setTopic(VirtualEnvUtil::clearProjectGroup(messageQueue.getTopic(), - m_projectGroupPrefix)); - } - } - return messageQueues; - } - default: - break; - } - - THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); - } - - THROW_MQEXCEPTION(MQClientException, "lockBatchMQ failed", -1); -} - -void MQClientAPIImpl::unlockBatchMQ(const std::string& addr, - UnlockBatchRequestBody* pRequestBody, - int timeoutMillis, - bool oneway) -{ - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - pRequestBody->setConsumerGroup((VirtualEnvUtil::buildWithProjectGroup( - pRequestBody->getConsumerGroup(), m_projectGroupPrefix))); - std::set<MessageQueue>& messageQueues = pRequestBody->getMqSet(); - std::set<MessageQueue>::iterator it = messageQueues.begin(); - - for (; it != messageQueues.end(); it++) - { - MessageQueue& messageQueue = (MessageQueue&)(*it); - messageQueue.setTopic(VirtualEnvUtil::buildWithProjectGroup(messageQueue.getTopic(), - m_projectGroupPrefix)); - } - } - - RemotingCommandPtr request = RemotingCommand::createRequestCommand(UNLOCK_BATCH_MQ_VALUE, NULL); - - std::string body; - pRequestBody->encode(body); - request->setBody((char*)body.data(), body.length(), true); - - if (oneway) - { - m_pRemotingClient->invokeOneway(addr, request, timeoutMillis); - } - else - { - RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); - if (response) - { - switch (response->getCode()) - { - case SUCCESS_VALUE: - return; - default: - break; - } - - THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); - } - - THROW_MQEXCEPTION(MQClientException, "unlockBatchMQ failed", -1); - } -} - -TopicStatsTable MQClientAPIImpl::getTopicStatsInfo(const std::string& addr, - const std::string& topic, - int timeoutMillis) -{ - //TODO - TopicStatsTable t; - return t; -} - -ConsumeStats MQClientAPIImpl::getConsumeStats(const std::string& addr, - const std::string& consumerGroup, - int timeoutMillis) -{ - //TODO - ConsumeStats cs; - return cs; -} - -ProducerConnection* MQClientAPIImpl::getProducerConnectionList(const std::string& addr, - const std::string& producerGroup, - int timeoutMillis) -{ - //TODO - return NULL; -} - -ConsumerConnection* MQClientAPIImpl::getConsumerConnectionList(const std::string& addr, - const std::string& consumerGroup, - int timeoutMillis) -{ - //TODO - return NULL; -} - -KVTable MQClientAPIImpl::getBrokerRuntimeInfo(const std::string& addr, int timeoutMillis) -{ - //TODO - KVTable kv; - return kv; -} - -void MQClientAPIImpl::updateBrokerConfig(const std::string& addr, - const std::map<std::string, std::string>& properties, - int timeoutMillis) -{ - //TODO -} - -ClusterInfo* MQClientAPIImpl::getBrokerClusterInfo(int timeoutMillis) -{ - //TODO - return NULL; -} - -TopicRouteData* MQClientAPIImpl::getDefaultTopicRouteInfoFromNameServer(const std::string& topic, - int timeoutMillis) -{ - GetRouteInfoRequestHeader* requestHeader = new GetRouteInfoRequestHeader(); - requestHeader->topic = topic; - - RemotingCommandPtr request = RemotingCommand::createRequestCommand(GET_ROUTEINTO_BY_TOPIC_VALUE, requestHeader); - RemotingCommandPtr response = m_pRemotingClient->invokeSync("", request, timeoutMillis); - if (response) - { - switch (response->getCode()) - { - case TOPIC_NOT_EXIST_VALUE: - { - // TODO LOG - break; - } - case SUCCESS_VALUE: - { - int bodyLen = response->getBodyLen(); - const char* body = response->getBody(); - if (body) - { - TopicRouteData* ret = TopicRouteData::encode(body, bodyLen); - return ret; - } - } - default: - break; - } - - THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); - } - - return NULL; -} - -TopicRouteData* MQClientAPIImpl::getTopicRouteInfoFromNameServer(const std::string& topic, int timeoutMillis) -{ - std::string topicWithProjectGroup = topic; - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, m_projectGroupPrefix); - } - - GetRouteInfoRequestHeader* requestHeader = new GetRouteInfoRequestHeader(); - requestHeader->topic = topicWithProjectGroup; - - RemotingCommandPtr request = RemotingCommand::createRequestCommand(GET_ROUTEINTO_BY_TOPIC_VALUE, requestHeader); - RemotingCommandPtr response = m_pRemotingClient->invokeSync("", request, timeoutMillis); - if (response) - { - switch (response->getCode()) - { - case TOPIC_NOT_EXIST_VALUE: - { - if (topic != MixAll::DEFAULT_TOPIC) - { - RMQ_WARN("get Topic [{%s}] RouteInfoFromNameServer is not exist value", topic.c_str()); - } - break; - } - case SUCCESS_VALUE: - { - int bodyLen = response->getBodyLen(); - const char* body = response->getBody(); - if (body) - { - TopicRouteData* ret = TopicRouteData::encode(body, bodyLen); - return ret; - } - } - default: - break; - } - - THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); - } - - return NULL; -} - -TopicList* MQClientAPIImpl::getTopicListFromNameServer(int timeoutMillis) -{ - RemotingCommandPtr request = RemotingCommand::createRequestCommand(GET_ALL_TOPIC_LIST_FROM_NAMESERVER_VALUE, NULL); - RemotingCommandPtr response = m_pRemotingClient->invokeSync("", request, timeoutMillis); - if (response) - { - switch (response->getCode()) - { - case SUCCESS_VALUE: - { - char* body = (char*)response->getBody(); - if (body != NULL) - { - TopicList* topicList = TopicList::decode(body, response->getBodyLen()); - - if (!UtilAll::isBlank(m_projectGroupPrefix)) - { - std::set<std::string> newTopicSet; - - const std::set<std::string>& topics = topicList->getTopicList(); - std::set<std::string>::const_iterator it = topics.begin(); - for (; it != topics.end(); it++) - { - std::string topic = *it; - newTopicSet.insert(VirtualEnvUtil::clearProjectGroup(topic, m_projectGroupPrefix)); - } - - topicList->setTopicList(newTopicSet); - } - - return topicList; - } - } - default: - break; - } - - THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); - } - - return NULL; -} - -int MQClientAPIImpl::wipeWritePermOfBroker(const std::string& namesrvAddr, - const std::string& brokerName, - int timeoutMillis) -{ - //TODO - return 0; -} - -void MQClientAPIImpl::deleteTopicInBroker(const std::string& addr, - const std::string& topic, - int timeoutMillis) -{ - //TODO -} - -void MQClientAPIImpl::deleteTopicInNameServer(const std::string& addr, - const std::string& topic, - int timeoutMillis) -{ - //TODO -} - -void MQClientAPIImpl::deleteSubscriptionGroup(const std::string& addr, - const std::string& groupName, - int timeoutMillis) -{ - //TODO -} - -std::string MQClientAPIImpl::getKVConfigValue(const std::string& projectNamespace, - const std::string& key, - int timeoutMillis) -{ - GetKVConfigRequestHeader* pRequestHeader = new GetKVConfigRequestHeader(); - pRequestHeader->namespace_ = projectNamespace; - pRequestHeader->key = key; - - RemotingCommandPtr request = - RemotingCommand::createRequestCommand(GET_KV_CONFIG_VALUE, pRequestHeader); - - RemotingCommandPtr response = m_pRemotingClient->invokeSync("", request, timeoutMillis); - if (response) - { - switch (response->getCode()) - { - case SUCCESS_VALUE: - { - GetKVConfigResponseHeader* ret = (GetKVConfigResponseHeader*)response->getCommandCustomHeader(); - return ret->value; - } - default: - break; - } - THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); - } - - THROW_MQEXCEPTION(MQClientException, "getKVConfigValue failed", -1); -} - -void MQClientAPIImpl::putKVConfigValue(const std::string& projectNamespace, - const std::string& key, - const std::string& value, - int timeoutMillis) -{ - //TODO -} - -void MQClientAPIImpl::deleteKVConfigValue(const std::string& projectNamespace, - const std::string& key, - int timeoutMillis) -{ - //TODO -} - -std::string MQClientAPIImpl::getProjectGroupByIp(const std::string& ip, int timeoutMillis) -{ - return getKVConfigValue(NamesrvUtil::NAMESPACE_PROJECT_CONFIG, ip, timeoutMillis); -} - -std::string MQClientAPIImpl::getKVConfigByValue(const std::string& projectNamespace, - const std::string& projectGroup, - int timeoutMillis) -{ - //TODO - return ""; -} - -KVTable MQClientAPIImpl::getKVListByNamespace(const std::string& projectNamespace, int timeoutMillis) -{ - //TODO - return KVTable(); -} - -void MQClientAPIImpl::deleteKVConfigByValue(const std::string& projectNamespace, - const std::string& projectGroup, - int timeoutMillis) -{ - //TODO -} - -SendResult* MQClientAPIImpl::sendMessageSync(const std::string& addr, - const std::string& brokerName, - Message& msg, - int timeoutMillis, - RemotingCommand* request) -{ - RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); - return processSendResponse(brokerName, msg.getTopic(), response); -} - -void MQClientAPIImpl::sendMessageAsync(const std::string& addr, - const std::string& brokerName, - Message& msg, - int timeoutMillis, - RemotingCommand* request, - SendCallback* pSendCallback) -{ - ProducerInvokeCallback* callback = new ProducerInvokeCallback(pSendCallback, this, msg.getTopic(), brokerName); - m_pRemotingClient->invokeAsync(addr, request, timeoutMillis, callback); -} - -SendResult* MQClientAPIImpl::processSendResponse(const std::string& brokerName, - const std::string& topic, - RemotingCommand* pResponse) -{ - if (pResponse == NULL) - { - return NULL; - } - - switch (pResponse->getCode()) - { - case FLUSH_DISK_TIMEOUT_VALUE: - case FLUSH_SLAVE_TIMEOUT_VALUE: - case SLAVE_NOT_AVAILABLE_VALUE: - { - // TODO LOG - } - case SUCCESS_VALUE: - { - SendStatus sendStatus = SEND_OK; - switch (pResponse->getCode()) - { - case FLUSH_DISK_TIMEOUT_VALUE: - sendStatus = FLUSH_DISK_TIMEOUT; - break; - case FLUSH_SLAVE_TIMEOUT_VALUE: - sendStatus = FLUSH_SLAVE_TIMEOUT; - break; - case SLAVE_NOT_AVAILABLE_VALUE: - sendStatus = SLAVE_NOT_AVAILABLE; - break; - case SUCCESS_VALUE: - sendStatus = SEND_OK; - break; - default: - //assert false; - break; - } - - SendMessageResponseHeader* responseHeader = (SendMessageResponseHeader*)pResponse->getCommandCustomHeader(); - MessageQueue messageQueue(topic, brokerName, responseHeader->queueId); - SendResult* ret = new SendResult(sendStatus, responseHeader->msgId, messageQueue, - responseHeader->queueOffset, m_projectGroupPrefix); - - return ret; - } - default: - break; - } - - THROW_MQEXCEPTION(MQClientException, pResponse->getRemark(), pResponse->getCode()); -} - -void MQClientAPIImpl::pullMessageAsync(const std::string& addr, - RemotingCommand* pRequest, - int timeoutMillis, - PullCallback* pPullCallback) -{ - ConsumerInvokeCallback* callback = new ConsumerInvokeCallback(pPullCallback, this); - m_pRemotingClient->invokeAsync(addr, pRequest, timeoutMillis, callback); -} - -PullResult* MQClientAPIImpl::processPullResponse(RemotingCommand* pResponse) -{ - PullStatus pullStatus = NO_NEW_MSG; - switch (pResponse->getCode()) - { - case SUCCESS_VALUE: - pullStatus = FOUND; - break; - case PULL_NOT_FOUND_VALUE: - pullStatus = NO_NEW_MSG; - break; - case PULL_RETRY_IMMEDIATELY_VALUE: - pullStatus = NO_MATCHED_MSG; - break; - case PULL_OFFSET_MOVED_VALUE: - pullStatus = OFFSET_ILLEGAL; - break; - default: - THROW_MQEXCEPTION(MQBrokerException, pResponse->getRemark(), pResponse->getCode()); - break; - } - - PullMessageResponseHeader* responseHeader = (PullMessageResponseHeader*) pResponse->getCommandCustomHeader(); - std::list<MessageExt*> msgFoundList; - return new PullResultExt(pullStatus, responseHeader->nextBeginOffset, - responseHeader->minOffset, responseHeader->maxOffset, msgFoundList, - responseHeader->suggestWhichBrokerId, pResponse->getBody(), pResponse->getBodyLen()); -} - -PullResult* MQClientAPIImpl::pullMessageSync(const std::string& addr, - RemotingCommand* pRequest, - int timeoutMillis) -{ - RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, pRequest, timeoutMillis); - PullResult* result = processPullResponse(response); - - response->setBody(NULL, 0, false); - return result; -} - -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/MQClientAPIImpl.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/MQClientAPIImpl.h b/rocketmq-client4cpp/src/MQClientAPIImpl.h deleted file mode 100755 index 88defb5..0000000 --- a/rocketmq-client4cpp/src/MQClientAPIImpl.h +++ /dev/null @@ -1,280 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __MQCLIENTAPIIMPL_H__ -#define __MQCLIENTAPIIMPL_H__ - -#include <string> -#include <map> -#include <list> -#include <set> - -#include "ClientConfig.h" -#include "RemoteClientConfig.h" -#include "SubscriptionGroupConfig.h" -#include "TopicConfig.h" -#include "ConsumeStats.h" -#include "TopicStatsTable.h" -#include "KVTable.h" -#include "TopicRouteData.h" -#include "SendResult.h" -#include "PullResult.h" -#include "MessageExt.h" -#include "CommunicationMode.h" -#include "TopAddressing.h" -#include "HeartbeatData.h" -#include "LockBatchBody.h" - -namespace rmq -{ -class ClientConfig; -class TcpRemotingClient; -class QueryConsumerOffsetRequestHeader; -class UpdateConsumerOffsetRequestHeader; -class EndTransactionRequestHeader; -class SendMessageRequestHeader; -class PullMessageRequestHeader; -class QueryMessageRequestHeader; -class ProducerConnection; -class ConsumerConnection; -class ClusterInfo; -class TopicList; -class InvokeCallback; -class RemotingCommand; -class PullCallback; -class SendCallback; -class ClientRemotingProcessor; - -class MQClientAPIImpl -{ - public: - MQClientAPIImpl(ClientConfig& clientConfig, - const RemoteClientConfig& remoteClientConfig, - ClientRemotingProcessor* pClientRemotingProcessor); - ~MQClientAPIImpl(); - - void start(); - void shutdown(); - - std::string getProjectGroupPrefix(); - std::vector<std::string> getNameServerAddressList(); - void updateNameServerAddressList(const std::string& addrs); - std::string fetchNameServerAddr(); - - void createSubscriptionGroup(const std::string& addr, - SubscriptionGroupConfig config, - int timeoutMillis); - - void createTopic(const std::string& addr, - const std::string& defaultTopic, - TopicConfig topicConfig, - int timeoutMillis); - - SendResult sendMessage(const std::string& addr, - const std::string& brokerName, - Message& msg, - SendMessageRequestHeader* pRequestHeader, - int timeoutMillis, - CommunicationMode communicationMode, - SendCallback* pSendCallback); - - PullResult* pullMessage(const std::string& addr, - PullMessageRequestHeader* pRequestHeader, - int timeoutMillis, - CommunicationMode communicationMode, - PullCallback* pPullCallback); - - MessageExt* viewMessage(const std::string& addr, long long phyoffset, int timeoutMillis); - - - long long searchOffset(const std::string& addr, - const std::string& topic, - int queueId, - long long timestamp, - int timeoutMillis); - - long long getMaxOffset(const std::string& addr, - const std::string& topic, - int queueId, - int timeoutMillis); - - std::list<std::string> getConsumerIdListByGroup(const std::string& addr, - const std::string& consumerGroup, - int timeoutMillis); - - long long getMinOffset(const std::string& addr, - const std::string& topic, - int queueId, - int timeoutMillis); - - long long getEarliestMsgStoretime(const std::string& addr, - const std::string& topic, - int queueId, - int timeoutMillis); - - long long queryConsumerOffset(const std::string& addr, - QueryConsumerOffsetRequestHeader* pRequestHeader, - int timeoutMillis); - - void updateConsumerOffset(const std::string& addr, - UpdateConsumerOffsetRequestHeader* pRequestHeader, - int timeoutMillis); - - void updateConsumerOffsetOneway(const std::string& addr, - UpdateConsumerOffsetRequestHeader* pRequestHeader, - int timeoutMillis); - - void sendHearbeat(const std::string& addr, HeartbeatData* pHeartbeatData, int timeoutMillis); - - void unregisterClient(const std::string& addr, - const std::string& clientID, - const std::string& producerGroup, - const std::string& consumerGroup, - int timeoutMillis); - - void endTransactionOneway(const std::string& addr, - EndTransactionRequestHeader* pRequestHeader, - const std::string& remark, - int timeoutMillis); - - void queryMessage(const std::string& addr, - QueryMessageRequestHeader* pRequestHeader, - int timeoutMillis, - InvokeCallback* pInvokeCallback); - - bool registerClient(const std::string& addr, - HeartbeatData& heartbeat, - int timeoutMillis); - - void consumerSendMessageBack(const std::string& addr, - MessageExt& msg, - const std::string& consumerGroup, - int delayLevel, - int timeoutMillis); - - std::set<MessageQueue> lockBatchMQ(const std::string& addr, - LockBatchRequestBody* pRequestBody, - int timeoutMillis); - - void unlockBatchMQ(const std::string& addr, - UnlockBatchRequestBody* pRequestBody, - int timeoutMillis, - bool oneway); - - TopicStatsTable getTopicStatsInfo(const std::string& addr, - const std::string& topic, - int timeoutMillis); - - ConsumeStats getConsumeStats(const std::string& addr, - const std::string& consumerGroup, - int timeoutMillis); - - ProducerConnection* getProducerConnectionList(const std::string& addr, - const std::string& producerGroup, - int timeoutMillis); - - ConsumerConnection* getConsumerConnectionList(const std::string& addr, - const std::string& consumerGroup, - int timeoutMillis); - - KVTable getBrokerRuntimeInfo(const std::string& addr, int timeoutMillis); - - void updateBrokerConfig(const std::string& addr, - const std::map<std::string, std::string>& properties, - int timeoutMillis); - - ClusterInfo* getBrokerClusterInfo(int timeoutMillis); - - TopicRouteData* getDefaultTopicRouteInfoFromNameServer(const std::string& topic, int timeoutMillis); - - TopicRouteData* getTopicRouteInfoFromNameServer(const std::string& topic, int timeoutMillis); - - TopicList* getTopicListFromNameServer(int timeoutMillis); - - int wipeWritePermOfBroker(const std::string& namesrvAddr, - const std::string& brokerName, - int timeoutMillis); - - void deleteTopicInBroker(const std::string& addr, const std::string& topic, int timeoutMillis); - void deleteTopicInNameServer(const std::string& addr, const std::string& topic, int timeoutMillis); - void deleteSubscriptionGroup(const std::string& addr, - const std::string& groupName, - int timeoutMillis); - - std::string getKVConfigValue(const std::string& projectNamespace, - const std::string& key, - int timeoutMillis); - - void putKVConfigValue(const std::string& projectNamespace, - const std::string& key, - const std::string& value, - int timeoutMillis); - - void deleteKVConfigValue(const std::string& projectNamespace, const std::string& key, int timeoutMillis); - - std::string getProjectGroupByIp(const std::string& ip, int timeoutMillis); - - std::string getKVConfigByValue(const std::string& projectNamespace, - const std::string& projectGroup, - int timeoutMillis); - - KVTable getKVListByNamespace(const std::string& projectNamespace, int timeoutMillis); - - void deleteKVConfigByValue(const std::string& projectNamespace, - const std::string& projectGroup, - int timeoutMillis); - - TcpRemotingClient* getRemotingClient(); - - SendResult* processSendResponse(const std::string& brokerName, - const std::string& topic, - RemotingCommand* pResponse); - - PullResult* processPullResponse(RemotingCommand* pResponse); - - private: - SendResult* sendMessageSync(const std::string& addr, - const std::string& brokerName, - Message& msg, - int timeoutMillis, - RemotingCommand* request); - - void sendMessageAsync(const std::string& addr, - const std::string& brokerName, - Message& msg, - int timeoutMillis, - RemotingCommand* request, - SendCallback* pSendCallback); - - void pullMessageAsync(const std::string& addr, - RemotingCommand* pRequest, - int timeoutMillis, - PullCallback* pPullCallback); - - PullResult* pullMessageSync(const std::string& addr, - RemotingCommand* pRequest, - int timeoutMillis); - - private: - TcpRemotingClient* m_pRemotingClient; - TopAddressing m_topAddressing; - ClientRemotingProcessor* m_pClientRemotingProcessor; - std::string m_nameSrvAddr; - std::string m_projectGroupPrefix; - }; -} - -#endif