http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/DefaultMQPullConsumer.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/DefaultMQPullConsumer.h b/rocketmq-client4cpp/include/DefaultMQPullConsumer.h deleted file mode 100755 index d9952c5..0000000 --- a/rocketmq-client4cpp/include/DefaultMQPullConsumer.h +++ /dev/null @@ -1,154 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __RMQ_DEFAULTMQPULLCONSUMER_H__ -#define __RMQ_DEFAULTMQPULLCONSUMER_H__ - -#include <list> -#include <string> - -#include "RocketMQClient.h" -#include "MQClientException.h" -#include "MessageQueue.h" -#include "MessageExt.h" -#include "ClientConfig.h" -#include "MQPullConsumer.h" - -namespace rmq -{ - class OffsetStore; - class DefaultMQPullConsumerImpl; - class AllocateMessageQueueStrategy; - - /** - * Pull Consumer - * - */ - class DefaultMQPullConsumer : public ClientConfig , public MQPullConsumer - { - public: - DefaultMQPullConsumer(); - DefaultMQPullConsumer(const std::string& consumerGroup); - ~DefaultMQPullConsumer(); - - //MQAdmin - void createTopic(const std::string& key, const std::string& newTopic, int queueNum); - long long searchOffset(const MessageQueue& mq, long long timestamp); - long long maxOffset(const MessageQueue& mq); - long long minOffset(const MessageQueue& mq); - long long earliestMsgStoreTime(const MessageQueue& mq); - MessageExt* viewMessage(const std::string& msgId); - QueryResult queryMessage(const std::string& topic, - const std::string& key, - int maxNum, - long long begin, - long long end); - // MQadmin end - - AllocateMessageQueueStrategy* getAllocateMessageQueueStrategy(); - void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy); - int getBrokerSuspendMaxTimeMillis() ; - void setBrokerSuspendMaxTimeMillis(int brokerSuspendMaxTimeMillis); - std::string getConsumerGroup(); - void setConsumerGroup(const std::string& consumerGroup); - int getConsumerPullTimeoutMillis(); - void setConsumerPullTimeoutMillis(int consumerPullTimeoutMillis); - int getConsumerTimeoutMillisWhenSuspend() ; - void setConsumerTimeoutMillisWhenSuspend(int consumerTimeoutMillisWhenSuspend); - MessageModel getMessageModel(); - void setMessageModel(MessageModel messageModel); - MessageQueueListener* getMessageQueueListener(); - void setMessageQueueListener(MessageQueueListener* pMessageQueueListener); - std::set<std::string> getRegisterTopics(); - void setRegisterTopics( std::set<std::string> registerTopics); - - //MQConsumer - void sendMessageBack(MessageExt& msg, int delayLevel); - void sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName); - std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic); - void start(); - void shutdown() ; - //MQConsumer end - - //MQPullConsumer - void registerMessageQueueListener(const std::string& topic, MessageQueueListener* pListener); - PullResult* pull(MessageQueue& mq, const std::string& subExpression, long long offset,int maxNums); - void pull(MessageQueue& mq, - const std::string& subExpression, - long long offset, - int maxNums, - PullCallback* pPullCallback); - - PullResult* pullBlockIfNotFound(MessageQueue& mq, - const std::string& subExpression, - long long offset, - int maxNums); - - void pullBlockIfNotFound(MessageQueue& mq, - const std::string& subExpression, - long long offset, - int maxNums, - PullCallback* pPullCallback); - - void updateConsumeOffset(MessageQueue& mq, long long offset); - - long long fetchConsumeOffset(MessageQueue& mq, bool fromStore); - - std::set<MessageQueue>* fetchMessageQueuesInBalance(const std::string& topic); - //MQPullConsumer end - - OffsetStore* getOffsetStore(); - void setOffsetStore(OffsetStore* offsetStore); - - DefaultMQPullConsumerImpl* getDefaultMQPullConsumerImpl(); - - bool isUnitMode(); - void setUnitMode(bool isUnitMode); - - int getMaxReconsumeTimes(); - void setMaxReconsumeTimes(int maxReconsumeTimes); - - protected: - DefaultMQPullConsumerImpl* m_pDefaultMQPullConsumerImpl; - - private: - std::string m_consumerGroup; - int m_brokerSuspendMaxTimeMillis ; - - int m_consumerTimeoutMillisWhenSuspend; - int m_consumerPullTimeoutMillis; - - MessageModel m_messageModel; - MessageQueueListener* m_pMessageQueueListener; - - OffsetStore* m_pOffsetStore; - - std::set<std::string> m_registerTopics; - AllocateMessageQueueStrategy* m_pAllocateMessageQueueStrategy; - - /** - * Whether the unit of subscription group - */ - bool m_unitMode; - - /** - * max retry times��default is 15 - */ - int m_maxReconsumeTimes; - }; -} - -#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/DefaultMQPushConsumer.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/DefaultMQPushConsumer.h b/rocketmq-client4cpp/include/DefaultMQPushConsumer.h deleted file mode 100755 index 25ef4fb..0000000 --- a/rocketmq-client4cpp/include/DefaultMQPushConsumer.h +++ /dev/null @@ -1,181 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#ifndef __RMQ_DEFAULTMQPUSHCONSUMER_H__ -#define __RMQ_DEFAULTMQPUSHCONSUMER_H__ - -#include <list> -#include <string> - -#include "RocketMQClient.h" -#include "MQClientException.h" -#include "Message.h" -#include "MessageExt.h" -#include "MessageQueue.h" -#include "MessageListener.h" -#include "PullResult.h" -#include "ClientConfig.h" -#include "MQPushConsumer.h" - -namespace rmq -{ - class AllocateMessageQueueStrategy; - class DefaultMQPushConsumerImpl; - class OffsetStore; - - /** - * Push Consumer - * - */ - class DefaultMQPushConsumer : public ClientConfig ,public MQPushConsumer - { - public: - DefaultMQPushConsumer(); - DefaultMQPushConsumer(const std::string& consumerGroup); - ~DefaultMQPushConsumer(); - - //MQAdmin - void createTopic(const std::string& key, const std::string& newTopic, int queueNum); - long long searchOffset(const MessageQueue& mq, long long timestamp); - long long maxOffset(const MessageQueue& mq); - long long minOffset(const MessageQueue& mq); - long long earliestMsgStoreTime(const MessageQueue& mq); - MessageExt* viewMessage(const std::string& msgId); - QueryResult queryMessage(const std::string& topic, - const std::string& key, - int maxNum, - long long begin, - long long end); - - // MQadmin end - - AllocateMessageQueueStrategy* getAllocateMessageQueueStrategy(); - void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy); - - int getConsumeConcurrentlyMaxSpan(); - void setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan); - - ConsumeFromWhere getConsumeFromWhere(); - void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere); - - int getConsumeMessageBatchMaxSize(); - void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize); - - std::string getConsumerGroup(); - void setConsumerGroup(const std::string& consumerGroup) ; - - int getConsumeThreadMax() ; - void setConsumeThreadMax(int consumeThreadMax); - - int getConsumeThreadMin(); - void setConsumeThreadMin(int consumeThreadMin); - - MessageListener* getMessageListener(); - void setMessageListener(MessageListener* pMessageListener); - - MessageModel getMessageModel(); - void setMessageModel(MessageModel messageModel) ; - - int getPullBatchSize() ; - void setPullBatchSize(int pullBatchSize); - - long getPullInterval(); - void setPullInterval(long pullInterval); - - int getPullThresholdForQueue(); - void setPullThresholdForQueue(int pullThresholdForQueue); - - std::map<std::string, std::string>& getSubscription(); - void setSubscription(const std::map<std::string, std::string>& subscription); - - //MQConsumer - void sendMessageBack(MessageExt& msg, int delayLevel); - void sendMessageBack(MessageExt& msg, int delayLevel, const std::string brokerName); - std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic); - - void start(); - void shutdown(); - //MQConsumer end - - //MQPushConsumer - void registerMessageListener(MessageListener* pMessageListener); - - void subscribe(const std::string& topic, const std::string& subExpression); - void unsubscribe(const std::string& topic); - - void updateCorePoolSize(int corePoolSize); - - void suspend() ; - void resume(); - //MQPushConsumer end - - OffsetStore* getOffsetStore(); - void setOffsetStore(OffsetStore* offsetStore); - - std::string getConsumeTimestamp(); - void setConsumeTimestamp(std::string consumeTimestamp); - - DefaultMQPushConsumerImpl* getDefaultMQPushConsumerImpl(); - - bool isPostSubscriptionWhenPull(); - void setPostSubscriptionWhenPull(bool postSubscriptionWhenPull); - - bool isUnitMode(); - void setUnitMode(bool isUnitMode); - - int getMaxReconsumeTimes(); - void setMaxReconsumeTimes(int maxReconsumeTimes); - - int getSuspendCurrentQueueTimeMillis(); - void setSuspendCurrentQueueTimeMillis(int suspendCurrentQueueTimeMillis); - - int getConsumeTimeout(); - void setConsumeTimeout(int consumeTimeout); - - protected: - DefaultMQPushConsumerImpl* m_pDefaultMQPushConsumerImpl; - - private: - std::string m_consumerGroup; - MessageModel m_messageModel; - ConsumeFromWhere m_consumeFromWhere; - std::string m_consumeTimestamp; - - AllocateMessageQueueStrategy* m_pAllocateMessageQueueStrategy ; - std::map<std::string /* topic */, std::string /* sub expression */> m_subscription ; - - MessageListener* m_pMessageListener; - OffsetStore* m_pOffsetStore; - - int m_consumeThreadMin; - int m_consumeThreadMax; - - int m_consumeConcurrentlyMaxSpan; - int m_pullThresholdForQueue; - long m_pullInterval; - - int m_consumeMessageBatchMaxSize; - int m_pullBatchSize; - - bool m_postSubscriptionWhenPull; - bool m_unitMode; - int m_maxReconsumeTimes; - - long m_suspendCurrentQueueTimeMillis; - long m_consumeTimeout; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MQAdmin.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MQAdmin.h b/rocketmq-client4cpp/include/MQAdmin.h deleted file mode 100755 index 552a468..0000000 --- a/rocketmq-client4cpp/include/MQAdmin.h +++ /dev/null @@ -1,66 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __RMQ_MQADMIN_H__ -#define __RMQ_MQADMIN_H__ - -#include <string> - -#include "RocketMQClient.h" -#include "MessageExt.h" - -namespace rmq -{ - class MQClientException; - class RemotingException; - class MQBrokerException; - class InterruptedException; - class MessageQueue; - class QueryResult; - - /** - * MQ Admin - * - */ - class MQAdmin - { - public: - MQAdmin() - { - } - - virtual ~MQAdmin() - { - } - - virtual void createTopic(const std::string& key, const std::string& newTopic, int queueNum)=0; - - virtual long long searchOffset(const MessageQueue& mq, long long timestamp)=0; - virtual long long maxOffset(const MessageQueue& mq)=0; - virtual long long minOffset(const MessageQueue& mq)=0; - - virtual long long earliestMsgStoreTime(const MessageQueue& mq)=0; - - virtual MessageExt* viewMessage(const std::string& msgId)=0; - virtual QueryResult queryMessage(const std::string& topic, - const std::string& key, - int maxNum, - long long begin, - long long end)=0; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MQClientException.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MQClientException.h b/rocketmq-client4cpp/include/MQClientException.h deleted file mode 100755 index f1d1d04..0000000 --- a/rocketmq-client4cpp/include/MQClientException.h +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __RMQ_MQCLIENTEXCEPTION_H__ -#define __RMQ_MQCLIENTEXCEPTION_H__ - -#include <string> -#include <ostream> -#include <sstream> -#include <exception> - -#include "RocketMQClient.h" - -namespace rmq -{ - class MQException : public std::exception - { - public: - MQException(const std::string& msg, int error,const char* file,int line)throw() - : m_error(error),m_line(line),m_file(file) - { - try - { - std::stringstream ss; - ss << "[" << file << ":" << line <<"]|error: " << error << "|msg:" << msg; - m_msg = ss.str(); - } - catch (...) - { - } - } - - virtual ~MQException()throw() - { - } - - const char* what() const throw() - { - return m_msg.c_str(); - } - - int GetError() const throw() - { - return m_error; - } - - virtual const char* GetType() const throw() - { - return "MQException"; - } - - protected: - int m_error; - int m_line; - std::string m_msg; - std::string m_file; - }; - - inline std::ostream& operator<<(std::ostream& os, const MQException& e) - { - os <<"Type:"<<e.GetType() << e.what(); - return os; - } - - #define DEFINE_MQCLIENTEXCEPTION(name, parent) \ - class name : public parent \ - {\ - public:\ - name(const std::string& msg, int error,const char* file,int line) throw ()\ - : parent(msg, error, file, line) {}\ - virtual const char* GetType() const throw()\ - {\ - return #name;\ - }\ - }; - - DEFINE_MQCLIENTEXCEPTION(MQClientException, MQException) - DEFINE_MQCLIENTEXCEPTION(MQBrokerException, MQException) - DEFINE_MQCLIENTEXCEPTION(InterruptedException, MQException) - DEFINE_MQCLIENTEXCEPTION(UnknownHostException, MQException) - - DEFINE_MQCLIENTEXCEPTION(RemotingException, MQException) - DEFINE_MQCLIENTEXCEPTION(RemotingCommandException, RemotingException) - DEFINE_MQCLIENTEXCEPTION(RemotingConnectException, RemotingException) - DEFINE_MQCLIENTEXCEPTION(RemotingSendRequestException, RemotingException) - DEFINE_MQCLIENTEXCEPTION(RemotingTimeoutException, RemotingException) - DEFINE_MQCLIENTEXCEPTION(RemotingTooMuchRequestException, RemotingException) - - #define THROW_MQEXCEPTION(e,msg,err) throw e(msg,err,__FILE__,__LINE__) -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MQConsumer.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MQConsumer.h b/rocketmq-client4cpp/include/MQConsumer.h deleted file mode 100755 index 87efe97..0000000 --- a/rocketmq-client4cpp/include/MQConsumer.h +++ /dev/null @@ -1,48 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __RMQ_MQCONSUMER_H__ -#define __RMQ_MQCONSUMER_H__ - -#include <set> -#include <string> - -#include "RocketMQClient.h" -#include "MQAdmin.h" -#include "ConsumeType.h" - - -namespace rmq -{ - class MessageExt; - - /** - * Consumer interface - * - */ - class MQConsumer : public MQAdmin - { - public: - virtual ~MQConsumer(){} - - virtual void start()=0; - virtual void shutdown()=0; - - virtual void sendMessageBack(MessageExt& msg, int delayLevel)=0; - virtual std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic)=0; - }; -} -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MQProducer.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MQProducer.h b/rocketmq-client4cpp/include/MQProducer.h deleted file mode 100755 index b353aba..0000000 --- a/rocketmq-client4cpp/include/MQProducer.h +++ /dev/null @@ -1,71 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License")=0; -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __RMQ_MQPRODUCER_H__ -#define __RMQ_MQPRODUCER_H__ - -#include <vector> -#include <string> - -#include "RocketMQClient.h" -#include "MQAdmin.h" -#include "SendResult.h" - -namespace rmq -{ - class MessageQueue; - class SendCallback; - class LocalTransactionExecuter; - class MessageQueueSelector; - - /** - * Producer interface - * - */ - class MQProducer : public MQAdmin - { - public: - MQProducer() - { - } - - virtual ~MQProducer() - { - } - - virtual void start()=0; - virtual void shutdown()=0; - - virtual std::vector<MessageQueue>* fetchPublishMessageQueues(const std::string& topic)=0; - - virtual SendResult send(Message& msg)=0; - virtual void send(Message& msg, SendCallback* sendCallback)=0; - virtual void sendOneway(Message& msg)=0; - - virtual SendResult send(Message& msg, MessageQueue& mq)=0; - virtual void send(Message& msg, MessageQueue& mq, SendCallback* sendCallback)=0; - virtual void sendOneway(Message& msg, MessageQueue& mq)=0; - - virtual SendResult send(Message& msg, MessageQueueSelector* selector, void* arg)=0; - virtual void send(Message& msg, MessageQueueSelector* selector, void* arg, SendCallback* sendCallback)=0; - virtual void sendOneway(Message& msg, MessageQueueSelector* selector, void* arg)=0; - - virtual TransactionSendResult sendMessageInTransaction(Message& msg, - LocalTransactionExecuter* tranExecuter, - void* arg)=0; - }; -} -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MQPullConsumer.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MQPullConsumer.h b/rocketmq-client4cpp/include/MQPullConsumer.h deleted file mode 100755 index ffb2ac5..0000000 --- a/rocketmq-client4cpp/include/MQPullConsumer.h +++ /dev/null @@ -1,54 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#ifndef __RMQ_MQPULLCONSUMER_H__ -#define __RMQ_MQPULLCONSUMER_H__ - -#include <set> -#include <string> - -#include "RocketMQClient.h" -#include "MQConsumer.h" -#include "PullResult.h" - -namespace rmq -{ - class MessageQueueListener; - class MessageQueue; - class PullCallback; - - /** - * Pull Consumer - * - */ - class MQPullConsumer : public MQConsumer - { - public: - virtual ~MQPullConsumer(){} - virtual void registerMessageQueueListener(const std::string& topic, MessageQueueListener* pListener)=0; - - virtual PullResult* pull(MessageQueue& mq, const std::string& subExpression, long long offset,int maxNums)=0; - virtual void pull(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums, PullCallback* pPullCallback)=0; - - virtual PullResult* pullBlockIfNotFound(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums)=0; - virtual void pullBlockIfNotFound(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums, PullCallback* pPullCallback)=0; - - virtual void updateConsumeOffset(MessageQueue& mq, long long offset)=0; - virtual long long fetchConsumeOffset(MessageQueue& mq, bool fromStore)=0; - - virtual std::set<MessageQueue>* fetchMessageQueuesInBalance(const std::string& topic)=0; - }; -} -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MQPushConsumer.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MQPushConsumer.h b/rocketmq-client4cpp/include/MQPushConsumer.h deleted file mode 100755 index fe6d4a0..0000000 --- a/rocketmq-client4cpp/include/MQPushConsumer.h +++ /dev/null @@ -1,49 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#ifndef __RMQ_MQPUSHCONSUMER_H__ -#define __RMQ_MQPUSHCONSUMER_H__ - -#include <set> -#include <string> - -#include "RocketMQClient.h" -#include "MQConsumer.h" -#include "PullResult.h" - -namespace rmq -{ - class MessageListener; - - /** - * Push Consumer - * - */ - class MQPushConsumer : public MQConsumer - { - public: - virtual void registerMessageListener(MessageListener* pMessageListener)=0; - - - virtual void subscribe(const std::string& topic, const std::string& subExpression)=0; - virtual void unsubscribe(const std::string& topic)=0; - - - virtual void updateCorePoolSize(int corePoolSize)=0; - virtual void suspend()=0; - virtual void resume()=0; - }; -} -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/Message.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/Message.h b/rocketmq-client4cpp/include/Message.h deleted file mode 100755 index 441b4e5..0000000 --- a/rocketmq-client4cpp/include/Message.h +++ /dev/null @@ -1,136 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __RMQ_MESSAGE_H__ -#define __RMQ_MESSAGE_H__ - -#include <map> -#include <string> -#include <list> -#include "RocketMQClient.h" - -namespace rmq -{ - /** - * Message - * - */ - class Message - { - public: - Message(); - Message(const std::string& topic, const char* body,int len); - Message(const std::string& topic, const std::string& tags, const char* body,int len); - Message(const std::string& topic, const std::string& tags,const std::string& keys, const char* body,int len); - Message(const std::string& topic, - const std::string& tags, - const std::string& keys, - const int flag, - const char* body, - int len, - bool waitStoreMsgOK); - - virtual ~Message(); - Message(const Message& other); - Message& operator=(const Message& other); - - void clearProperty(const std::string& name); - void putProperty(const std::string& name, const std::string& value); - std::string getProperty(const std::string& name); - - std::string getTopic()const; - void setTopic(const std::string& topic); - - std::string getTags(); - void setTags(const std::string& tags); - - std::string getKeys(); - void setKeys(const std::string& keys); - void setKeys(const std::list<std::string> keys); - - int getDelayTimeLevel(); - void setDelayTimeLevel(int level); - - bool isWaitStoreMsgOK(); - void setWaitStoreMsgOK(bool waitStoreMsgOK); - - int getFlag(); - void setFlag(int flag); - - const char* getBody() const; - int getBodyLen() const; - void setBody(const char* body, int len); - - bool tryToCompress(int compressLevel); - const char* getCompressBody() const; - int getCompressBodyLen() const; - - std::map<std::string, std::string>& getProperties(); - void setProperties(const std::map<std::string, std::string>& properties); - - std::string toString() const; - - protected: - void Init(const std::string& topic, - const std::string& tags, - const std::string& keys, - const int flag, - const char* body, - int len, - bool waitStoreMsgOK); - - public: - static const std::string PROPERTY_KEYS; - static const std::string PROPERTY_TAGS; - static const std::string PROPERTY_WAIT_STORE_MSG_OK; - static const std::string PROPERTY_DELAY_TIME_LEVEL; - - /** - * for inner use - */ - static const std::string PROPERTY_RETRY_TOPIC; - static const std::string PROPERTY_REAL_TOPIC; - static const std::string PROPERTY_REAL_QUEUE_ID; - static const std::string PROPERTY_TRANSACTION_PREPARED; - static const std::string PROPERTY_PRODUCER_GROUP; - static const std::string PROPERTY_MIN_OFFSET; - static const std::string PROPERTY_MAX_OFFSET; - static const std::string PROPERTY_BUYER_ID; - static const std::string PROPERTY_ORIGIN_MESSAGE_ID; - static const std::string PROPERTY_TRANSFER_FLAG; - static const std::string PROPERTY_CORRECTION_FLAG; - static const std::string PROPERTY_MQ2_FLAG; - static const std::string PROPERTY_RECONSUME_TIME; - static const std::string PROPERTY_MSG_REGION; - static const std::string PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX; - static const std::string PROPERTY_MAX_RECONSUME_TIMES; - static const std::string PROPERTY_CONSUME_START_TIMESTAMP; - - static const std::string KEY_SEPARATOR; - private: - std::string m_topic; - int m_flag; - std::map<std::string, std::string> m_properties; - - char* m_body; - int m_bodyLen; - - char* m_compressBody; - int m_compressBodyLen; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MessageExt.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MessageExt.h b/rocketmq-client4cpp/include/MessageExt.h deleted file mode 100755 index f70041c..0000000 --- a/rocketmq-client4cpp/include/MessageExt.h +++ /dev/null @@ -1,108 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#ifndef __RMQ_MESSAGEEXT_H__ -#define __RMQ_MESSAGEEXT_H__ - -#include <sys/socket.h> -#include <string> -#include "Message.h" -#include "TopicFilterType.h" -#include "RocketMQClient.h" - -namespace rmq - { - /** - * Message extend - * - */ - class MessageExt : public Message - { - public: - MessageExt(); - - MessageExt(int queueId, - long long bornTimestamp, - sockaddr bornHost, - long long storeTimestamp, - sockaddr storeHost, - std::string msgId); - - ~MessageExt(); - - static TopicFilterType parseTopicFilterType(int sysFlag); - - int getQueueId(); - void setQueueId(int queueId); - - long long getBornTimestamp(); - void setBornTimestamp(long long bornTimestamp); - - sockaddr getBornHost(); - std::string getBornHostString(); - std::string getBornHostNameString(); - void setBornHost(const sockaddr& bornHost); - - long long getStoreTimestamp(); - void setStoreTimestamp(long long storeTimestamp); - - sockaddr getStoreHost(); - std::string getStoreHostString(); - void setStoreHost(const sockaddr& storeHost); - - std::string getMsgId(); - void setMsgId(const std::string& msgId); - - int getSysFlag(); - void setSysFlag(int sysFlag); - - int getBodyCRC(); - void setBodyCRC(int bodyCRC); - - long long getQueueOffset(); - void setQueueOffset(long long queueOffset); - - long long getCommitLogOffset(); - void setCommitLogOffset(long long physicOffset); - - int getStoreSize(); - void setStoreSize(int storeSize); - - int getReconsumeTimes(); - void setReconsumeTimes(int reconsumeTimes); - - long long getPreparedTransactionOffset(); - void setPreparedTransactionOffset(long long preparedTransactionOffset); - - std::string toString() const; - - private: - long long m_queueOffset; - long long m_commitLogOffset; - long long m_bornTimestamp; - long long m_storeTimestamp; - long long m_preparedTransactionOffset; - int m_queueId; - int m_storeSize; - int m_sysFlag; - int m_bodyCRC; - int m_reconsumeTimes; - sockaddr m_bornHost; - sockaddr m_storeHost; - std::string m_msgId; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MessageListener.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MessageListener.h b/rocketmq-client4cpp/include/MessageListener.h deleted file mode 100755 index 130a219..0000000 --- a/rocketmq-client4cpp/include/MessageListener.h +++ /dev/null @@ -1,94 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __RMQ_MESSAGELISTENER_H__ -#define __RMQ_MESSAGELISTENER_H__ - -#include <limits.h> -#include <list> - -#include "MessageExt.h" -#include "MessageQueue.h" - -namespace rmq -{ - /** - * Message Listener - * - */ - class MessageListener - { - public: - virtual ~MessageListener(){} - }; - - enum ConsumeOrderlyStatus - { - SUCCESS, - ROLLBACK, - COMMIT, - SUSPEND_CURRENT_QUEUE_A_MOMENT, - }; - - typedef struct tagConsumeOrderlyContext - { - tagConsumeOrderlyContext(MessageQueue& mq) - :messageQueue(mq), - autoCommit(true), - suspendCurrentQueueTimeMillis(1000) - { - } - - MessageQueue messageQueue;///< Ҫ���ѵ���Ϣ�����ĸ����� - bool autoCommit;///< ��ϢOffset�Ƿ��Զ��ύ - long suspendCurrentQueueTimeMillis; - }ConsumeOrderlyContext; - - class MessageListenerOrderly : public MessageListener - { - public: - virtual ConsumeOrderlyStatus consumeMessage(std::list<MessageExt*>& msgs, - ConsumeOrderlyContext& context)=0; - }; - - enum ConsumeConcurrentlyStatus - { - CONSUME_SUCCESS, - RECONSUME_LATER, - }; - - struct ConsumeConcurrentlyContext - { - ConsumeConcurrentlyContext(MessageQueue& mq) - :messageQueue(mq), - delayLevelWhenNextConsume(0), - ackIndex(INT_MAX) - { - } - MessageQueue messageQueue; - int delayLevelWhenNextConsume; - int ackIndex; - }; - - class MessageListenerConcurrently : public MessageListener - { - public: - virtual ConsumeConcurrentlyStatus consumeMessage(std::list<MessageExt*>& msgs, - ConsumeConcurrentlyContext& context)=0; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MessageQueue.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MessageQueue.h b/rocketmq-client4cpp/include/MessageQueue.h deleted file mode 100755 index 89ddf58..0000000 --- a/rocketmq-client4cpp/include/MessageQueue.h +++ /dev/null @@ -1,70 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __RMQ_MESSAGEQUEUE_H__ -#define __RMQ_MESSAGEQUEUE_H__ - -#include <iostream> -#include <string> -#include <sstream> - -#include "RocketMQClient.h" - -namespace rmq -{ - /** - * Message Queue - * - */ - class MessageQueue - { - public: - MessageQueue(); - ~MessageQueue(){}; - - MessageQueue(const std::string& topic, const std::string& brokerName, int queueId); - - std::string getTopic()const; - void setTopic(const std::string& topic); - - std::string getBrokerName()const; - void setBrokerName(const std::string& brokerName); - - int getQueueId()const; - void setQueueId(int queueId); - - int hashCode(); - std::string toString() const; - std::string toJsonString() const; - - bool operator==(const MessageQueue& mq) const; - bool operator<(const MessageQueue& mq) const; - int compareTo(const MessageQueue& mq) const; - - private: - std::string m_topic; - std::string m_brokerName; - int m_queueId; - }; - - inline std::ostream& operator<<(std::ostream& os, const MessageQueue& obj) - { - os << obj.toString(); - return os; - } -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MessageQueueListener.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MessageQueueListener.h b/rocketmq-client4cpp/include/MessageQueueListener.h deleted file mode 100755 index 9f04c3e..0000000 --- a/rocketmq-client4cpp/include/MessageQueueListener.h +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __RMQ_MESSAGEQUEUELISTENER_H__ -#define __RMQ_MESSAGEQUEUELISTENER_H__ - -#include <set> -#include "RocketMQClient.h" - -namespace rmq -{ - /** - * Message Queue Listener - * - */ - class MessageQueueListener - { - public: - virtual ~MessageQueueListener() {} - virtual void messageQueueChanged(const std::string& topic, - std::set<MessageQueue>& mqAll, - std::set<MessageQueue>& mqDivided)=0; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/OffsetStore.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/OffsetStore.h b/rocketmq-client4cpp/include/OffsetStore.h deleted file mode 100755 index a533750..0000000 --- a/rocketmq-client4cpp/include/OffsetStore.h +++ /dev/null @@ -1,58 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#ifndef __RMQ_OFFSETSTORE_H__ -#define __RMQ_OFFSETSTORE_H__ - -#include <set> -#include <map> - -#include "RocketMQClient.h" - -namespace rmq -{ - class MessageQueue; - - enum ReadOffsetType - { - READ_FROM_MEMORY, - READ_FROM_STORE, - MEMORY_FIRST_THEN_STORE, - }; - - /** - * Consumer Offset Store - * - */ - class OffsetStore - { - public: - virtual ~OffsetStore() {} - - virtual void load()=0; - - virtual void updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly)=0; - virtual long long readOffset(const MessageQueue& mq, ReadOffsetType type)=0; - - virtual void persistAll(std::set<MessageQueue>& mqs)=0; - virtual void persist(const MessageQueue& mq)=0; - - virtual void removeOffset(const MessageQueue& mq)=0; - - virtual std::map<MessageQueue, long long> cloneOffsetTable(const std::string& topic) = 0; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/PullCallback.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/PullCallback.h b/rocketmq-client4cpp/include/PullCallback.h deleted file mode 100755 index 47ade68..0000000 --- a/rocketmq-client4cpp/include/PullCallback.h +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __RMQ_PULLCALLBACK_H__ -#define __RMQ_PULLCALLBACK_H__ - -#include "RocketMQClient.h" -#include "PullResult.h" - -namespace rmq -{ - class MQException; - - /** - * PullCallback - * - */ - class PullCallback - { - public: - virtual ~PullCallback() {} - virtual void onSuccess(PullResult& pullResult)=0; - virtual void onException(MQException& e)=0; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/PullResult.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/PullResult.h b/rocketmq-client4cpp/include/PullResult.h deleted file mode 100755 index 42c13ca..0000000 --- a/rocketmq-client4cpp/include/PullResult.h +++ /dev/null @@ -1,91 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#ifndef __RMQ_PULLRESULT_H__ -#define __RMQ_PULLRESULT_H__ - -#include <list> -#include <string> -#include <sstream> - -#include "RocketMQClient.h" -#include "MessageExt.h" - -namespace rmq -{ - enum PullStatus - { - FOUND, - NO_NEW_MSG, - NO_MATCHED_MSG, - OFFSET_ILLEGAL - }; - - /** - * PullResult - * - */ - struct PullResult - { - PullResult() - { - - } - - PullResult(PullStatus pullStatus, - long long nextBeginOffset, - long long minOffset, - long long maxOffset, - std::list<MessageExt*>& msgFoundList) - :pullStatus(pullStatus), - nextBeginOffset(nextBeginOffset), - minOffset(minOffset), - maxOffset(maxOffset), - msgFoundList(msgFoundList) - { - - } - - ~PullResult() - { - std::list<MessageExt*>::iterator it = msgFoundList.begin(); - - for (;it!=msgFoundList.end();it++) - { - delete *it; - } - } - - std::string toString() const - { - std::stringstream ss; - ss << "{pullStatus=" << pullStatus - << ",nextBeginOffset=" << nextBeginOffset - << ",minOffset=" << nextBeginOffset - << ",maxOffset=" << nextBeginOffset - << ",msgFoundList.size=" << msgFoundList.size() - <<"}"; - return ss.str(); - } - - PullStatus pullStatus; - long long nextBeginOffset; - long long minOffset; - long long maxOffset; - std::list<MessageExt*> msgFoundList; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/QueryResult.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/QueryResult.h b/rocketmq-client4cpp/include/QueryResult.h deleted file mode 100644 index 13164e4..0000000 --- a/rocketmq-client4cpp/include/QueryResult.h +++ /dev/null @@ -1,56 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __RMQ_QUERYRESULT_H__ -#define __RMQ_QUERYRESULT_H__ - -#include <list> - -#include "RocketMQClient.h" -#include "MessageExt.h" - -namespace rmq -{ - /** - * QueryResult - * - */ - class QueryResult - { - public: - QueryResult(long long indexLastUpdateTimestamp, const std::list<MessageExt*>& messageList) - { - m_indexLastUpdateTimestamp = indexLastUpdateTimestamp; - m_messageList = messageList; - } - - long long getIndexLastUpdateTimestamp() - { - return m_indexLastUpdateTimestamp; - } - - std::list<MessageExt*>& getMessageList() - { - return m_messageList; - } - - private: - long long m_indexLastUpdateTimestamp; - std::list<MessageExt*> m_messageList; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/RocketMQClient.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/RocketMQClient.h b/rocketmq-client4cpp/include/RocketMQClient.h deleted file mode 100755 index e4c71c9..0000000 --- a/rocketmq-client4cpp/include/RocketMQClient.h +++ /dev/null @@ -1,100 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#ifndef __RMQ_ROCKETMQCLIENT_H__ -#define __RMQ_ROCKETMQCLIENT_H__ - -#include <stdlib.h> -#include <stdio.h> -#include <stdint.h> -#include <string.h> -#include <assert.h> -#include <time.h> -#include <stdarg.h> -#include <fcntl.h> -#include <errno.h> -#include <signal.h> -#include <pthread.h> - -#include <sys/time.h> -#include <sys/timeb.h> -#include <sys/types.h> -#include <sys/stat.h> -#include <sys/file.h> -#include <sys/syscall.h> -#include <linux/unistd.h> - -#include <cstdio> -#include <iostream> -#include <string> -#include <sstream> -#include <vector> -#include <map> -#include <set> - - -class RocketMQUtil -{ -public: - enum - { - NONE_LOG = 0, - ERROR_LOG = 1, - WARN_LOG = 2, - INFO_LOG = 3, - DEBUG_LOG = 4, - }; - -public: - static pid_t getPid(); - static pid_t getTid(); - - static int getDiffDays(time_t tmFirst, time_t tmSecond); - static std::string tm2str(const time_t &t, const std::string &sFormat); - static std::string now2str(const std::string &sFormat); - static std::string now2str(); - static int64_t getNowMs(); - static std::string str2fmt(const char* format, ...)__attribute__((format(__printf__,1,2))); - - static int initLog(const std::string& sLogPath); - static void setLogLevel(int logLevel); - static void writeLog(const char* fmt, ...) __attribute__((format(__printf__,1,2))); - static inline bool isNeedLog(int level) - { - return (level <= _logLevel); - }; - -public: - static volatile int _logFd; - static int _logLevel; - static std::string _logPath; -}; - -#define RMQ_AUTO(name, value) typeof(value) name = value -#define RMQ_FOR_EACH(container, it) \ - for(typeof((container).begin()) it = (container).begin();it!=(container).end(); ++it) - - - -#define RMQ_DEBUG(fmt, args...) do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::DEBUG_LOG)) RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][DEBUG]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__,__LINE__, ##args);}while(0) -#define RMQ_INFO(fmt, args...) do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::INFO_LOG)) RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][INFO]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__, __LINE__, ##args);}while(0) -#define RMQ_WARN(fmt, args...) do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::WARN_LOG)) RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][WARN]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__, __LINE__, ##args);}while(0) -#define RMQ_ERROR(fmt, args...) do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::ERROR_LOG)) RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][ERROR]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__, __LINE__, ##args);}while(0) - -#define RMQ_PRINT(fmt, args...) do{ printf("%d|[%s][%s:%s:%d][DEBUG]|"fmt"\n", RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__,__LINE__, ##args);}while(0) - - -#endif - http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/SendCallback.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/SendCallback.h b/rocketmq-client4cpp/include/SendCallback.h deleted file mode 100755 index 0feb5a1..0000000 --- a/rocketmq-client4cpp/include/SendCallback.h +++ /dev/null @@ -1,39 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#ifndef __RMQ_SENDCALLBACK_H__ -#define __RMQ_SENDCALLBACK_H__ - -#include "SendResult.h" -#include "RocketMQClient.h" - -namespace rmq -{ - class MQException; - - /** - * Send Mesage Callback - * - */ - class SendCallback - { - public: - virtual ~SendCallback() {} - virtual void onSuccess(SendResult& sendResult)=0; - virtual void onException(MQException& e)=0; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/SendMessageHook.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/SendMessageHook.h b/rocketmq-client4cpp/include/SendMessageHook.h deleted file mode 100644 index 9869aa6..0000000 --- a/rocketmq-client4cpp/include/SendMessageHook.h +++ /dev/null @@ -1,50 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#ifndef __RMQ_SENDMESSAGEHOOK_H__ -#define __RMQ_SENDMESSAGEHOOK_H__ - -#include <string> - -#include "RocketMQClient.h" -#include "Message.h" -#include "MQClientException.h" - -namespace rmq -{ - class SendMessageContext - { - public: - std::string producerGroup; - Message msg; - MessageQueue mq; - std::string brokerAddr; - CommunicationMode communicationMode; - SendResult sendResult; - MQException* pException; - void* pArg; - }; - - class SendMessageHook - { - public: - virtual ~SendMessageHook() {} - virtual std::string hookName()=0; - virtual void sendMessageBefore(const SendMessageContext& context)=0; - virtual void sendMessageAfter(const SendMessageContext& context)=0; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/SendResult.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/SendResult.h b/rocketmq-client4cpp/include/SendResult.h deleted file mode 100755 index d6a3174..0000000 --- a/rocketmq-client4cpp/include/SendResult.h +++ /dev/null @@ -1,89 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#ifndef __RMQ_SENDRESULT_H__ -#define __RMQ_SENDRESULT_H__ - -#include "RocketMQClient.h" -#include "MessageQueue.h" - -namespace rmq -{ - enum SendStatus - { - SEND_OK, - FLUSH_DISK_TIMEOUT, - FLUSH_SLAVE_TIMEOUT, - SLAVE_NOT_AVAILABLE - }; - - /** - * Send Message Result - * - */ - class SendResult - { - public: - SendResult(); - SendResult(const SendStatus& sendStatus, - const std::string& msgId, - MessageQueue& messageQueue, - long long queueOffset, - std::string& projectGroupPrefix); - - const std::string& getMsgId(); - void setMsgId(const std::string& msgId); - SendStatus getSendStatus(); - void setSendStatus(const SendStatus& sendStatus); - MessageQueue& getMessageQueue(); - void setMessageQueue(MessageQueue& messageQueue); - long long getQueueOffset(); - void setQueueOffset(long long queueOffset); - bool hasResult(); - - std::string toString() const; - std::string toJsonString() const; - - private: - SendStatus m_sendStatus; - std::string m_msgId; - MessageQueue m_messageQueue; - long long m_queueOffset; - }; - - enum LocalTransactionState - { - COMMIT_MESSAGE, - ROLLBACK_MESSAGE, - UNKNOW, - }; - - /** - * Send transaction message result - * - */ - class TransactionSendResult : public SendResult - { - public: - TransactionSendResult(); - LocalTransactionState getLocalTransactionState(); - void setLocalTransactionState(LocalTransactionState localTransactionState); - - private: - LocalTransactionState m_localTransactionState; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/TopicFilterType.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/TopicFilterType.h b/rocketmq-client4cpp/include/TopicFilterType.h deleted file mode 100755 index e51ae20..0000000 --- a/rocketmq-client4cpp/include/TopicFilterType.h +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __RMQ_TOPICFILTERTYPE_H__ -#define __RMQ_TOPICFILTERTYPE_H__ - -namespace rmq -{ - /** - * Topic filter type - * - */ - enum TopicFilterType - { - SINGLE_TAG, - MULTI_TAG - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/rocketmq.mk ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/rocketmq.mk b/rocketmq-client4cpp/rocketmq.mk deleted file mode 100644 index eecc458..0000000 --- a/rocketmq-client4cpp/rocketmq.mk +++ /dev/null @@ -1,6 +0,0 @@ -ROCKETMQ_PATH := /data/libs/rocketmq - -INCLUDE += -I$(ROCKETMQ_PATH)/include -INCLUDE_32 += -I$(ROCKETMQ_PATH)/include -march=i686 -LIB_32 += -L$(ROCKETMQ_PATH)/lib32 -lrocketmq -lz -lrt -lpthread -LIB_64 += -L$(ROCKETMQ_PATH)/lib64 -lrocketmq -lz -lrt -lpthread http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/ClientConfig.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/ClientConfig.cpp b/rocketmq-client4cpp/src/ClientConfig.cpp deleted file mode 100755 index 986d67d..0000000 --- a/rocketmq-client4cpp/src/ClientConfig.cpp +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Copyright (C) 2010-2013 kangliqiang, kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <stdlib.h> -#include <sstream> - -#include "MQClientException.h" -#include "SocketUtil.h" -#include "ClientConfig.h" -#include "UtilAll.h" -#include "MixAll.h" - -namespace rmq -{ - -ClientConfig::ClientConfig() -{ - char* addr = getenv(MixAll::NAMESRV_ADDR_ENV.c_str()); - if (addr) - { - m_namesrvAddr = addr; - } - else - { - m_namesrvAddr = ""; - } - - m_clientIP = getLocalAddress(); - m_instanceName = "DEFAULT"; - m_clientCallbackExecutorThreads = UtilAll::availableProcessors(); - m_pollNameServerInterval = 1000 * 30; - m_heartbeatBrokerInterval = 1000 * 30; - m_persistConsumerOffsetInterval = 1000 * 5; -} - -ClientConfig::~ClientConfig() -{ -} - -std::string ClientConfig::buildMQClientId() -{ - return m_clientIP + "@" + m_instanceName; -} - -void ClientConfig::changeInstanceNameToPID() -{ - if (m_instanceName == "DEFAULT") - { - m_instanceName = UtilAll::toString(UtilAll::getPid()); - } -} - - -void ClientConfig::resetClientConfig(const ClientConfig& cc) -{ - m_namesrvAddr = cc.m_namesrvAddr; - m_clientIP = cc.m_clientIP; - m_instanceName = cc.m_instanceName; - m_clientCallbackExecutorThreads = cc.m_clientCallbackExecutorThreads; - m_pollNameServerInterval = cc.m_pollNameServerInterval; - m_heartbeatBrokerInterval = cc.m_heartbeatBrokerInterval; - m_persistConsumerOffsetInterval = cc.m_persistConsumerOffsetInterval; -} - -ClientConfig ClientConfig::cloneClientConfig() -{ - return *this; -} - -std::string ClientConfig::getNamesrvAddr() -{ - return m_namesrvAddr; -} - -void ClientConfig::setNamesrvAddr(const std::string& namesrvAddr) -{ - m_namesrvAddr = namesrvAddr; -} - -std::string ClientConfig::getClientIP() -{ - return m_clientIP; -} - -void ClientConfig::setClientIP(const std::string& clientIP) -{ - m_clientIP = clientIP; -} - -std::string ClientConfig::getInstanceName() -{ - return m_instanceName; -} - -void ClientConfig::setInstanceName(const std::string& instanceName) -{ - m_instanceName = instanceName; -} - -int ClientConfig::getClientCallbackExecutorThreads() -{ - return m_clientCallbackExecutorThreads; -} - -void ClientConfig::setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) -{ - m_clientCallbackExecutorThreads = clientCallbackExecutorThreads; -} - -int ClientConfig::getPollNameServerInterval() -{ - return m_pollNameServerInterval; -} - -void ClientConfig::setPollNameServerInterval(int pollNameServerInterval) -{ - m_pollNameServerInterval = pollNameServerInterval; -} - -int ClientConfig::getHeartbeatBrokerInterval() -{ - return m_heartbeatBrokerInterval; -} - -void ClientConfig::setHeartbeatBrokerInterval(int heartbeatBrokerInterval) -{ - m_heartbeatBrokerInterval = heartbeatBrokerInterval; -} - -int ClientConfig:: getPersistConsumerOffsetInterval() -{ - return m_persistConsumerOffsetInterval; -} - -void ClientConfig::setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) -{ - m_persistConsumerOffsetInterval = persistConsumerOffsetInterval; -} - - -std::string ClientConfig::toString() const -{ - std::stringstream ss; - ss << "{namesrvAddr=" << m_namesrvAddr - << ",clientIP=" << m_clientIP - << ",instanceName=" << m_instanceName - << ",clientCallbackExecutorThreads=" << m_clientCallbackExecutorThreads - << ",pollNameServerInteval=" << m_pollNameServerInterval - << ",heartbeatBrokerInterval=" << m_heartbeatBrokerInterval - << ",persistConsumerOffsetInterval=" << m_persistConsumerOffsetInterval - <<"}"; - return ss.str(); -} - - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp b/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp deleted file mode 100755 index ae88de5..0000000 --- a/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp +++ /dev/null @@ -1,154 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#include "ClientRemotingProcessor.h" -#include "MQProtos.h" -#include "TcpTransport.h" -#include "RemotingCommand.h" -#include "MQClientFactory.h" -#include "CommandCustomHeader.h" -#include "ConsumerRunningInfo.h" - - - -namespace rmq -{ - -ClientRemotingProcessor::ClientRemotingProcessor(MQClientFactory* pMQClientFactory) - : m_pMQClientFactory(pMQClientFactory) -{ - -} - -RemotingCommand* ClientRemotingProcessor::processRequest(TcpTransport* pTts, RemotingCommand* pRequest) -{ - int code = pRequest->getCode(); - switch (code) - { - case CHECK_TRANSACTION_STATE_VALUE: - return checkTransactionState(pTts, pRequest); - case NOTIFY_CONSUMER_IDS_CHANGED_VALUE: - return notifyConsumerIdsChanged(pTts, pRequest); - case RESET_CONSUMER_CLIENT_OFFSET_VALUE: - return resetOffset(pTts, pRequest); - case GET_CONSUMER_STATUS_FROM_CLIENT_VALUE: - return getConsumeStatus(pTts, pRequest); - case GET_CONSUMER_RUNNING_INFO_VALUE: - return getConsumerRunningInfo(pTts, pRequest); - case CONSUME_MESSAGE_DIRECTLY_VALUE: - return consumeMessageDirectly(pTts, pRequest); - default: - break; - } - - return NULL; -} - -RemotingCommand* ClientRemotingProcessor::checkTransactionState(TcpTransport* pTts, RemotingCommand* pRequest) -{ - //TODO - return NULL; -} - -RemotingCommand* ClientRemotingProcessor::notifyConsumerIdsChanged(TcpTransport* pTts, RemotingCommand* pRequest) -{ - try - { - NotifyConsumerIdsChangedRequestHeader* extHeader = (NotifyConsumerIdsChangedRequestHeader*)pRequest->getCommandCustomHeader(); - RMQ_INFO("receive broker's notification[{%s}], the consumer group: {%s} changed, rebalance immediately", - pTts->getServerAddr().c_str(), - extHeader->consumerGroup.c_str()); - m_pMQClientFactory->rebalanceImmediately(); - } - catch (std::exception& e) - { - RMQ_ERROR("notifyConsumerIdsChanged exception: %s", e.what()); - } - - return NULL; -} - -RemotingCommand* ClientRemotingProcessor::resetOffset(TcpTransport* pTts, RemotingCommand* pRequest) -{ - //TODO - return NULL; -} - - -RemotingCommand* ClientRemotingProcessor::getConsumeStatus(TcpTransport* pTts, RemotingCommand* pRequest) -{ - //TODO - return NULL; -} - - -RemotingCommand* ClientRemotingProcessor::getConsumerRunningInfo(TcpTransport* pTts, RemotingCommand* pRequest) -{ - return NULL; - - /* - GetConsumerRunningInfoRequestHeader* requestHeader = (GetConsumerRunningInfoRequestHeader)pRequest->getCommandCustomHeader(); - RemotingCommand* pResponse = RemotingCommand::createResponseCommand(NULL); - - pResponse = RemotingCommand::createResponseCommand( - REQUEST_CODE_NOT_SUPPORTED_VALUE, "request type not supported", NULL); - pResponse->setOpaque(pCmd->getOpaque()); - - ConsumerRunningInfo* consumerRunningInfo = m_pMQClientFactory->consumerRunningInfo(requestHeader->consumerGroup); - if (NULL != consumerRunningInfo) { - response.setCode(ResponseCode.SUCCESS); - response.setBody(consumerRunningInfo.encode()); - } else { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", - requestHeader.getConsumerGroup())); - } - return pResponse; - - // java - final RemotingCommand response = RemotingCommand.createResponseCommand(null); - final GetConsumerRunningInfoRequestHeader requestHeader = - (GetConsumerRunningInfoRequestHeader) request - .decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); - - ConsumerRunningInfo consumerRunningInfo = - this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup()); - if (null != consumerRunningInfo) { - if (requestHeader.isJstackEnable()) { - String jstack = UtilAll.jstack(); - consumerRunningInfo.setJstack(jstack); - } - - response.setCode(ResponseCode.SUCCESS); - response.setBody(consumerRunningInfo.encode()); - } else { - response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", - requestHeader.getConsumerGroup())); - } - - return response; - */ -} - - -RemotingCommand* ClientRemotingProcessor::consumeMessageDirectly(TcpTransport* pTts, RemotingCommand* pRequest) -{ - //TODO - return NULL; -} - - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/ClientRemotingProcessor.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/ClientRemotingProcessor.h b/rocketmq-client4cpp/src/ClientRemotingProcessor.h deleted file mode 100755 index 4cd2873..0000000 --- a/rocketmq-client4cpp/src/ClientRemotingProcessor.h +++ /dev/null @@ -1,45 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __CLIENTREMOTINGPROCESSOR_H__ -#define __CLIENTREMOTINGPROCESSOR_H__ - -#include "TcpRequestProcessor.h" - -namespace rmq -{ - class MQClientFactory; - class RemotingCommand; - - class ClientRemotingProcessor : public TcpRequestProcessor - { - public: - ClientRemotingProcessor(MQClientFactory* pMQClientFactory); - - RemotingCommand* processRequest(TcpTransport* pTts, RemotingCommand* pRequest); - RemotingCommand* checkTransactionState(TcpTransport* pTts, RemotingCommand* pRequest); - RemotingCommand* notifyConsumerIdsChanged(TcpTransport* pTts, RemotingCommand* pRequest); - RemotingCommand* resetOffset(TcpTransport* pTts, RemotingCommand* pRequest); - RemotingCommand* getConsumeStatus(TcpTransport* pTts, RemotingCommand* pRequest); - RemotingCommand* getConsumerRunningInfo(TcpTransport* pTts, RemotingCommand* pRequest); - RemotingCommand* consumeMessageDirectly(TcpTransport* pTts, RemotingCommand* pRequest); - - private: - MQClientFactory* m_pMQClientFactory; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/CommunicationMode.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/CommunicationMode.h b/rocketmq-client4cpp/src/CommunicationMode.h deleted file mode 100755 index 43b2941..0000000 --- a/rocketmq-client4cpp/src/CommunicationMode.h +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Copyright (C) 2013 kangliqiang ,kang...@163.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __COMMUNICATIONMODE_H__ -#define __COMMUNICATIONMODE_H__ - -namespace rmq -{ - /** - * Communication Mode - * - */ - enum CommunicationMode - { - SYNC, - ASYNC, - ONEWAY - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/FindBrokerResult.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/FindBrokerResult.h b/rocketmq-client4cpp/src/FindBrokerResult.h deleted file mode 100644 index 51a9845..0000000 --- a/rocketmq-client4cpp/src/FindBrokerResult.h +++ /dev/null @@ -1,28 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -#ifndef __FINDBROKERRESULT_H__ -#define __FINDBROKERRESULT_H__ - -namespace rmq -{ - typedef struct - { - std::string brokerAddr; - bool slave; - } FindBrokerResult; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/MQAdminImpl.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/MQAdminImpl.cpp b/rocketmq-client4cpp/src/MQAdminImpl.cpp deleted file mode 100755 index 2a6b597..0000000 --- a/rocketmq-client4cpp/src/MQAdminImpl.cpp +++ /dev/null @@ -1,295 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#include <list> -#include "SocketUtil.h" -#include "MQAdminImpl.h" -#include "MQClientFactory.h" -#include "MQClientAPIImpl.h" -#include "MQClientException.h" -#include "TopicConfig.h" -#include "TopicPublishInfo.h" -#include "MessageId.h" -#include "MessageDecoder.h" - -namespace rmq -{ - - -MQAdminImpl::MQAdminImpl(MQClientFactory* pMQClientFactory) -{ - m_pMQClientFactory = pMQClientFactory; -} - -MQAdminImpl::~MQAdminImpl() -{ - -} - -void MQAdminImpl::createTopic(const std::string& key, const std::string& newTopic, - int queueNum) -{ - return createTopic(key, newTopic, queueNum, 0); -} - - -void MQAdminImpl::createTopic(const std::string& key, const std::string& newTopic, - int queueNum, int topicSysFlag) -{ - try - { - MQClientAPIImpl* api = m_pMQClientFactory->getMQClientAPIImpl(); - TopicRouteDataPtr topicRouteData = api->getTopicRouteInfoFromNameServer(key, 1000 * 3); - - std::list<BrokerData> brokerDataList = topicRouteData->getBrokerDatas(); - if (!brokerDataList.empty()) - { - brokerDataList.sort(); - - MQClientException exception("", 0, "", 0); - bool hasException = false; - - std::list<BrokerData>::iterator it = brokerDataList.begin(); - - for (; it != brokerDataList.end(); it++) - { - std::map<int, std::string>::iterator it1 = (*it).brokerAddrs.find(MixAll::MASTER_ID); - if (it1 != (*it).brokerAddrs.end()) - { - std::string addr = it1->second; - - TopicConfig topicConfig(newTopic); - topicConfig.setReadQueueNums(queueNum); - topicConfig.setWriteQueueNums(queueNum); - topicConfig.setTopicSysFlag(topicSysFlag); - - try - { - api->createTopic(addr, key, topicConfig, 1000 * 3); - } - catch (MQClientException& e) - { - hasException = true; - exception = e; - } - } - } - - if (hasException) - { - throw exception; - } - } - else - { - THROW_MQEXCEPTION(MQClientException, "Not found broker, maybe key is wrong", -1); - } - } - catch (MQClientException e) - { - THROW_MQEXCEPTION(MQClientException, "create new topic failed", -1); - } -} - -std::vector<MessageQueue>* MQAdminImpl::fetchPublishMessageQueues(const std::string& topic) -{ - try - { - MQClientAPIImpl* api = m_pMQClientFactory->getMQClientAPIImpl(); - TopicRouteDataPtr topicRouteData = api->getTopicRouteInfoFromNameServer(topic, 1000 * 3); - - if (topicRouteData.ptr() != NULL) - { - TopicPublishInfoPtr topicPublishInfo = - MQClientFactory::topicRouteData2TopicPublishInfo(topic, *topicRouteData); - if (topicPublishInfo.ptr() != NULL && topicPublishInfo->ok()) - { - std::vector<MessageQueue>* ret = new std::vector<MessageQueue>(); - (*ret) = topicPublishInfo->getMessageQueueList(); - - /* - std::vector<MessageQueue>& mqs = ; - std::vector<MessageQueue>::iterator it = mqs.begin(); - for (; it != mqs.end(); it++) - { - ret->push_back(*it); - } - */ - - return ret; - } - } - } - catch (MQClientException e) - { - THROW_MQEXCEPTION(MQClientException, "Can not find Message Queue for this topic" + topic, -1); - } - - THROW_MQEXCEPTION(MQClientException, "Unknow why, Can not find Message Queue for this topic, " + topic, -1); -} - -std::set<MessageQueue>* MQAdminImpl::fetchSubscribeMessageQueues(const std::string& topic) -{ - try - { - TopicRouteDataPtr topicRouteData = - m_pMQClientFactory->getMQClientAPIImpl()->getTopicRouteInfoFromNameServer(topic, 1000 * 3); - if (topicRouteData.ptr() != NULL) - { - std::set<MessageQueue>* mqList = - MQClientFactory::topicRouteData2TopicSubscribeInfo(topic, *topicRouteData); - if (!mqList->empty()) - { - return mqList; - } - else - { - THROW_MQEXCEPTION(MQClientException, "Can not find Message Queue for this topic" + topic, -1); - } - } - } - catch (MQClientException e) - { - THROW_MQEXCEPTION(MQClientException, "Can not find Message Queue for this topic" + topic, -1); - } - - THROW_MQEXCEPTION(MQClientException, "Unknow why, Can not find Message Queue for this topic: " + topic, -1); -} - -long long MQAdminImpl::searchOffset(const MessageQueue& mq, long long timestamp) -{ - std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName()); - if (brokerAddr.empty()) - { - m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic()); - brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName()); - } - - if (!brokerAddr.empty()) - { - try - { - return m_pMQClientFactory->getMQClientAPIImpl()->searchOffset(brokerAddr, mq.getTopic(), - mq.getQueueId(), timestamp, 1000 * 3); - } - catch (MQClientException e) - { - THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1); - } - } - THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1); -} - -long long MQAdminImpl::maxOffset(const MessageQueue& mq) -{ - std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName()); - if (brokerAddr.empty()) - { - m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic()); - brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName()); - } - - if (!brokerAddr.empty()) - { - try - { - return m_pMQClientFactory->getMQClientAPIImpl()->getMaxOffset(brokerAddr, mq.getTopic(), - mq.getQueueId(), 1000 * 3); - } - catch (MQClientException e) - { - THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1); - } - } - THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1); -} - -long long MQAdminImpl::minOffset(const MessageQueue& mq) -{ - std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName()); - if (brokerAddr.empty()) - { - m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic()); - brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName()); - } - - if (!brokerAddr.empty()) - { - try - { - return m_pMQClientFactory->getMQClientAPIImpl()->getMinOffset(brokerAddr, mq.getTopic(), - mq.getQueueId(), 1000 * 3); - } - catch (MQClientException e) - { - THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1); - } - } - - THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1); -} - -long long MQAdminImpl::earliestMsgStoreTime(const MessageQueue& mq) -{ - std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName()); - if (brokerAddr.empty()) - { - m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic()); - brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName()); - } - - if (!brokerAddr.empty()) - { - try - { - return m_pMQClientFactory->getMQClientAPIImpl()->getEarliestMsgStoretime(brokerAddr, - mq.getTopic(), mq.getQueueId(), 1000 * 3); - } - catch (MQClientException e) - { - THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1); - } - } - - THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1); -} - -MessageExt* MQAdminImpl::viewMessage(const std::string& msgId) -{ - try - { - MessageId messageId = MessageDecoder::decodeMessageId(msgId); - return m_pMQClientFactory->getMQClientAPIImpl()->viewMessage( - socketAddress2String(messageId.getAddress()), messageId.getOffset(), 1000 * 3); - } - catch (UnknownHostException e) - { - THROW_MQEXCEPTION(MQClientException, "message id illegal", -1); - } -} - -QueryResult MQAdminImpl::queryMessage(const std::string& topic, - const std::string& key, - int maxNum, long long begin, long long end) -{ - //TODO - std::list<MessageExt*> messageList; - QueryResult result(0, messageList); - - return result; -} - -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/MQAdminImpl.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/MQAdminImpl.h b/rocketmq-client4cpp/src/MQAdminImpl.h deleted file mode 100755 index 907d61e..0000000 --- a/rocketmq-client4cpp/src/MQAdminImpl.h +++ /dev/null @@ -1,63 +0,0 @@ -/** -* Copyright (C) 2013 kangliqiang ,kang...@163.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -#ifndef __MQADMINIMPL_H__ -#define __MQADMINIMPL_H__ - -#include <string> -#include <list> -#include <set> -#include <vector> - -#include "MessageExt.h" -#include "QueryResult.h" - -namespace rmq -{ - class MQClientFactory; - class MessageQueue; - - class MQAdminImpl - { - public: - MQAdminImpl(MQClientFactory* pMQClientFactory); - ~MQAdminImpl(); - - void createTopic(const std::string& key, const std::string& newTopic, int queueNum); - void createTopic(const std::string& key, const std::string& newTopic, int queueNum, int topicSysFlag); - - std::vector<MessageQueue>* fetchPublishMessageQueues(const std::string& topic); - std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic); - long long searchOffset(const MessageQueue& mq, long long timestamp); - long long maxOffset(const MessageQueue& mq); - long long minOffset(const MessageQueue& mq); - - long long earliestMsgStoreTime(const MessageQueue& mq); - - MessageExt* viewMessage(const std::string& msgId); - - QueryResult queryMessage(const std::string& topic, - const std::string& key, - int maxNum, - long long begin, - long long end); - - private: - MQClientFactory* m_pMQClientFactory; - }; -} - -#endif