http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.cpp b/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.cpp deleted file mode 100755 index 8d7f8a1..0000000 --- a/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.cpp +++ /dev/null @@ -1,1018 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,[email protected] -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#include "DefaultMQPushConsumerImpl.h" - -#include <string> -#include <set> -#include "DefaultMQPushConsumer.h" -#include "ConsumerStatManage.h" -#include "DefaultMQPullConsumer.h" -#include "DefaultMQProducer.h" -#include "MQClientFactory.h" -#include "MQAdminImpl.h" -#include "RebalancePushImpl.h" -#include "MQClientAPIImpl.h" -#include "OffsetStore.h" -#include "MixAll.h" -#include "MQClientManager.h" -#include "LocalFileOffsetStore.h" -#include "RemoteBrokerOffsetStore.h" -#include "PullSysFlag.h" -#include "FilterAPI.h" -#include "PullAPIWrapper.h" -#include "MQClientException.h" -#include "Validators.h" -#include "MessageListener.h" -#include "ConsumeMessageHook.h" -#include "PullMessageService.h" -#include "ConsumeMessageOrderlyService.h" -#include "ConsumeMessageConcurrentlyService.h" -#include "KPRUtil.h" -#include "TimerThread.h" - -namespace rmq -{ - -/* RemoveProcessQueueLater */ -class RemoveProcessQueueLater : public kpr::TimerHandler -{ -public: - RemoveProcessQueueLater(DefaultMQPushConsumerImpl* pConsumerImp, PullRequest* pPullRequest) - : m_pConsumerImp(pConsumerImp), m_pPullRequest(pPullRequest) - { - } - - void OnTimeOut(unsigned int timerID) - { - try - { - m_pConsumerImp->getOffsetStore()->updateOffset(m_pPullRequest->getMessageQueue(), m_pPullRequest->getNextOffset(), false); - m_pConsumerImp->getOffsetStore()->persist(m_pPullRequest->getMessageQueue()); - m_pConsumerImp->getRebalanceImpl()->removeProcessQueue(m_pPullRequest->getMessageQueue()); - - RMQ_WARN("fix the pull request offset, {%s}", m_pPullRequest->toString().c_str()); - } - catch(...) - { - RMQ_ERROR("RemoveProcessQueueLater OnTimeOut Exception"); - } - - delete this; - } - -private: - DefaultMQPushConsumerImpl* m_pConsumerImp; - PullRequest* m_pPullRequest; -}; - - -/* DefaultMQPushConsumerImplCallback */ -class DefaultMQPushConsumerImplCallback : public PullCallback -{ -public: - DefaultMQPushConsumerImplCallback(SubscriptionData& subscriptionData, - DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl, - PullRequest* pPullRequest) - : m_subscriptionData(subscriptionData), - m_pDefaultMQPushConsumerImpl(pDefaultMQPushConsumerImpl), - m_pPullRequest(pPullRequest) - { - m_beginTimestamp = KPRUtil::GetCurrentTimeMillis(); - } - - void onSuccess(PullResult& pullResult) - { - RMQ_DEBUG("onSuccess begin: %s", pullResult.toString().c_str()); - PullResult* pPullResult = &pullResult; - if (pPullResult != NULL) - { - pPullResult = - m_pDefaultMQPushConsumerImpl->m_pPullAPIWrapper->processPullResult( - m_pPullRequest->getMessageQueue(), *pPullResult, m_subscriptionData); - - switch (pPullResult->pullStatus) - { - case FOUND: - { - m_pPullRequest->setNextOffset(pPullResult->nextBeginOffset); - - long long pullRT = KPRUtil::GetCurrentTimeMillis() - m_beginTimestamp; - m_pDefaultMQPushConsumerImpl->getConsumerStatManager()->getConsumertat() - .pullTimesTotal++; - m_pDefaultMQPushConsumerImpl->getConsumerStatManager()->getConsumertat() - .pullRTTotal.fetchAndAdd(pullRT); - - ProcessQueue* processQueue = m_pPullRequest->getProcessQueue(); - bool dispatchToConsume = processQueue->putMessage(pPullResult->msgFoundList); - - m_pDefaultMQPushConsumerImpl->m_pConsumeMessageService->submitConsumeRequest(// - pPullResult->msgFoundList, // - processQueue, // - m_pPullRequest->getMessageQueue(), // - dispatchToConsume); - - if (m_pDefaultMQPushConsumerImpl->m_pDefaultMQPushConsumer->getPullInterval() > 0) - { - m_pDefaultMQPushConsumerImpl->executePullRequestLater(m_pPullRequest, - m_pDefaultMQPushConsumerImpl->m_pDefaultMQPushConsumer->getPullInterval()); - } - else - { - m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(m_pPullRequest); - } - } - break; - case NO_NEW_MSG: - m_pPullRequest->setNextOffset(pPullResult->nextBeginOffset); - m_pDefaultMQPushConsumerImpl->correctTagsOffset(*m_pPullRequest); - m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(m_pPullRequest); - break; - case NO_MATCHED_MSG: - m_pPullRequest->setNextOffset(pPullResult->nextBeginOffset); - m_pDefaultMQPushConsumerImpl->correctTagsOffset(*m_pPullRequest); - m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(m_pPullRequest); - break; - case OFFSET_ILLEGAL: - RMQ_WARN("the pull request offset illegal, %s, %s", - m_pPullRequest->toString().c_str(), pPullResult->toString().c_str()); - - /* - if (m_pPullRequest->getNextOffset() < pPullResult->minOffset) - { - m_pPullRequest->setNextOffset(pPullResult->minOffset); - } - else if (m_pPullRequest->getNextOffset() > pPullResult->maxOffset) - { - m_pPullRequest->setNextOffset(pPullResult->maxOffset); - } - m_pDefaultMQPushConsumerImpl->m_pOffsetStore->updateOffset( - m_pPullRequest->getMessageQueue(), m_pPullRequest->getNextOffset(), false); - m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(m_pPullRequest); - */ - - // todo - m_pPullRequest->setNextOffset(pPullResult->nextBeginOffset); - m_pPullRequest->getProcessQueue()->setDropped(true); - - m_pDefaultMQPushConsumerImpl->executeTaskLater(new RemoveProcessQueueLater( - m_pDefaultMQPushConsumerImpl, m_pPullRequest), 10000); - break; - default: - break; - } - } - else - { - RMQ_WARN("Warning: PullRequest is null!"); - } - RMQ_DEBUG("onSuccess end"); - } - - void onException(MQException& e) - { - std::string topic = m_pPullRequest->getMessageQueue().getTopic(); - if (topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) != std::string::npos) - { - RMQ_WARN("execute the pull request exception:%s", e.what()); - } - - m_pDefaultMQPushConsumerImpl->executePullRequestLater(m_pPullRequest, - DefaultMQPushConsumerImpl::s_PullTimeDelayMillsWhenException); - } - -private: - SubscriptionData m_subscriptionData; - DefaultMQPushConsumerImpl* m_pDefaultMQPushConsumerImpl; - PullRequest* m_pPullRequest; - unsigned long long m_beginTimestamp; -}; - - -DefaultMQPushConsumerImpl::DefaultMQPushConsumerImpl(DefaultMQPushConsumer* pDefaultMQPushConsumer) -{ - m_pDefaultMQPushConsumer = pDefaultMQPushConsumer; - m_serviceState = CREATE_JUST; - flowControlTimes1 = 0; - flowControlTimes2 = 0; - m_pause = false; - m_consumeOrderly = false; - - m_pMQClientFactory = NULL; - m_pPullAPIWrapper = NULL; - m_pMessageListenerInner = NULL; - m_pOffsetStore = NULL; - m_pRebalanceImpl = new RebalancePushImpl(this); - m_pConsumerStatManager = new ConsumerStatManager(); - m_pConsumeMessageService = NULL; -} - -DefaultMQPushConsumerImpl::~DefaultMQPushConsumerImpl() -{ - //delete m_pMessageListenerInner; - if (m_pPullAPIWrapper) - delete m_pPullAPIWrapper; - if (m_pRebalanceImpl) - delete m_pRebalanceImpl; - if (m_pConsumerStatManager) - delete m_pConsumerStatManager; - if (m_pConsumeMessageService) - delete m_pConsumeMessageService; - if (m_pOffsetStore) - delete m_pOffsetStore; - //delete m_pMQClientFactory; -} - -void DefaultMQPushConsumerImpl::start() -{ - RMQ_DEBUG("DefaultMQPushConsumerImpl::start()"); - switch (m_serviceState) - { - case CREATE_JUST: - { - RMQ_INFO("the consumer [{%s}] start beginning. messageModel={%s}", - m_pDefaultMQPushConsumer->getConsumerGroup().c_str(), - getMessageModelString(m_pDefaultMQPushConsumer->getMessageModel())); - - m_serviceState = START_FAILED; - checkConfig(); - copySubscription(); - - if (m_pDefaultMQPushConsumer->getMessageModel() == CLUSTERING) - { - m_pDefaultMQPushConsumer->changeInstanceNameToPID(); - } - - m_pMQClientFactory = MQClientManager::getInstance()->getAndCreateMQClientFactory(*m_pDefaultMQPushConsumer); - - m_pRebalanceImpl->setConsumerGroup(m_pDefaultMQPushConsumer->getConsumerGroup()); - m_pRebalanceImpl->setMessageModel(m_pDefaultMQPushConsumer->getMessageModel()); - m_pRebalanceImpl->setAllocateMessageQueueStrategy(m_pDefaultMQPushConsumer->getAllocateMessageQueueStrategy()); - m_pRebalanceImpl->setmQClientFactory(m_pMQClientFactory); - - m_pPullAPIWrapper = new PullAPIWrapper(m_pMQClientFactory, m_pDefaultMQPushConsumer->getConsumerGroup()); - - if (m_pDefaultMQPushConsumer->getOffsetStore() != NULL) - { - m_pOffsetStore = m_pDefaultMQPushConsumer->getOffsetStore(); - } - else - { - switch (m_pDefaultMQPushConsumer->getMessageModel()) - { - case BROADCASTING: - m_pOffsetStore = new LocalFileOffsetStore(m_pMQClientFactory, m_pDefaultMQPushConsumer->getConsumerGroup()); - break; - case CLUSTERING: - m_pOffsetStore = new RemoteBrokerOffsetStore(m_pMQClientFactory, m_pDefaultMQPushConsumer->getConsumerGroup()); - break; - default: - break; - } - } - - m_pOffsetStore->load(); - - if (dynamic_cast<MessageListenerOrderly*>(m_pMessageListenerInner) != NULL) - { - m_consumeOrderly = true; - m_pConsumeMessageService = - new ConsumeMessageOrderlyService(this, (MessageListenerOrderly*)m_pMessageListenerInner); - } - else if (dynamic_cast<MessageListenerConcurrently*>(m_pMessageListenerInner) != NULL) - { - m_consumeOrderly = false; - m_pConsumeMessageService = - new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently*)m_pMessageListenerInner); - } - m_pConsumeMessageService->start(); - - bool registerOK = m_pMQClientFactory->registerConsumer(m_pDefaultMQPushConsumer->getConsumerGroup(), this); - if (!registerOK) - { - m_serviceState = CREATE_JUST; - m_pConsumeMessageService->shutdown(); - std::string str = "The consumer group[" + m_pDefaultMQPushConsumer->getConsumerGroup(); - str += "] has been created before, specify another name please."; - THROW_MQEXCEPTION(MQClientException, str, -1); - } - m_pMQClientFactory->start(); - - RMQ_INFO("the consumer [%s] start OK.", m_pDefaultMQPushConsumer->getConsumerGroup().c_str()); - m_serviceState = RUNNING; - } - break; - case RUNNING: - case START_FAILED: - case SHUTDOWN_ALREADY: - THROW_MQEXCEPTION(MQClientException, "The PullConsumer service state not OK, maybe started once, ", -1); - default: - break; - } - - updateTopicSubscribeInfoWhenSubscriptionChanged(); - m_pMQClientFactory->sendHeartbeatToAllBrokerWithLock(); - m_pMQClientFactory->rebalanceImmediately(); -} - - -void DefaultMQPushConsumerImpl::shutdown() -{ - RMQ_DEBUG("DefaultMQPushConsumerImpl::shutdown()"); - switch (m_serviceState) - { - case CREATE_JUST: - break; - case RUNNING: - m_pConsumeMessageService->shutdown(); - persistConsumerOffset(); - m_pMQClientFactory->unregisterConsumer(m_pDefaultMQPushConsumer->getConsumerGroup()); - m_pMQClientFactory->shutdown(); - - m_serviceState = SHUTDOWN_ALREADY; - break; - case SHUTDOWN_ALREADY: - break; - default: - break; - } -} - - - - -bool DefaultMQPushConsumerImpl::hasHook() -{ - return !m_hookList.empty(); -} - -void DefaultMQPushConsumerImpl::registerHook(ConsumeMessageHook* pHook) -{ - m_hookList.push_back(pHook); -} - -void DefaultMQPushConsumerImpl::executeHookBefore(ConsumeMessageContext& context) -{ - std::list<ConsumeMessageHook*>::iterator it = m_hookList.begin(); - for (; it != m_hookList.end(); it++) - { - try - { - (*it)->consumeMessageBefore(context); - } - catch (...) - { - RMQ_WARN("consumeMessageBefore exception"); - } - } -} - -void DefaultMQPushConsumerImpl::executeHookAfter(ConsumeMessageContext& context) -{ - std::list<ConsumeMessageHook*>::iterator it = m_hookList.begin(); - for (; it != m_hookList.end(); it++) - { - try - { - (*it)->consumeMessageAfter(context); - } - catch (...) - { - RMQ_WARN("consumeMessageAfter exception"); - } - } -} - -void DefaultMQPushConsumerImpl::createTopic(const std::string& key, const std::string& newTopic, int queueNum) -{ - m_pMQClientFactory->getMQAdminImpl()->createTopic(key, newTopic, queueNum); -} - -std::set<MessageQueue>* DefaultMQPushConsumerImpl::fetchSubscribeMessageQueues(const std::string& topic) -{ - std::map<std::string, std::set<MessageQueue> >& mqs = m_pRebalanceImpl->getTopicSubscribeInfoTable(); - std::map<std::string, std::set<MessageQueue> >::iterator it = mqs.find(topic); - - if (it == mqs.end()) - { - m_pMQClientFactory->updateTopicRouteInfoFromNameServer(topic); - mqs = m_pRebalanceImpl->getTopicSubscribeInfoTable(); - it = mqs.find(topic); - } - - if (it == mqs.end()) - { - THROW_MQEXCEPTION(MQClientException, "The topic[" + topic + "] not exist", -1); - } - - std::set<MessageQueue>* result = new std::set<MessageQueue>(it->second.begin(), it->second.end()); - return result; -} - -DefaultMQPushConsumer* DefaultMQPushConsumerImpl::getDefaultMQPushConsumer() -{ - return m_pDefaultMQPushConsumer; -} - -long long DefaultMQPushConsumerImpl::earliestMsgStoreTime(const MessageQueue& mq) -{ - return m_pMQClientFactory->getMQAdminImpl()->earliestMsgStoreTime(mq); -} - -long long DefaultMQPushConsumerImpl::maxOffset(const MessageQueue& mq) -{ - return m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq); -} - -long long DefaultMQPushConsumerImpl::minOffset(const MessageQueue& mq) -{ - return m_pMQClientFactory->getMQAdminImpl()->minOffset(mq); -} - -OffsetStore* DefaultMQPushConsumerImpl::getOffsetStore() -{ - return m_pOffsetStore; -} - -void DefaultMQPushConsumerImpl::setOffsetStore(OffsetStore* pOffsetStore) -{ - m_pOffsetStore = pOffsetStore; -} - -//MQConsumerInner -std::string DefaultMQPushConsumerImpl::groupName() -{ - return m_pDefaultMQPushConsumer->getConsumerGroup(); -} - -MessageModel DefaultMQPushConsumerImpl::messageModel() -{ - return m_pDefaultMQPushConsumer->getMessageModel(); -} - -ConsumeType DefaultMQPushConsumerImpl::consumeType() -{ - return CONSUME_PASSIVELY; -} - -ConsumeFromWhere DefaultMQPushConsumerImpl::consumeFromWhere() -{ - return m_pDefaultMQPushConsumer->getConsumeFromWhere(); -} - -std::set<SubscriptionData> DefaultMQPushConsumerImpl::subscriptions() -{ - std::set<SubscriptionData> sds; - std::map<std::string, SubscriptionData>& subscription = m_pRebalanceImpl->getSubscriptionInner(); - std::map<std::string, SubscriptionData>::iterator it = subscription.begin(); - for (; it != subscription.end(); it++) - { - sds.insert(it->second); - } - - return sds; -} - -void DefaultMQPushConsumerImpl::doRebalance() -{ - if (m_pRebalanceImpl != NULL) - { - m_pRebalanceImpl->doRebalance(); - } -} - -void DefaultMQPushConsumerImpl::persistConsumerOffset() -{ - try - { - makeSureStateOK(); - - std::set<MessageQueue> mqs; - { - kpr::ScopedRLock<kpr::RWMutex> lock(m_pRebalanceImpl->getProcessQueueTableLock()); - std::map<MessageQueue, ProcessQueue*>& processQueueTable = m_pRebalanceImpl->getProcessQueueTable(); - RMQ_FOR_EACH(processQueueTable, it) - { - mqs.insert(it->first); - } - } - - m_pOffsetStore->persistAll(mqs); - } - catch (...) - { - RMQ_ERROR("persistConsumerOffset exception, group: %s", - m_pDefaultMQPushConsumer->getConsumerGroup().c_str()); - } -} - -void DefaultMQPushConsumerImpl::updateTopicSubscribeInfo(const std::string& topic, const std::set<MessageQueue>& info) -{ - std::map<std::string, SubscriptionData>& subTable = getSubscriptionInner(); - - if (subTable.find(topic) != subTable.end()) - { - m_pRebalanceImpl->getTopicSubscribeInfoTable().insert(std::pair<std::string, std::set<MessageQueue> >(topic, info)); - } -} - -std::map<std::string, SubscriptionData>& DefaultMQPushConsumerImpl::getSubscriptionInner() -{ - return m_pRebalanceImpl->getSubscriptionInner(); -} - -bool DefaultMQPushConsumerImpl::isSubscribeTopicNeedUpdate(const std::string& topic) -{ - std::map<std::string, SubscriptionData>& subTable = getSubscriptionInner(); - - if (subTable.find(topic) != subTable.end()) - { - std::map<std::string, std::set<MessageQueue> >& mqs = - m_pRebalanceImpl->getTopicSubscribeInfoTable(); - - return mqs.find(topic) == mqs.end(); - } - - return false; -} - -bool DefaultMQPushConsumerImpl::isPause() -{ - return m_pause; -} - -void DefaultMQPushConsumerImpl::setPause(bool pause) -{ - m_pause = pause; -} - - -void DefaultMQPushConsumerImpl::correctTagsOffset(PullRequest& pullRequest) -{ - if (pullRequest.getProcessQueue()->getMsgCount().get() == 0) - { - m_pOffsetStore->updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true); - } -} - -void DefaultMQPushConsumerImpl::pullMessage(PullRequest* pPullRequest) -{ - RMQ_DEBUG("pullMessage begin: %s", pPullRequest->toString().c_str()); - - ProcessQueue* processQueue = pPullRequest->getProcessQueue(); - if (processQueue->isDropped()) - { - RMQ_WARN("the pull request[%s] is dropped.", pPullRequest->toString().c_str()); - delete pPullRequest; - return; - } - - pPullRequest->getProcessQueue()->setLastPullTimestamp(KPRUtil::GetCurrentTimeMillis()); - - try - { - makeSureStateOK(); - } - catch (const MQException& e) - { - RMQ_WARN("pullMessage exception [%s], consumer state not ok", e.what()); - executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenException); - return; - } - - if (isPause()) - { - RMQ_WARN("consumer was paused, execute pull request later. instanceName={%s}", - m_pDefaultMQPushConsumer->getInstanceName().c_str()); - executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenSuspend); - return; - } - - long size = processQueue->getMsgCount().get(); - if (size > m_pDefaultMQPushConsumer->getPullThresholdForQueue()) - { - executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenFlowControl); - if ((flowControlTimes1++ % 3000) == 0) - { - RMQ_WARN("the consumer message buffer is full, so do flow control, {%ld} {%s} {%lld}", size, - pPullRequest->toString().c_str(), flowControlTimes1); - } - return; - } - - if (!m_consumeOrderly) - { - if (processQueue->getMaxSpan() > m_pDefaultMQPushConsumer->getConsumeConcurrentlyMaxSpan()) - { - executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenFlowControl); - if ((flowControlTimes2++ % 3000) == 0) - { - RMQ_WARN("the queue's messages, span too long, so do flow control, size: {%ld}, pullRequest: {%s}, times: {%lld}, maxspan: {%lld}", - size, pPullRequest->toString().c_str(), flowControlTimes2, processQueue->getMaxSpan()); - } - return; - } - } - - std::map<std::string, SubscriptionData>& subTable = getSubscriptionInner(); - std::string topic = pPullRequest->getMessageQueue().getTopic(); - std::map<std::string, SubscriptionData>::iterator it = subTable.find(topic); - if (it == subTable.end()) - { - executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenException); - RMQ_WARN("find the consumer's subscription failed, {%s}", pPullRequest->toString().c_str()); - return; - } - - SubscriptionData subscriptionData = it->second; - PullCallback* pullCallback = new DefaultMQPushConsumerImplCallback(subTable[topic], this, pPullRequest); - - bool commitOffsetEnable = false; - long commitOffsetValue = 0L; - if (CLUSTERING == m_pDefaultMQPushConsumer->getMessageModel()) - { - commitOffsetValue = m_pOffsetStore->readOffset(pPullRequest->getMessageQueue(), - READ_FROM_MEMORY); - if (commitOffsetValue > 0) - { - commitOffsetEnable = true; - } - } - - int sysFlag = PullSysFlag::buildSysFlag( - commitOffsetEnable, // commitOffset - true, // suspend - false// subscription - ); - try - { - m_pPullAPIWrapper->pullKernelImpl( - pPullRequest->getMessageQueue(), // 1 - "", // 2 - subscriptionData.getSubVersion(), // 3 - pPullRequest->getNextOffset(), // 4 - m_pDefaultMQPushConsumer->getPullBatchSize(), // 5 - sysFlag, // 6 - commitOffsetValue,// 7 - s_BrokerSuspendMaxTimeMillis, // 8 - s_ConsumerTimeoutMillisWhenSuspend, // 9 - ASYNC, // 10 - pullCallback// 11 - ); - } - catch (...) - { - RMQ_ERROR("pullKernelImpl exception"); - executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenException); - } - - RMQ_DEBUG("pullMessage end"); -} - -void DefaultMQPushConsumerImpl::executePullRequestImmediately(PullRequest* pullRequest) -{ - m_pMQClientFactory->getPullMessageService()->executePullRequestImmediately(pullRequest); -} - -void DefaultMQPushConsumerImpl::executePullRequestLater(PullRequest* pullRequest, long timeDelay) -{ - m_pMQClientFactory->getPullMessageService()->executePullRequestLater(pullRequest, timeDelay); -} - -void DefaultMQPushConsumerImpl::executeTaskLater(kpr::TimerHandler* handler, long timeDelay) -{ - m_pMQClientFactory->getPullMessageService()->executeTaskLater(handler, timeDelay); -} - - -void DefaultMQPushConsumerImpl::makeSureStateOK() -{ - if (m_serviceState != RUNNING) - { - THROW_MQEXCEPTION(MQClientException, "The consumer service state not OK, ", -1); - } -} - -ConsumerStatManager* DefaultMQPushConsumerImpl::getConsumerStatManager() -{ - return m_pConsumerStatManager; -} - -QueryResult DefaultMQPushConsumerImpl::queryMessage(const std::string& topic, - const std::string& key, - int maxNum, - long long begin, - long long end) -{ - return m_pMQClientFactory->getMQAdminImpl()->queryMessage(topic, key, maxNum, begin, end); -} - -void DefaultMQPushConsumerImpl::registerMessageListener(MessageListener* pMessageListener) -{ - m_pMessageListenerInner = pMessageListener; -} - -void DefaultMQPushConsumerImpl::resume() -{ - m_pause = false; -} - -long long DefaultMQPushConsumerImpl::searchOffset(const MessageQueue& mq, long long timestamp) -{ - return m_pMQClientFactory->getMQAdminImpl()->searchOffset(mq, timestamp); -} - -void DefaultMQPushConsumerImpl::sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName) -{ - try - { - std::string brokerAddr = brokerName.empty() ? - socketAddress2IPPort(msg.getStoreHost()) : m_pMQClientFactory->findBrokerAddressInPublish(brokerName); - - m_pMQClientFactory->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg, - m_pDefaultMQPushConsumer->getConsumerGroup(), - delayLevel, - 5000); - } - catch (...) - { - RMQ_ERROR("sendMessageBack Exception, group: %s", m_pDefaultMQPushConsumer->getConsumerGroup().c_str()); - Message newMsg(MixAll::getRetryTopic(m_pDefaultMQPushConsumer->getConsumerGroup()), - msg.getBody(), msg.getBodyLen()); - - std::string originMsgId = msg.getProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID); - newMsg.putProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID, UtilAll::isBlank(originMsgId) ? msg.getMsgId() - : originMsgId); - - newMsg.setFlag(msg.getFlag()); - newMsg.setProperties(msg.getProperties()); - newMsg.putProperty(Message::PROPERTY_RETRY_TOPIC, msg.getTopic()); - - int reTimes = msg.getReconsumeTimes() + 1; - newMsg.putProperty(Message::PROPERTY_RECONSUME_TIME, UtilAll::toString(reTimes)); - newMsg.putProperty(Message::PROPERTY_MAX_RECONSUME_TIMES, UtilAll::toString(m_pDefaultMQPushConsumer->getMaxReconsumeTimes())); - newMsg.setDelayTimeLevel(3 + reTimes); - - m_pMQClientFactory->getDefaultMQProducer()->send(newMsg); - } -} - -void DefaultMQPushConsumerImpl::checkConfig() -{ - // consumerGroup check - Validators::checkGroup(m_pDefaultMQPushConsumer->getConsumerGroup()); - - // consumerGroup - if (m_pDefaultMQPushConsumer->getConsumerGroup() == MixAll::DEFAULT_CONSUMER_GROUP) - { - THROW_MQEXCEPTION(MQClientException, "consumerGroup can not equal " - + MixAll::DEFAULT_CONSUMER_GROUP // - + ", please specify another one.", -1); - } - - if (m_pDefaultMQPushConsumer->getMessageModel() != BROADCASTING - && m_pDefaultMQPushConsumer->getMessageModel() != CLUSTERING) - { - THROW_MQEXCEPTION(MQClientException, "messageModel is invalid ", -1); - } - - // allocateMessageQueueStrategy - if (m_pDefaultMQPushConsumer->getAllocateMessageQueueStrategy() == NULL) - { - THROW_MQEXCEPTION(MQClientException, "allocateMessageQueueStrategy is null", -1); - } - - // consumeFromWhereOffset - if (m_pDefaultMQPushConsumer->getConsumeFromWhere() < CONSUME_FROM_LAST_OFFSET - || m_pDefaultMQPushConsumer->getConsumeFromWhere() > CONSUME_FROM_MAX_OFFSET) - { - THROW_MQEXCEPTION(MQClientException, "consumeFromWhere is invalid", -1); - } - - // subscription - /* - if (m_pDefaultMQPushConsumer->getSubscription().size() == 0) - { - THROW_MQEXCEPTION(MQClientException,"subscription is null" ,-1); - } - */ - - // messageListener - if (m_pDefaultMQPushConsumer->getMessageListener() == NULL) - { - THROW_MQEXCEPTION(MQClientException, "messageListener is null", -1); - } - - MessageListener* listener = m_pDefaultMQPushConsumer->getMessageListener(); - MessageListener* orderly = (dynamic_cast<MessageListenerOrderly*>(listener)) ; - MessageListener* concurrently = (dynamic_cast<MessageListenerConcurrently*>(listener)) ; - - if (!orderly && !concurrently) - { - THROW_MQEXCEPTION(MQClientException, - "messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently" , - -1); - } - - // consumeThreadMin - if (m_pDefaultMQPushConsumer->getConsumeThreadMin() < 1 - || m_pDefaultMQPushConsumer->getConsumeThreadMin() > 1000 - || m_pDefaultMQPushConsumer->getConsumeThreadMin() > m_pDefaultMQPushConsumer->getConsumeThreadMax() - ) - { - THROW_MQEXCEPTION(MQClientException, "consumeThreadMin Out of range [1, 1000]", -1); - } - - // consumeThreadMax - if (m_pDefaultMQPushConsumer->getConsumeThreadMax() < 1 - || m_pDefaultMQPushConsumer->getConsumeThreadMax() > 1000) - { - THROW_MQEXCEPTION(MQClientException, "consumeThreadMax Out of range [1, 1000]", -1); - } - - // consumeConcurrentlyMaxSpan - if (m_pDefaultMQPushConsumer->getConsumeConcurrentlyMaxSpan() < 1 - || m_pDefaultMQPushConsumer->getConsumeConcurrentlyMaxSpan() > 65535) - { - THROW_MQEXCEPTION(MQClientException, "consumeConcurrentlyMaxSpan Out of range [1, 65535]" , -1); - } - - // pullThresholdForQueue - if (m_pDefaultMQPushConsumer->getPullThresholdForQueue() < 1 - || m_pDefaultMQPushConsumer->getPullThresholdForQueue() > 65535) - { - THROW_MQEXCEPTION(MQClientException, "pullThresholdForQueue Out of range [1, 65535]", -1); - } - - // pullInterval - if (m_pDefaultMQPushConsumer->getPullInterval() < 0 - || m_pDefaultMQPushConsumer->getPullInterval() > 65535) - { - THROW_MQEXCEPTION(MQClientException, "pullInterval Out of range [0, 65535]", -1); - } - - // consumeMessageBatchMaxSize - if (m_pDefaultMQPushConsumer->getConsumeMessageBatchMaxSize() < 1 - || m_pDefaultMQPushConsumer->getConsumeMessageBatchMaxSize() > 1024) - { - THROW_MQEXCEPTION(MQClientException, "consumeMessageBatchMaxSize Out of range [1, 1024]", -1); - } - - // pullBatchSize - if (m_pDefaultMQPushConsumer->getPullBatchSize() < 1 - || m_pDefaultMQPushConsumer->getPullBatchSize() > 1024) - { - THROW_MQEXCEPTION(MQClientException, "pullBatchSize Out of range [1, 1024]", -1); - } -} - -void DefaultMQPushConsumerImpl::copySubscription() -{ - try - { - std::map<std::string, std::string>& sub = m_pDefaultMQPushConsumer->getSubscription(); - std::map<std::string, std::string>::iterator it = sub.begin(); - for (; it != sub.end(); it++) - { - SubscriptionDataPtr subscriptionData = FilterAPI::buildSubscriptionData(it->first, it->second); - m_pRebalanceImpl->getSubscriptionInner()[it->first] = *subscriptionData; - } - - if (m_pMessageListenerInner == NULL) - { - m_pMessageListenerInner = m_pDefaultMQPushConsumer->getMessageListener(); - } - - switch (m_pDefaultMQPushConsumer->getMessageModel()) - { - case BROADCASTING: - break; - case CLUSTERING: - { - std::string retryTopic = MixAll::getRetryTopic(m_pDefaultMQPushConsumer->getConsumerGroup()); - SubscriptionDataPtr subscriptionData = - FilterAPI::buildSubscriptionData(retryTopic, SubscriptionData::SUB_ALL); - m_pRebalanceImpl->getSubscriptionInner()[retryTopic] = *subscriptionData; - } - - break; - default: - break; - } - } - catch (...) - { - THROW_MQEXCEPTION(MQClientException, "subscription exception", -1); - } -} - -void DefaultMQPushConsumerImpl::updateTopicSubscribeInfoWhenSubscriptionChanged() -{ - std::map<std::string, SubscriptionData> subTable = getSubscriptionInner(); - std::map<std::string, SubscriptionData>::iterator it = subTable.begin(); - for (; it != subTable.end(); it++) - { - m_pMQClientFactory->updateTopicRouteInfoFromNameServer(it->first); - } -} - -MessageListener* DefaultMQPushConsumerImpl::getMessageListenerInner() -{ - return m_pMessageListenerInner; -} - -void DefaultMQPushConsumerImpl::subscribe(const std::string& topic, const std::string& subExpression) -{ - try - { - SubscriptionDataPtr subscriptionData = FilterAPI::buildSubscriptionData(topic, subExpression); - m_pRebalanceImpl->getSubscriptionInner()[topic] = *subscriptionData; - - if (m_pMQClientFactory) - { - m_pMQClientFactory->sendHeartbeatToAllBrokerWithLock(); - } - } - catch (...) - { - THROW_MQEXCEPTION(MQClientException, "subscription exception", -1); - } -} - -void DefaultMQPushConsumerImpl::suspend() -{ - m_pause = true; -} - -void DefaultMQPushConsumerImpl::unsubscribe(const std::string& topic) -{ - m_pRebalanceImpl->getSubscriptionInner().erase(topic); -} - -void DefaultMQPushConsumerImpl::updateConsumeOffset(MessageQueue& mq, long long offset) -{ - m_pOffsetStore->updateOffset(mq, offset, false); -} - -void DefaultMQPushConsumerImpl::updateCorePoolSize(int corePoolSize) -{ - m_pConsumeMessageService->updateCorePoolSize(corePoolSize); -} - -MessageExt* DefaultMQPushConsumerImpl::viewMessage(const std::string& msgId) -{ - return m_pMQClientFactory->getMQAdminImpl()->viewMessage(msgId); -} - -RebalanceImpl* DefaultMQPushConsumerImpl::getRebalanceImpl() -{ - return m_pRebalanceImpl; -} - -bool DefaultMQPushConsumerImpl::isConsumeOrderly() -{ - return m_consumeOrderly; -} - -void DefaultMQPushConsumerImpl::setConsumeOrderly(bool consumeOrderly) -{ - m_consumeOrderly = consumeOrderly; -} - - -MQClientFactory* DefaultMQPushConsumerImpl::getmQClientFactory() -{ - return m_pMQClientFactory; -} - -void DefaultMQPushConsumerImpl::setmQClientFactory(MQClientFactory* mQClientFactory) -{ - m_pMQClientFactory = mQClientFactory; -} - - -ServiceState DefaultMQPushConsumerImpl::getServiceState() -{ - return m_serviceState; -} - -void DefaultMQPushConsumerImpl::setServiceState(ServiceState serviceState) -{ - m_serviceState = serviceState; -} - -} -
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.h b/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.h deleted file mode 100755 index 5370586..0000000 --- a/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.h +++ /dev/null @@ -1,169 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,[email protected] -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __DEFAULTMQPUSHCONSUMERIMPL_H__ -#define __DEFAULTMQPUSHCONSUMERIMPL_H__ - -#include <string> -#include <set> -#include <map> - -#include "MQConsumerInner.h" -#include "MessageExt.h" -#include "QueryResult.h" -#include "ServiceState.h" -#include "PullResult.h" -#include "ConsumeMessageHook.h" -#include "MixAll.h" -#include "PullCallback.h" -#include "TimerThread.h" - -namespace rmq -{ - class DefaultMQPushConsumer; - class ConsumeMessageHook; - class OffsetStore; - class RebalanceImpl; - class ConsumerStatManager; - class ConsumeMessageService; - class MessageListener; - class PullRequest; - class MQClientFactory; - class PullAPIWrapper; - class PullMessageService; - class DefaultMQPushConsumerImplCallback; - class MQException; - - /** - * Push Consumer Impl - * - */ - class DefaultMQPushConsumerImpl : public MQConsumerInner - { - public: - DefaultMQPushConsumerImpl(DefaultMQPushConsumer* pDefaultMQPushConsumer); - ~DefaultMQPushConsumerImpl(); - - void start(); - void suspend(); - void resume(); - void shutdown(); - bool isPause(); - void setPause(bool pause); - - bool hasHook(); - void registerHook(ConsumeMessageHook* pHook); - void executeHookBefore(ConsumeMessageContext& context); - void executeHookAfter(ConsumeMessageContext& context); - - void createTopic(const std::string& key, const std::string& newTopic, int queueNum); - std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic); - - long long earliestMsgStoreTime(const MessageQueue& mq); - long long maxOffset(const MessageQueue& mq); - long long minOffset(const MessageQueue& mq); - OffsetStore* getOffsetStore() ; - void setOffsetStore(OffsetStore* pOffsetStore); - - //MQConsumerInner - std::string groupName() ; - MessageModel messageModel() ; - ConsumeType consumeType(); - ConsumeFromWhere consumeFromWhere(); - std::set<SubscriptionData> subscriptions(); - void doRebalance() ; - void persistConsumerOffset() ; - void updateTopicSubscribeInfo(const std::string& topic, const std::set<MessageQueue>& info); - std::map<std::string, SubscriptionData>& getSubscriptionInner() ; - bool isSubscribeTopicNeedUpdate(const std::string& topic); - - MessageExt* viewMessage(const std::string& msgId); - QueryResult queryMessage(const std::string& topic, - const std::string& key, - int maxNum, - long long begin, - long long end); - - void registerMessageListener(MessageListener* pMessageListener); - long long searchOffset(const MessageQueue& mq, long long timestamp); - void sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName); - - void subscribe(const std::string& topic, const std::string& subExpression); - void unsubscribe(const std::string& topic); - - void updateConsumeOffset(MessageQueue& mq, long long offset); - void updateCorePoolSize(int corePoolSize); - bool isConsumeOrderly(); - void setConsumeOrderly(bool consumeOrderly); - - RebalanceImpl* getRebalanceImpl() ; - MessageListener* getMessageListenerInner(); - DefaultMQPushConsumer* getDefaultMQPushConsumer() ; - ConsumerStatManager* getConsumerStatManager(); - - MQClientFactory* getmQClientFactory(); - void setmQClientFactory(MQClientFactory* mQClientFactory); - - ServiceState getServiceState(); - void setServiceState(ServiceState serviceState); - - private: - void correctTagsOffset(PullRequest& pullRequest) ; - - void pullMessage(PullRequest* pPullRequest); - - - void executePullRequestImmediately(PullRequest* pullRequest); - - - void executePullRequestLater(PullRequest* pullRequest, long timeDelay); - void executeTaskLater(kpr::TimerHandler* handler, long timeDelay); - - void makeSureStateOK(); - void checkConfig(); - void copySubscription() ; - void updateTopicSubscribeInfoWhenSubscriptionChanged(); - - private: - static const int s_PullTimeDelayMillsWhenException = 3000; - static const int s_PullTimeDelayMillsWhenFlowControl = 50; - static const int s_PullTimeDelayMillsWhenSuspend = 1000; - static const int s_BrokerSuspendMaxTimeMillis = 15000; - static const int s_ConsumerTimeoutMillisWhenSuspend = 30000; - - long long flowControlTimes1; - long long flowControlTimes2; - ServiceState m_serviceState; - volatile bool m_pause; - bool m_consumeOrderly; - DefaultMQPushConsumer* m_pDefaultMQPushConsumer; - MQClientFactory* m_pMQClientFactory; - PullAPIWrapper* m_pPullAPIWrapper; - MessageListener* m_pMessageListenerInner; - OffsetStore* m_pOffsetStore; - RebalanceImpl* m_pRebalanceImpl; - ConsumerStatManager* m_pConsumerStatManager; - ConsumeMessageService* m_pConsumeMessageService; - - std::list<ConsumeMessageHook*> m_hookList; - friend class PullMessageService; - friend class RebalancePushImpl; - friend class DefaultMQPushConsumerImplCallback; - }; -} - -#endif - http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.cpp b/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.cpp deleted file mode 100755 index 40e9d65..0000000 --- a/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.cpp +++ /dev/null @@ -1,257 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,[email protected] -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#include "LocalFileOffsetStore.h" - -#include "MQClientFactory.h" -#include "OffsetSerializeWrapper.h" -#include "ScopedLock.h" -#include "FileUtil.h" -#include "MixAll.h" -#include "Exception.h" -#include "MQClientException.h" - -namespace rmq -{ - -LocalFileOffsetStore::LocalFileOffsetStore(MQClientFactory* pMQClientFactory, - const std::string& groupName) -{ - m_pMQClientFactory = pMQClientFactory; - m_groupName = groupName; - std::string homePath = getenv("HOME"); - m_storePath = homePath + "/.rocketmq_offsets/" + m_pMQClientFactory->getClientId() - + "/" + m_groupName + "/offsets.json"; -} - -void LocalFileOffsetStore::load() -{ - OffsetSerializeWrapperPtr offsetSerializeWrapper = this->readLocalOffset(); - if (offsetSerializeWrapper.ptr() != NULL - && offsetSerializeWrapper->getOffsetTable().size() > 0) - { - kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex); - m_offsetTable = offsetSerializeWrapper->getOffsetTable(); - RMQ_FOR_EACH(m_offsetTable, it) - { - const MessageQueue& mq = it->first; - const kpr::AtomicLong& offset = it->second; - RMQ_INFO("load consumer's offset, {%s} {%s} {%lld}", - m_groupName.c_str(), - mq.toString().c_str(), - offset.get()); - } - } -} - - -void LocalFileOffsetStore::updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly) -{ - RMQ_DEBUG("updateOffset, MQ:%s, offset:%lld", mq.toString().c_str(), offset); - kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex); - typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq); - if (it == m_offsetTable.end()) - { - m_offsetTable[mq] = offset; - it = m_offsetTable.find(mq); - } - - kpr::AtomicLong& offsetOld = it->second; - if (increaseOnly) - { - MixAll::compareAndIncreaseOnly(offsetOld, offset); - } - else - { - offsetOld.set(offset); - } -} - -long long LocalFileOffsetStore::readOffset(const MessageQueue& mq, ReadOffsetType type) -{ - RMQ_DEBUG("readOffset, MQ:%s, type:%d", mq.toString().c_str(), type); - switch (type) - { - case MEMORY_FIRST_THEN_STORE: - case READ_FROM_MEMORY: - { - kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex); - typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq); - if (it != m_offsetTable.end()) - { - return it->second.get(); - } - else if (READ_FROM_MEMORY == type) - { - RMQ_WARN("No offset in memory, MQ:%s", mq.toString().c_str()); - return -1; - } - } - case READ_FROM_STORE: - { - OffsetSerializeWrapperPtr offsetSerializeWrapper; - try - { - offsetSerializeWrapper = this->readLocalOffset(); - } - catch (std::exception& e) - { - RMQ_WARN("load offset file fail, MQ:%s, exception:%s", mq.toString().c_str(), e.what()); - return -1; - } - - if (offsetSerializeWrapper.ptr() != NULL) - { - std::map<MessageQueue, kpr::AtomicLong>& offsetTable = offsetSerializeWrapper->getOffsetTable(); - typeof(offsetTable.begin()) it = offsetTable.find(mq); - if (it != offsetTable.end()) - { - kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex); - m_offsetTable[mq] = it->second.get(); - return it->second.get(); - } - } - return -1; - } - default: - break; - } - - return -1; -} - - -void LocalFileOffsetStore::persistAll(std::set<MessageQueue>& mqs) -{ - RMQ_DEBUG("persistAll, mqs.size={%u}, mqs=%s", - (unsigned)mqs.size(), UtilAll::toString(mqs).c_str()); - if (mqs.empty()) - { - return; - } - RMQ_DEBUG("persistAll, m_offsetTable.size={%u}, m_offsetTable=%s", - (unsigned)m_offsetTable.size(), UtilAll::toString(m_offsetTable).c_str()); - - OffsetSerializeWrapper offsetSerializeWrapper; - std::map<MessageQueue, kpr::AtomicLong>& offsetTable = offsetSerializeWrapper.getOffsetTable(); - { - kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex); - RMQ_FOR_EACH(m_offsetTable, it) - { - MessageQueue mq = it->first; - kpr::AtomicLong& offset = it->second; - if (mqs.find(mq) != mqs.end()) - { - offsetTable[mq] = offset; - } - } - } - - RMQ_DEBUG("persistAll, offsetTable.size={%u}, offsetTable=%s", - (unsigned)offsetTable.size(), UtilAll::toString(offsetTable).c_str()); - - std::string jsonString; - offsetSerializeWrapper.encode(jsonString); - RMQ_DEBUG("persistAll, json=%s", jsonString.c_str()); - - if (!jsonString.empty()) - { - try - { - kpr::FileUtil::makeDirRecursive(kpr::FileUtil::extractFilePath(m_storePath)); - MixAll::string2File(m_storePath, jsonString); - } - catch (const std::exception& e) - { - RMQ_ERROR("persistAll consumer offset Exception, %s, %s", m_storePath.c_str(), e.what()); - } - } -} - -void LocalFileOffsetStore::persist(const MessageQueue& mq) -{ -} - -void LocalFileOffsetStore::removeOffset(const MessageQueue& mq) -{ -} - - -std::map<MessageQueue, long long> LocalFileOffsetStore::cloneOffsetTable(const std::string& topic) -{ - kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex); - std::map<MessageQueue, long long> cloneOffsetTable; - RMQ_FOR_EACH(m_offsetTable, it) - { - MessageQueue mq = it->first; - kpr::AtomicLong& offset = it->second; - if (topic == mq.getTopic()) - { - cloneOffsetTable[mq] = offset.get(); - } - } - - return cloneOffsetTable; -} - - -OffsetSerializeWrapper* LocalFileOffsetStore::readLocalOffset() -{ - std::string content = MixAll::file2String(m_storePath); - if (content.length() == 0) - { - return this->readLocalOffsetBak(); - } - else - { - OffsetSerializeWrapper* offsetSerializeWrapper = NULL; - try - { - offsetSerializeWrapper = OffsetSerializeWrapper::decode(content.c_str(), content.size()); - } - catch (const MQException& e) - { - RMQ_WARN("readLocalOffset Exception, and try to correct, %s", e.what()); - return this->readLocalOffsetBak(); - } - - return offsetSerializeWrapper; - } -} - - -OffsetSerializeWrapper* LocalFileOffsetStore::readLocalOffsetBak() -{ - std::string content = MixAll::file2String(m_storePath + ".bak"); - if (content.length() > 0) - { - OffsetSerializeWrapper* offsetSerializeWrapper = NULL; - try - { - offsetSerializeWrapper = OffsetSerializeWrapper::decode(content.c_str(), content.size()); - } - catch (const MQException& e) - { - RMQ_WARN("readLocalOffset Exception, maybe json content invalid, %s", e.what()); - } - - return offsetSerializeWrapper; - } - - return NULL; -} - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.h b/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.h deleted file mode 100755 index c4efb76..0000000 --- a/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.h +++ /dev/null @@ -1,61 +0,0 @@ -/** -* Copyright (C) 2013 suwenkuang ,[email protected] -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __LOCALFILEOFFSETSTORE_H__ -#define __LOCALFILEOFFSETSTORE_H__ -#include <map> -#include <string> -#include <set> - -#include "RocketMQClient.h" -#include "OffsetStore.h" -#include "MessageQueue.h" -#include "AtomicValue.h" -#include "Mutex.h" - -namespace rmq -{ - class MQClientFactory; - class MessageQueue; - class OffsetSerializeWrapper; - - class LocalFileOffsetStore : public OffsetStore - { - public: - LocalFileOffsetStore(MQClientFactory* pMQClientFactory, const std::string& groupName); - - void load(); - void updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly); - long long readOffset(const MessageQueue& mq, ReadOffsetType type); - void persistAll(std::set<MessageQueue>& mqs); - void persist(const MessageQueue& mq); - void removeOffset(const MessageQueue& mq) ; - std::map<MessageQueue, long long> cloneOffsetTable(const std::string& topic); - - private: - OffsetSerializeWrapper* readLocalOffset(); - OffsetSerializeWrapper* readLocalOffsetBak(); - - private: - MQClientFactory* m_pMQClientFactory; - std::string m_groupName; - std::string m_storePath; - std::map<MessageQueue, kpr::AtomicLong> m_offsetTable; - kpr::RWMutex m_tableMutex; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/MQConsumerInner.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/MQConsumerInner.h b/rocketmq-client4cpp/src/consumer/MQConsumerInner.h deleted file mode 100755 index ed83621..0000000 --- a/rocketmq-client4cpp/src/consumer/MQConsumerInner.h +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,[email protected] - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __MQCONSUMERINNER_H__ -#define __MQCONSUMERINNER_H__ - -#include <string> -#include <set> - -#include "ConsumeType.h" -#include "SubscriptionData.h" - -namespace rmq -{ - class MessageQueue; - - class MQConsumerInner - { - public: - virtual ~MQConsumerInner() {} - virtual std::string groupName() = 0; - virtual MessageModel messageModel() = 0; - virtual ConsumeType consumeType() = 0; - virtual ConsumeFromWhere consumeFromWhere() = 0; - virtual std::set<SubscriptionData> subscriptions() = 0; - virtual void doRebalance() = 0; - virtual void persistConsumerOffset() = 0; - virtual void updateTopicSubscribeInfo(const std::string& topic, const std::set<MessageQueue>& info) = 0; - virtual bool isSubscribeTopicNeedUpdate(const std::string& topic) = 0; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/MessageQueueLock.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/MessageQueueLock.h b/rocketmq-client4cpp/src/consumer/MessageQueueLock.h deleted file mode 100755 index 65af99e..0000000 --- a/rocketmq-client4cpp/src/consumer/MessageQueueLock.h +++ /dev/null @@ -1,68 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,[email protected] -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __MESSAGEQUEUELOCK_H__ -#define __MESSAGEQUEUELOCK_H__ - -#include <map> -#include "Mutex.h" -#include "ScopedLock.h" -#include "MessageQueue.h" - -namespace rmq -{ - class MessageQueueLock - { - public: - MessageQueueLock() - { - - } - - ~MessageQueueLock() - { - std::map<MessageQueue, kpr::Mutex*>::iterator it = m_mqLockTable.begin(); - - for (; it != m_mqLockTable.end(); it++) - { - delete it->second; - } - } - - kpr::Mutex* fetchLockObject(MessageQueue& mq) - { - kpr::ScopedLock<kpr::Mutex> lock(m_lock); - std::map<MessageQueue, kpr::Mutex*>::iterator it = m_mqLockTable.find(mq); - kpr::Mutex* objLock; - if (it == m_mqLockTable.end()) - { - objLock = new kpr::Mutex(); - m_mqLockTable[mq] = objLock; - } - else - { - objLock = it->second; - } - - return objLock; - } - - private: - std::map<MessageQueue, kpr::Mutex*> m_mqLockTable; - kpr::Mutex m_lock; - }; -} -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ProcessQueue.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/ProcessQueue.cpp b/rocketmq-client4cpp/src/consumer/ProcessQueue.cpp deleted file mode 100755 index f90e502..0000000 --- a/rocketmq-client4cpp/src/consumer/ProcessQueue.cpp +++ /dev/null @@ -1,445 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,[email protected] -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#include "ProcessQueue.h" -#include "MessageExt.h" -#include "KPRUtil.h" -#include "UtilAll.h" -#include "ScopedLock.h" -#include "DefaultMQPushConsumer.h" -#include "DefaultMQPushConsumerImpl.h" - -namespace rmq -{ - -ProcessQueue::ProcessQueue() -{ - m_lastPullTimestamp = KPRUtil::GetCurrentTimeMillis(); - m_lastConsumeTimestamp = KPRUtil::GetCurrentTimeMillis(); - m_queueOffsetMax = 0L; - m_msgCount = 0; - m_dropped = false; - m_locked = false; - m_lastLockTimestamp = KPRUtil::GetCurrentTimeMillis(); - m_consuming = false; -} - -bool ProcessQueue::isLockExpired() -{ - bool result = (KPRUtil::GetCurrentTimeMillis() - m_lastLockTimestamp) > - s_RebalanceLockMaxLiveTime; - return result; -} - -bool ProcessQueue::isPullExpired() -{ - bool result = (KPRUtil::GetCurrentTimeMillis() - m_lastPullTimestamp) > - s_PullMaxIdleTime; - return result; -} - - -void ProcessQueue::cleanExpiredMsg(DefaultMQPushConsumer* pPushConsumer) -{ - if (pPushConsumer->getDefaultMQPushConsumerImpl()->isConsumeOrderly()) - { - return; - } - - long long now = KPRUtil::GetCurrentTimeMillis(); - int loop = m_msgTreeMap.size() < 16 ? m_msgTreeMap.size() : 16; - for (int i = 0; i < loop; i++) - { - MessageExt* msg = NULL; - try - { - kpr::ScopedRLock<kpr::RWMutex> lock(m_lockTreeMap); - if (m_msgTreeMap.empty()) - { - return; - } - - MessageExt* firstMsg = m_msgTreeMap.begin()->second; - long long startTimestamp = UtilAll::str2ll(firstMsg->getProperty(Message::PROPERTY_CONSUME_START_TIMESTAMP).c_str()); - if (startTimestamp > 0 && (now - startTimestamp) > (pPushConsumer->getConsumeTimeout() * 60 * 1000)) - { - msg = firstMsg; - } - else - { - return; - } - } - catch (...) - { - RMQ_ERROR("getExpiredMsg exception"); - } - - try - { - pPushConsumer->sendMessageBack((*msg), 3); - RMQ_WARN("send expire msg back. topic={%s}, msgId={%s}, storeHost={%s}, queueId={%d}, queueOffset={%lld}", - msg->getTopic().c_str(), msg->getMsgId().c_str(), msg->getStoreHostString().c_str(), - msg->getQueueId(), msg->getQueueOffset()); - - try - { - kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap); - if (!m_msgTreeMap.empty() && msg->getQueueOffset() == m_msgTreeMap.begin()->first) - { - try - { - m_msgTreeMap.erase(m_msgTreeMap.begin()); - m_msgCount -= 1; - // if free msg, may be coredump - //delete msg; - } - catch (...) - { - RMQ_ERROR("send expired msg exception"); - } - } - - } - catch (...) - { - RMQ_ERROR("delExpiredMsg exception"); - } - } - catch (...) - { - RMQ_ERROR("send expired msg exception"); - } - } -} - - -bool ProcessQueue::putMessage(const std::list<MessageExt *> &msgs) -{ - bool dispathToConsume = false; - - try - { - kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap); - int validMsgCnt = 0; - std::list<MessageExt *>::const_iterator it = msgs.begin(); - - for (; it != msgs.end(); it++) - { - MessageExt *msg = (*it); - - if (m_msgTreeMap.find(msg->getQueueOffset()) == m_msgTreeMap.end()) - { - validMsgCnt++; - m_queueOffsetMax = msg->getQueueOffset(); - } - - m_msgTreeMap[msg->getQueueOffset()] = msg; - } - - m_msgCount += validMsgCnt; - - if (!m_msgTreeMap.empty() && !m_consuming) - { - dispathToConsume = true; - m_consuming = true; - } - } - catch (...) - { - RMQ_ERROR("putMessage exception"); - } - - return dispathToConsume; -} - -long long ProcessQueue::getMaxSpan() -{ - try - { - kpr::ScopedRLock<kpr::RWMutex> lock(m_lockTreeMap); - - if (!m_msgTreeMap.empty()) - { - std::map<long long, MessageExt *>::iterator it1 = m_msgTreeMap.begin(); - std::map<long long, MessageExt *>::iterator it2 = m_msgTreeMap.end(); - it2--; - return it2->first - it1->first; - } - } - catch (...) - { - RMQ_ERROR("getMaxSpan exception"); - } - - return 0; -} - -long long ProcessQueue::removeMessage(std::list<MessageExt *> &msgs) -{ - long long result = -1; - unsigned long long now = KPRUtil::GetCurrentTimeMillis(); - - try - { - kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap); - m_lastConsumeTimestamp = now; - - if (!m_msgTreeMap.empty()) - { - result = m_queueOffsetMax + 1; - int removedCnt = 0; - std::list<MessageExt *>::iterator it = msgs.begin(); - - for (; it != msgs.end();) - { - MessageExt *msg = (*it); - - if (m_msgTreeMap.find(msg->getQueueOffset()) != m_msgTreeMap.end()) - { - removedCnt++; - } - - m_msgTreeMap.erase(msg->getQueueOffset()); - //TODO delete message? - it = msgs.erase(it); - delete msg; - } - - m_msgCount -= removedCnt; - - if (!m_msgTreeMap.empty()) - { - std::map<long long, MessageExt *>::iterator it = m_msgTreeMap.begin(); - result = it->first; - } - } - } - catch (...) - { - RMQ_ERROR("removeMessage exception"); - } - - return result; -} - - -void ProcessQueue::clear() -{ - try - { - kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap); - m_msgTreeMap.clear(); - m_msgTreeMapTemp.clear(); - m_msgCount.set(0); - m_queueOffsetMax = 0; - } - catch (...) - { - RMQ_ERROR("clear exception"); - } - - return; -} - - -std::map<long long, MessageExt *> ProcessQueue::getMsgTreeMap() -{ - return m_msgTreeMap; -} - -kpr::AtomicInteger ProcessQueue::getMsgCount() -{ - return m_msgCount; -} - -bool ProcessQueue::isDropped() -{ - return m_dropped; -} - -void ProcessQueue::setDropped(bool dropped) -{ - m_dropped = dropped; -} - -unsigned long long ProcessQueue::getLastPullTimestamp() -{ - return m_lastPullTimestamp; -} - - -void ProcessQueue::setLastPullTimestamp(unsigned long long lastPullTimestamp) -{ - m_lastPullTimestamp = lastPullTimestamp; -} - - -unsigned long long ProcessQueue::getLastConsumeTimestamp() -{ - return m_lastConsumeTimestamp; -} - - -void ProcessQueue::setLastConsumeTimestamp(unsigned long long - lastConsumeTimestamp) -{ - m_lastConsumeTimestamp = lastConsumeTimestamp; -} - - -/** -* ======================================================================== -*/ -kpr::Mutex &ProcessQueue::getLockConsume() -{ - return m_lockConsume; -} - -void ProcessQueue::setLocked(bool locked) -{ - m_locked = locked; -} - -bool ProcessQueue::isLocked() -{ - return m_locked; -} - -long long ProcessQueue::getTryUnlockTimes() -{ - return m_tryUnlockTimes.get(); -} - -void ProcessQueue::incTryUnlockTimes() -{ - m_tryUnlockTimes++; -} - - -void ProcessQueue::rollback() -{ - try - { - kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap); - m_msgTreeMap = m_msgTreeMapTemp; - m_msgTreeMapTemp.clear(); - } - catch (...) - { - RMQ_ERROR("rollback exception"); - } -} - -long long ProcessQueue::commit() -{ - try - { - kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap); - - if (!m_msgTreeMapTemp.empty()) - { - std::map<long long, MessageExt *>::iterator it = m_msgTreeMapTemp.end(); - it--; - long long offset = it->first; - m_msgCount -= m_msgTreeMapTemp.size(); - m_msgTreeMapTemp.clear(); - return offset + 1; - } - } - catch (...) - { - RMQ_ERROR("commit exception"); - } - - return -1; -} - -void ProcessQueue::makeMessageToCosumeAgain(const std::list<MessageExt *> &msgs) -{ - try - { - kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap); - std::list<MessageExt *>::const_iterator it = msgs.begin(); - - for (; it != msgs.end(); it++) - { - MessageExt *msg = (*it); - m_msgTreeMapTemp.erase(msg->getQueueOffset()); - m_msgTreeMap[msg->getQueueOffset()] = msg; - } - } - catch (...) - { - RMQ_ERROR("makeMessageToCosumeAgain exception"); - } -} - -std::list<MessageExt *> ProcessQueue::takeMessages(int batchSize) -{ - std::list<MessageExt *> result; - unsigned long long now = KPRUtil::GetCurrentTimeMillis(); - - try - { - kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap); - m_lastConsumeTimestamp = now; - - if (!m_msgTreeMap.empty()) - { - for (int i = 0; i < batchSize; i++) - { - std::map<long long, MessageExt *>::iterator it = m_msgTreeMap.begin(); - - if (it != m_msgTreeMap.end()) - { - result.push_back(it->second); - m_msgTreeMapTemp[it->first] = it->second; - m_msgTreeMap.erase(it); - } - else - { - break; - } - } - - if (result.empty()) - { - m_consuming = false; - } - } - } - catch (...) - { - RMQ_ERROR("takeMessags exception"); - } - - return result; -} - -long long ProcessQueue::getLastLockTimestamp() -{ - return m_lastLockTimestamp; -} - -void ProcessQueue::setLastLockTimestamp(long long lastLockTimestamp) -{ - m_lastLockTimestamp = lastLockTimestamp; -} - - -} - http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ProcessQueue.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/ProcessQueue.h b/rocketmq-client4cpp/src/consumer/ProcessQueue.h deleted file mode 100755 index 559dd7f..0000000 --- a/rocketmq-client4cpp/src/consumer/ProcessQueue.h +++ /dev/null @@ -1,102 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,[email protected] -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __PROCESSQUEUE_H__ -#define __PROCESSQUEUE_H__ - -#include <list> -#include <map> -#include "Mutex.h" -#include "AtomicValue.h" - -namespace rmq -{ - class MessageExt; - class DefaultMQPushConsumer; - - class ProcessQueue - { - public: - static const unsigned int s_RebalanceLockMaxLiveTime = 30000; - static const unsigned int s_RebalanceLockInterval = 20000; - static const unsigned int s_PullMaxIdleTime = 120000; - - public: - ProcessQueue(); - - bool isLockExpired(); - bool isPullExpired(); - - void cleanExpiredMsg(DefaultMQPushConsumer* pPushConsumer); - bool putMessage(const std::list<MessageExt*>& msgs); - - long long getMaxSpan(); - long long removeMessage(std::list<MessageExt*>& msgs); - - void clear(); - - std::map<long long, MessageExt*> getMsgTreeMap(); - kpr::AtomicInteger getMsgCount(); - bool isDropped(); - void setDropped(bool dropped); - - unsigned long long getLastPullTimestamp(); - void setLastPullTimestamp(unsigned long long lastPullTimestamp); - - unsigned long long getLastConsumeTimestamp(); - void setLastConsumeTimestamp(unsigned long long lastConsumeTimestamp); - - /** - * ======================================================================== - */ - kpr::Mutex& getLockConsume(); - void setLocked(bool locked); - bool isLocked(); - long long getTryUnlockTimes(); - void incTryUnlockTimes(); - - void rollback(); - long long commit(); - void makeMessageToCosumeAgain(const std::list<MessageExt*>& msgs); - - std::list<MessageExt*> takeMessages(int batchSize); - - long long getLastLockTimestamp(); - void setLastLockTimestamp(long long lastLockTimestamp); - - - private: - kpr::RWMutex m_lockTreeMap; - std::map<long long, MessageExt*> m_msgTreeMap; - volatile long long m_queueOffsetMax ; - kpr::AtomicInteger m_msgCount; - volatile bool m_dropped; - volatile unsigned long long m_lastPullTimestamp; - volatile unsigned long long m_lastConsumeTimestamp; - - /** - * order message - */ - kpr::Mutex m_lockConsume; - volatile bool m_locked; - volatile unsigned long long m_lastLockTimestamp; - volatile bool m_consuming; - std::map<long long, MessageExt*> m_msgTreeMapTemp; - kpr::AtomicInteger m_tryUnlockTimes; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/PullAPIWrapper.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/PullAPIWrapper.cpp b/rocketmq-client4cpp/src/consumer/PullAPIWrapper.cpp deleted file mode 100755 index c520e4c..0000000 --- a/rocketmq-client4cpp/src/consumer/PullAPIWrapper.cpp +++ /dev/null @@ -1,222 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,[email protected] -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#include "PullAPIWrapper.h" - -#include <stdlib.h> -#include <list> -#include <set> -#include "ScopedLock.h" -#include "MQClientFactory.h" -#include "PullCallback.h" -#include "MixAll.h" -#include "PullSysFlag.h" -#include "CommandCustomHeader.h" -#include "MQClientAPIImpl.h" -#include "MQClientException.h" -#include "SubscriptionData.h" -#include "UtilAll.h" -#include "MessageExt.h" -#include "PullResultExt.h" -#include "MessageDecoder.h" -#include "VirtualEnvUtil.h" - -namespace rmq -{ - -PullAPIWrapper::PullAPIWrapper(MQClientFactory* pMQClientFactory, const std::string& consumerGroup) -{ - m_pMQClientFactory = pMQClientFactory; - m_consumerGroup = consumerGroup; -} - -void PullAPIWrapper::updatePullFromWhichNode(MessageQueue& mq, long brokerId) -{ - std::map<MessageQueue, kpr::AtomicInteger>::iterator it; - { - kpr::ScopedRLock<kpr::RWMutex> lock(m_pullFromWhichNodeTableLock); - it = m_pullFromWhichNodeTable.find(mq); - if (it != m_pullFromWhichNodeTable.end()) - { - it->second.set(brokerId); - return; - } - } - - kpr::ScopedWLock<kpr::RWMutex> lock(m_pullFromWhichNodeTableLock); - m_pullFromWhichNodeTable[mq] = kpr::AtomicInteger(brokerId); -} - -PullResult* PullAPIWrapper::processPullResult(MessageQueue& mq, - PullResult& pullResult, - SubscriptionData& subscriptionData) -{ - std::string projectGroupPrefix = m_pMQClientFactory->getMQClientAPIImpl()->getProjectGroupPrefix(); - PullResultExt& pullResultExt = (PullResultExt&) pullResult; - - updatePullFromWhichNode(mq, pullResultExt.suggestWhichBrokerId); - - if (pullResult.pullStatus == FOUND) - { - std::list<MessageExt*> msgList = - MessageDecoder::decodes(pullResultExt.messageBinary, pullResultExt.messageBinaryLen); - - std::list<MessageExt*> msgListFilterAgain; - - if (!subscriptionData.getTagsSet().empty()) - { - std::list<MessageExt*>::iterator it = msgList.begin(); - for (; it != msgList.end();) - { - MessageExt* msg = *it; - if (!msg->getTags().empty()) - { - std::set<std::string>& tags = subscriptionData.getTagsSet(); - if (tags.find(msg->getTags()) != tags.end()) - { - msgListFilterAgain.push_back(msg); - it = msgList.erase(it); - } - else - { - it++; - } - } - } - } - else - { - msgListFilterAgain.assign(msgList.begin(), msgList.end()); - msgList.clear(); - } - - if (!UtilAll::isBlank(projectGroupPrefix)) - { - subscriptionData.setTopic(VirtualEnvUtil::clearProjectGroup(subscriptionData.getTopic(), - projectGroupPrefix)); - mq.setTopic(VirtualEnvUtil::clearProjectGroup(mq.getTopic(), projectGroupPrefix)); - - std::list<MessageExt*>::iterator it = msgListFilterAgain.begin(); - for (; it != msgListFilterAgain.end(); it++) - { - MessageExt* msg = *it; - msg->setTopic(VirtualEnvUtil::clearProjectGroup(msg->getTopic(), projectGroupPrefix)); - - msg->putProperty(Message::PROPERTY_MIN_OFFSET, UtilAll::toString(pullResult.minOffset)); - msg->putProperty(Message::PROPERTY_MAX_OFFSET, UtilAll::toString(pullResult.maxOffset)); - } - } - else - { - std::list<MessageExt*>::iterator it = msgListFilterAgain.begin(); - for (; it != msgListFilterAgain.end(); it++) - { - MessageExt* msg = *it; - - msg->putProperty(Message::PROPERTY_MIN_OFFSET, UtilAll::toString(pullResult.minOffset)); - msg->putProperty(Message::PROPERTY_MAX_OFFSET, UtilAll::toString(pullResult.maxOffset)); - } - } - - std::list<MessageExt*>::iterator it = msgListFilterAgain.begin(); - for (; it != msgListFilterAgain.end(); it++) - { - pullResultExt.msgFoundList.push_back(*it); - } - - it = msgList.begin(); - for (; it != msgList.end(); it++) - { - delete *it; - } - - delete[] pullResultExt.messageBinary; - pullResultExt.messageBinary = NULL; - pullResultExt.messageBinaryLen = 0; - } - - return &pullResult; -} - -long PullAPIWrapper::recalculatePullFromWhichNode(MessageQueue& mq) -{ - kpr::ScopedRLock<kpr::RWMutex> lock(m_pullFromWhichNodeTableLock); - std::map<MessageQueue, kpr::AtomicInteger>::iterator it = m_pullFromWhichNodeTable.find(mq); - if (it != m_pullFromWhichNodeTable.end()) - { - return it->second.get(); - } - - return MixAll::MASTER_ID; -} - -PullResult* PullAPIWrapper::pullKernelImpl(MessageQueue& mq, - const std::string& subExpression, - long long subVersion, - long long offset, - int maxNums, - int sysFlag, - long long commitOffset, - long long brokerSuspendMaxTimeMillis, - int timeoutMillis, - CommunicationMode communicationMode, - PullCallback* pPullCallback) -{ - FindBrokerResult findBrokerResult = - m_pMQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), - recalculatePullFromWhichNode(mq), false); - if (findBrokerResult.brokerAddr.empty()) - { - m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic()); - findBrokerResult = m_pMQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), - recalculatePullFromWhichNode(mq), false); - } - - if (!findBrokerResult.brokerAddr.empty()) - { - int sysFlagInner = sysFlag; - - if (findBrokerResult.slave) - { - sysFlagInner = PullSysFlag::clearCommitOffsetFlag(sysFlagInner); - } - - PullMessageRequestHeader* requestHeader = new PullMessageRequestHeader(); - requestHeader->consumerGroup = m_consumerGroup; - requestHeader->topic = mq.getTopic(); - requestHeader->queueId = mq.getQueueId(); - requestHeader->queueOffset = offset; - requestHeader->maxMsgNums = maxNums; - requestHeader->sysFlag = sysFlagInner; - requestHeader->commitOffset = commitOffset; - requestHeader->suspendTimeoutMillis = brokerSuspendMaxTimeMillis; - requestHeader->subscription = subExpression; - requestHeader->subVersion = subVersion; - - PullResult* pullResult = m_pMQClientFactory->getMQClientAPIImpl()->pullMessage(// - findBrokerResult.brokerAddr,// - requestHeader,// - timeoutMillis,// - communicationMode,// - pPullCallback); - - return pullResult; - } - - THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1); -} - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/PullAPIWrapper.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/PullAPIWrapper.h b/rocketmq-client4cpp/src/consumer/PullAPIWrapper.h deleted file mode 100755 index d5ec787..0000000 --- a/rocketmq-client4cpp/src/consumer/PullAPIWrapper.h +++ /dev/null @@ -1,67 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,[email protected] -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __PULLAPIWRAPPER_H__ -#define __PULLAPIWRAPPER_H__ - -#include <string> -#include <map> - -#include "AtomicValue.h" -#include "PullResult.h" -#include "MessageQueue.h" -#include "CommunicationMode.h" -#include "Mutex.h" - -namespace rmq -{ - class MQClientFactory; - class PullCallback; - class SubscriptionData; - - class PullAPIWrapper - { - public: - PullAPIWrapper(MQClientFactory* pMQClientFactory, const std::string& consumerGroup); - void updatePullFromWhichNode(MessageQueue& mq, long brokerId); - - - PullResult* processPullResult(MessageQueue& mq, - PullResult& pullResult, - SubscriptionData& subscriptionData); - long recalculatePullFromWhichNode(MessageQueue& mq); - - PullResult* pullKernelImpl(MessageQueue& mq, - const std::string& subExpression, - long long subVersion, - long long offset, - int maxNums, - int sysFlag, - long long commitOffset, - long long brokerSuspendMaxTimeMillis, - int timeoutMillis, - CommunicationMode communicationMode, - PullCallback* pPullCallback); - - private: - std::map<MessageQueue, kpr::AtomicInteger> m_pullFromWhichNodeTable; - kpr::RWMutex m_pullFromWhichNodeTableLock; - MQClientFactory* m_pMQClientFactory; - std::string m_consumerGroup; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/PullMessageService.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/PullMessageService.cpp b/rocketmq-client4cpp/src/consumer/PullMessageService.cpp deleted file mode 100755 index 6d9972e..0000000 --- a/rocketmq-client4cpp/src/consumer/PullMessageService.cpp +++ /dev/null @@ -1,171 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,[email protected] -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#include "PullMessageService.h" -#include <list> -#include "MQClientFactory.h" -#include "MQConsumerInner.h" -#include "PullRequest.h" -#include "DefaultMQPushConsumerImpl.h" -#include "ScopedLock.h" - -namespace rmq -{ - -class SubmitPullRequestLater : public kpr::TimerHandler -{ -public: - SubmitPullRequestLater(PullMessageService* pService, PullRequest* pPullRequest) - : m_pService(pService), m_pPullRequest(pPullRequest) - { - - } - - void OnTimeOut(unsigned int timerID) - { - try - { - m_pService->executePullRequestImmediately(m_pPullRequest); - } - catch(...) - { - RMQ_ERROR("SubmitPullRequestLater OnTimeOut exception"); - } - - delete this; - } - -private: - PullMessageService* m_pService; - PullRequest* m_pPullRequest; -}; - - -PullMessageService::PullMessageService(MQClientFactory* pMQClientFactory) - : ServiceThread("PullMessageService"), - m_pMQClientFactory(pMQClientFactory) -{ - m_TimerThread = new kpr::TimerThread("PullMessageService-timer", 10); - m_TimerThread->Start(); -} - - -PullMessageService::~PullMessageService() -{ - -} - - -void PullMessageService::executePullRequestLater(PullRequest* pPullRequest, long timeDelay) -{ - SubmitPullRequestLater* pHandler = new SubmitPullRequestLater(this, pPullRequest); - m_TimerThread->RegisterTimer(0, timeDelay, pHandler, false); -} - - -void PullMessageService::executeTaskLater(kpr::TimerHandler* pHandler, long timeDelay) -{ - m_TimerThread->RegisterTimer(0, timeDelay, pHandler, false); -} - - -void PullMessageService::executePullRequestImmediately(PullRequest* pPullRequest) -{ - try - { - { - kpr::ScopedLock<kpr::Mutex> lock(m_lock); - m_pullRequestQueue.push_back(pPullRequest); - } - - wakeup(); - } - catch (...) - { - RMQ_ERROR("executePullRequestImmediately pullRequestQueue.push"); - } -} - -void PullMessageService::Run() -{ - RMQ_INFO("%s service started", getServiceName().c_str()); - - while (!m_stoped) - { - try - { - bool wait = false; - { - kpr::ScopedLock<kpr::Mutex> lock(m_lock); - if (m_pullRequestQueue.empty()) - { - wait = true; - } - } - - if (wait) - { - waitForRunning(5000); - } - - PullRequest* pullRequest = NULL; - { - kpr::ScopedLock<kpr::Mutex> lock(m_lock); - if (!m_pullRequestQueue.empty()) - { - pullRequest = m_pullRequestQueue.front(); - m_pullRequestQueue.pop_front(); - } - } - - if (pullRequest != NULL) - { - pullMessage(pullRequest); - } - } - catch (...) - { - RMQ_ERROR("Pull Message Service Run Method exception"); - } - } - - m_TimerThread->Stop(); - m_TimerThread->Join(); - - RMQ_INFO("%s service end", getServiceName().c_str()); -} - -std::string PullMessageService::getServiceName() -{ - return "PullMessageService"; -} - - -void PullMessageService::pullMessage(PullRequest* pPullRequest) -{ - MQConsumerInner* consumer = m_pMQClientFactory->selectConsumer(pPullRequest->getConsumerGroup()); - if (consumer != NULL) - { - DefaultMQPushConsumerImpl* impl = (DefaultMQPushConsumerImpl*) consumer; - impl->pullMessage(pPullRequest); - } - else - { - RMQ_WARN("No matched consumer for the PullRequest {%s}, drop it", pPullRequest->toString().c_str()); - } -} - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/PullMessageService.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/PullMessageService.h b/rocketmq-client4cpp/src/consumer/PullMessageService.h deleted file mode 100755 index d6ebcee..0000000 --- a/rocketmq-client4cpp/src/consumer/PullMessageService.h +++ /dev/null @@ -1,56 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,[email protected] -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#ifndef __PULLMESSAGESERVICE_H__ -#define __PULLMESSAGESERVICE_H__ - -#include <list> -#include "RocketMQClient.h" -#include "ServiceThread.h" -#include "TimerThread.h" -#include "PullRequest.h" - -namespace rmq -{ - class MQClientFactory; - class MQConsumerInner; - class PullRequest; - - class PullMessageService : public ServiceThread - { - public: - PullMessageService(MQClientFactory* pMQClientFactory); - ~PullMessageService(); - - void executePullRequestLater(PullRequest* pPullRequest, long timeDelay); - void executeTaskLater(kpr::TimerHandler* pHandler, long timeDelay); - - void executePullRequestImmediately(PullRequest* pPullRequest); - std::string getServiceName(); - - void Run(); - private: - void pullMessage(PullRequest* pPullRequest); - - private: - std::list<PullRequest*> m_pullRequestQueue; - kpr::Mutex m_lock; - MQClientFactory* m_pMQClientFactory; - kpr::TimerThreadPtr m_TimerThread; - }; - typedef kpr::RefHandleT<PullMessageService> PullMessageServicePtr; -} - -#endif
