http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/producer/DefaultMQProducer.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/producer/DefaultMQProducer.cpp b/rocketmq-cpp/src/producer/DefaultMQProducer.cpp new file mode 100755 index 0000000..9c53930 --- /dev/null +++ b/rocketmq-cpp/src/producer/DefaultMQProducer.cpp @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "DefaultMQProducer.h" +#include <assert.h> +#include "CommandHeader.h" +#include "CommunicationMode.h" +#include "Logging.h" +#include "MQClientAPIImpl.h" +#include "MQClientException.h" +#include "MQClientFactory.h" +#include "MQClientManager.h" +#include "MQDecoder.h" +#include "MQProtos.h" +#include "MessageSysFlag.h" +#include "TopicPublishInfo.h" +#include "Validators.h" + +namespace rocketmq { + +//<!************************************************************************ +DefaultMQProducer::DefaultMQProducer(const string& groupname) + : m_sendMsgTimeout(3000), + m_compressMsgBodyOverHowmuch(4 * 1024), + m_maxMessageSize(1024 * 128), + m_retryAnotherBrokerWhenNotStoreOK(false), + m_compressLevel(5), + m_retryTimes(5) { + //<!set default group name; + string gname = groupname.empty() ? DEFAULT_PRODUCER_GROUP : groupname; + setGroupName(gname); +} + +DefaultMQProducer::~DefaultMQProducer() {} + +void DefaultMQProducer::start() { + /* Ignore the SIGPIPE */ + struct sigaction sa; + sa.sa_handler = SIG_IGN; + sa.sa_flags = 0; + sigaction(SIGPIPE, &sa, 0); + + switch (m_serviceState) { + case CREATE_JUST: { + m_serviceState = START_FAILED; + MQClient::start(); + LOG_INFO("DefaultMQProducer:%s start", m_GroupName.c_str()); + + bool registerOK = getFactory()->registerProducer(this); + if (!registerOK) { + m_serviceState = CREATE_JUST; + THROW_MQEXCEPTION( + MQClientException, + "The producer group[" + getGroupName() + + "] has been created before, specify another name please.", + -1); + } + + getFactory()->start(); + getFactory()->sendHeartbeatToAllBroker(); + m_serviceState = RUNNING; + break; + } + case RUNNING: + case START_FAILED: + case SHUTDOWN_ALREADY: + break; + default: + break; + } +} + +void DefaultMQProducer::shutdown() { + switch (m_serviceState) { + case RUNNING: { + LOG_INFO("DefaultMQProducer shutdown"); + getFactory()->unregisterProducer(this); + getFactory()->shutdown(); + m_serviceState = SHUTDOWN_ALREADY; + break; + } + case SHUTDOWN_ALREADY: + case CREATE_JUST: + break; + default: + break; + } +} + +SendResult DefaultMQProducer::send(MQMessage& msg, bool bSelectActiveBroker) { + Validators::checkMessage(msg, getMaxMessageSize()); + try { + return sendDefaultImpl(msg, ComMode_SYNC, NULL, bSelectActiveBroker); + } catch (MQException& e) { + LOG_ERROR(e.what()); + throw e; + } + return SendResult(); +} + +void DefaultMQProducer::send(MQMessage& msg, SendCallback* pSendCallback, + bool bSelectActiveBroker) { + Validators::checkMessage(msg, getMaxMessageSize()); + try { + sendDefaultImpl(msg, ComMode_ASYNC, pSendCallback, bSelectActiveBroker); + } catch (MQException& e) { + LOG_ERROR(e.what()); + throw e; + } +} + +SendResult DefaultMQProducer::send(MQMessage& msg, const MQMessageQueue& mq) { + Validators::checkMessage(msg, getMaxMessageSize()); + if (msg.getTopic() != mq.getTopic()) { + LOG_WARN("message's topic not equal mq's topic"); + } + try { + return sendKernelImpl(msg, mq, ComMode_SYNC, NULL); + } catch (MQException& e) { + LOG_ERROR(e.what()); + throw e; + } + return SendResult(); +} + +void DefaultMQProducer::send(MQMessage& msg, const MQMessageQueue& mq, + SendCallback* pSendCallback) { + Validators::checkMessage(msg, getMaxMessageSize()); + if (msg.getTopic() != mq.getTopic()) { + LOG_WARN("message's topic not equal mq's topic"); + } + try { + sendKernelImpl(msg, mq, ComMode_ASYNC, pSendCallback); + } catch (MQException& e) { + LOG_ERROR(e.what()); + throw e; + } +} + +void DefaultMQProducer::sendOneway(MQMessage& msg, bool bSelectActiveBroker) { + Validators::checkMessage(msg, getMaxMessageSize()); + try { + sendDefaultImpl(msg, ComMode_ONEWAY, NULL, bSelectActiveBroker); + } catch (MQException& e) { + LOG_ERROR(e.what()); + throw e; + } +} + +void DefaultMQProducer::sendOneway(MQMessage& msg, const MQMessageQueue& mq) { + Validators::checkMessage(msg, getMaxMessageSize()); + if (msg.getTopic() != mq.getTopic()) { + LOG_WARN("message's topic not equal mq's topic"); + } + try { + sendKernelImpl(msg, mq, ComMode_ONEWAY, NULL); + } catch (MQException& e) { + LOG_ERROR(e.what()); + throw e; + } +} + +SendResult DefaultMQProducer::send(MQMessage& msg, + MessageQueueSelector* pSelector, void* arg) { + try { + return sendSelectImpl(msg, pSelector, arg, ComMode_SYNC, NULL); + } catch (MQException& e) { + LOG_ERROR(e.what()); + throw e; + } + return SendResult(); +} + +SendResult DefaultMQProducer::send(MQMessage& msg, + MessageQueueSelector* pSelector, void* arg, + int autoRetryTimes, bool bActiveBroker) { + try { + return sendAutoRetrySelectImpl(msg, pSelector, arg, ComMode_SYNC, NULL, + autoRetryTimes, bActiveBroker); + } catch (MQException& e) { + LOG_ERROR(e.what()); + throw e; + } + return SendResult(); +} + +void DefaultMQProducer::send(MQMessage& msg, MessageQueueSelector* pSelector, + void* arg, SendCallback* pSendCallback) { + try { + sendSelectImpl(msg, pSelector, arg, ComMode_ASYNC, pSendCallback); + } catch (MQException& e) { + LOG_ERROR(e.what()); + throw e; + } +} + +void DefaultMQProducer::sendOneway(MQMessage& msg, + MessageQueueSelector* pSelector, void* arg) { + try { + sendSelectImpl(msg, pSelector, arg, ComMode_ONEWAY, NULL); + } catch (MQException& e) { + LOG_ERROR(e.what()); + throw e; + } +} + +int DefaultMQProducer::getSendMsgTimeout() const { return m_sendMsgTimeout; } + +void DefaultMQProducer::setSendMsgTimeout(int sendMsgTimeout) { + m_sendMsgTimeout = sendMsgTimeout; +} + +int DefaultMQProducer::getCompressMsgBodyOverHowmuch() const { + return m_compressMsgBodyOverHowmuch; +} + +void DefaultMQProducer::setCompressMsgBodyOverHowmuch( + int compressMsgBodyOverHowmuch) { + m_compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch; +} + +int DefaultMQProducer::getMaxMessageSize() const { return m_maxMessageSize; } + +void DefaultMQProducer::setMaxMessageSize(int maxMessageSize) { + m_maxMessageSize = maxMessageSize; +} + +int DefaultMQProducer::getCompressLevel() const { return m_compressLevel; } + +void DefaultMQProducer::setCompressLevel(int compressLevel) { + assert(compressLevel >= 0 && compressLevel <= 9 || compressLevel == -1); + + m_compressLevel = compressLevel; +} + +//<!************************************************************************ +SendResult DefaultMQProducer::sendDefaultImpl(MQMessage& msg, + int communicationMode, + SendCallback* pSendCallback, + bool bActiveMQ) { + MQMessageQueue lastmq; + int mq_index = 0; + for (int times = 1; times <= m_retryTimes; times++) { + boost::weak_ptr<TopicPublishInfo> weak_topicPublishInfo( + getFactory()->tryToFindTopicPublishInfo(msg.getTopic(), + getSessionCredentials())); + boost::shared_ptr<TopicPublishInfo> topicPublishInfo( + weak_topicPublishInfo.lock()); + if (topicPublishInfo) { + if (times == 1) { + mq_index = topicPublishInfo->getWhichQueue(); + } else { + mq_index++; + } + + SendResult sendResult; + MQMessageQueue mq; + if (bActiveMQ) + mq = topicPublishInfo->selectOneActiveMessageQueue(lastmq, mq_index); + else + mq = topicPublishInfo->selectOneMessageQueue(lastmq, mq_index); + + lastmq = mq; + if (mq.getQueueId() == -1) { + // THROW_MQEXCEPTION(MQClientException, "the MQMessageQueue is + // invalide", -1); + continue; + } + + try { + LOG_DEBUG("send to brokerName:%s", mq.getBrokerName().c_str()); + sendResult = sendKernelImpl(msg, mq, communicationMode, pSendCallback); + switch (communicationMode) { + case ComMode_ASYNC: + return sendResult; + case ComMode_ONEWAY: + return sendResult; + case ComMode_SYNC: + if (sendResult.getSendStatus() != SEND_OK) { + if (bActiveMQ) { + topicPublishInfo->updateNonServiceMessageQueue( + mq, getSendMsgTimeout()); + } + continue; + } + return sendResult; + default: + break; + } + } catch (...) { + LOG_ERROR("send failed of times:%d,brokerName:%s", times, + mq.getBrokerName().c_str()); + if (bActiveMQ) { + topicPublishInfo->updateNonServiceMessageQueue(mq, + getSendMsgTimeout()); + } + continue; + } + } // end of for + LOG_WARN("Retry many times, still failed"); + } + THROW_MQEXCEPTION(MQClientException, "No route info of this topic, ", -1); +} + +SendResult DefaultMQProducer::sendKernelImpl(MQMessage& msg, + const MQMessageQueue& mq, + int communicationMode, + SendCallback* sendCallback) { + string brokerAddr = + getFactory()->findBrokerAddressInPublish(mq.getBrokerName()); + + if (brokerAddr.empty()) { + getFactory()->tryToFindTopicPublishInfo(mq.getTopic(), + getSessionCredentials()); + brokerAddr = getFactory()->findBrokerAddressInPublish(mq.getBrokerName()); + } + + if (!brokerAddr.empty()) { + try { + LOG_DEBUG("produce before:%s to %s", msg.toString().c_str(), + mq.toString().c_str()); + int sysFlag = 0; + if (tryToCompressMessage(msg)) { + sysFlag |= MessageSysFlag::CompressedFlag; + } + + string tranMsg = + msg.getProperty(MQMessage::PROPERTY_TRANSACTION_PREPARED); + if (!tranMsg.empty() && tranMsg == "true") { + sysFlag |= MessageSysFlag::TransactionPreparedType; + } + + SendMessageRequestHeader* requestHeader = new SendMessageRequestHeader(); + requestHeader->producerGroup = getGroupName(); + requestHeader->topic = (msg.getTopic()); + requestHeader->defaultTopic = DEFAULT_TOPIC; + requestHeader->defaultTopicQueueNums = 4; + requestHeader->queueId = (mq.getQueueId()); + requestHeader->sysFlag = (sysFlag); + requestHeader->bornTimestamp = UtilAll::currentTimeMillis(); + requestHeader->flag = (msg.getFlag()); + requestHeader->properties = + (MQDecoder::messageProperties2String(msg.getProperties())); + + return getFactory()->getMQClientAPIImpl()->sendMessage( + brokerAddr, mq.getBrokerName(), msg, requestHeader, + getSendMsgTimeout(), communicationMode, sendCallback, + getSessionCredentials()); + } catch (MQException& e) { + throw e; + } + } + THROW_MQEXCEPTION(MQClientException, + "The broker[" + mq.getBrokerName() + "] not exist", -1); +} + +SendResult DefaultMQProducer::sendSelectImpl(MQMessage& msg, + MessageQueueSelector* pSelector, + void* pArg, int communicationMode, + SendCallback* sendCallback) { + Validators::checkMessage(msg, getMaxMessageSize()); + + boost::weak_ptr<TopicPublishInfo> weak_topicPublishInfo( + getFactory()->tryToFindTopicPublishInfo(msg.getTopic(), + getSessionCredentials())); + boost::shared_ptr<TopicPublishInfo> topicPublishInfo( + weak_topicPublishInfo.lock()); + if (topicPublishInfo) //&& topicPublishInfo->ok()) + { + MQMessageQueue mq = + pSelector->select(topicPublishInfo->getMessageQueueList(), msg, pArg); + return sendKernelImpl(msg, mq, communicationMode, sendCallback); + } + THROW_MQEXCEPTION(MQClientException, "No route info for this topic", -1); +} + +SendResult DefaultMQProducer::sendAutoRetrySelectImpl( + MQMessage& msg, MessageQueueSelector* pSelector, void* pArg, + int communicationMode, SendCallback* pSendCallback, int autoRetryTimes, + bool bActiveMQ) { + Validators::checkMessage(msg, getMaxMessageSize()); + + MQMessageQueue lastmq; + MQMessageQueue mq; + int mq_index = 0; + for (int times = 1; times <= autoRetryTimes + 1; times++) { + boost::weak_ptr<TopicPublishInfo> weak_topicPublishInfo( + getFactory()->tryToFindTopicPublishInfo(msg.getTopic(), + getSessionCredentials())); + boost::shared_ptr<TopicPublishInfo> topicPublishInfo( + weak_topicPublishInfo.lock()); + if (topicPublishInfo) { + SendResult sendResult; + if (times == 1) { // always send to selected MQ firstly, evenif bActiveMQ + // was setted to true + mq = pSelector->select(topicPublishInfo->getMessageQueueList(), msg, + pArg); + lastmq = mq; + } else { + LOG_INFO("sendAutoRetrySelectImpl with times:%d", times); + vector<MQMessageQueue> mqs(topicPublishInfo->getMessageQueueList()); + for (size_t i = 0; i < mqs.size(); i++) { + if (mqs[i] == lastmq) mq_index = i; + } + if (bActiveMQ) + mq = topicPublishInfo->selectOneActiveMessageQueue(lastmq, mq_index); + else + mq = topicPublishInfo->selectOneMessageQueue(lastmq, mq_index); + lastmq = mq; + if (mq.getQueueId() == -1) { + // THROW_MQEXCEPTION(MQClientException, "the MQMessageQueue is + // invalide", -1); + continue; + } + } + + try { + LOG_DEBUG("send to broker:%s", mq.toString().c_str()); + sendResult = sendKernelImpl(msg, mq, communicationMode, pSendCallback); + switch (communicationMode) { + case ComMode_ASYNC: + return sendResult; + case ComMode_ONEWAY: + return sendResult; + case ComMode_SYNC: + if (sendResult.getSendStatus() != SEND_OK) { + if (bActiveMQ) { + topicPublishInfo->updateNonServiceMessageQueue( + mq, getSendMsgTimeout()); + } + continue; + } + return sendResult; + default: + break; + } + } catch (...) { + LOG_ERROR("send failed of times:%d,mq:%s", times, + mq.toString().c_str()); + if (bActiveMQ) { + topicPublishInfo->updateNonServiceMessageQueue(mq, + getSendMsgTimeout()); + } + continue; + } + } // end of for + LOG_WARN("Retry many times, still failed"); + } + THROW_MQEXCEPTION(MQClientException, "No route info of this topic, ", -1); +} + +bool DefaultMQProducer::tryToCompressMessage(MQMessage& msg) { + string body = msg.getBody(); + if ((int)body.length() >= getCompressMsgBodyOverHowmuch()) { + string outBody; + if (UtilAll::deflate(body, outBody, getCompressLevel())) { + msg.setBody(outBody); + return true; + } + } + + return false; +} +int DefaultMQProducer::getRetryTimes() const { return m_retryTimes; } +void DefaultMQProducer::setRetryTimes(int times) { + if (times <= 0) { + LOG_WARN("set retry times illegal, use default value:5"); + return; + } + + if (times > 15) { + LOG_WARN("set retry times illegal, use max value:15"); + m_retryTimes = 15; + return; + } + LOG_WARN("set retry times to:%d", times); + m_retryTimes = times; +} +//<!*************************************************************************** +} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/producer/SendResult.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/producer/SendResult.cpp b/rocketmq-cpp/src/producer/SendResult.cpp new file mode 100755 index 0000000..7fd844e --- /dev/null +++ b/rocketmq-cpp/src/producer/SendResult.cpp @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "SendResult.h" +#include "UtilAll.h" +#include "VirtualEnvUtil.h" + +namespace rocketmq { +//<!*************************************************************************** +SendResult::SendResult() : m_sendStatus(SEND_OK), m_queueOffset(0) {} + +SendResult::SendResult(const SendStatus& sendStatus, const string& msgId, + const MQMessageQueue& messageQueue, int64 queueOffset) + : m_sendStatus(sendStatus), + m_msgId(msgId), + m_messageQueue(messageQueue), + m_queueOffset(queueOffset) {} + +SendResult::SendResult(const SendResult& other) { + m_sendStatus = other.m_sendStatus; + m_msgId = other.m_msgId; + m_messageQueue = other.m_messageQueue; + m_queueOffset = other.m_queueOffset; +} + +SendResult& SendResult::operator=(const SendResult& other) { + if (this != &other) { + m_sendStatus = other.m_sendStatus; + m_msgId = other.m_msgId; + m_messageQueue = other.m_messageQueue; + m_queueOffset = other.m_queueOffset; + } + return *this; +} + +SendResult::~SendResult() {} + +const string& SendResult::getMsgId() const { return m_msgId; } + +SendStatus SendResult::getSendStatus() const { return m_sendStatus; } + +MQMessageQueue SendResult::getMessageQueue() const { return m_messageQueue; } + +int64 SendResult::getQueueOffset() const { return m_queueOffset; } + +//<!************************************************************************ +} //<!end namespace; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/producer/TopicPublishInfo.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/producer/TopicPublishInfo.h b/rocketmq-cpp/src/producer/TopicPublishInfo.h new file mode 100755 index 0000000..726b231 --- /dev/null +++ b/rocketmq-cpp/src/producer/TopicPublishInfo.h @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __TOPICPUBLISHINFO_H__ +#define __TOPICPUBLISHINFO_H__ + +#include <boost/asio.hpp> +#include <boost/asio/io_service.hpp> +#include <boost/atomic.hpp> +#include <boost/bind.hpp> +#include <boost/date_time/posix_time/posix_time.hpp> +#include <boost/scoped_ptr.hpp> +#include <boost/thread/thread.hpp> +#include "Logging.h" +#include "MQMessageQueue.h" + +namespace rocketmq { +//<!************************************************************************/ +class TopicPublishInfo { + public: + TopicPublishInfo() : m_sendWhichQueue(0) { + m_async_service_thread.reset(new boost::thread( + boost::bind(&TopicPublishInfo::boost_asio_work, this))); + } + + void boost_asio_work() { + boost::asio::io_service::work work(m_async_ioService); // avoid async io + // service stops + // after first timer + // timeout callback + boost::system::error_code e; + boost::asio::deadline_timer t(m_async_ioService, + boost::posix_time::seconds(60)); + t.async_wait(boost::bind( + &TopicPublishInfo::op_resumeNonServiceMessageQueueList, this, e, &t)); + boost::system::error_code ec; + m_async_ioService.run(ec); + } + + virtual ~TopicPublishInfo() { + m_async_ioService.stop(); + m_async_service_thread->interrupt(); + m_async_service_thread->join(); + + m_nonSerivceQueues.clear(); + m_onSerivceQueues.clear(); + m_brokerTimerMap.clear(); + m_queues.clear(); + } + + bool ok() { + boost::lock_guard<boost::mutex> lock(m_queuelock); + return !m_queues.empty(); + } + + void updateMessageQueueList(const MQMessageQueue& mq) { + boost::lock_guard<boost::mutex> lock(m_queuelock); + m_queues.push_back(mq); + string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId()); + m_onSerivceQueues[key] = mq; + if (m_nonSerivceQueues.find(key) != m_nonSerivceQueues.end()) { + m_nonSerivceQueues.erase(key); // if topicPublishInfo changed, erase this + // mq from m_nonSerivceQueues to avoid 2 + // copies both in m_onSerivceQueues and + // m_nonSerivceQueues + } + } + + void op_resumeNonServiceMessageQueueList(boost::system::error_code& ec, + boost::asio::deadline_timer* t) { + resumeNonServiceMessageQueueList(); + boost::system::error_code e; + t->expires_at(t->expires_at() + boost::posix_time::seconds(60), e); + t->async_wait(boost::bind( + &TopicPublishInfo::op_resumeNonServiceMessageQueueList, this, e, t)); + } + + void resumeNonServiceMessageQueueList() { + boost::lock_guard<boost::mutex> lock(m_queuelock); + for (map<MQMessageQueue, int64>::iterator it = m_brokerTimerMap.begin(); + it != m_brokerTimerMap.end(); ++it) { + if (UtilAll::currentTimeMillis() - it->second >= 1000 * 60 * 5) { + string key = it->first.getBrokerName() + + UtilAll::to_string(it->first.getQueueId()); + if (m_nonSerivceQueues.find(key) != m_nonSerivceQueues.end()) { + m_nonSerivceQueues.erase(key); + } + m_onSerivceQueues[key] = it->first; + } + } + } + + void updateNonServiceMessageQueue(const MQMessageQueue& mq, + int timeoutMilliseconds) { + boost::lock_guard<boost::mutex> lock(m_queuelock); + + string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId()); + if (m_nonSerivceQueues.find(key) != m_nonSerivceQueues.end()) { + return; + } + LOG_INFO("updateNonServiceMessageQueue of mq:%s", mq.toString().c_str()); + m_brokerTimerMap[mq] = UtilAll::currentTimeMillis(); + m_nonSerivceQueues[key] = mq; + if (m_onSerivceQueues.find(key) != m_onSerivceQueues.end()) { + m_onSerivceQueues.erase(key); + } + } + + vector<MQMessageQueue>& getMessageQueueList() { + boost::lock_guard<boost::mutex> lock(m_queuelock); + return m_queues; + } + + int getWhichQueue() { + return m_sendWhichQueue.load(boost::memory_order_acquire); + } + + MQMessageQueue selectOneMessageQueue(const MQMessageQueue& lastmq, + int& mq_index) { + boost::lock_guard<boost::mutex> lock(m_queuelock); + + if (m_queues.size() > 0) { + LOG_DEBUG("selectOneMessageQueue Enter, queue size:%zu", m_queues.size()); + unsigned int pos = 0; + if (mq_index >= 0) { + pos = mq_index % m_queues.size(); + } else { + LOG_ERROR("mq_index is negative"); + return MQMessageQueue(); + } + if (!lastmq.getBrokerName().empty()) { + for (size_t i = 0; i < m_queues.size(); i++) { + if (m_sendWhichQueue.load(boost::memory_order_acquire) == + numeric_limits<int>::max()) { + m_sendWhichQueue.store(0, boost::memory_order_release); + } + + if (pos >= m_queues.size()) pos = pos % m_queues.size(); + + ++m_sendWhichQueue; + MQMessageQueue mq = m_queues.at(pos); + LOG_DEBUG("lastmq broker not empty, m_sendWhichQueue:%d, pos:%d", + m_sendWhichQueue.load(boost::memory_order_acquire), pos); + if (mq.getBrokerName().compare(lastmq.getBrokerName()) != 0) { + mq_index = pos; + return mq; + } + ++pos; + } + LOG_ERROR("could not find property mq"); + return MQMessageQueue(); + } else { + if (m_sendWhichQueue.load(boost::memory_order_acquire) == + numeric_limits<int>::max()) { + m_sendWhichQueue.store(0, boost::memory_order_release); + } + + ++m_sendWhichQueue; + LOG_DEBUG("lastmq broker empty, m_sendWhichQueue:%d, pos:%d", + m_sendWhichQueue.load(boost::memory_order_acquire), pos); + mq_index = pos; + return m_queues.at(pos); + } + } else { + LOG_ERROR("m_queues empty"); + return MQMessageQueue(); + } + } + + MQMessageQueue selectOneActiveMessageQueue(const MQMessageQueue& lastmq, + int& mq_index) { + boost::lock_guard<boost::mutex> lock(m_queuelock); + + if (m_queues.size() > 0) { + unsigned int pos = 0; + if (mq_index >= 0) { + pos = mq_index % m_queues.size(); + } else { + LOG_ERROR("mq_index is negative"); + return MQMessageQueue(); + } + if (!lastmq.getBrokerName().empty()) { + for (size_t i = 0; i < m_queues.size(); i++) { + if (m_sendWhichQueue.load(boost::memory_order_acquire) == + numeric_limits<int>::max()) { + m_sendWhichQueue.store(0, boost::memory_order_release); + } + + if (pos >= m_queues.size()) pos = pos % m_queues.size(); + + ++m_sendWhichQueue; + MQMessageQueue mq = m_queues.at(pos); + string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId()); + if ((mq.getBrokerName().compare(lastmq.getBrokerName()) != 0) && + (m_onSerivceQueues.find(key) != m_onSerivceQueues.end())) { + mq_index = pos; + return mq; + } + ++pos; + } + + for (MQMAP::iterator it = m_nonSerivceQueues.begin(); + it != m_nonSerivceQueues.end(); + ++it) { // if no MQMessageQueue(except lastmq) in + // m_onSerivceQueues, search m_nonSerivceQueues + if (it->second.getBrokerName().compare(lastmq.getBrokerName()) != 0) + return it->second; + } + LOG_ERROR("can not find property mq"); + return MQMessageQueue(); + } else { + for (size_t i = 0; i < m_queues.size(); i++) { + if (m_sendWhichQueue.load(boost::memory_order_acquire) == + numeric_limits<int>::max()) { + m_sendWhichQueue.store(0, boost::memory_order_release); + } + if (pos >= m_queues.size()) pos = pos % m_queues.size(); + + ++m_sendWhichQueue; + LOG_DEBUG("lastmq broker empty, m_sendWhichQueue:%d, pos:%d", + m_sendWhichQueue.load(boost::memory_order_acquire), pos); + mq_index = pos; + MQMessageQueue mq = m_queues.at(pos); + string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId()); + if (m_onSerivceQueues.find(key) != m_onSerivceQueues.end()) { + return mq; + } else { + ++pos; + } + } + + for (MQMAP::iterator it = m_nonSerivceQueues.begin(); + it != m_nonSerivceQueues.end(); + ++it) { // if no MQMessageQueue(except lastmq) in + // m_onSerivceQueues, search m_nonSerivceQueues + if (it->second.getBrokerName().compare(lastmq.getBrokerName()) != 0) + return it->second; + } + LOG_ERROR("can not find property mq"); + return MQMessageQueue(); + } + } else { + LOG_ERROR("m_queues empty"); + return MQMessageQueue(); + } + } + + private: + boost::mutex m_queuelock; + typedef vector<MQMessageQueue> QueuesVec; + QueuesVec m_queues; + typedef map<string, MQMessageQueue> MQMAP; + MQMAP m_onSerivceQueues; + MQMAP m_nonSerivceQueues; + boost::atomic<int> m_sendWhichQueue; + map<MQMessageQueue, int64> m_brokerTimerMap; + boost::asio::io_service m_async_ioService; + boost::scoped_ptr<boost::thread> m_async_service_thread; +}; + +//<!*************************************************************************** +} //<!end namespace; + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/CommandHeader.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/protocol/CommandHeader.cpp b/rocketmq-cpp/src/protocol/CommandHeader.cpp new file mode 100644 index 0000000..366ac2e --- /dev/null +++ b/rocketmq-cpp/src/protocol/CommandHeader.cpp @@ -0,0 +1,592 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "CommandHeader.h" +#include <cstdlib> +#include <sstream> +#include "Logging.h" +#include "UtilAll.h" + +namespace rocketmq { +//<!************************************************************************ +void GetRouteInfoRequestHeader::Encode(Json::Value& outData) { + outData["topic"] = topic; +} + +void GetRouteInfoRequestHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("topic", topic)); +} +//<!*************************************************************************** +void UnregisterClientRequestHeader::Encode(Json::Value& outData) { + outData["clientID"] = clientID; + outData["producerGroup"] = producerGroup; + outData["consumerGroup"] = consumerGroup; +} + +void UnregisterClientRequestHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("clientID", clientID)); + requestMap.insert(pair<string, string>("producerGroup", producerGroup)); + requestMap.insert(pair<string, string>("consumerGroup", consumerGroup)); +} +//<!************************************************************************ +void CreateTopicRequestHeader::Encode(Json::Value& outData) { + outData["topic"] = topic; + outData["defaultTopic"] = defaultTopic; + outData["readQueueNums"] = readQueueNums; + outData["writeQueueNums"] = writeQueueNums; + outData["perm"] = perm; + outData["topicFilterType"] = topicFilterType; +} +void CreateTopicRequestHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("topic", topic)); + requestMap.insert(pair<string, string>("defaultTopic", defaultTopic)); + requestMap.insert( + pair<string, string>("readQueueNums", UtilAll::to_string(readQueueNums))); + requestMap.insert(pair<string, string>("writeQueueNums", + UtilAll::to_string(writeQueueNums))); + requestMap.insert(pair<string, string>("perm", UtilAll::to_string(perm))); + requestMap.insert(pair<string, string>("topicFilterType", topicFilterType)); +} + +//<!************************************************************************ +void SendMessageRequestHeader::Encode(Json::Value& outData) { + outData["producerGroup"] = producerGroup; + outData["topic"] = topic; + outData["defaultTopic"] = defaultTopic; + outData["defaultTopicQueueNums"] = defaultTopicQueueNums; + outData["queueId"] = queueId; + outData["sysFlag"] = sysFlag; + outData["bornTimestamp"] = UtilAll::to_string(bornTimestamp); + outData["flag"] = flag; + outData["properties"] = properties; +#ifdef ONS + outData["reconsumeTimes"] = UtilAll::to_string(reconsumeTimes); + outData["unitMode"] = UtilAll::to_string(unitMode); +#endif +} + +int SendMessageRequestHeader::getReconsumeTimes() { return reconsumeTimes; } + +void SendMessageRequestHeader::setReconsumeTimes(int input_reconsumeTimes) { + reconsumeTimes = input_reconsumeTimes; +} + +void SendMessageRequestHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + LOG_DEBUG( + "SendMessageRequestHeader producerGroup is:%s,topic is:%s, defaulttopic " + "is:%s, properties is:%s,UtilAll::to_string( defaultTopicQueueNums) " + "is:%s,UtilAll::to_string( queueId):%s, UtilAll::to_string( sysFlag) " + "is:%s, UtilAll::to_string( bornTimestamp) is:%s,UtilAll::to_string( " + "flag) is:%s", + producerGroup.c_str(), topic.c_str(), defaultTopic.c_str(), + properties.c_str(), UtilAll::to_string(defaultTopicQueueNums).c_str(), + UtilAll::to_string(queueId).c_str(), UtilAll::to_string(sysFlag).c_str(), + UtilAll::to_string(bornTimestamp).c_str(), + UtilAll::to_string(flag).c_str()); + + requestMap.insert(pair<string, string>("producerGroup", producerGroup)); + requestMap.insert(pair<string, string>("topic", topic)); + requestMap.insert(pair<string, string>("defaultTopic", defaultTopic)); + requestMap.insert(pair<string, string>( + "defaultTopicQueueNums", UtilAll::to_string(defaultTopicQueueNums))); + requestMap.insert( + pair<string, string>("queueId", UtilAll::to_string(queueId))); + requestMap.insert( + pair<string, string>("sysFlag", UtilAll::to_string(sysFlag))); + requestMap.insert( + pair<string, string>("bornTimestamp", UtilAll::to_string(bornTimestamp))); + requestMap.insert(pair<string, string>("flag", UtilAll::to_string(flag))); + requestMap.insert(pair<string, string>("properties", properties)); +#ifdef ONS + requestMap.insert(pair<string, string>("reconsumeTimes", + UtilAll::to_string(reconsumeTimes))); + requestMap.insert( + pair<string, string>("unitMode", UtilAll::to_string(unitMode))); +#endif +} + +//<!************************************************************************ +CommandHeader* SendMessageResponseHeader::Decode(Json::Value& ext) { + SendMessageResponseHeader* h = new SendMessageResponseHeader(); + + Json::Value& tempValue = ext["msgId"]; + if (tempValue.isString()) { + h->msgId = tempValue.asString(); + } + + tempValue = ext["queueId"]; + if (tempValue.isString()) { + h->queueId = atoi(tempValue.asCString()); + } + + tempValue = ext["queueOffset"]; + if (tempValue.isString()) { + h->queueOffset = UtilAll::str2ll(tempValue.asCString()); + } + return h; +} + +void SendMessageResponseHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("msgId", msgId)); + requestMap.insert( + pair<string, string>("queueId", UtilAll::to_string(queueId))); + requestMap.insert( + pair<string, string>("queueOffset", UtilAll::to_string(queueOffset))); +} +//<!************************************************************************ +void PullMessageRequestHeader::Encode(Json::Value& outData) { + outData["consumerGroup"] = consumerGroup; + outData["topic"] = topic; + outData["queueId"] = queueId; + outData["queueOffset"] = UtilAll::to_string(queueOffset); + ; + outData["maxMsgNums"] = maxMsgNums; + outData["sysFlag"] = sysFlag; + outData["commitOffset"] = UtilAll::to_string(commitOffset); + ; + outData["subVersion"] = UtilAll::to_string(subVersion); + ; + outData["suspendTimeoutMillis"] = UtilAll::to_string(suspendTimeoutMillis); + ; + outData["subscription"] = subscription; +} + +void PullMessageRequestHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("consumerGroup", consumerGroup)); + requestMap.insert(pair<string, string>("topic", topic)); + requestMap.insert( + pair<string, string>("queueId", UtilAll::to_string(queueId))); + requestMap.insert( + pair<string, string>("queueOffset", UtilAll::to_string(queueOffset))); + requestMap.insert( + pair<string, string>("maxMsgNums", UtilAll::to_string(maxMsgNums))); + requestMap.insert( + pair<string, string>("sysFlag", UtilAll::to_string(sysFlag))); + requestMap.insert( + pair<string, string>("commitOffset", UtilAll::to_string(commitOffset))); + requestMap.insert( + pair<string, string>("subVersion", UtilAll::to_string(subVersion))); + requestMap.insert(pair<string, string>( + "suspendTimeoutMillis", UtilAll::to_string(suspendTimeoutMillis))); + requestMap.insert(pair<string, string>("subscription", subscription)); +} +//<!************************************************************************ +CommandHeader* PullMessageResponseHeader::Decode(Json::Value& ext) { + PullMessageResponseHeader* h = new PullMessageResponseHeader(); + + Json::Value& tempValue = ext["suggestWhichBrokerId"]; + if (tempValue.isString()) { + h->suggestWhichBrokerId = UtilAll::str2ll(tempValue.asCString()); + } + + tempValue = ext["nextBeginOffset"]; + if (tempValue.isString()) { + h->nextBeginOffset = UtilAll::str2ll(tempValue.asCString()); + } + + tempValue = ext["minOffset"]; + if (tempValue.isString()) { + h->minOffset = UtilAll::str2ll(tempValue.asCString()); + } + + tempValue = ext["maxOffset"]; + if (tempValue.isString()) { + h->maxOffset = UtilAll::str2ll(tempValue.asCString()); + } + + return h; +} + +void PullMessageResponseHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>( + "suggestWhichBrokerId", UtilAll::to_string(suggestWhichBrokerId))); + requestMap.insert(pair<string, string>("nextBeginOffset", + UtilAll::to_string(nextBeginOffset))); + requestMap.insert( + pair<string, string>("minOffset", UtilAll::to_string(minOffset))); + requestMap.insert( + pair<string, string>("maxOffset", UtilAll::to_string(maxOffset))); +} +//<!************************************************************************ +void GetConsumerListByGroupResponseHeader::Encode(Json::Value& outData) { + // outData = "{}"; +} + +void GetConsumerListByGroupResponseHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) {} +//<!*************************************************************************** +void GetMinOffsetRequestHeader::Encode(Json::Value& outData) { + outData["topic"] = topic; + outData["queueId"] = queueId; +} + +void GetMinOffsetRequestHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("topic", topic)); + requestMap.insert( + pair<string, string>("queueId", UtilAll::to_string(queueId))); +} +//<!*************************************************************************** +CommandHeader* GetMinOffsetResponseHeader::Decode(Json::Value& ext) { + GetMinOffsetResponseHeader* h = new GetMinOffsetResponseHeader(); + + Json::Value& tempValue = ext["offset"]; + if (tempValue.isString()) { + h->offset = UtilAll::str2ll(tempValue.asCString()); + } + return h; +} + +void GetMinOffsetResponseHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset))); +} +//<!*************************************************************************** +void GetMaxOffsetRequestHeader::Encode(Json::Value& outData) { + outData["topic"] = topic; + outData["queueId"] = queueId; +} + +void GetMaxOffsetRequestHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("topic", topic)); + requestMap.insert( + pair<string, string>("queueId", UtilAll::to_string(queueId))); +} +//<!*************************************************************************** +CommandHeader* GetMaxOffsetResponseHeader::Decode(Json::Value& ext) { + GetMaxOffsetResponseHeader* h = new GetMaxOffsetResponseHeader(); + + Json::Value& tempValue = ext["offset"]; + if (tempValue.isString()) { + h->offset = UtilAll::str2ll(tempValue.asCString()); + } + return h; +} + +void GetMaxOffsetResponseHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset))); +} +//<!*************************************************************************** +void SearchOffsetRequestHeader::Encode(Json::Value& outData) { + outData["topic"] = topic; + outData["queueId"] = queueId; + outData["timestamp"] = UtilAll::to_string(timestamp); +} + +void SearchOffsetRequestHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("topic", topic)); + requestMap.insert( + pair<string, string>("queueId", UtilAll::to_string(queueId))); + requestMap.insert( + pair<string, string>("timestamp", UtilAll::to_string(timestamp))); +} +//<!*************************************************************************** +CommandHeader* SearchOffsetResponseHeader::Decode(Json::Value& ext) { + SearchOffsetResponseHeader* h = new SearchOffsetResponseHeader(); + + Json::Value& tempValue = ext["offset"]; + if (tempValue.isString()) { + h->offset = UtilAll::str2ll(tempValue.asCString()); + } + return h; +} + +void SearchOffsetResponseHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset))); +} +//<!*************************************************************************** +void ViewMessageRequestHeader::Encode(Json::Value& outData) { + outData["offset"] = UtilAll::to_string(offset); +} + +void ViewMessageRequestHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset))); +} +//<!*************************************************************************** +void GetEarliestMsgStoretimeRequestHeader::Encode(Json::Value& outData) { + outData["topic"] = topic; + outData["queueId"] = queueId; +} + +void GetEarliestMsgStoretimeRequestHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("topic", topic)); + requestMap.insert( + pair<string, string>("queueId", UtilAll::to_string(queueId))); +} +//<!*************************************************************************** +CommandHeader* GetEarliestMsgStoretimeResponseHeader::Decode( + Json::Value& ext) { + GetEarliestMsgStoretimeResponseHeader* h = + new GetEarliestMsgStoretimeResponseHeader(); + + Json::Value& tempValue = ext["timestamp"]; + if (tempValue.isString()) { + h->timestamp = UtilAll::str2ll(tempValue.asCString()); + } + return h; +} + +void GetEarliestMsgStoretimeResponseHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert( + pair<string, string>("timestamp", UtilAll::to_string(timestamp))); +} +//<!*************************************************************************** +void GetConsumerListByGroupRequestHeader::Encode(Json::Value& outData) { + outData["consumerGroup"] = consumerGroup; +} + +void GetConsumerListByGroupRequestHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("consumerGroup", consumerGroup)); +} +//<!*************************************************************************** +void QueryConsumerOffsetRequestHeader::Encode(Json::Value& outData) { + outData["consumerGroup"] = consumerGroup; + outData["topic"] = topic; + outData["queueId"] = queueId; +} + +void QueryConsumerOffsetRequestHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("consumerGroup", consumerGroup)); + requestMap.insert(pair<string, string>("topic", topic)); + requestMap.insert( + pair<string, string>("queueId", UtilAll::to_string(queueId))); +} +//<!*************************************************************************** +CommandHeader* QueryConsumerOffsetResponseHeader::Decode( + Json::Value& ext) { + QueryConsumerOffsetResponseHeader* h = + new QueryConsumerOffsetResponseHeader(); + Json::Value& tempValue = ext["offset"]; + if (tempValue.isString()) { + h->offset = UtilAll::str2ll(tempValue.asCString()); + } + return h; +} + +void QueryConsumerOffsetResponseHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset))); +} +//<!*************************************************************************** +void UpdateConsumerOffsetRequestHeader::Encode(Json::Value& outData) { + outData["consumerGroup"] = consumerGroup; + outData["topic"] = topic; + outData["queueId"] = queueId; + outData["commitOffset"] = UtilAll::to_string(commitOffset); +} + +void UpdateConsumerOffsetRequestHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("consumerGroup", consumerGroup)); + requestMap.insert(pair<string, string>("topic", topic)); + requestMap.insert( + pair<string, string>("queueId", UtilAll::to_string(queueId))); + requestMap.insert( + pair<string, string>("commitOffset", UtilAll::to_string(commitOffset))); +} +//<!*************************************************************************** +void ConsumerSendMsgBackRequestHeader::Encode(Json::Value& outData) { + outData["group"] = group; + outData["delayLevel"] = delayLevel; + outData["offset"] = UtilAll::to_string(offset); +#ifdef ONS + outData["originMsgId"] = originMsgId; + outData["originTopic"] = originTopic; +#endif +} + +void ConsumerSendMsgBackRequestHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("group", group)); + requestMap.insert( + pair<string, string>("delayLevel", UtilAll::to_string(delayLevel))); + requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset))); +} +//<!*************************************************************************** +void GetConsumerListByGroupResponseBody::Decode(const MemoryBlock* mem, + vector<string>& cids) { + cids.clear(); + //<! decode; + const char* const pData = static_cast<const char*>(mem->getData()); + + Json::Reader reader; + Json::Value root; + if (!reader.parse(pData, root)) { + LOG_ERROR("GetConsumerListByGroupResponse error"); + return; + } + + Json::Value ids = root["consumerIdList"]; + for (unsigned int i = 0; i < ids.size(); i++) { + if (ids[i].isString()) { + cids.push_back(ids[i].asString()); + } + } +} + +void GetConsumerListByGroupResponseBody::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) {} + +void ResetOffsetRequestHeader::setTopic(const string& tmp) { topic = tmp; } + +void ResetOffsetRequestHeader::setGroup(const string& tmp) { group = tmp; } + +void ResetOffsetRequestHeader::setTimeStamp(const int64& tmp) { + timestamp = tmp; +} + +void ResetOffsetRequestHeader::setForceFlag(const bool& tmp) { isForce = tmp; } + +const string ResetOffsetRequestHeader::getTopic() const { return topic; } + +const string ResetOffsetRequestHeader::getGroup() const { return group; } + +const int64 ResetOffsetRequestHeader::getTimeStamp() const { return timestamp; } + +const bool ResetOffsetRequestHeader::getForceFlag() const { return isForce; } + +CommandHeader* ResetOffsetRequestHeader::Decode(Json::Value& ext) { + ResetOffsetRequestHeader* h = new ResetOffsetRequestHeader(); + + Json::Value& tempValue = ext["topic"]; + if (tempValue.isString()) { + h->topic = tempValue.asString(); + } + + tempValue = ext["group"]; + if (tempValue.isString()) { + h->group = tempValue.asString(); + } + + tempValue = ext["timestamp"]; + if (tempValue.isString()) { + h->timestamp = UtilAll::str2ll(tempValue.asCString()); + } + + tempValue = ext["isForce"]; + if (tempValue.isString()) { + h->isForce = UtilAll::to_bool(tempValue.asCString()); + } + LOG_INFO("topic:%s, group:%s, timestamp:%lld, isForce:%d,isForce:%s", + h->topic.c_str(), h->group.c_str(), h->timestamp, h->isForce, + tempValue.asCString()); + return h; +} + +CommandHeader* GetConsumerRunningInfoRequestHeader::Decode( + Json::Value& ext) { + GetConsumerRunningInfoRequestHeader* h = + new GetConsumerRunningInfoRequestHeader(); + + Json::Value& tempValue = ext["consumerGroup"]; + if (tempValue.isString()) { + h->consumerGroup = tempValue.asString(); + } + + tempValue = ext["clientId"]; + if (tempValue.isString()) { + h->clientId = tempValue.asString(); + } + + tempValue = ext["jstackEnable"]; + if (tempValue.isString()) { + h->jstackEnable = UtilAll::to_bool(tempValue.asCString()); + } + LOG_INFO("consumerGroup:%s, clientId:%s, jstackEnable:%d", + h->consumerGroup.c_str(), h->clientId.c_str(), h->jstackEnable); + return h; +} + +void GetConsumerRunningInfoRequestHeader::Encode(Json::Value& outData) { + outData["consumerGroup"] = consumerGroup; + outData["clientId"] = clientId; + outData["jstackEnable"] = jstackEnable; +} + +void GetConsumerRunningInfoRequestHeader::SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) { + requestMap.insert(pair<string, string>("consumerGroup", consumerGroup)); + requestMap.insert(pair<string, string>("clientId", clientId)); + requestMap.insert( + pair<string, string>("jstackEnable", UtilAll::to_string(jstackEnable))); +} + +const string GetConsumerRunningInfoRequestHeader::getConsumerGroup() const { + return consumerGroup; +} + +void GetConsumerRunningInfoRequestHeader::setConsumerGroup( + const string& Group) { + consumerGroup = Group; +} + +const string GetConsumerRunningInfoRequestHeader::getClientId() const { + return clientId; +} + +void GetConsumerRunningInfoRequestHeader::setClientId( + const string& input_clientId) { + clientId = input_clientId; +} + +const bool GetConsumerRunningInfoRequestHeader::isJstackEnable() const { + return jstackEnable; +} + +void GetConsumerRunningInfoRequestHeader::setJstackEnable( + const bool& input_jstackEnable) { + jstackEnable = input_jstackEnable; +} + +CommandHeader* NotifyConsumerIdsChangedRequestHeader::Decode( + Json::Value& ext) { + NotifyConsumerIdsChangedRequestHeader* h = + new NotifyConsumerIdsChangedRequestHeader(); + + Json::Value& tempValue = ext["consumerGroup"]; + if (tempValue.isString()) { + h->consumerGroup = tempValue.asString(); + } + + return h; +} + +void NotifyConsumerIdsChangedRequestHeader::setGroup(const string& tmp) { + consumerGroup = tmp; +} +const string NotifyConsumerIdsChangedRequestHeader::getGroup() const { + return consumerGroup; +} + +//<!************************************************************************ +} //<!end namespace; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/CommandHeader.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/protocol/CommandHeader.h b/rocketmq-cpp/src/protocol/CommandHeader.h new file mode 100644 index 0000000..5a55c55 --- /dev/null +++ b/rocketmq-cpp/src/protocol/CommandHeader.h @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __COMMANDCUSTOMHEADER_H__ +#define __COMMANDCUSTOMHEADER_H__ + +#include <string> +#include "MQClientException.h" +#include "MessageSysFlag.h" +#include "UtilAll.h" +#include "dataBlock.h" +#include "json/json.h" + +namespace rocketmq { +//<!*************************************************************************** + +class CommandHeader { + public: + virtual ~CommandHeader() {} + virtual void Encode(Json::Value& outData) {} + virtual void SetDeclaredFieldOfCommandHeader( + map<string, string>& requestMap) {} +}; + +//<!************************************************************************ +class GetRouteInfoRequestHeader : public CommandHeader { + public: + GetRouteInfoRequestHeader(const string& top) : topic(top) {} + virtual ~GetRouteInfoRequestHeader() {} + virtual void Encode(Json::Value& outData); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + private: + string topic; +}; + +//<!************************************************************************ +class UnregisterClientRequestHeader : public CommandHeader { + public: + UnregisterClientRequestHeader(string cID, string proGroup, string conGroup) + : clientID(cID), producerGroup(proGroup), consumerGroup(conGroup) {} + virtual ~UnregisterClientRequestHeader() {} + virtual void Encode(Json::Value& outData); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + private: + string clientID; + string producerGroup; + string consumerGroup; +}; + +//<!************************************************************************ +class CreateTopicRequestHeader : public CommandHeader { + public: + CreateTopicRequestHeader() : readQueueNums(0), writeQueueNums(0), perm(0) {} + virtual ~CreateTopicRequestHeader() {} + virtual void Encode(Json::Value& outData); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + string topic; + string defaultTopic; + int readQueueNums; + int writeQueueNums; + int perm; + string topicFilterType; +}; + +//<!************************************************************************ +class SendMessageRequestHeader : public CommandHeader { + public: + SendMessageRequestHeader() + : defaultTopicQueueNums(0), + queueId(0), + sysFlag(0), + bornTimestamp(0), + flag(0), + reconsumeTimes(0), + unitMode(false) {} + virtual ~SendMessageRequestHeader() {} + virtual void Encode(Json::Value& outData); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + int getReconsumeTimes(); + void setReconsumeTimes(int input_reconsumeTimes); + + public: + string producerGroup; + string topic; + string defaultTopic; + int defaultTopicQueueNums; + int queueId; + int sysFlag; + int64 bornTimestamp; + int flag; + string properties; + int reconsumeTimes; + bool unitMode; +}; + +//<!************************************************************************ +class SendMessageResponseHeader : public CommandHeader { + public: + SendMessageResponseHeader() : queueId(0), queueOffset(0) { msgId.clear(); } + virtual ~SendMessageResponseHeader() {} + static CommandHeader* Decode(Json::Value& ext); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + string msgId; + int queueId; + int64 queueOffset; +}; + +//<!************************************************************************ +class PullMessageRequestHeader : public CommandHeader { + public: + PullMessageRequestHeader() + : queueId(0), + maxMsgNums(0), + sysFlag(0), + queueOffset(0), + commitOffset(0), + suspendTimeoutMillis(0), + subVersion(0) {} + virtual ~PullMessageRequestHeader() {} + virtual void Encode(Json::Value& outData); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + string consumerGroup; + string topic; + int queueId; + int maxMsgNums; + int sysFlag; + string subscription; + int64 queueOffset; + int64 commitOffset; + int64 suspendTimeoutMillis; + int64 subVersion; +}; + +//<!************************************************************************ +class PullMessageResponseHeader : public CommandHeader { + public: + PullMessageResponseHeader() + : suggestWhichBrokerId(0), + nextBeginOffset(0), + minOffset(0), + maxOffset(0) {} + virtual ~PullMessageResponseHeader() {} + static CommandHeader* Decode(Json::Value& ext); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + int64 suggestWhichBrokerId; + int64 nextBeginOffset; + int64 minOffset; + int64 maxOffset; +}; + +//<!************************************************************************ +class GetConsumerListByGroupResponseHeader : public CommandHeader { + public: + GetConsumerListByGroupResponseHeader() {} + virtual ~GetConsumerListByGroupResponseHeader() {} + virtual void Encode(Json::Value& outData); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); +}; + +//<!*************************************************************************** +class GetMinOffsetRequestHeader : public CommandHeader { + public: + GetMinOffsetRequestHeader() : queueId(0){}; + virtual ~GetMinOffsetRequestHeader() {} + virtual void Encode(Json::Value& outData); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + string topic; + int queueId; +}; + +//<!*************************************************************************** +class GetMinOffsetResponseHeader : public CommandHeader { + public: + GetMinOffsetResponseHeader() : offset(0){}; + virtual ~GetMinOffsetResponseHeader() {} + static CommandHeader* Decode(Json::Value& ext); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + int64 offset; +}; + +//<!*************************************************************************** +class GetMaxOffsetRequestHeader : public CommandHeader { + public: + GetMaxOffsetRequestHeader() : queueId(0){}; + virtual ~GetMaxOffsetRequestHeader() {} + virtual void Encode(Json::Value& outData); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + string topic; + int queueId; +}; + +//<!*************************************************************************** +class GetMaxOffsetResponseHeader : public CommandHeader { + public: + GetMaxOffsetResponseHeader() : offset(0){}; + virtual ~GetMaxOffsetResponseHeader() {} + static CommandHeader* Decode(Json::Value& ext); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + int64 offset; +}; + +//<!*************************************************************************** +class SearchOffsetRequestHeader : public CommandHeader { + public: + SearchOffsetRequestHeader() : queueId(0), timestamp(0){}; + virtual ~SearchOffsetRequestHeader() {} + virtual void Encode(Json::Value& outData); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + string topic; + int queueId; + int64 timestamp; +}; + +//<!*************************************************************************** +class SearchOffsetResponseHeader : public CommandHeader { + public: + SearchOffsetResponseHeader() : offset(0){}; + virtual ~SearchOffsetResponseHeader() {} + static CommandHeader* Decode(Json::Value& ext); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + int64 offset; +}; + +//<!*************************************************************************** +class ViewMessageRequestHeader : public CommandHeader { + public: + ViewMessageRequestHeader() : offset(0){}; + virtual ~ViewMessageRequestHeader() {} + virtual void Encode(Json::Value& outData); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + int64 offset; +}; + +//<!*************************************************************************** +class GetEarliestMsgStoretimeRequestHeader : public CommandHeader { + public: + GetEarliestMsgStoretimeRequestHeader() : queueId(0){}; + virtual ~GetEarliestMsgStoretimeRequestHeader() {} + virtual void Encode(Json::Value& outData); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + string topic; + int queueId; +}; + +//<!*************************************************************************** +class GetEarliestMsgStoretimeResponseHeader : public CommandHeader { + public: + GetEarliestMsgStoretimeResponseHeader() : timestamp(0){}; + virtual ~GetEarliestMsgStoretimeResponseHeader() {} + static CommandHeader* Decode(Json::Value& ext); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + int64 timestamp; +}; + +//<!*************************************************************************** +class GetConsumerListByGroupRequestHeader : public CommandHeader { + public: + GetConsumerListByGroupRequestHeader(){}; + virtual ~GetConsumerListByGroupRequestHeader() {} + virtual void Encode(Json::Value& outData); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + string consumerGroup; +}; + +//<!************************************************************************ +class QueryConsumerOffsetRequestHeader : public CommandHeader { + public: + QueryConsumerOffsetRequestHeader() : queueId(0){}; + virtual ~QueryConsumerOffsetRequestHeader() {} + virtual void Encode(Json::Value& outData); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + string consumerGroup; + string topic; + int queueId; +}; + +//<!************************************************************************ +class QueryConsumerOffsetResponseHeader : public CommandHeader { + public: + QueryConsumerOffsetResponseHeader() : offset(0){}; + virtual ~QueryConsumerOffsetResponseHeader() {} + static CommandHeader* Decode(Json::Value& ext); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + int64 offset; +}; + +//<!************************************************************************ +class UpdateConsumerOffsetRequestHeader : public CommandHeader { + public: + UpdateConsumerOffsetRequestHeader() : queueId(0), commitOffset(0){}; + virtual ~UpdateConsumerOffsetRequestHeader() {} + virtual void Encode(Json::Value& outData); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + string consumerGroup; + string topic; + int queueId; + int64 commitOffset; +}; + +//<!*************************************************************************** +class ConsumerSendMsgBackRequestHeader : public CommandHeader { + public: + ConsumerSendMsgBackRequestHeader() : delayLevel(0), offset(0){}; + virtual ~ConsumerSendMsgBackRequestHeader() {} + virtual void Encode(Json::Value& outData); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + string group; + int delayLevel; + int64 offset; +}; + +//<!*************************************************************************** +class GetConsumerListByGroupResponseBody { + public: + GetConsumerListByGroupResponseBody(){}; + virtual ~GetConsumerListByGroupResponseBody() {} + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + + public: + static void Decode(const MemoryBlock* mem, vector<string>& cids); +}; + +class ResetOffsetRequestHeader : public CommandHeader { + public: + ResetOffsetRequestHeader() {} + ~ResetOffsetRequestHeader() {} + static CommandHeader* Decode(Json::Value& ext); + void setTopic(const string& tmp); + void setGroup(const string& tmp); + void setTimeStamp(const int64& tmp); + void setForceFlag(const bool& tmp); + const string getTopic() const; + const string getGroup() const; + const int64 getTimeStamp() const; + const bool getForceFlag() const; + + private: + string topic; + string group; + int64 timestamp; + bool isForce; +}; + +class GetConsumerRunningInfoRequestHeader : public CommandHeader { + public: + GetConsumerRunningInfoRequestHeader() {} + virtual ~GetConsumerRunningInfoRequestHeader() {} + virtual void Encode(Json::Value& outData); + virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap); + static CommandHeader* Decode(Json::Value& ext); + const string getConsumerGroup() const; + void setConsumerGroup(const string& consumerGroup); + const string getClientId() const; + void setClientId(const string& clientId); + const bool isJstackEnable() const; + void setJstackEnable(const bool& jstackEnable); + + private: + string consumerGroup; + string clientId; + bool jstackEnable; +}; + +class NotifyConsumerIdsChangedRequestHeader : public CommandHeader { + public: + NotifyConsumerIdsChangedRequestHeader() {} + virtual ~NotifyConsumerIdsChangedRequestHeader() {} + static CommandHeader* Decode(Json::Value& ext); + void setGroup(const string& tmp); + const string getGroup() const; + + private: + string consumerGroup; +}; + +//<!*************************************************************************** +} //<!end namespace; + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/ConsumerRunningInfo.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/protocol/ConsumerRunningInfo.cpp b/rocketmq-cpp/src/protocol/ConsumerRunningInfo.cpp new file mode 100644 index 0000000..10ac0aa --- /dev/null +++ b/rocketmq-cpp/src/protocol/ConsumerRunningInfo.cpp @@ -0,0 +1,109 @@ +#include "ConsumerRunningInfo.h" +#include "UtilAll.h" + +namespace rocketmq { +const string ConsumerRunningInfo::PROP_NAMESERVER_ADDR = "PROP_NAMESERVER_ADDR"; +const string ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE = + "PROP_THREADPOOL_CORE_SIZE"; +const string ConsumerRunningInfo::PROP_CONSUME_ORDERLY = "PROP_CONSUMEORDERLY"; +const string ConsumerRunningInfo::PROP_CONSUME_TYPE = "PROP_CONSUME_TYPE"; +const string ConsumerRunningInfo::PROP_CLIENT_VERSION = "PROP_CLIENT_VERSION"; +const string ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP = + "PROP_CONSUMER_START_TIMESTAMP"; + +const map<string, string> ConsumerRunningInfo::getProperties() const { + return properties; +} + +void ConsumerRunningInfo::setProperties( + const map<string, string>& input_properties) { + properties = input_properties; +} + +void ConsumerRunningInfo::setProperty(const string& key, const string& value) { + properties[key] = value; +} + +const map<MessageQueue, ProcessQueueInfo> ConsumerRunningInfo::getMqTable() + const { + return mqTable; +} + +void ConsumerRunningInfo::setMqTable(MessageQueue queue, + ProcessQueueInfo queueInfo) { + mqTable[queue] = queueInfo; +} + +/*const map<string, ConsumeStatus> ConsumerRunningInfo::getStatusTable() const +{ +return statusTable; +} + + +void ConsumerRunningInfo::setStatusTable(const map<string, ConsumeStatus>& +input_statusTable) +{ +statusTable = input_statusTable; +} */ + +const vector<SubscriptionData> ConsumerRunningInfo::getSubscriptionSet() const { + return subscriptionSet; +} + +void ConsumerRunningInfo::setSubscriptionSet( + const vector<SubscriptionData>& input_subscriptionSet) { + subscriptionSet = input_subscriptionSet; +} + +const string ConsumerRunningInfo::getJstack() const { return jstack; } + +void ConsumerRunningInfo::setJstack(const string& input_jstack) { + jstack = input_jstack; +} + +string ConsumerRunningInfo::encode() { + Json::Value outData; + + outData[PROP_NAMESERVER_ADDR] = properties[PROP_NAMESERVER_ADDR]; + outData[PROP_CONSUME_TYPE] = properties[PROP_CONSUME_TYPE]; + outData[PROP_CLIENT_VERSION] = properties[PROP_CLIENT_VERSION]; + outData[PROP_CONSUMER_START_TIMESTAMP] = + properties[PROP_CONSUMER_START_TIMESTAMP]; + outData[PROP_CONSUME_ORDERLY] = properties[PROP_CONSUME_ORDERLY]; + outData[PROP_THREADPOOL_CORE_SIZE] = properties[PROP_THREADPOOL_CORE_SIZE]; + + Json::Value root; + root["jstack"] = jstack; + root["properties"] = outData; + + { + vector<SubscriptionData>::const_iterator it = subscriptionSet.begin(); + for (; it != subscriptionSet.end(); it++) { + root["subscriptionSet"].append(it->toJson()); + } + } + + Json::FastWriter fastwrite; + string finals = fastwrite.write(root); + + Json::Value mq; + string key = "\"mqTable\":"; + key.append("{"); + for (map<MessageQueue, ProcessQueueInfo>::iterator it = mqTable.begin(); + it != mqTable.end(); ++it) { + key.append((it->first).toJson().toStyledString()); + key.erase(key.end() - 1); + key.append(":"); + key.append((it->second).toJson().toStyledString()); + key.append(","); + } + key.erase(key.end() - 1); + key.append("}"); + + // insert mqTable to final string + key.append(","); + finals.insert(1, key); + + return finals; +} +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/ConsumerRunningInfo.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/protocol/ConsumerRunningInfo.h b/rocketmq-cpp/src/protocol/ConsumerRunningInfo.h new file mode 100644 index 0000000..6467ad5 --- /dev/null +++ b/rocketmq-cpp/src/protocol/ConsumerRunningInfo.h @@ -0,0 +1,50 @@ +#ifndef __CONSUMERRUNNINGINFO_H__ +#define __CONSUMERRUNNINGINFO_H__ + +#include "MessageQueue.h" +#include "ProcessQueueInfo.h" +#include "SubscriptionData.h" + +namespace rocketmq { + +class ConsumerRunningInfo { + public: + ConsumerRunningInfo() {} + virtual ~ConsumerRunningInfo() { + properties.clear(); + mqTable.clear(); + subscriptionSet.clear(); + } + + public: + static const string PROP_NAMESERVER_ADDR; + static const string PROP_THREADPOOL_CORE_SIZE; + static const string PROP_CONSUME_ORDERLY; + static const string PROP_CONSUME_TYPE; + static const string PROP_CLIENT_VERSION; + static const string PROP_CONSUMER_START_TIMESTAMP; + + public: + const map<string, string> getProperties() const; + void setProperties(const map<string, string>& input_properties); + void setProperty(const string& key, const string& value); + const map<MessageQueue, ProcessQueueInfo> getMqTable() const; + void setMqTable(MessageQueue queue, ProcessQueueInfo queueInfo); + // const map<string, ConsumeStatus> getStatusTable() const; + // void setStatusTable(const map<string, ConsumeStatus>& input_statusTable) ; + const vector<SubscriptionData> getSubscriptionSet() const; + void setSubscriptionSet( + const vector<SubscriptionData>& input_subscriptionSet); + const string getJstack() const; + void setJstack(const string& input_jstack); + string encode(); + + private: + map<string, string> properties; + vector<SubscriptionData> subscriptionSet; + map<MessageQueue, ProcessQueueInfo> mqTable; + // map<string, ConsumeStatus> statusTable; + string jstack; +}; +} +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/HeartbeatData.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/protocol/HeartbeatData.h b/rocketmq-cpp/src/protocol/HeartbeatData.h new file mode 100755 index 0000000..9b74280 --- /dev/null +++ b/rocketmq-cpp/src/protocol/HeartbeatData.h @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __HEARTBEATDATA_H__ +#define __HEARTBEATDATA_H__ +#include <boost/thread/thread.hpp> +#include <cstdlib> +#include <string> +#include <vector> +#include "ConsumeType.h" +#include "SubscriptionData.h" + +namespace rocketmq { +//<!*************************************************************************** +class ProducerData { + public: + ProducerData(){}; + bool operator<(const ProducerData& pd) const { + return groupName < pd.groupName; + } + Json::Value toJson() const { + Json::Value outJson; + outJson["groupName"] = groupName; + return outJson; + } + + public: + string groupName; +}; + +//<!*************************************************************************** +class ConsumerData { + public: + ConsumerData(){}; + virtual ~ConsumerData() { subscriptionDataSet.clear(); } + bool operator<(const ConsumerData& cd) const { + return groupName < cd.groupName; + } + + Json::Value toJson() const { + Json::Value outJson; + outJson["groupName"] = groupName; + outJson["consumeFromWhere"] = consumeFromWhere; + outJson["consumeType"] = consumeType; + outJson["messageModel"] = messageModel; + + vector<SubscriptionData>::const_iterator it = subscriptionDataSet.begin(); + for (; it != subscriptionDataSet.end(); it++) { + outJson["subscriptionDataSet"].append((*it).toJson()); + } + + return outJson; + } + + public: + string groupName; + ConsumeType consumeType; + MessageModel messageModel; + ConsumeFromWhere consumeFromWhere; + vector<SubscriptionData> subscriptionDataSet; +}; + +//<!*************************************************************************** +class HeartbeatData { + public: + virtual ~HeartbeatData() { + m_producerDataSet.clear(); + m_consumerDataSet.clear(); + } + void Encode(string& outData) { + Json::Value root; + + //<!id; + root["clientID"] = m_clientID; + + //<!consumer; + { + boost::lock_guard<boost::mutex> lock(m_consumerDataMutex); + vector<ConsumerData>::iterator itc = m_consumerDataSet.begin(); + for (; itc != m_consumerDataSet.end(); itc++) { + root["consumerDataSet"].append((*itc).toJson()); + } + } + + //<!producer; + { + boost::lock_guard<boost::mutex> lock(m_producerDataMutex); + vector<ProducerData>::iterator itp = m_producerDataSet.begin(); + for (; itp != m_producerDataSet.end(); itp++) { + root["producerDataSet"].append((*itp).toJson()); + } + } + //<!output; + Json::FastWriter fastwrite; + outData = fastwrite.write(root); + } + + void setClientID(const string& clientID) { m_clientID = clientID; } + + bool isProducerDataSetEmpty() { + boost::lock_guard<boost::mutex> lock(m_producerDataMutex); + return m_producerDataSet.empty(); + } + + void insertDataToProducerDataSet(ProducerData& producerData) { + boost::lock_guard<boost::mutex> lock(m_producerDataMutex); + m_producerDataSet.push_back(producerData); + } + + bool isConsumerDataSetEmpty() { + boost::lock_guard<boost::mutex> lock(m_consumerDataMutex); + return m_consumerDataSet.empty(); + } + + void insertDataToConsumerDataSet(ConsumerData& consumerData) { + boost::lock_guard<boost::mutex> lock(m_consumerDataMutex); + m_consumerDataSet.push_back(consumerData); + } + + private: + string m_clientID; + vector<ProducerData> m_producerDataSet; + vector<ConsumerData> m_consumerDataSet; + boost::mutex m_producerDataMutex; + boost::mutex m_consumerDataMutex; +}; +} //<!end namespace; + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/KVTable.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/protocol/KVTable.h b/rocketmq-cpp/src/protocol/KVTable.h new file mode 100755 index 0000000..69191b7 --- /dev/null +++ b/rocketmq-cpp/src/protocol/KVTable.h @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __KVTABLE_H__ +#define __KVTABLE_H__ +#include <map> +#include <string> +#include "RemotingSerializable.h" + +namespace rocketmq { +//<!*************************************************************************** +class KVTable : public RemotingSerializable { + public: + virtual ~KVTable() { m_table.clear(); } + + void Encode(string& outData) {} + + const map<string, string>& getTable() { return m_table; } + + void setTable(const map<string, string>& table) { m_table = table; } + + private: + map<string, string> m_table; +}; +} //<!end namespace; + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/LockBatchBody.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/protocol/LockBatchBody.cpp b/rocketmq-cpp/src/protocol/LockBatchBody.cpp new file mode 100755 index 0000000..c56c17f --- /dev/null +++ b/rocketmq-cpp/src/protocol/LockBatchBody.cpp @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "LockBatchBody.h" +#include "Logging.h" +namespace rocketmq { //<!end namespace; + +string LockBatchRequestBody::getConsumerGroup() { return consumerGroup; } +void LockBatchRequestBody::setConsumerGroup(string in_consumerGroup) { + consumerGroup = in_consumerGroup; +} +string LockBatchRequestBody::getClientId() { return clientId; } +void LockBatchRequestBody::setClientId(string in_clientId) { + clientId = in_clientId; +} +vector<MQMessageQueue> LockBatchRequestBody::getMqSet() { return mqSet; } +void LockBatchRequestBody::setMqSet(vector<MQMessageQueue> in_mqSet) { + mqSet.swap(in_mqSet); +} +void LockBatchRequestBody::Encode(string& outData) { + Json::Value root; + root["consumerGroup"] = consumerGroup; + root["clientId"] = clientId; + + vector<MQMessageQueue>::const_iterator it = mqSet.begin(); + for (; it != mqSet.end(); it++) { + root["mqSet"].append(toJson(*it)); + } + + Json::FastWriter fastwrite; + outData = fastwrite.write(root); +} + +Json::Value LockBatchRequestBody::toJson(const MQMessageQueue& mq) const { + Json::Value outJson; + outJson["topic"] = mq.getTopic(); + outJson["brokerName"] = mq.getBrokerName(); + outJson["queueId"] = mq.getQueueId(); + return outJson; +} + +vector<MQMessageQueue> LockBatchResponseBody::getLockOKMQSet() { + return lockOKMQSet; +} +void LockBatchResponseBody::setLockOKMQSet( + vector<MQMessageQueue> in_lockOKMQSet) { + lockOKMQSet.swap(in_lockOKMQSet); +} + +void LockBatchResponseBody::Decode(const MemoryBlock* mem, + vector<MQMessageQueue>& messageQueues) { + messageQueues.clear(); + //<! decode; + const char* const pData = static_cast<const char*>(mem->getData()); + + Json::Reader reader; + Json::Value root; + if (!reader.parse(pData, root)) { + LOG_WARN("decode LockBatchResponseBody error"); + return; + } + + Json::Value mqs = root["lockOKMQSet"]; + LOG_DEBUG("LockBatchResponseBody mqs size:%d", mqs.size()); + for (unsigned int i = 0; i < mqs.size(); i++) { + MQMessageQueue mq; + Json::Value qd = mqs[i]; + mq.setTopic(qd["topic"].asString()); + mq.setBrokerName(qd["brokerName"].asString()); + mq.setQueueId(qd["queueId"].asInt()); + LOG_INFO("LockBatchResponseBody MQ:%s", mq.toString().c_str()); + messageQueues.push_back(mq); + } +} + +string UnlockBatchRequestBody::getConsumerGroup() { return consumerGroup; } +void UnlockBatchRequestBody::setConsumerGroup(string in_consumerGroup) { + consumerGroup = in_consumerGroup; +} +string UnlockBatchRequestBody::getClientId() { return clientId; } +void UnlockBatchRequestBody::setClientId(string in_clientId) { + clientId = in_clientId; +} +vector<MQMessageQueue> UnlockBatchRequestBody::getMqSet() { return mqSet; } +void UnlockBatchRequestBody::setMqSet(vector<MQMessageQueue> in_mqSet) { + mqSet.swap(in_mqSet); +} +void UnlockBatchRequestBody::Encode(string& outData) { + Json::Value root; + root["consumerGroup"] = consumerGroup; + root["clientId"] = clientId; + + vector<MQMessageQueue>::const_iterator it = mqSet.begin(); + for (; it != mqSet.end(); it++) { + root["mqSet"].append(toJson(*it)); + } + + Json::FastWriter fastwrite; + outData = fastwrite.write(root); +} + +Json::Value UnlockBatchRequestBody::toJson( + const MQMessageQueue& mq) const { + Json::Value outJson; + outJson["topic"] = mq.getTopic(); + outJson["brokerName"] = mq.getBrokerName(); + outJson["queueId"] = mq.getQueueId(); + return outJson; +} +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/LockBatchBody.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/protocol/LockBatchBody.h b/rocketmq-cpp/src/protocol/LockBatchBody.h new file mode 100755 index 0000000..c1d7155 --- /dev/null +++ b/rocketmq-cpp/src/protocol/LockBatchBody.h @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __LOCKBATCHBODY_H__ +#define __LOCKBATCHBODY_H__ +#include <set> +#include <string> +#include "MQMessageQueue.h" +#include "RemotingSerializable.h" +#include "dataBlock.h" +#include "json/json.h" +#include "UtilAll.h" + +namespace rocketmq { +//<!*************************************************************************** + +class LockBatchRequestBody { + public: + virtual ~LockBatchRequestBody() { mqSet.clear(); } + string getConsumerGroup(); + void setConsumerGroup(string consumerGroup); + string getClientId(); + void setClientId(string clientId); + vector<MQMessageQueue> getMqSet(); + void setMqSet(vector<MQMessageQueue> mqSet); + void Encode(string& outData); + Json::Value toJson(const MQMessageQueue& mq) const; + + private: + string consumerGroup; + string clientId; + vector<MQMessageQueue> mqSet; +}; + +class LockBatchResponseBody { + public: + virtual ~LockBatchResponseBody() { lockOKMQSet.clear(); } + vector<MQMessageQueue> getLockOKMQSet(); + void setLockOKMQSet(vector<MQMessageQueue> lockOKMQSet); + static void Decode(const MemoryBlock* mem, + vector<MQMessageQueue>& messageQueues); + + private: + vector<MQMessageQueue> lockOKMQSet; +}; + +class UnlockBatchRequestBody { + public: + virtual ~UnlockBatchRequestBody() { mqSet.clear(); } + string getConsumerGroup(); + void setConsumerGroup(string consumerGroup); + string getClientId(); + void setClientId(string clientId); + vector<MQMessageQueue> getMqSet(); + void setMqSet(vector<MQMessageQueue> mqSet); + void Encode(string& outData); + Json::Value toJson(const MQMessageQueue& mq) const; + + private: + string consumerGroup; + string clientId; + vector<MQMessageQueue> mqSet; +}; + +} //<!end namespace; +#endif