http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.cpp 
b/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.cpp
deleted file mode 100755
index 8d7f8a1..0000000
--- a/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ /dev/null
@@ -1,1018 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,[email protected]
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "DefaultMQPushConsumerImpl.h"
-
-#include <string>
-#include <set>
-#include "DefaultMQPushConsumer.h"
-#include "ConsumerStatManage.h"
-#include "DefaultMQPullConsumer.h"
-#include "DefaultMQProducer.h"
-#include "MQClientFactory.h"
-#include "MQAdminImpl.h"
-#include "RebalancePushImpl.h"
-#include "MQClientAPIImpl.h"
-#include "OffsetStore.h"
-#include "MixAll.h"
-#include "MQClientManager.h"
-#include "LocalFileOffsetStore.h"
-#include "RemoteBrokerOffsetStore.h"
-#include "PullSysFlag.h"
-#include "FilterAPI.h"
-#include "PullAPIWrapper.h"
-#include "MQClientException.h"
-#include "Validators.h"
-#include "MessageListener.h"
-#include "ConsumeMessageHook.h"
-#include "PullMessageService.h"
-#include "ConsumeMessageOrderlyService.h"
-#include "ConsumeMessageConcurrentlyService.h"
-#include "KPRUtil.h"
-#include "TimerThread.h"
-
-namespace rmq
-{
-
-/* RemoveProcessQueueLater */
-class RemoveProcessQueueLater : public kpr::TimerHandler
-{
-public:
-    RemoveProcessQueueLater(DefaultMQPushConsumerImpl* pConsumerImp, 
PullRequest* pPullRequest)
-        : m_pConsumerImp(pConsumerImp), m_pPullRequest(pPullRequest)
-    {
-    }
-
-    void OnTimeOut(unsigned int timerID)
-    {
-       try
-       {
-            
m_pConsumerImp->getOffsetStore()->updateOffset(m_pPullRequest->getMessageQueue(),
 m_pPullRequest->getNextOffset(), false);
-            
m_pConsumerImp->getOffsetStore()->persist(m_pPullRequest->getMessageQueue());
-            
m_pConsumerImp->getRebalanceImpl()->removeProcessQueue(m_pPullRequest->getMessageQueue());
-
-            RMQ_WARN("fix the pull request offset, {%s}", 
m_pPullRequest->toString().c_str());
-        }
-        catch(...)
-        {
-               RMQ_ERROR("RemoveProcessQueueLater OnTimeOut Exception");
-        }
-
-        delete this;
-    }
-
-private:
-       DefaultMQPushConsumerImpl* m_pConsumerImp;
-    PullRequest* m_pPullRequest;
-};
-
-
-/* DefaultMQPushConsumerImplCallback */
-class DefaultMQPushConsumerImplCallback : public PullCallback
-{
-public:
-    DefaultMQPushConsumerImplCallback(SubscriptionData& subscriptionData,
-        DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl,
-        PullRequest* pPullRequest)
-           : m_subscriptionData(subscriptionData),
-             m_pDefaultMQPushConsumerImpl(pDefaultMQPushConsumerImpl),
-             m_pPullRequest(pPullRequest)
-       {
-           m_beginTimestamp = KPRUtil::GetCurrentTimeMillis();
-       }
-
-    void onSuccess(PullResult& pullResult)
-       {
-               RMQ_DEBUG("onSuccess begin: %s", pullResult.toString().c_str());
-           PullResult* pPullResult = &pullResult;
-           if (pPullResult != NULL)
-           {
-               pPullResult =
-                   
m_pDefaultMQPushConsumerImpl->m_pPullAPIWrapper->processPullResult(
-                       m_pPullRequest->getMessageQueue(), *pPullResult, 
m_subscriptionData);
-
-               switch (pPullResult->pullStatus)
-               {
-                   case FOUND:
-                   {
-                       
m_pPullRequest->setNextOffset(pPullResult->nextBeginOffset);
-
-                       long long pullRT = KPRUtil::GetCurrentTimeMillis() - 
m_beginTimestamp;
-                       
m_pDefaultMQPushConsumerImpl->getConsumerStatManager()->getConsumertat()
-                       .pullTimesTotal++;
-                       
m_pDefaultMQPushConsumerImpl->getConsumerStatManager()->getConsumertat()
-                       .pullRTTotal.fetchAndAdd(pullRT);
-
-                       ProcessQueue* processQueue = 
m_pPullRequest->getProcessQueue();
-                       bool dispatchToConsume = 
processQueue->putMessage(pPullResult->msgFoundList);
-
-                       
m_pDefaultMQPushConsumerImpl->m_pConsumeMessageService->submitConsumeRequest(//
-                           pPullResult->msgFoundList, //
-                           processQueue, //
-                           m_pPullRequest->getMessageQueue(), //
-                           dispatchToConsume);
-
-                       if 
(m_pDefaultMQPushConsumerImpl->m_pDefaultMQPushConsumer->getPullInterval() > 0)
-                       {
-                           
m_pDefaultMQPushConsumerImpl->executePullRequestLater(m_pPullRequest,
-                                   
m_pDefaultMQPushConsumerImpl->m_pDefaultMQPushConsumer->getPullInterval());
-                       }
-                       else
-                       {
-                           
m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(m_pPullRequest);
-                       }
-                   }
-                   break;
-                   case NO_NEW_MSG:
-                       
m_pPullRequest->setNextOffset(pPullResult->nextBeginOffset);
-                       
m_pDefaultMQPushConsumerImpl->correctTagsOffset(*m_pPullRequest);
-                       
m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(m_pPullRequest);
-                       break;
-                   case NO_MATCHED_MSG:
-                       
m_pPullRequest->setNextOffset(pPullResult->nextBeginOffset);
-                       
m_pDefaultMQPushConsumerImpl->correctTagsOffset(*m_pPullRequest);
-                       
m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(m_pPullRequest);
-                       break;
-                   case OFFSET_ILLEGAL:
-                       RMQ_WARN("the pull request offset illegal, %s, %s",
-                                m_pPullRequest->toString().c_str(), 
pPullResult->toString().c_str());
-
-                       /*
-                       if (m_pPullRequest->getNextOffset() < 
pPullResult->minOffset)
-                       {
-                           
m_pPullRequest->setNextOffset(pPullResult->minOffset);
-                       }
-                       else if (m_pPullRequest->getNextOffset() > 
pPullResult->maxOffset)
-                       {
-                           
m_pPullRequest->setNextOffset(pPullResult->maxOffset);
-                       }
-                       
m_pDefaultMQPushConsumerImpl->m_pOffsetStore->updateOffset(
-                           m_pPullRequest->getMessageQueue(), 
m_pPullRequest->getNextOffset(), false);
-                       
m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(m_pPullRequest);
-                       */
-
-                       // todo
-                       
m_pPullRequest->setNextOffset(pPullResult->nextBeginOffset);
-                                       
m_pPullRequest->getProcessQueue()->setDropped(true);
-
-                                       
m_pDefaultMQPushConsumerImpl->executeTaskLater(new RemoveProcessQueueLater(
-                                               m_pDefaultMQPushConsumerImpl, 
m_pPullRequest), 10000);
-                           break;
-                   default:
-                       break;
-               }
-           }
-           else
-           {
-               RMQ_WARN("Warning: PullRequest is null!");
-           }
-           RMQ_DEBUG("onSuccess end");
-       }
-
-       void onException(MQException& e)
-       {
-           std::string topic = m_pPullRequest->getMessageQueue().getTopic();
-           if (topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) != 
std::string::npos)
-           {
-               RMQ_WARN("execute the pull request exception:%s", e.what());
-           }
-
-           
m_pDefaultMQPushConsumerImpl->executePullRequestLater(m_pPullRequest,
-                   
DefaultMQPushConsumerImpl::s_PullTimeDelayMillsWhenException);
-       }
-
-private:
-    SubscriptionData m_subscriptionData;
-    DefaultMQPushConsumerImpl* m_pDefaultMQPushConsumerImpl;
-    PullRequest* m_pPullRequest;
-    unsigned long long m_beginTimestamp;
-};
-
-
-DefaultMQPushConsumerImpl::DefaultMQPushConsumerImpl(DefaultMQPushConsumer* 
pDefaultMQPushConsumer)
-{
-    m_pDefaultMQPushConsumer = pDefaultMQPushConsumer;
-    m_serviceState = CREATE_JUST;
-    flowControlTimes1 = 0;
-    flowControlTimes2 = 0;
-    m_pause = false;
-    m_consumeOrderly = false;
-
-    m_pMQClientFactory = NULL;
-    m_pPullAPIWrapper = NULL;
-    m_pMessageListenerInner = NULL;
-    m_pOffsetStore = NULL;
-    m_pRebalanceImpl = new RebalancePushImpl(this);
-    m_pConsumerStatManager = new ConsumerStatManager();
-    m_pConsumeMessageService = NULL;
-}
-
-DefaultMQPushConsumerImpl::~DefaultMQPushConsumerImpl()
-{
-    //delete m_pMessageListenerInner;
-    if (m_pPullAPIWrapper)
-               delete m_pPullAPIWrapper;
-       if (m_pRebalanceImpl)
-       delete m_pRebalanceImpl;
-    if (m_pConsumerStatManager)
-       delete m_pConsumerStatManager;
-    if (m_pConsumeMessageService)
-       delete m_pConsumeMessageService;
-    if (m_pOffsetStore)
-       delete m_pOffsetStore;
-    //delete m_pMQClientFactory;
-}
-
-void DefaultMQPushConsumerImpl::start()
-{
-    RMQ_DEBUG("DefaultMQPushConsumerImpl::start()");
-    switch (m_serviceState)
-    {
-        case CREATE_JUST:
-        {
-               RMQ_INFO("the consumer [{%s}] start beginning. 
messageModel={%s}",
-                m_pDefaultMQPushConsumer->getConsumerGroup().c_str(),
-                
getMessageModelString(m_pDefaultMQPushConsumer->getMessageModel()));
-
-            m_serviceState = START_FAILED;
-            checkConfig();
-            copySubscription();
-
-            if (m_pDefaultMQPushConsumer->getMessageModel() == CLUSTERING)
-            {
-                m_pDefaultMQPushConsumer->changeInstanceNameToPID();
-            }
-
-            m_pMQClientFactory = 
MQClientManager::getInstance()->getAndCreateMQClientFactory(*m_pDefaultMQPushConsumer);
-
-            
m_pRebalanceImpl->setConsumerGroup(m_pDefaultMQPushConsumer->getConsumerGroup());
-            
m_pRebalanceImpl->setMessageModel(m_pDefaultMQPushConsumer->getMessageModel());
-            
m_pRebalanceImpl->setAllocateMessageQueueStrategy(m_pDefaultMQPushConsumer->getAllocateMessageQueueStrategy());
-            m_pRebalanceImpl->setmQClientFactory(m_pMQClientFactory);
-
-            m_pPullAPIWrapper = new PullAPIWrapper(m_pMQClientFactory, 
m_pDefaultMQPushConsumer->getConsumerGroup());
-
-            if (m_pDefaultMQPushConsumer->getOffsetStore() != NULL)
-            {
-                m_pOffsetStore = m_pDefaultMQPushConsumer->getOffsetStore();
-            }
-            else
-            {
-                switch (m_pDefaultMQPushConsumer->getMessageModel())
-                {
-                    case BROADCASTING:
-                        m_pOffsetStore = new 
LocalFileOffsetStore(m_pMQClientFactory, 
m_pDefaultMQPushConsumer->getConsumerGroup());
-                        break;
-                    case CLUSTERING:
-                        m_pOffsetStore = new 
RemoteBrokerOffsetStore(m_pMQClientFactory, 
m_pDefaultMQPushConsumer->getConsumerGroup());
-                        break;
-                    default:
-                        break;
-                }
-            }
-
-            m_pOffsetStore->load();
-
-            if (dynamic_cast<MessageListenerOrderly*>(m_pMessageListenerInner) 
!= NULL)
-            {
-                m_consumeOrderly = true;
-                m_pConsumeMessageService =
-                    new ConsumeMessageOrderlyService(this, 
(MessageListenerOrderly*)m_pMessageListenerInner);
-            }
-            else if 
(dynamic_cast<MessageListenerConcurrently*>(m_pMessageListenerInner) != NULL)
-            {
-                m_consumeOrderly = false;
-                m_pConsumeMessageService =
-                    new ConsumeMessageConcurrentlyService(this, 
(MessageListenerConcurrently*)m_pMessageListenerInner);
-            }
-            m_pConsumeMessageService->start();
-
-            bool registerOK = 
m_pMQClientFactory->registerConsumer(m_pDefaultMQPushConsumer->getConsumerGroup(),
 this);
-            if (!registerOK)
-            {
-                m_serviceState = CREATE_JUST;
-                m_pConsumeMessageService->shutdown();
-                std::string str = "The consumer group[" + 
m_pDefaultMQPushConsumer->getConsumerGroup();
-                str += "] has been created before, specify another name 
please.";
-                THROW_MQEXCEPTION(MQClientException, str, -1);
-            }
-            m_pMQClientFactory->start();
-
-                       RMQ_INFO("the consumer [%s] start OK.", 
m_pDefaultMQPushConsumer->getConsumerGroup().c_str());
-            m_serviceState = RUNNING;
-        }
-        break;
-        case RUNNING:
-        case START_FAILED:
-        case SHUTDOWN_ALREADY:
-            THROW_MQEXCEPTION(MQClientException, "The PullConsumer service 
state not OK, maybe started once, ", -1);
-        default:
-            break;
-    }
-
-    updateTopicSubscribeInfoWhenSubscriptionChanged();
-    m_pMQClientFactory->sendHeartbeatToAllBrokerWithLock();
-    m_pMQClientFactory->rebalanceImmediately();
-}
-
-
-void DefaultMQPushConsumerImpl::shutdown()
-{
-    RMQ_DEBUG("DefaultMQPushConsumerImpl::shutdown()");
-    switch (m_serviceState)
-    {
-        case CREATE_JUST:
-            break;
-        case RUNNING:
-            m_pConsumeMessageService->shutdown();
-            persistConsumerOffset();
-            
m_pMQClientFactory->unregisterConsumer(m_pDefaultMQPushConsumer->getConsumerGroup());
-            m_pMQClientFactory->shutdown();
-
-            m_serviceState = SHUTDOWN_ALREADY;
-            break;
-        case SHUTDOWN_ALREADY:
-            break;
-        default:
-            break;
-    }
-}
-
-
-
-
-bool DefaultMQPushConsumerImpl::hasHook()
-{
-    return !m_hookList.empty();
-}
-
-void DefaultMQPushConsumerImpl::registerHook(ConsumeMessageHook* pHook)
-{
-    m_hookList.push_back(pHook);
-}
-
-void DefaultMQPushConsumerImpl::executeHookBefore(ConsumeMessageContext& 
context)
-{
-    std::list<ConsumeMessageHook*>::iterator it = m_hookList.begin();
-    for (; it != m_hookList.end(); it++)
-    {
-        try
-        {
-            (*it)->consumeMessageBefore(context);
-        }
-        catch (...)
-        {
-               RMQ_WARN("consumeMessageBefore exception");
-        }
-    }
-}
-
-void DefaultMQPushConsumerImpl::executeHookAfter(ConsumeMessageContext& 
context)
-{
-    std::list<ConsumeMessageHook*>::iterator it = m_hookList.begin();
-    for (; it != m_hookList.end(); it++)
-    {
-        try
-        {
-            (*it)->consumeMessageAfter(context);
-        }
-        catch (...)
-        {
-               RMQ_WARN("consumeMessageAfter exception");
-        }
-    }
-}
-
-void DefaultMQPushConsumerImpl::createTopic(const std::string& key, const 
std::string& newTopic, int queueNum)
-{
-    m_pMQClientFactory->getMQAdminImpl()->createTopic(key, newTopic, queueNum);
-}
-
-std::set<MessageQueue>* 
DefaultMQPushConsumerImpl::fetchSubscribeMessageQueues(const std::string& topic)
-{
-    std::map<std::string, std::set<MessageQueue> >& mqs =  
m_pRebalanceImpl->getTopicSubscribeInfoTable();
-    std::map<std::string, std::set<MessageQueue> >::iterator it = 
mqs.find(topic);
-
-    if (it == mqs.end())
-    {
-        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(topic);
-        mqs =  m_pRebalanceImpl->getTopicSubscribeInfoTable();
-        it = mqs.find(topic);
-    }
-
-    if (it == mqs.end())
-    {
-        THROW_MQEXCEPTION(MQClientException, "The topic[" + topic + "] not 
exist", -1);
-    }
-
-    std::set<MessageQueue>* result = new 
std::set<MessageQueue>(it->second.begin(), it->second.end());
-    return result;
-}
-
-DefaultMQPushConsumer* DefaultMQPushConsumerImpl::getDefaultMQPushConsumer()
-{
-    return m_pDefaultMQPushConsumer;
-}
-
-long long DefaultMQPushConsumerImpl::earliestMsgStoreTime(const MessageQueue& 
mq)
-{
-    return m_pMQClientFactory->getMQAdminImpl()->earliestMsgStoreTime(mq);
-}
-
-long long DefaultMQPushConsumerImpl::maxOffset(const MessageQueue& mq)
-{
-    return m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq);
-}
-
-long long DefaultMQPushConsumerImpl::minOffset(const MessageQueue& mq)
-{
-    return m_pMQClientFactory->getMQAdminImpl()->minOffset(mq);
-}
-
-OffsetStore* DefaultMQPushConsumerImpl::getOffsetStore()
-{
-    return m_pOffsetStore;
-}
-
-void DefaultMQPushConsumerImpl::setOffsetStore(OffsetStore* pOffsetStore)
-{
-    m_pOffsetStore = pOffsetStore;
-}
-
-//MQConsumerInner
-std::string DefaultMQPushConsumerImpl::groupName()
-{
-    return m_pDefaultMQPushConsumer->getConsumerGroup();
-}
-
-MessageModel DefaultMQPushConsumerImpl::messageModel()
-{
-    return m_pDefaultMQPushConsumer->getMessageModel();
-}
-
-ConsumeType DefaultMQPushConsumerImpl::consumeType()
-{
-    return CONSUME_PASSIVELY;
-}
-
-ConsumeFromWhere DefaultMQPushConsumerImpl::consumeFromWhere()
-{
-    return m_pDefaultMQPushConsumer->getConsumeFromWhere();
-}
-
-std::set<SubscriptionData> DefaultMQPushConsumerImpl::subscriptions()
-{
-    std::set<SubscriptionData> sds;
-    std::map<std::string, SubscriptionData>& subscription = 
m_pRebalanceImpl->getSubscriptionInner();
-    std::map<std::string, SubscriptionData>::iterator it = 
subscription.begin();
-    for (; it != subscription.end(); it++)
-    {
-        sds.insert(it->second);
-    }
-
-    return sds;
-}
-
-void DefaultMQPushConsumerImpl::doRebalance()
-{
-    if (m_pRebalanceImpl != NULL)
-    {
-        m_pRebalanceImpl->doRebalance();
-    }
-}
-
-void DefaultMQPushConsumerImpl::persistConsumerOffset()
-{
-    try
-    {
-        makeSureStateOK();
-
-        std::set<MessageQueue> mqs;
-        {
-               kpr::ScopedRLock<kpr::RWMutex> 
lock(m_pRebalanceImpl->getProcessQueueTableLock());
-               std::map<MessageQueue, ProcessQueue*>& processQueueTable = 
m_pRebalanceImpl->getProcessQueueTable();
-               RMQ_FOR_EACH(processQueueTable, it)
-               {
-                   mqs.insert(it->first);
-               }
-        }
-
-        m_pOffsetStore->persistAll(mqs);
-    }
-    catch (...)
-    {
-       RMQ_ERROR("persistConsumerOffset exception, group: %s",
-               m_pDefaultMQPushConsumer->getConsumerGroup().c_str());
-    }
-}
-
-void DefaultMQPushConsumerImpl::updateTopicSubscribeInfo(const std::string& 
topic, const std::set<MessageQueue>& info)
-{
-    std::map<std::string, SubscriptionData>& subTable = getSubscriptionInner();
-
-    if (subTable.find(topic) != subTable.end())
-    {
-        
m_pRebalanceImpl->getTopicSubscribeInfoTable().insert(std::pair<std::string, 
std::set<MessageQueue> >(topic, info));
-    }
-}
-
-std::map<std::string, SubscriptionData>& 
DefaultMQPushConsumerImpl::getSubscriptionInner()
-{
-    return m_pRebalanceImpl->getSubscriptionInner();
-}
-
-bool DefaultMQPushConsumerImpl::isSubscribeTopicNeedUpdate(const std::string& 
topic)
-{
-    std::map<std::string, SubscriptionData>& subTable = getSubscriptionInner();
-
-    if (subTable.find(topic) != subTable.end())
-    {
-        std::map<std::string, std::set<MessageQueue> >& mqs =
-            m_pRebalanceImpl->getTopicSubscribeInfoTable();
-
-        return mqs.find(topic) == mqs.end();
-    }
-
-    return false;
-}
-
-bool DefaultMQPushConsumerImpl::isPause()
-{
-    return m_pause;
-}
-
-void DefaultMQPushConsumerImpl::setPause(bool pause)
-{
-    m_pause = pause;
-}
-
-
-void DefaultMQPushConsumerImpl::correctTagsOffset(PullRequest& pullRequest)
-{
-    if (pullRequest.getProcessQueue()->getMsgCount().get() == 0)
-    {
-        m_pOffsetStore->updateOffset(pullRequest.getMessageQueue(), 
pullRequest.getNextOffset(), true);
-    }
-}
-
-void DefaultMQPushConsumerImpl::pullMessage(PullRequest* pPullRequest)
-{
-       RMQ_DEBUG("pullMessage begin: %s", pPullRequest->toString().c_str());
-
-    ProcessQueue* processQueue = pPullRequest->getProcessQueue();
-    if (processQueue->isDropped())
-    {
-        RMQ_WARN("the pull request[%s] is dropped.", 
pPullRequest->toString().c_str());
-        delete pPullRequest;
-        return;
-    }
-
-    
pPullRequest->getProcessQueue()->setLastPullTimestamp(KPRUtil::GetCurrentTimeMillis());
-
-    try
-    {
-        makeSureStateOK();
-    }
-    catch (const MQException& e)
-    {
-        RMQ_WARN("pullMessage exception [%s], consumer state not ok", 
e.what());
-        executePullRequestLater(pPullRequest, 
s_PullTimeDelayMillsWhenException);
-        return;
-    }
-
-    if (isPause())
-    {
-       RMQ_WARN("consumer was paused, execute pull request later. 
instanceName={%s}",
-                m_pDefaultMQPushConsumer->getInstanceName().c_str());
-        executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenSuspend);
-        return;
-    }
-
-    long size = processQueue->getMsgCount().get();
-    if (size > m_pDefaultMQPushConsumer->getPullThresholdForQueue())
-    {
-        executePullRequestLater(pPullRequest, 
s_PullTimeDelayMillsWhenFlowControl);
-        if ((flowControlTimes1++ % 3000) == 0)
-        {
-            RMQ_WARN("the consumer message buffer is full, so do flow control, 
{%ld} {%s} {%lld}", size,
-                    pPullRequest->toString().c_str(), flowControlTimes1);
-        }
-        return;
-    }
-
-    if (!m_consumeOrderly)
-    {
-        if (processQueue->getMaxSpan() > 
m_pDefaultMQPushConsumer->getConsumeConcurrentlyMaxSpan())
-        {
-            executePullRequestLater(pPullRequest, 
s_PullTimeDelayMillsWhenFlowControl);
-            if ((flowControlTimes2++ % 3000) == 0)
-            {
-                RMQ_WARN("the queue's messages, span too long, so do flow 
control, size: {%ld}, pullRequest: {%s}, times: {%lld}, maxspan: {%lld}",
-                       size, pPullRequest->toString().c_str(), 
flowControlTimes2, processQueue->getMaxSpan());
-            }
-            return;
-        }
-    }
-
-    std::map<std::string, SubscriptionData>& subTable = getSubscriptionInner();
-    std::string topic = pPullRequest->getMessageQueue().getTopic();
-    std::map<std::string, SubscriptionData>::iterator it = 
subTable.find(topic);
-    if (it == subTable.end())
-    {
-        executePullRequestLater(pPullRequest, 
s_PullTimeDelayMillsWhenException);
-        RMQ_WARN("find the consumer's subscription failed, {%s}", 
pPullRequest->toString().c_str());
-        return;
-    }
-
-    SubscriptionData subscriptionData = it->second;
-    PullCallback* pullCallback = new 
DefaultMQPushConsumerImplCallback(subTable[topic], this, pPullRequest);
-
-    bool commitOffsetEnable = false;
-    long commitOffsetValue = 0L;
-    if (CLUSTERING == m_pDefaultMQPushConsumer->getMessageModel())
-    {
-        commitOffsetValue = 
m_pOffsetStore->readOffset(pPullRequest->getMessageQueue(),
-                            READ_FROM_MEMORY);
-        if (commitOffsetValue > 0)
-        {
-            commitOffsetEnable = true;
-        }
-    }
-
-    int sysFlag = PullSysFlag::buildSysFlag(
-                      commitOffsetEnable, // commitOffset
-                      true, // suspend
-                      false// subscription
-                  );
-    try
-    {
-        m_pPullAPIWrapper->pullKernelImpl(
-            pPullRequest->getMessageQueue(), // 1
-            "", // 2
-            subscriptionData.getSubVersion(), // 3
-            pPullRequest->getNextOffset(), // 4
-            m_pDefaultMQPushConsumer->getPullBatchSize(), // 5
-            sysFlag, // 6
-            commitOffsetValue,// 7
-            s_BrokerSuspendMaxTimeMillis, // 8
-            s_ConsumerTimeoutMillisWhenSuspend, // 9
-            ASYNC, // 10
-            pullCallback// 11
-        );
-    }
-    catch (...)
-    {
-        RMQ_ERROR("pullKernelImpl exception");
-        executePullRequestLater(pPullRequest, 
s_PullTimeDelayMillsWhenException);
-    }
-
-    RMQ_DEBUG("pullMessage end");
-}
-
-void DefaultMQPushConsumerImpl::executePullRequestImmediately(PullRequest* 
pullRequest)
-{
-    
m_pMQClientFactory->getPullMessageService()->executePullRequestImmediately(pullRequest);
-}
-
-void DefaultMQPushConsumerImpl::executePullRequestLater(PullRequest* 
pullRequest, long timeDelay)
-{
-    
m_pMQClientFactory->getPullMessageService()->executePullRequestLater(pullRequest,
 timeDelay);
-}
-
-void DefaultMQPushConsumerImpl::executeTaskLater(kpr::TimerHandler* handler, 
long timeDelay)
-{
-    m_pMQClientFactory->getPullMessageService()->executeTaskLater(handler, 
timeDelay);
-}
-
-
-void DefaultMQPushConsumerImpl::makeSureStateOK()
-{
-    if (m_serviceState != RUNNING)
-    {
-        THROW_MQEXCEPTION(MQClientException, "The consumer service state not 
OK, ", -1);
-    }
-}
-
-ConsumerStatManager* DefaultMQPushConsumerImpl::getConsumerStatManager()
-{
-    return m_pConsumerStatManager;
-}
-
-QueryResult DefaultMQPushConsumerImpl::queryMessage(const std::string& topic,
-        const std::string&  key,
-        int maxNum,
-        long long begin,
-        long long end)
-{
-    return m_pMQClientFactory->getMQAdminImpl()->queryMessage(topic, key, 
maxNum, begin, end);
-}
-
-void DefaultMQPushConsumerImpl::registerMessageListener(MessageListener* 
pMessageListener)
-{
-    m_pMessageListenerInner = pMessageListener;
-}
-
-void DefaultMQPushConsumerImpl::resume()
-{
-    m_pause = false;
-}
-
-long long DefaultMQPushConsumerImpl::searchOffset(const MessageQueue& mq, long 
long timestamp)
-{
-    return m_pMQClientFactory->getMQAdminImpl()->searchOffset(mq, timestamp);
-}
-
-void DefaultMQPushConsumerImpl::sendMessageBack(MessageExt& msg, int 
delayLevel, const std::string& brokerName)
-{
-    try
-    {
-       std::string brokerAddr = brokerName.empty() ?
-               socketAddress2IPPort(msg.getStoreHost()) : 
m_pMQClientFactory->findBrokerAddressInPublish(brokerName);
-
-        
m_pMQClientFactory->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, 
msg,
-                m_pDefaultMQPushConsumer->getConsumerGroup(),
-                delayLevel,
-                5000);
-    }
-    catch (...)
-    {
-       RMQ_ERROR("sendMessageBack Exception, group: %s", 
m_pDefaultMQPushConsumer->getConsumerGroup().c_str());
-        Message 
newMsg(MixAll::getRetryTopic(m_pDefaultMQPushConsumer->getConsumerGroup()),
-                       msg.getBody(), msg.getBodyLen());
-
-               std::string originMsgId = 
msg.getProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID);
-               newMsg.putProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID, 
UtilAll::isBlank(originMsgId) ? msg.getMsgId()
-                    : originMsgId);
-
-        newMsg.setFlag(msg.getFlag());
-        newMsg.setProperties(msg.getProperties());
-        newMsg.putProperty(Message::PROPERTY_RETRY_TOPIC, msg.getTopic());
-
-        int reTimes = msg.getReconsumeTimes() + 1;
-        newMsg.putProperty(Message::PROPERTY_RECONSUME_TIME, 
UtilAll::toString(reTimes));
-        newMsg.putProperty(Message::PROPERTY_MAX_RECONSUME_TIMES, 
UtilAll::toString(m_pDefaultMQPushConsumer->getMaxReconsumeTimes()));
-        newMsg.setDelayTimeLevel(3 + reTimes);
-
-        m_pMQClientFactory->getDefaultMQProducer()->send(newMsg);
-    }
-}
-
-void DefaultMQPushConsumerImpl::checkConfig()
-{
-    // consumerGroup check
-    Validators::checkGroup(m_pDefaultMQPushConsumer->getConsumerGroup());
-
-    // consumerGroup
-    if (m_pDefaultMQPushConsumer->getConsumerGroup() == 
MixAll::DEFAULT_CONSUMER_GROUP)
-    {
-        THROW_MQEXCEPTION(MQClientException, "consumerGroup can not equal "
-                          + MixAll::DEFAULT_CONSUMER_GROUP //
-                          + ", please specify another one.", -1);
-    }
-
-    if (m_pDefaultMQPushConsumer->getMessageModel() != BROADCASTING
-        && m_pDefaultMQPushConsumer->getMessageModel() != CLUSTERING)
-    {
-        THROW_MQEXCEPTION(MQClientException, "messageModel is invalid ", -1);
-    }
-
-    // allocateMessageQueueStrategy
-    if (m_pDefaultMQPushConsumer->getAllocateMessageQueueStrategy() == NULL)
-    {
-        THROW_MQEXCEPTION(MQClientException, "allocateMessageQueueStrategy is 
null", -1);
-    }
-
-    // consumeFromWhereOffset
-    if (m_pDefaultMQPushConsumer->getConsumeFromWhere() < 
CONSUME_FROM_LAST_OFFSET
-        || m_pDefaultMQPushConsumer->getConsumeFromWhere() > 
CONSUME_FROM_MAX_OFFSET)
-    {
-        THROW_MQEXCEPTION(MQClientException, "consumeFromWhere is invalid", 
-1);
-    }
-
-    // subscription
-    /*
-    if (m_pDefaultMQPushConsumer->getSubscription().size() == 0)
-    {
-      THROW_MQEXCEPTION(MQClientException,"subscription is null" ,-1);
-    }
-    */
-
-    // messageListener
-    if (m_pDefaultMQPushConsumer->getMessageListener() == NULL)
-    {
-        THROW_MQEXCEPTION(MQClientException, "messageListener is null", -1);
-    }
-
-    MessageListener* listener = m_pDefaultMQPushConsumer->getMessageListener();
-    MessageListener* orderly = 
(dynamic_cast<MessageListenerOrderly*>(listener)) ;
-    MessageListener* concurrently = 
(dynamic_cast<MessageListenerConcurrently*>(listener)) ;
-
-    if (!orderly && !concurrently)
-    {
-        THROW_MQEXCEPTION(MQClientException,
-                          "messageListener must be instanceof 
MessageListenerOrderly or MessageListenerConcurrently" ,
-                          -1);
-    }
-
-    // consumeThreadMin
-    if (m_pDefaultMQPushConsumer->getConsumeThreadMin() < 1
-        || m_pDefaultMQPushConsumer->getConsumeThreadMin() > 1000
-        || m_pDefaultMQPushConsumer->getConsumeThreadMin() > 
m_pDefaultMQPushConsumer->getConsumeThreadMax()
-       )
-    {
-        THROW_MQEXCEPTION(MQClientException, "consumeThreadMin Out of range 
[1, 1000]", -1);
-    }
-
-    // consumeThreadMax
-    if (m_pDefaultMQPushConsumer->getConsumeThreadMax() < 1
-        || m_pDefaultMQPushConsumer->getConsumeThreadMax() > 1000)
-    {
-        THROW_MQEXCEPTION(MQClientException, "consumeThreadMax Out of range 
[1, 1000]", -1);
-    }
-
-    // consumeConcurrentlyMaxSpan
-    if (m_pDefaultMQPushConsumer->getConsumeConcurrentlyMaxSpan() < 1
-        || m_pDefaultMQPushConsumer->getConsumeConcurrentlyMaxSpan() > 65535)
-    {
-        THROW_MQEXCEPTION(MQClientException, "consumeConcurrentlyMaxSpan Out 
of range [1, 65535]" , -1);
-    }
-
-    // pullThresholdForQueue
-    if (m_pDefaultMQPushConsumer->getPullThresholdForQueue() < 1
-        || m_pDefaultMQPushConsumer->getPullThresholdForQueue() > 65535)
-    {
-        THROW_MQEXCEPTION(MQClientException, "pullThresholdForQueue Out of 
range [1, 65535]", -1);
-    }
-
-    // pullInterval
-    if (m_pDefaultMQPushConsumer->getPullInterval() < 0
-        || m_pDefaultMQPushConsumer->getPullInterval() > 65535)
-    {
-        THROW_MQEXCEPTION(MQClientException, "pullInterval Out of range [0, 
65535]", -1);
-    }
-
-    // consumeMessageBatchMaxSize
-    if (m_pDefaultMQPushConsumer->getConsumeMessageBatchMaxSize() < 1
-        || m_pDefaultMQPushConsumer->getConsumeMessageBatchMaxSize() > 1024)
-    {
-        THROW_MQEXCEPTION(MQClientException, "consumeMessageBatchMaxSize Out 
of range [1, 1024]", -1);
-    }
-
-    // pullBatchSize
-    if (m_pDefaultMQPushConsumer->getPullBatchSize() < 1
-        || m_pDefaultMQPushConsumer->getPullBatchSize() > 1024)
-    {
-        THROW_MQEXCEPTION(MQClientException, "pullBatchSize Out of range [1, 
1024]", -1);
-    }
-}
-
-void DefaultMQPushConsumerImpl::copySubscription()
-{
-    try
-    {
-        std::map<std::string, std::string>& sub = 
m_pDefaultMQPushConsumer->getSubscription();
-        std::map<std::string, std::string>::iterator it = sub.begin();
-        for (; it != sub.end(); it++)
-        {
-            SubscriptionDataPtr subscriptionData = 
FilterAPI::buildSubscriptionData(it->first, it->second);
-            m_pRebalanceImpl->getSubscriptionInner()[it->first] = 
*subscriptionData;
-        }
-
-        if (m_pMessageListenerInner == NULL)
-        {
-            m_pMessageListenerInner = 
m_pDefaultMQPushConsumer->getMessageListener();
-        }
-
-        switch (m_pDefaultMQPushConsumer->getMessageModel())
-        {
-            case BROADCASTING:
-                break;
-            case CLUSTERING:
-            {
-                std::string retryTopic = 
MixAll::getRetryTopic(m_pDefaultMQPushConsumer->getConsumerGroup());
-                SubscriptionDataPtr subscriptionData =
-                    FilterAPI::buildSubscriptionData(retryTopic, 
SubscriptionData::SUB_ALL);
-                m_pRebalanceImpl->getSubscriptionInner()[retryTopic] = 
*subscriptionData;
-            }
-
-            break;
-            default:
-                break;
-        }
-    }
-    catch (...)
-    {
-        THROW_MQEXCEPTION(MQClientException, "subscription exception", -1);
-    }
-}
-
-void 
DefaultMQPushConsumerImpl::updateTopicSubscribeInfoWhenSubscriptionChanged()
-{
-    std::map<std::string, SubscriptionData> subTable = getSubscriptionInner();
-    std::map<std::string, SubscriptionData>::iterator it = subTable.begin();
-    for (; it != subTable.end(); it++)
-    {
-        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(it->first);
-    }
-}
-
-MessageListener* DefaultMQPushConsumerImpl::getMessageListenerInner()
-{
-    return m_pMessageListenerInner;
-}
-
-void DefaultMQPushConsumerImpl::subscribe(const std::string& topic, const 
std::string& subExpression)
-{
-    try
-    {
-        SubscriptionDataPtr subscriptionData = 
FilterAPI::buildSubscriptionData(topic, subExpression);
-        m_pRebalanceImpl->getSubscriptionInner()[topic] = *subscriptionData;
-
-        if (m_pMQClientFactory)
-        {
-            m_pMQClientFactory->sendHeartbeatToAllBrokerWithLock();
-        }
-    }
-    catch (...)
-    {
-        THROW_MQEXCEPTION(MQClientException, "subscription exception", -1);
-    }
-}
-
-void DefaultMQPushConsumerImpl::suspend()
-{
-    m_pause = true;
-}
-
-void DefaultMQPushConsumerImpl::unsubscribe(const std::string& topic)
-{
-    m_pRebalanceImpl->getSubscriptionInner().erase(topic);
-}
-
-void DefaultMQPushConsumerImpl::updateConsumeOffset(MessageQueue& mq, long 
long offset)
-{
-    m_pOffsetStore->updateOffset(mq, offset, false);
-}
-
-void DefaultMQPushConsumerImpl::updateCorePoolSize(int corePoolSize)
-{
-    m_pConsumeMessageService->updateCorePoolSize(corePoolSize);
-}
-
-MessageExt* DefaultMQPushConsumerImpl::viewMessage(const std::string& msgId)
-{
-    return m_pMQClientFactory->getMQAdminImpl()->viewMessage(msgId);
-}
-
-RebalanceImpl* DefaultMQPushConsumerImpl::getRebalanceImpl()
-{
-    return m_pRebalanceImpl;
-}
-
-bool DefaultMQPushConsumerImpl::isConsumeOrderly()
-{
-    return m_consumeOrderly;
-}
-
-void DefaultMQPushConsumerImpl::setConsumeOrderly(bool consumeOrderly)
-{
-    m_consumeOrderly = consumeOrderly;
-}
-
-
-MQClientFactory* DefaultMQPushConsumerImpl::getmQClientFactory()
-{
-    return m_pMQClientFactory;
-}
-
-void DefaultMQPushConsumerImpl::setmQClientFactory(MQClientFactory* 
mQClientFactory)
-{
-    m_pMQClientFactory = mQClientFactory;
-}
-
-
-ServiceState DefaultMQPushConsumerImpl::getServiceState()
-{
-    return m_serviceState;
-}
-
-void DefaultMQPushConsumerImpl::setServiceState(ServiceState serviceState)
-{
-    m_serviceState = serviceState;
-}
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.h 
b/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.h
deleted file mode 100755
index 5370586..0000000
--- a/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.h
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,[email protected]
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __DEFAULTMQPUSHCONSUMERIMPL_H__
-#define __DEFAULTMQPUSHCONSUMERIMPL_H__
-
-#include <string>
-#include <set>
-#include <map>
-
-#include "MQConsumerInner.h"
-#include "MessageExt.h"
-#include "QueryResult.h"
-#include "ServiceState.h"
-#include "PullResult.h"
-#include "ConsumeMessageHook.h"
-#include "MixAll.h"
-#include "PullCallback.h"
-#include "TimerThread.h"
-
-namespace rmq
-{
-    class DefaultMQPushConsumer;
-    class ConsumeMessageHook;
-    class OffsetStore;
-    class RebalanceImpl;
-    class ConsumerStatManager;
-    class ConsumeMessageService;
-    class MessageListener;
-    class PullRequest;
-    class MQClientFactory;
-    class PullAPIWrapper;
-    class PullMessageService;
-    class DefaultMQPushConsumerImplCallback;
-       class MQException;
-
-    /**
-    * Push Consumer Impl
-    *
-    */
-    class DefaultMQPushConsumerImpl : public  MQConsumerInner
-    {
-    public:
-        DefaultMQPushConsumerImpl(DefaultMQPushConsumer* 
pDefaultMQPushConsumer);
-               ~DefaultMQPushConsumerImpl();
-
-        void start();
-        void suspend();
-        void resume();
-        void shutdown();
-        bool isPause();
-        void setPause(bool pause);
-
-        bool hasHook();
-        void registerHook(ConsumeMessageHook* pHook);
-        void executeHookBefore(ConsumeMessageContext& context);
-        void executeHookAfter(ConsumeMessageContext& context);
-
-        void createTopic(const std::string& key, const std::string& newTopic, 
int queueNum);
-        std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& 
topic);
-
-        long long earliestMsgStoreTime(const MessageQueue& mq);
-        long long maxOffset(const MessageQueue& mq);
-        long long minOffset(const MessageQueue& mq);
-        OffsetStore* getOffsetStore() ;
-        void setOffsetStore(OffsetStore* pOffsetStore);
-
-        //MQConsumerInner
-        std::string groupName() ;
-        MessageModel messageModel() ;
-        ConsumeType consumeType();
-        ConsumeFromWhere consumeFromWhere();
-        std::set<SubscriptionData> subscriptions();
-        void doRebalance() ;
-        void persistConsumerOffset() ;
-        void updateTopicSubscribeInfo(const std::string& topic, const 
std::set<MessageQueue>& info);
-        std::map<std::string, SubscriptionData>& getSubscriptionInner() ;
-        bool isSubscribeTopicNeedUpdate(const std::string& topic);
-
-        MessageExt* viewMessage(const std::string& msgId);
-        QueryResult queryMessage(const std::string& topic,
-                                 const std::string&  key,
-                                 int maxNum,
-                                 long long begin,
-                                 long long end);
-
-        void registerMessageListener(MessageListener* pMessageListener);
-        long long searchOffset(const MessageQueue& mq, long long timestamp);
-        void sendMessageBack(MessageExt& msg, int delayLevel, const 
std::string& brokerName);
-
-        void subscribe(const std::string& topic, const std::string& 
subExpression);
-        void unsubscribe(const std::string& topic);
-
-        void updateConsumeOffset(MessageQueue& mq, long long offset);
-        void updateCorePoolSize(int corePoolSize);
-        bool isConsumeOrderly();
-        void setConsumeOrderly(bool consumeOrderly);
-
-        RebalanceImpl* getRebalanceImpl() ;
-        MessageListener* getMessageListenerInner();
-        DefaultMQPushConsumer* getDefaultMQPushConsumer() ;
-        ConsumerStatManager* getConsumerStatManager();
-
-               MQClientFactory* getmQClientFactory();
-               void setmQClientFactory(MQClientFactory* mQClientFactory);
-
-               ServiceState getServiceState();
-               void setServiceState(ServiceState serviceState);
-
-    private:
-        void correctTagsOffset(PullRequest& pullRequest) ;
-
-        void pullMessage(PullRequest* pPullRequest);
-
-
-        void executePullRequestImmediately(PullRequest* pullRequest);
-
-
-        void executePullRequestLater(PullRequest* pullRequest, long timeDelay);
-               void executeTaskLater(kpr::TimerHandler* handler, long 
timeDelay);
-
-        void makeSureStateOK();
-        void checkConfig();
-        void copySubscription() ;
-        void updateTopicSubscribeInfoWhenSubscriptionChanged();
-
-    private:
-               static const int s_PullTimeDelayMillsWhenException = 3000;
-               static const int s_PullTimeDelayMillsWhenFlowControl = 50;
-               static const int s_PullTimeDelayMillsWhenSuspend = 1000;
-               static const int s_BrokerSuspendMaxTimeMillis = 15000;
-               static const int s_ConsumerTimeoutMillisWhenSuspend = 30000;
-
-        long long flowControlTimes1;
-        long long flowControlTimes2;
-        ServiceState m_serviceState;
-        volatile bool m_pause;
-        bool m_consumeOrderly;
-        DefaultMQPushConsumer* m_pDefaultMQPushConsumer;
-        MQClientFactory* m_pMQClientFactory;
-        PullAPIWrapper* m_pPullAPIWrapper;
-        MessageListener* m_pMessageListenerInner;
-        OffsetStore* m_pOffsetStore;
-        RebalanceImpl* m_pRebalanceImpl;
-        ConsumerStatManager* m_pConsumerStatManager;
-        ConsumeMessageService* m_pConsumeMessageService;
-
-        std::list<ConsumeMessageHook*> m_hookList;
-        friend class PullMessageService;
-        friend class RebalancePushImpl;
-        friend class DefaultMQPushConsumerImplCallback;
-    };
-}
-
-#endif
-

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.cpp 
b/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.cpp
deleted file mode 100755
index 40e9d65..0000000
--- a/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.cpp
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,[email protected]
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "LocalFileOffsetStore.h"
-
-#include "MQClientFactory.h"
-#include "OffsetSerializeWrapper.h"
-#include "ScopedLock.h"
-#include "FileUtil.h"
-#include "MixAll.h"
-#include "Exception.h"
-#include "MQClientException.h"
-
-namespace rmq
-{
-
-LocalFileOffsetStore::LocalFileOffsetStore(MQClientFactory* pMQClientFactory,
-        const std::string& groupName)
-{
-    m_pMQClientFactory = pMQClientFactory;
-    m_groupName = groupName;
-    std::string homePath = getenv("HOME");
-    m_storePath = homePath + "/.rocketmq_offsets/" + 
m_pMQClientFactory->getClientId()
-                  + "/" + m_groupName + "/offsets.json";
-}
-
-void  LocalFileOffsetStore::load()
-{
-    OffsetSerializeWrapperPtr offsetSerializeWrapper = this->readLocalOffset();
-    if (offsetSerializeWrapper.ptr() != NULL
-        && offsetSerializeWrapper->getOffsetTable().size() > 0)
-    {
-        kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex);
-        m_offsetTable = offsetSerializeWrapper->getOffsetTable();
-        RMQ_FOR_EACH(m_offsetTable, it)
-        {
-            const MessageQueue& mq = it->first;
-            const kpr::AtomicLong& offset = it->second;
-            RMQ_INFO("load consumer's offset, {%s} {%s} {%lld}",
-                     m_groupName.c_str(),
-                     mq.toString().c_str(),
-                     offset.get());
-        }
-    }
-}
-
-
-void  LocalFileOffsetStore::updateOffset(const MessageQueue& mq, long long 
offset, bool increaseOnly)
-{
-    RMQ_DEBUG("updateOffset, MQ:%s, offset:%lld", mq.toString().c_str(), 
offset);
-    kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex);
-    typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq);
-    if (it == m_offsetTable.end())
-    {
-        m_offsetTable[mq] = offset;
-        it = m_offsetTable.find(mq);
-    }
-
-    kpr::AtomicLong& offsetOld = it->second;
-    if (increaseOnly)
-    {
-        MixAll::compareAndIncreaseOnly(offsetOld, offset);
-    }
-    else
-    {
-        offsetOld.set(offset);
-    }
-}
-
-long long  LocalFileOffsetStore::readOffset(const MessageQueue& mq, 
ReadOffsetType type)
-{
-    RMQ_DEBUG("readOffset, MQ:%s, type:%d", mq.toString().c_str(), type);
-    switch (type)
-    {
-        case MEMORY_FIRST_THEN_STORE:
-        case READ_FROM_MEMORY:
-        {
-            kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
-            typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq);
-            if (it != m_offsetTable.end())
-            {
-                return it->second.get();
-            }
-            else if (READ_FROM_MEMORY == type)
-            {
-                RMQ_WARN("No offset in memory, MQ:%s", mq.toString().c_str());
-                return -1;
-            }
-        }
-        case READ_FROM_STORE:
-        {
-            OffsetSerializeWrapperPtr offsetSerializeWrapper;
-            try
-            {
-                offsetSerializeWrapper = this->readLocalOffset();
-            }
-            catch (std::exception& e)
-            {
-                RMQ_WARN("load offset file fail, MQ:%s, exception:%s", 
mq.toString().c_str(), e.what());
-                return -1;
-            }
-
-            if (offsetSerializeWrapper.ptr() != NULL)
-            {
-                std::map<MessageQueue, kpr::AtomicLong>& offsetTable = 
offsetSerializeWrapper->getOffsetTable();
-                typeof(offsetTable.begin()) it = offsetTable.find(mq);
-                if (it != offsetTable.end())
-                {
-                    kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex);
-                    m_offsetTable[mq] = it->second.get();
-                    return it->second.get();
-                }
-            }
-            return -1;
-        }
-        default:
-            break;
-    }
-
-    return -1;
-}
-
-
-void  LocalFileOffsetStore::persistAll(std::set<MessageQueue>& mqs)
-{
-    RMQ_DEBUG("persistAll, mqs.size={%u}, mqs=%s",
-              (unsigned)mqs.size(), UtilAll::toString(mqs).c_str());
-    if (mqs.empty())
-    {
-        return;
-    }
-    RMQ_DEBUG("persistAll, m_offsetTable.size={%u}, m_offsetTable=%s",
-              (unsigned)m_offsetTable.size(), 
UtilAll::toString(m_offsetTable).c_str());
-
-    OffsetSerializeWrapper offsetSerializeWrapper;
-    std::map<MessageQueue, kpr::AtomicLong>& offsetTable = 
offsetSerializeWrapper.getOffsetTable();
-    {
-        kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
-        RMQ_FOR_EACH(m_offsetTable, it)
-        {
-            MessageQueue mq = it->first;
-            kpr::AtomicLong& offset = it->second;
-            if (mqs.find(mq) != mqs.end())
-            {
-                offsetTable[mq] = offset;
-            }
-        }
-    }
-
-    RMQ_DEBUG("persistAll, offsetTable.size={%u}, offsetTable=%s",
-              (unsigned)offsetTable.size(), 
UtilAll::toString(offsetTable).c_str());
-
-    std::string jsonString;
-    offsetSerializeWrapper.encode(jsonString);
-    RMQ_DEBUG("persistAll, json=%s", jsonString.c_str());
-
-    if (!jsonString.empty())
-    {
-        try
-        {
-            
kpr::FileUtil::makeDirRecursive(kpr::FileUtil::extractFilePath(m_storePath));
-            MixAll::string2File(m_storePath, jsonString);
-        }
-        catch (const std::exception& e)
-        {
-            RMQ_ERROR("persistAll consumer offset Exception, %s, %s", 
m_storePath.c_str(), e.what());
-        }
-    }
-}
-
-void  LocalFileOffsetStore::persist(const MessageQueue& mq)
-{
-}
-
-void  LocalFileOffsetStore::removeOffset(const MessageQueue& mq)
-{
-}
-
-
-std::map<MessageQueue, long long> LocalFileOffsetStore::cloneOffsetTable(const 
std::string& topic)
-{
-    kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
-    std::map<MessageQueue, long long> cloneOffsetTable;
-    RMQ_FOR_EACH(m_offsetTable, it)
-    {
-        MessageQueue mq = it->first;
-        kpr::AtomicLong& offset = it->second;
-        if (topic == mq.getTopic())
-        {
-            cloneOffsetTable[mq] = offset.get();
-        }
-    }
-
-    return cloneOffsetTable;
-}
-
-
-OffsetSerializeWrapper* LocalFileOffsetStore::readLocalOffset()
-{
-    std::string content = MixAll::file2String(m_storePath);
-    if (content.length() == 0)
-    {
-        return this->readLocalOffsetBak();
-    }
-    else
-    {
-        OffsetSerializeWrapper* offsetSerializeWrapper = NULL;
-        try
-        {
-            offsetSerializeWrapper = 
OffsetSerializeWrapper::decode(content.c_str(), content.size());
-        }
-        catch (const MQException& e)
-        {
-            RMQ_WARN("readLocalOffset Exception, and try to correct, %s", 
e.what());
-            return this->readLocalOffsetBak();
-        }
-
-        return offsetSerializeWrapper;
-    }
-}
-
-
-OffsetSerializeWrapper* LocalFileOffsetStore::readLocalOffsetBak()
-{
-    std::string content = MixAll::file2String(m_storePath + ".bak");
-    if (content.length() > 0)
-    {
-        OffsetSerializeWrapper* offsetSerializeWrapper = NULL;
-        try
-        {
-            offsetSerializeWrapper = 
OffsetSerializeWrapper::decode(content.c_str(), content.size());
-        }
-        catch (const MQException& e)
-        {
-            RMQ_WARN("readLocalOffset Exception, maybe json content invalid, 
%s", e.what());
-        }
-
-        return offsetSerializeWrapper;
-    }
-
-    return NULL;
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.h 
b/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.h
deleted file mode 100755
index c4efb76..0000000
--- a/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
-* Copyright (C) 2013 suwenkuang ,[email protected]
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __LOCALFILEOFFSETSTORE_H__
-#define __LOCALFILEOFFSETSTORE_H__
-#include <map>
-#include <string>
-#include <set>
-
-#include "RocketMQClient.h"
-#include "OffsetStore.h"
-#include "MessageQueue.h"
-#include "AtomicValue.h"
-#include "Mutex.h"
-
-namespace rmq
-{
-    class MQClientFactory;
-    class MessageQueue;
-    class OffsetSerializeWrapper;
-
-    class LocalFileOffsetStore : public OffsetStore
-    {
-    public:
-        LocalFileOffsetStore(MQClientFactory* pMQClientFactory, const 
std::string& groupName);
-
-        void load();
-        void updateOffset(const MessageQueue& mq, long long offset, bool 
increaseOnly);
-        long long readOffset(const MessageQueue& mq, ReadOffsetType type);
-        void persistAll(std::set<MessageQueue>& mqs);
-        void persist(const MessageQueue& mq);
-        void removeOffset(const MessageQueue& mq) ;
-        std::map<MessageQueue, long long> cloneOffsetTable(const std::string& 
topic);
-
-    private:
-        OffsetSerializeWrapper* readLocalOffset();
-        OffsetSerializeWrapper* readLocalOffsetBak();
-
-    private:
-        MQClientFactory* m_pMQClientFactory;
-        std::string m_groupName;
-        std::string m_storePath;
-        std::map<MessageQueue, kpr::AtomicLong> m_offsetTable;
-        kpr::RWMutex m_tableMutex;
-    };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/MQConsumerInner.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/MQConsumerInner.h 
b/rocketmq-client4cpp/src/consumer/MQConsumerInner.h
deleted file mode 100755
index ed83621..0000000
--- a/rocketmq-client4cpp/src/consumer/MQConsumerInner.h
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,[email protected]
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __MQCONSUMERINNER_H__
-#define __MQCONSUMERINNER_H__
-
-#include <string>
-#include <set>
-
-#include "ConsumeType.h"
-#include "SubscriptionData.h"
-
-namespace rmq
-{
-    class MessageQueue;
-
-    class MQConsumerInner
-    {
-    public:
-        virtual ~MQConsumerInner() {}
-        virtual std::string groupName() = 0;
-        virtual MessageModel messageModel() = 0;
-        virtual ConsumeType consumeType() = 0;
-        virtual ConsumeFromWhere consumeFromWhere() = 0;
-        virtual std::set<SubscriptionData> subscriptions() = 0;
-        virtual void doRebalance() = 0;
-        virtual void persistConsumerOffset() = 0;
-        virtual void updateTopicSubscribeInfo(const std::string& topic, const 
std::set<MessageQueue>& info) = 0;
-        virtual bool isSubscribeTopicNeedUpdate(const std::string& topic) = 0;
-    };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/MessageQueueLock.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/MessageQueueLock.h 
b/rocketmq-client4cpp/src/consumer/MessageQueueLock.h
deleted file mode 100755
index 65af99e..0000000
--- a/rocketmq-client4cpp/src/consumer/MessageQueueLock.h
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,[email protected]
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __MESSAGEQUEUELOCK_H__
-#define __MESSAGEQUEUELOCK_H__
-
-#include <map>
-#include "Mutex.h"
-#include "ScopedLock.h"
-#include "MessageQueue.h"
-
-namespace rmq
-{
-    class MessageQueueLock
-    {
-    public:
-        MessageQueueLock()
-        {
-
-        }
-
-        ~MessageQueueLock()
-        {
-            std::map<MessageQueue, kpr::Mutex*>::iterator it = 
m_mqLockTable.begin();
-
-            for (; it != m_mqLockTable.end(); it++)
-            {
-                delete it->second;
-            }
-        }
-
-        kpr::Mutex* fetchLockObject(MessageQueue& mq)
-        {
-            kpr::ScopedLock<kpr::Mutex> lock(m_lock);
-            std::map<MessageQueue, kpr::Mutex*>::iterator it = 
m_mqLockTable.find(mq);
-            kpr::Mutex* objLock;
-            if (it == m_mqLockTable.end())
-            {
-                objLock = new kpr::Mutex();
-                m_mqLockTable[mq] = objLock;
-            }
-            else
-            {
-                objLock = it->second;
-            }
-
-            return objLock;
-        }
-
-    private:
-        std::map<MessageQueue, kpr::Mutex*> m_mqLockTable;
-        kpr::Mutex m_lock;
-    };
-}
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ProcessQueue.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ProcessQueue.cpp 
b/rocketmq-client4cpp/src/consumer/ProcessQueue.cpp
deleted file mode 100755
index f90e502..0000000
--- a/rocketmq-client4cpp/src/consumer/ProcessQueue.cpp
+++ /dev/null
@@ -1,445 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,[email protected]
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "ProcessQueue.h"
-#include "MessageExt.h"
-#include "KPRUtil.h"
-#include "UtilAll.h"
-#include "ScopedLock.h"
-#include "DefaultMQPushConsumer.h"
-#include "DefaultMQPushConsumerImpl.h"
-
-namespace rmq
-{
-
-ProcessQueue::ProcessQueue()
-{
-    m_lastPullTimestamp = KPRUtil::GetCurrentTimeMillis();
-    m_lastConsumeTimestamp = KPRUtil::GetCurrentTimeMillis();
-    m_queueOffsetMax = 0L;
-    m_msgCount = 0;
-    m_dropped = false;
-    m_locked = false;
-    m_lastLockTimestamp = KPRUtil::GetCurrentTimeMillis();
-    m_consuming = false;
-}
-
-bool ProcessQueue::isLockExpired()
-{
-    bool result = (KPRUtil::GetCurrentTimeMillis() - m_lastLockTimestamp) >
-                  s_RebalanceLockMaxLiveTime;
-    return result;
-}
-
-bool ProcessQueue::isPullExpired()
-{
-    bool result = (KPRUtil::GetCurrentTimeMillis() - m_lastPullTimestamp) >
-                  s_PullMaxIdleTime;
-    return result;
-}
-
-
-void ProcessQueue::cleanExpiredMsg(DefaultMQPushConsumer* pPushConsumer)
-{
-    if (pPushConsumer->getDefaultMQPushConsumerImpl()->isConsumeOrderly())
-    {
-        return;
-    }
-
-       long long now = KPRUtil::GetCurrentTimeMillis();
-    int loop = m_msgTreeMap.size() < 16 ? m_msgTreeMap.size() : 16;
-    for (int i = 0; i < loop; i++)
-    {
-        MessageExt* msg = NULL;
-        try
-        {
-            kpr::ScopedRLock<kpr::RWMutex> lock(m_lockTreeMap);
-            if (m_msgTreeMap.empty())
-            {
-               return;
-            }
-
-            MessageExt* firstMsg = m_msgTreeMap.begin()->second;
-            long long startTimestamp = 
UtilAll::str2ll(firstMsg->getProperty(Message::PROPERTY_CONSUME_START_TIMESTAMP).c_str());
-            if (startTimestamp > 0 && (now - startTimestamp) > 
(pPushConsumer->getConsumeTimeout() * 60 * 1000))
-            {
-                msg = firstMsg;
-            }
-            else
-            {
-                return;
-            }
-        }
-        catch (...)
-        {
-            RMQ_ERROR("getExpiredMsg exception");
-        }
-
-        try
-        {
-            pPushConsumer->sendMessageBack((*msg), 3);
-            RMQ_WARN("send expire msg back. topic={%s}, msgId={%s}, 
storeHost={%s}, queueId={%d}, queueOffset={%lld}",
-                     msg->getTopic().c_str(), msg->getMsgId().c_str(), 
msg->getStoreHostString().c_str(),
-                     msg->getQueueId(), msg->getQueueOffset());
-
-            try
-            {
-                kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap);
-                if (!m_msgTreeMap.empty() && msg->getQueueOffset() == 
m_msgTreeMap.begin()->first)
-                {
-                    try
-                    {
-                        m_msgTreeMap.erase(m_msgTreeMap.begin());
-                        m_msgCount -= 1;
-                        // if free msg, may be coredump
-                        //delete msg;
-                    }
-                    catch (...)
-                    {
-                        RMQ_ERROR("send expired msg exception");
-                    }
-                }
-
-            }
-            catch (...)
-            {
-                RMQ_ERROR("delExpiredMsg exception");
-            }
-        }
-        catch (...)
-        {
-            RMQ_ERROR("send expired msg exception");
-        }
-    }
-}
-
-
-bool ProcessQueue::putMessage(const std::list<MessageExt *> &msgs)
-{
-    bool dispathToConsume = false;
-
-    try
-    {
-        kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap);
-        int validMsgCnt = 0;
-        std::list<MessageExt *>::const_iterator it = msgs.begin();
-
-        for (; it != msgs.end(); it++)
-        {
-            MessageExt *msg = (*it);
-
-            if (m_msgTreeMap.find(msg->getQueueOffset()) == m_msgTreeMap.end())
-            {
-                validMsgCnt++;
-                m_queueOffsetMax = msg->getQueueOffset();
-            }
-
-            m_msgTreeMap[msg->getQueueOffset()] = msg;
-        }
-
-        m_msgCount += validMsgCnt;
-
-        if (!m_msgTreeMap.empty() && !m_consuming)
-        {
-            dispathToConsume = true;
-            m_consuming = true;
-        }
-    }
-    catch (...)
-    {
-        RMQ_ERROR("putMessage exception");
-    }
-
-    return dispathToConsume;
-}
-
-long long ProcessQueue::getMaxSpan()
-{
-    try
-    {
-        kpr::ScopedRLock<kpr::RWMutex> lock(m_lockTreeMap);
-
-        if (!m_msgTreeMap.empty())
-        {
-            std::map<long long, MessageExt *>::iterator it1 = 
m_msgTreeMap.begin();
-            std::map<long long, MessageExt *>::iterator it2 = 
m_msgTreeMap.end();
-            it2--;
-            return it2->first - it1->first;
-        }
-    }
-    catch (...)
-    {
-        RMQ_ERROR("getMaxSpan exception");
-    }
-
-    return 0;
-}
-
-long long ProcessQueue::removeMessage(std::list<MessageExt *> &msgs)
-{
-    long long result = -1;
-    unsigned long long now = KPRUtil::GetCurrentTimeMillis();
-
-    try
-    {
-        kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap);
-        m_lastConsumeTimestamp = now;
-
-        if (!m_msgTreeMap.empty())
-        {
-            result = m_queueOffsetMax + 1;
-            int removedCnt = 0;
-            std::list<MessageExt *>::iterator it = msgs.begin();
-
-            for (; it != msgs.end();)
-            {
-                MessageExt *msg = (*it);
-
-                if (m_msgTreeMap.find(msg->getQueueOffset()) != 
m_msgTreeMap.end())
-                {
-                    removedCnt++;
-                }
-
-                m_msgTreeMap.erase(msg->getQueueOffset());
-                //TODO delete message?
-                it = msgs.erase(it);
-                delete msg;
-            }
-
-            m_msgCount -= removedCnt;
-
-            if (!m_msgTreeMap.empty())
-            {
-                std::map<long long, MessageExt *>::iterator it = 
m_msgTreeMap.begin();
-                result = it->first;
-            }
-        }
-    }
-    catch (...)
-    {
-        RMQ_ERROR("removeMessage exception");
-    }
-
-    return result;
-}
-
-
-void ProcessQueue::clear()
-{
-    try
-    {
-        kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap);
-        m_msgTreeMap.clear();
-        m_msgTreeMapTemp.clear();
-        m_msgCount.set(0);
-        m_queueOffsetMax = 0;
-    }
-    catch (...)
-    {
-        RMQ_ERROR("clear exception");
-    }
-
-    return;
-}
-
-
-std::map<long long, MessageExt *> ProcessQueue::getMsgTreeMap()
-{
-    return m_msgTreeMap;
-}
-
-kpr::AtomicInteger ProcessQueue::getMsgCount()
-{
-    return m_msgCount;
-}
-
-bool ProcessQueue::isDropped()
-{
-    return m_dropped;
-}
-
-void ProcessQueue::setDropped(bool dropped)
-{
-    m_dropped = dropped;
-}
-
-unsigned long long ProcessQueue::getLastPullTimestamp()
-{
-    return m_lastPullTimestamp;
-}
-
-
-void ProcessQueue::setLastPullTimestamp(unsigned long long lastPullTimestamp)
-{
-    m_lastPullTimestamp = lastPullTimestamp;
-}
-
-
-unsigned long long ProcessQueue::getLastConsumeTimestamp()
-{
-    return m_lastConsumeTimestamp;
-}
-
-
-void ProcessQueue::setLastConsumeTimestamp(unsigned long long
-        lastConsumeTimestamp)
-{
-    m_lastConsumeTimestamp = lastConsumeTimestamp;
-}
-
-
-/**
-* ========================================================================
-*/
-kpr::Mutex &ProcessQueue::getLockConsume()
-{
-    return m_lockConsume;
-}
-
-void ProcessQueue::setLocked(bool locked)
-{
-    m_locked = locked;
-}
-
-bool ProcessQueue::isLocked()
-{
-    return m_locked;
-}
-
-long long ProcessQueue::getTryUnlockTimes()
-{
-    return m_tryUnlockTimes.get();
-}
-
-void ProcessQueue::incTryUnlockTimes()
-{
-    m_tryUnlockTimes++;
-}
-
-
-void ProcessQueue::rollback()
-{
-    try
-    {
-        kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap);
-        m_msgTreeMap = m_msgTreeMapTemp;
-        m_msgTreeMapTemp.clear();
-    }
-    catch (...)
-    {
-        RMQ_ERROR("rollback exception");
-    }
-}
-
-long long ProcessQueue::commit()
-{
-    try
-    {
-        kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap);
-
-        if (!m_msgTreeMapTemp.empty())
-        {
-            std::map<long long, MessageExt *>::iterator it = 
m_msgTreeMapTemp.end();
-            it--;
-            long long offset = it->first;
-            m_msgCount -= m_msgTreeMapTemp.size();
-            m_msgTreeMapTemp.clear();
-            return offset + 1;
-        }
-    }
-    catch (...)
-    {
-        RMQ_ERROR("commit exception");
-    }
-
-    return -1;
-}
-
-void ProcessQueue::makeMessageToCosumeAgain(const std::list<MessageExt *> 
&msgs)
-{
-    try
-    {
-        kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap);
-        std::list<MessageExt *>::const_iterator it = msgs.begin();
-
-        for (; it != msgs.end(); it++)
-        {
-            MessageExt *msg = (*it);
-            m_msgTreeMapTemp.erase(msg->getQueueOffset());
-            m_msgTreeMap[msg->getQueueOffset()] = msg;
-        }
-    }
-    catch (...)
-    {
-        RMQ_ERROR("makeMessageToCosumeAgain exception");
-    }
-}
-
-std::list<MessageExt *> ProcessQueue::takeMessages(int batchSize)
-{
-    std::list<MessageExt *> result;
-    unsigned long long now = KPRUtil::GetCurrentTimeMillis();
-
-    try
-    {
-        kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap);
-        m_lastConsumeTimestamp = now;
-
-        if (!m_msgTreeMap.empty())
-        {
-            for (int i = 0; i < batchSize; i++)
-            {
-                std::map<long long, MessageExt *>::iterator it = 
m_msgTreeMap.begin();
-
-                if (it != m_msgTreeMap.end())
-                {
-                    result.push_back(it->second);
-                    m_msgTreeMapTemp[it->first] = it->second;
-                    m_msgTreeMap.erase(it);
-                }
-                else
-                {
-                    break;
-                }
-            }
-
-            if (result.empty())
-            {
-                m_consuming = false;
-            }
-        }
-    }
-    catch (...)
-    {
-        RMQ_ERROR("takeMessags exception");
-    }
-
-    return result;
-}
-
-long long ProcessQueue::getLastLockTimestamp()
-{
-    return m_lastLockTimestamp;
-}
-
-void ProcessQueue::setLastLockTimestamp(long long lastLockTimestamp)
-{
-    m_lastLockTimestamp = lastLockTimestamp;
-}
-
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ProcessQueue.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ProcessQueue.h 
b/rocketmq-client4cpp/src/consumer/ProcessQueue.h
deleted file mode 100755
index 559dd7f..0000000
--- a/rocketmq-client4cpp/src/consumer/ProcessQueue.h
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,[email protected]
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __PROCESSQUEUE_H__
-#define __PROCESSQUEUE_H__
-
-#include <list>
-#include <map>
-#include "Mutex.h"
-#include "AtomicValue.h"
-
-namespace rmq
-{
-    class MessageExt;
-       class DefaultMQPushConsumer;
-
-    class ProcessQueue
-    {
-    public:
-               static const unsigned int s_RebalanceLockMaxLiveTime = 30000;
-               static const unsigned int s_RebalanceLockInterval = 20000;
-               static const unsigned int s_PullMaxIdleTime = 120000;
-
-    public:
-        ProcessQueue();
-
-        bool isLockExpired();
-               bool isPullExpired();
-
-               void cleanExpiredMsg(DefaultMQPushConsumer* pPushConsumer);
-        bool putMessage(const std::list<MessageExt*>& msgs);
-
-        long long getMaxSpan();
-        long long removeMessage(std::list<MessageExt*>& msgs);
-
-               void clear();
-
-        std::map<long long, MessageExt*> getMsgTreeMap();
-        kpr::AtomicInteger getMsgCount();
-        bool isDropped();
-        void setDropped(bool dropped);
-
-               unsigned long long getLastPullTimestamp();
-               void setLastPullTimestamp(unsigned long long lastPullTimestamp);
-
-               unsigned long long getLastConsumeTimestamp();
-               void setLastConsumeTimestamp(unsigned long long 
lastConsumeTimestamp);
-
-        /**
-        * 
========================================================================
-        */
-               kpr::Mutex& getLockConsume();
-        void setLocked(bool locked);
-        bool isLocked();
-               long long getTryUnlockTimes();
-               void incTryUnlockTimes();
-
-        void rollback();
-        long long commit();
-        void makeMessageToCosumeAgain(const std::list<MessageExt*>& msgs);
-
-        std::list<MessageExt*> takeMessages(int batchSize);
-
-        long long getLastLockTimestamp();
-        void setLastLockTimestamp(long long lastLockTimestamp);
-
-
-    private:
-        kpr::RWMutex m_lockTreeMap;
-        std::map<long long, MessageExt*> m_msgTreeMap;
-        volatile long long m_queueOffsetMax ;
-        kpr::AtomicInteger m_msgCount;
-        volatile bool m_dropped;
-        volatile unsigned long long m_lastPullTimestamp;
-               volatile unsigned long long m_lastConsumeTimestamp;
-
-        /**
-        * order message
-        */
-        kpr::Mutex m_lockConsume;
-        volatile bool m_locked;
-        volatile unsigned long long m_lastLockTimestamp;
-        volatile bool m_consuming;
-        std::map<long long, MessageExt*> m_msgTreeMapTemp;
-        kpr::AtomicInteger m_tryUnlockTimes;
-    };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/PullAPIWrapper.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullAPIWrapper.cpp 
b/rocketmq-client4cpp/src/consumer/PullAPIWrapper.cpp
deleted file mode 100755
index c520e4c..0000000
--- a/rocketmq-client4cpp/src/consumer/PullAPIWrapper.cpp
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,[email protected]
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "PullAPIWrapper.h"
-
-#include <stdlib.h>
-#include <list>
-#include <set>
-#include "ScopedLock.h"
-#include "MQClientFactory.h"
-#include "PullCallback.h"
-#include "MixAll.h"
-#include "PullSysFlag.h"
-#include "CommandCustomHeader.h"
-#include "MQClientAPIImpl.h"
-#include "MQClientException.h"
-#include "SubscriptionData.h"
-#include "UtilAll.h"
-#include "MessageExt.h"
-#include "PullResultExt.h"
-#include "MessageDecoder.h"
-#include "VirtualEnvUtil.h"
-
-namespace rmq
-{
-
-PullAPIWrapper::PullAPIWrapper(MQClientFactory* pMQClientFactory, const 
std::string& consumerGroup)
-{
-    m_pMQClientFactory = pMQClientFactory;
-    m_consumerGroup = consumerGroup;
-}
-
-void  PullAPIWrapper::updatePullFromWhichNode(MessageQueue& mq, long brokerId)
-{
-    std::map<MessageQueue, kpr::AtomicInteger>::iterator it;
-    {
-        kpr::ScopedRLock<kpr::RWMutex> lock(m_pullFromWhichNodeTableLock);
-        it = m_pullFromWhichNodeTable.find(mq);
-        if (it != m_pullFromWhichNodeTable.end())
-        {
-            it->second.set(brokerId);
-            return;
-        }
-    }
-
-    kpr::ScopedWLock<kpr::RWMutex> lock(m_pullFromWhichNodeTableLock);
-    m_pullFromWhichNodeTable[mq] = kpr::AtomicInteger(brokerId);
-}
-
-PullResult* PullAPIWrapper::processPullResult(MessageQueue& mq,
-        PullResult& pullResult,
-        SubscriptionData& subscriptionData)
-{
-    std::string projectGroupPrefix = 
m_pMQClientFactory->getMQClientAPIImpl()->getProjectGroupPrefix();
-    PullResultExt& pullResultExt = (PullResultExt&) pullResult;
-
-    updatePullFromWhichNode(mq, pullResultExt.suggestWhichBrokerId);
-
-    if (pullResult.pullStatus == FOUND)
-    {
-        std::list<MessageExt*> msgList =
-            MessageDecoder::decodes(pullResultExt.messageBinary, 
pullResultExt.messageBinaryLen);
-
-        std::list<MessageExt*> msgListFilterAgain;
-
-        if (!subscriptionData.getTagsSet().empty())
-        {
-            std::list<MessageExt*>::iterator it = msgList.begin();
-            for (; it != msgList.end();)
-            {
-                MessageExt* msg = *it;
-                if (!msg->getTags().empty())
-                {
-                    std::set<std::string>& tags = 
subscriptionData.getTagsSet();
-                    if (tags.find(msg->getTags()) != tags.end())
-                    {
-                        msgListFilterAgain.push_back(msg);
-                        it = msgList.erase(it);
-                    }
-                    else
-                    {
-                        it++;
-                    }
-                }
-            }
-        }
-        else
-        {
-            msgListFilterAgain.assign(msgList.begin(), msgList.end());
-            msgList.clear();
-        }
-
-        if (!UtilAll::isBlank(projectGroupPrefix))
-        {
-            
subscriptionData.setTopic(VirtualEnvUtil::clearProjectGroup(subscriptionData.getTopic(),
-                                      projectGroupPrefix));
-            mq.setTopic(VirtualEnvUtil::clearProjectGroup(mq.getTopic(), 
projectGroupPrefix));
-
-            std::list<MessageExt*>::iterator it = msgListFilterAgain.begin();
-            for (; it != msgListFilterAgain.end(); it++)
-            {
-                MessageExt* msg = *it;
-                
msg->setTopic(VirtualEnvUtil::clearProjectGroup(msg->getTopic(), 
projectGroupPrefix));
-
-                msg->putProperty(Message::PROPERTY_MIN_OFFSET, 
UtilAll::toString(pullResult.minOffset));
-                msg->putProperty(Message::PROPERTY_MAX_OFFSET, 
UtilAll::toString(pullResult.maxOffset));
-            }
-        }
-        else
-        {
-            std::list<MessageExt*>::iterator it = msgListFilterAgain.begin();
-            for (; it != msgListFilterAgain.end(); it++)
-            {
-                MessageExt* msg = *it;
-
-                msg->putProperty(Message::PROPERTY_MIN_OFFSET, 
UtilAll::toString(pullResult.minOffset));
-                msg->putProperty(Message::PROPERTY_MAX_OFFSET, 
UtilAll::toString(pullResult.maxOffset));
-            }
-        }
-
-        std::list<MessageExt*>::iterator it = msgListFilterAgain.begin();
-        for (; it != msgListFilterAgain.end(); it++)
-        {
-            pullResultExt.msgFoundList.push_back(*it);
-        }
-
-        it = msgList.begin();
-        for (; it != msgList.end(); it++)
-        {
-            delete *it;
-        }
-
-        delete[] pullResultExt.messageBinary;
-        pullResultExt.messageBinary = NULL;
-        pullResultExt.messageBinaryLen = 0;
-    }
-
-    return &pullResult;
-}
-
-long PullAPIWrapper::recalculatePullFromWhichNode(MessageQueue& mq)
-{
-    kpr::ScopedRLock<kpr::RWMutex> lock(m_pullFromWhichNodeTableLock);
-    std::map<MessageQueue, kpr::AtomicInteger>::iterator it = 
m_pullFromWhichNodeTable.find(mq);
-    if (it != m_pullFromWhichNodeTable.end())
-    {
-        return it->second.get();
-    }
-
-    return MixAll::MASTER_ID;
-}
-
-PullResult* PullAPIWrapper::pullKernelImpl(MessageQueue& mq,
-        const std::string& subExpression,
-        long long subVersion,
-        long long offset,
-        int maxNums,
-        int sysFlag,
-        long long commitOffset,
-        long long brokerSuspendMaxTimeMillis,
-        int timeoutMillis,
-        CommunicationMode communicationMode,
-        PullCallback* pPullCallback)
-{
-    FindBrokerResult findBrokerResult =
-        m_pMQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
-                recalculatePullFromWhichNode(mq), false);
-    if (findBrokerResult.brokerAddr.empty())
-    {
-        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
-        findBrokerResult = 
m_pMQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
-                           recalculatePullFromWhichNode(mq), false);
-    }
-
-    if (!findBrokerResult.brokerAddr.empty())
-    {
-        int sysFlagInner = sysFlag;
-
-        if (findBrokerResult.slave)
-        {
-            sysFlagInner = PullSysFlag::clearCommitOffsetFlag(sysFlagInner);
-        }
-
-        PullMessageRequestHeader* requestHeader = new 
PullMessageRequestHeader();
-        requestHeader->consumerGroup = m_consumerGroup;
-        requestHeader->topic = mq.getTopic();
-        requestHeader->queueId = mq.getQueueId();
-        requestHeader->queueOffset = offset;
-        requestHeader->maxMsgNums = maxNums;
-        requestHeader->sysFlag = sysFlagInner;
-        requestHeader->commitOffset = commitOffset;
-        requestHeader->suspendTimeoutMillis = brokerSuspendMaxTimeMillis;
-        requestHeader->subscription = subExpression;
-        requestHeader->subVersion = subVersion;
-
-        PullResult* pullResult = 
m_pMQClientFactory->getMQClientAPIImpl()->pullMessage(//
-                                     findBrokerResult.brokerAddr,//
-                                     requestHeader,//
-                                     timeoutMillis,//
-                                     communicationMode,//
-                                     pPullCallback);
-
-        return pullResult;
-    }
-
-    THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + 
"] not exist", -1);
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/PullAPIWrapper.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullAPIWrapper.h 
b/rocketmq-client4cpp/src/consumer/PullAPIWrapper.h
deleted file mode 100755
index d5ec787..0000000
--- a/rocketmq-client4cpp/src/consumer/PullAPIWrapper.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,[email protected]
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __PULLAPIWRAPPER_H__
-#define __PULLAPIWRAPPER_H__
-
-#include <string>
-#include <map>
-
-#include "AtomicValue.h"
-#include "PullResult.h"
-#include "MessageQueue.h"
-#include "CommunicationMode.h"
-#include "Mutex.h"
-
-namespace rmq
-{
-  class MQClientFactory;
-  class PullCallback;
-  class SubscriptionData;
-
-  class PullAPIWrapper
-  {
-  public:
-      PullAPIWrapper(MQClientFactory* pMQClientFactory, const std::string& 
consumerGroup);
-      void updatePullFromWhichNode(MessageQueue& mq, long brokerId);
-
-
-      PullResult* processPullResult(MessageQueue& mq,
-                                    PullResult& pullResult,
-                                    SubscriptionData& subscriptionData);
-      long recalculatePullFromWhichNode(MessageQueue& mq);
-
-      PullResult* pullKernelImpl(MessageQueue& mq,
-                                 const std::string& subExpression,
-                                 long long subVersion,
-                                 long long offset,
-                                 int maxNums,
-                                 int sysFlag,
-                                 long long commitOffset,
-                                 long long brokerSuspendMaxTimeMillis,
-                                 int timeoutMillis,
-                                 CommunicationMode communicationMode,
-                                 PullCallback* pPullCallback);
-
-  private:
-      std::map<MessageQueue, kpr::AtomicInteger> m_pullFromWhichNodeTable;
-      kpr::RWMutex m_pullFromWhichNodeTableLock;
-      MQClientFactory* m_pMQClientFactory;
-      std::string m_consumerGroup;
-  };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/PullMessageService.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullMessageService.cpp 
b/rocketmq-client4cpp/src/consumer/PullMessageService.cpp
deleted file mode 100755
index 6d9972e..0000000
--- a/rocketmq-client4cpp/src/consumer/PullMessageService.cpp
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,[email protected]
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "PullMessageService.h"
-#include <list>
-#include "MQClientFactory.h"
-#include "MQConsumerInner.h"
-#include "PullRequest.h"
-#include "DefaultMQPushConsumerImpl.h"
-#include "ScopedLock.h"
-
-namespace rmq
-{
-
-class SubmitPullRequestLater : public kpr::TimerHandler
-{
-public:
-    SubmitPullRequestLater(PullMessageService* pService, PullRequest* 
pPullRequest)
-        : m_pService(pService), m_pPullRequest(pPullRequest)
-    {
-
-    }
-
-    void OnTimeOut(unsigned int timerID)
-    {
-       try
-       {
-               m_pService->executePullRequestImmediately(m_pPullRequest);
-        }
-        catch(...)
-        {
-               RMQ_ERROR("SubmitPullRequestLater OnTimeOut exception");
-        }
-
-        delete this;
-    }
-
-private:
-    PullMessageService* m_pService;
-    PullRequest* m_pPullRequest;
-};
-
-
-PullMessageService::PullMessageService(MQClientFactory* pMQClientFactory)
-    : ServiceThread("PullMessageService"),
-      m_pMQClientFactory(pMQClientFactory)
-{
-    m_TimerThread = new kpr::TimerThread("PullMessageService-timer", 10);
-    m_TimerThread->Start();
-}
-
-
-PullMessageService::~PullMessageService()
-{
-
-}
-
-
-void PullMessageService::executePullRequestLater(PullRequest* pPullRequest, 
long timeDelay)
-{
-    SubmitPullRequestLater* pHandler = new SubmitPullRequestLater(this, 
pPullRequest);
-    m_TimerThread->RegisterTimer(0, timeDelay, pHandler, false);
-}
-
-
-void PullMessageService::executeTaskLater(kpr::TimerHandler* pHandler, long 
timeDelay)
-{
-    m_TimerThread->RegisterTimer(0, timeDelay, pHandler, false);
-}
-
-
-void PullMessageService::executePullRequestImmediately(PullRequest* 
pPullRequest)
-{
-    try
-    {
-        {
-            kpr::ScopedLock<kpr::Mutex> lock(m_lock);
-            m_pullRequestQueue.push_back(pPullRequest);
-        }
-
-        wakeup();
-    }
-    catch (...)
-    {
-       RMQ_ERROR("executePullRequestImmediately pullRequestQueue.push");
-    }
-}
-
-void PullMessageService::Run()
-{
-       RMQ_INFO("%s service started", getServiceName().c_str());
-
-    while (!m_stoped)
-    {
-        try
-        {
-            bool wait = false;
-            {
-                kpr::ScopedLock<kpr::Mutex> lock(m_lock);
-                if (m_pullRequestQueue.empty())
-                {
-                    wait = true;
-                }
-            }
-
-            if (wait)
-            {
-                waitForRunning(5000);
-            }
-
-            PullRequest* pullRequest = NULL;
-            {
-                kpr::ScopedLock<kpr::Mutex> lock(m_lock);
-                if (!m_pullRequestQueue.empty())
-                {
-                    pullRequest = m_pullRequestQueue.front();
-                    m_pullRequestQueue.pop_front();
-                }
-            }
-
-            if (pullRequest != NULL)
-            {
-                pullMessage(pullRequest);
-            }
-        }
-        catch (...)
-        {
-                       RMQ_ERROR("Pull Message Service Run Method exception");
-        }
-    }
-
-    m_TimerThread->Stop();
-    m_TimerThread->Join();
-
-    RMQ_INFO("%s service end", getServiceName().c_str());
-}
-
-std::string PullMessageService::getServiceName()
-{
-    return "PullMessageService";
-}
-
-
-void PullMessageService::pullMessage(PullRequest* pPullRequest)
-{
-    MQConsumerInner* consumer = 
m_pMQClientFactory->selectConsumer(pPullRequest->getConsumerGroup());
-    if (consumer != NULL)
-    {
-        DefaultMQPushConsumerImpl* impl = (DefaultMQPushConsumerImpl*) 
consumer;
-        impl->pullMessage(pPullRequest);
-    }
-    else
-    {
-        RMQ_WARN("No matched consumer for the PullRequest {%s}, drop it", 
pPullRequest->toString().c_str());
-    }
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/PullMessageService.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullMessageService.h 
b/rocketmq-client4cpp/src/consumer/PullMessageService.h
deleted file mode 100755
index d6ebcee..0000000
--- a/rocketmq-client4cpp/src/consumer/PullMessageService.h
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,[email protected]
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __PULLMESSAGESERVICE_H__
-#define __PULLMESSAGESERVICE_H__
-
-#include <list>
-#include "RocketMQClient.h"
-#include "ServiceThread.h"
-#include "TimerThread.h"
-#include "PullRequest.h"
-
-namespace rmq
-{
-    class MQClientFactory;
-    class MQConsumerInner;
-    class PullRequest;
-
-    class PullMessageService : public ServiceThread
-    {
-    public:
-        PullMessageService(MQClientFactory* pMQClientFactory);
-        ~PullMessageService();
-
-        void executePullRequestLater(PullRequest* pPullRequest, long 
timeDelay);
-               void executeTaskLater(kpr::TimerHandler* pHandler, long 
timeDelay);
-
-        void executePullRequestImmediately(PullRequest* pPullRequest);
-        std::string getServiceName();
-
-        void Run();
-    private:
-        void pullMessage(PullRequest* pPullRequest);
-
-    private:
-        std::list<PullRequest*> m_pullRequestQueue;
-        kpr::Mutex m_lock;
-        MQClientFactory* m_pMQClientFactory;
-        kpr::TimerThreadPtr m_TimerThread;
-    };
-       typedef kpr::RefHandleT<PullMessageService> PullMessageServicePtr;
-}
-
-#endif


Reply via email to