http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/MQProtos.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/protocol/MQProtos.h b/rocketmq-cpp/src/protocol/MQProtos.h new file mode 100755 index 0000000..50c1841 --- /dev/null +++ b/rocketmq-cpp/src/protocol/MQProtos.h @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __MQPROTOS_H__ +#define __MQPROTOS_H__ + +namespace rocketmq { +//<!*************************************************************************** +enum MQRequestCode { + // send msg to Broker + SEND_MESSAGE = 10, + // subscribe msg from Broker + PULL_MESSAGE = 11, + // query msg from Broker + QUERY_MESSAGE = 12, + // query Broker Offset + QUERY_BROKER_OFFSET = 13, + // query Consumer Offset from broker + QUERY_CONSUMER_OFFSET = 14, + // update Consumer Offset to broker + UPDATE_CONSUMER_OFFSET = 15, + // create or update Topic to broker + UPDATE_AND_CREATE_TOPIC = 17, + // get all topic config info from broker + GET_ALL_TOPIC_CONFIG = 21, + //git all topic list from broker + GET_TOPIC_CONFIG_LIST = 22, + //get topic name list from broker + GET_TOPIC_NAME_LIST = 23, + UPDATE_BROKER_CONFIG = 25, + GET_BROKER_CONFIG = 26, + TRIGGER_DELETE_FILES = 27, + GET_BROKER_RUNTIME_INFO = 28, + SEARCH_OFFSET_BY_TIMESTAMP = 29, + GET_MAX_OFFSET = 30, + GET_MIN_OFFSET = 31, + GET_EARLIEST_MSG_STORETIME = 32, + VIEW_MESSAGE_BY_ID = 33, + //send heartbeat to broker, and register itself + HEART_BEAT = 34, + //unregister client to broker + UNREGISTER_CLIENT = 35, + //send back consume fail msg to broker + CONSUMER_SEND_MSG_BACK = 36, + //Commit Or Rollback transaction + END_TRANSACTION = 37, + // get consumer list by group from broker + GET_CONSUMER_LIST_BY_GROUP = 38, + + CHECK_TRANSACTION_STATE = 39, + //broker send notify to consumer when consumer lists changes + NOTIFY_CONSUMER_IDS_CHANGED = 40, + //lock mq before orderly consume + LOCK_BATCH_MQ = 41, + //unlock mq after orderly consume + UNLOCK_BATCH_MQ = 42, + GET_ALL_CONSUMER_OFFSET = 43, + GET_ALL_DELAY_OFFSET = 45, + PUT_KV_CONFIG = 100, + GET_KV_CONFIG = 101, + DELETE_KV_CONFIG = 102, + REGISTER_BROKER = 103, + UNREGISTER_BROKER = 104, + GET_ROUTEINTO_BY_TOPIC = 105, + GET_BROKER_CLUSTER_INFO = 106, + UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200, + GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201, + GET_TOPIC_STATS_INFO = 202, + GET_CONSUMER_CONNECTION_LIST = 203, + GET_PRODUCER_CONNECTION_LIST = 204, + WIPE_WRITE_PERM_OF_BROKER = 205, + + GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206, + DELETE_SUBSCRIPTIONGROUP = 207, + GET_CONSUME_STATS = 208, + SUSPEND_CONSUMER = 209, + RESUME_CONSUMER = 210, + RESET_CONSUMER_OFFSET_IN_CONSUMER = 211, + RESET_CONSUMER_OFFSET_IN_BROKER = 212, + ADJUST_CONSUMER_THREAD_POOL = 213, + WHO_CONSUME_THE_MESSAGE = 214, + + DELETE_TOPIC_IN_BROKER = 215, + DELETE_TOPIC_IN_NAMESRV = 216, + + GET_KV_CONFIG_BY_VALUE = 217, + + DELETE_KV_CONFIG_BY_VALUE = 218, + + GET_KVLIST_BY_NAMESPACE = 219, + + + RESET_CONSUMER_CLIENT_OFFSET = 220, + + GET_CONSUMER_STATUS_FROM_CLIENT = 221, + + INVOKE_BROKER_TO_RESET_OFFSET = 222, + + INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223, + + QUERY_TOPIC_CONSUME_BY_WHO = 300, + + GET_TOPICS_BY_CLUSTER = 224, + + REGISTER_FILTER_SERVER = 301, + + REGISTER_MESSAGE_FILTER_CLASS = 302, + + QUERY_CONSUME_TIME_SPAN = 303, + + GET_SYSTEM_TOPIC_LIST_FROM_NS = 304, + GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305, + + CLEAN_EXPIRED_CONSUMEQUEUE = 306, + + GET_CONSUMER_RUNNING_INFO = 307, + + QUERY_CORRECTION_OFFSET = 308, + + CONSUME_MESSAGE_DIRECTLY = 309, + + SEND_MESSAGE_V2 = 310, + + GET_UNIT_TOPIC_LIST = 311, + GET_HAS_UNIT_SUB_TOPIC_LIST = 312, + GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313, + + CLONE_GROUP_OFFSET = 314, + + VIEW_BROKER_STATS_DATA = 315 +}; + +//<!*************************************************************************** +enum MQResponseCode { + //rcv success response from broker + SUCCESS_VALUE = 0, + //rcv exception from broker + SYSTEM_ERROR = 1, + //rcv symtem busy from broker + SYSTEM_BUSY = 2, + //broker don't support the request code + REQUEST_CODE_NOT_SUPPORTED = 3, + //broker flush disk timeout error + FLUSH_DISK_TIMEOUT = 10, + //broker sync double write, slave broker not available + SLAVE_NOT_AVAILABLE = 11, + //broker sync double write, slave broker flush msg timeout + FLUSH_SLAVE_TIMEOUT = 12, + //broker rcv illegal mesage + MESSAGE_ILLEGAL = 13, + //service not available due to broker or namesrv in shutdown status + SERVICE_NOT_AVAILABLE = 14, + //this version is not supported on broker or namesrv + VERSION_NOT_SUPPORTED = 15, + //broker or Namesrv has no permission to do this operation + NO_PERMISSION = 16, + //topic is not exist on broker + TOPIC_NOT_EXIST = 17, + //broker already created this topic + TOPIC_EXIST_ALREADY = 18, + //pulled msg was not found + PULL_NOT_FOUND = 19, + //retry later + PULL_RETRY_IMMEDIATELY = 20, + //pull msg with wrong offset + PULL_OFFSET_MOVED = 21, + //could not find the query msg + QUERY_NOT_FOUND = 22, + + SUBSCRIPTION_PARSE_FAILED = 23, + SUBSCRIPTION_NOT_EXIST = 24, + SUBSCRIPTION_NOT_LATEST = 25, + SUBSCRIPTION_GROUP_NOT_EXIST = 26, + + TRANSACTION_SHOULD_COMMIT = 200, + TRANSACTION_SHOULD_ROLLBACK = 201, + TRANSACTION_STATE_UNKNOW = 202, + TRANSACTION_STATE_GROUP_WRONG = 203, + + CONSUMER_NOT_ONLINE = 206, + CONSUME_MSG_TIMEOUT = 207 +}; +//<!************************************************************************ +} //<!end namespace; + +#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/MessageQueue.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/protocol/MessageQueue.cpp b/rocketmq-cpp/src/protocol/MessageQueue.cpp new file mode 100755 index 0000000..f1b3f8f --- /dev/null +++ b/rocketmq-cpp/src/protocol/MessageQueue.cpp @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "MessageQueue.h" +#include "UtilAll.h" + +namespace rocketmq { +//<!************************************************************************ +MessageQueue::MessageQueue() { + m_queueId = -1; // invalide mq + m_topic.clear(); + m_brokerName.clear(); +} + +MessageQueue::MessageQueue(const string& topic, const string& brokerName, + int queueId) + : m_topic(topic), m_brokerName(brokerName), m_queueId(queueId) {} + +MessageQueue::MessageQueue(const MessageQueue& other) + : m_topic(other.m_topic), + m_brokerName(other.m_brokerName), + m_queueId(other.m_queueId) {} + +MessageQueue& MessageQueue::operator=(const MessageQueue& other) { + if (this != &other) { + m_brokerName = other.m_brokerName; + m_topic = other.m_topic; + m_queueId = other.m_queueId; + } + return *this; +} + +string MessageQueue::getTopic() const { return m_topic; } + +void MessageQueue::setTopic(const string& topic) { m_topic = topic; } + +string MessageQueue::getBrokerName() const { return m_brokerName; } + +void MessageQueue::setBrokerName(const string& brokerName) { + m_brokerName = brokerName; +} + +int MessageQueue::getQueueId() const { return m_queueId; } + +void MessageQueue::setQueueId(int queueId) { m_queueId = queueId; } + +bool MessageQueue::operator==(const MessageQueue& mq) const { + if (this == &mq) { + return true; + } + + if (m_brokerName != mq.m_brokerName) { + return false; + } + + if (m_queueId != mq.m_queueId) { + return false; + } + + if (m_topic != mq.m_topic) { + return false; + } + + return true; +} + +int MessageQueue::compareTo(const MessageQueue& mq) const { + int result = m_topic.compare(mq.m_topic); + if (result != 0) { + return result; + } + + result = m_brokerName.compare(mq.m_brokerName); + if (result != 0) { + return result; + } + + return m_queueId - mq.m_queueId; +} + +bool MessageQueue::operator<(const MessageQueue& mq) const { + return compareTo(mq) < 0; +} + +Json::Value MessageQueue::toJson() const { + Json::Value outJson; + outJson["topic"] = m_topic; + outJson["brokerName"] = m_brokerName; + outJson["queueId"] = m_queueId; + return outJson; +} + +//<!*************************************************************************** +} //<!end namespace; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/MessageQueue.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/protocol/MessageQueue.h b/rocketmq-cpp/src/protocol/MessageQueue.h new file mode 100755 index 0000000..0d47bf8 --- /dev/null +++ b/rocketmq-cpp/src/protocol/MessageQueue.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __MESSAGEQUEUE_H__ +#define __MESSAGEQUEUE_H__ + +#include <string> +#include "json/json.h" + +namespace rocketmq { +//<!************************************************************************/ +//<!* MQ(T,B,ID); +//<!************************************************************************/ +class MessageQueue { + public: + MessageQueue(); + MessageQueue(const std::string& topic, const std::string& brokerName, + int queueId); + MessageQueue(const MessageQueue& other); + MessageQueue& operator=(const MessageQueue& other); + + std::string getTopic() const; + void setTopic(const std::string& topic); + + std::string getBrokerName() const; + void setBrokerName(const std::string& brokerName); + + int getQueueId() const; + void setQueueId(int queueId); + + bool operator==(const MessageQueue& mq) const; + bool operator<(const MessageQueue& mq) const; + int compareTo(const MessageQueue& mq) const; + Json::Value toJson() const; + + private: + std::string m_topic; + std::string m_brokerName; + int m_queueId; +}; +//<!*************************************************************************** +} //<!end namespace; +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/ProcessQueueInfo.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/protocol/ProcessQueueInfo.h b/rocketmq-cpp/src/protocol/ProcessQueueInfo.h new file mode 100644 index 0000000..d7493a5 --- /dev/null +++ b/rocketmq-cpp/src/protocol/ProcessQueueInfo.h @@ -0,0 +1,86 @@ +#ifndef __PROCESSQUEUEINFO_H__ +#define __PROCESSQUEUEINFO_H__ + +#include "UtilAll.h" +#include "json/json.h" + +namespace rocketmq { +class ProcessQueueInfo { + public: + ProcessQueueInfo() { + commitOffset = 0; + cachedMsgMinOffset = 0; + cachedMsgMaxOffset = 0; + cachedMsgCount = 0; + transactionMsgMinOffset = 0; + transactionMsgMaxOffset = 0; + transactionMsgCount = 0; + locked = false; + tryUnlockTimes = 0; + lastLockTimestamp = 123; + droped = false; + lastPullTimestamp = 0; + lastConsumeTimestamp = 0; + } + virtual ~ProcessQueueInfo() {} + + public: + const uint64 getCommitOffset() const { return commitOffset; } + + void setCommitOffset(uint64 input_commitOffset) { + commitOffset = input_commitOffset; + } + + void setLocked(bool in_locked) { locked = in_locked; } + + const bool isLocked() const { return locked; } + + void setDroped(bool in_dropped) { droped = in_dropped; } + + const bool isDroped() const { return droped; } + + Json::Value toJson() const { + Json::Value outJson; + outJson["commitOffset"] = (UtilAll::to_string(commitOffset)).c_str(); + outJson["cachedMsgMinOffset"] = + (UtilAll::to_string(cachedMsgMinOffset)).c_str(); + outJson["cachedMsgMaxOffset"] = + (UtilAll::to_string(cachedMsgMaxOffset)).c_str(); + outJson["cachedMsgCount"] = (int)(cachedMsgCount); + outJson["transactionMsgMinOffset"] = + (UtilAll::to_string(transactionMsgMinOffset)).c_str(); + outJson["transactionMsgMaxOffset"] = + (UtilAll::to_string(transactionMsgMaxOffset)).c_str(); + outJson["transactionMsgCount"] = (int)(transactionMsgCount); + outJson["locked"] = (locked); + outJson["tryUnlockTimes"] = (int)(tryUnlockTimes); + outJson["lastLockTimestamp"] = + (UtilAll::to_string(lastLockTimestamp)).c_str(); + outJson["droped"] = (droped); + outJson["lastPullTimestamp"] = + (UtilAll::to_string(lastPullTimestamp)).c_str(); + outJson["lastConsumeTimestamp"] = + (UtilAll::to_string(lastConsumeTimestamp)).c_str(); + + return outJson; + } + + public: + uint64 commitOffset; + uint64 cachedMsgMinOffset; + uint64 cachedMsgMaxOffset; + int cachedMsgCount; + uint64 transactionMsgMinOffset; + uint64 transactionMsgMaxOffset; + int transactionMsgCount; + bool locked; + int tryUnlockTimes; + uint64 lastLockTimestamp; + + bool droped; + uint64 lastPullTimestamp; + uint64 lastConsumeTimestamp; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/RemotingCommand.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/protocol/RemotingCommand.cpp b/rocketmq-cpp/src/protocol/RemotingCommand.cpp new file mode 100644 index 0000000..ff6e53e --- /dev/null +++ b/rocketmq-cpp/src/protocol/RemotingCommand.cpp @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "RemotingCommand.h" +#include "ByteOrder.h" +#include "Logging.h" +#include "MQProtos.h" +#include "MQVersion.h" +#include "SessionCredentials.h" + +namespace rocketmq { +boost::atomic<int> RemotingCommand::s_seqNumber; +boost::mutex RemotingCommand::m_clock; +//<!************************************************************************ +RemotingCommand::RemotingCommand(int code, + CommandHeader* pExtHeader /* = NULL */) + : m_code(code), + m_language("CPP"), + m_version(MQVersion::s_CurrentVersion), + m_flag(0), + m_remark(""), + m_pExtHeader(pExtHeader) { + boost::lock_guard<boost::mutex> lock(m_clock); + m_opaque = (s_seqNumber.load(boost::memory_order_acquire)) % + (numeric_limits<int>::max()); + s_seqNumber.store(m_opaque, boost::memory_order_release); + ++s_seqNumber; +} + +RemotingCommand::RemotingCommand(int code, string language, int version, + int opaque, int flag, string remark, + CommandHeader* pExtHeader) + : m_code(code), + m_language(language), + m_version(version), + m_opaque(opaque), + m_flag(flag), + m_remark(remark), + m_pExtHeader(pExtHeader) {} + +RemotingCommand::~RemotingCommand() { m_pExtHeader = NULL; } + +void RemotingCommand::Encode() { + Json::Value root; + root["code"] = m_code; + root["language"] = "CPP"; + root["version"] = m_version; + root["opaque"] = m_opaque; + root["flag"] = m_flag; + root["remark"] = m_remark; + + if (m_pExtHeader) { + Json::Value extJson; + m_pExtHeader->Encode(extJson); + + extJson[SessionCredentials::Signature] = + m_extFields[SessionCredentials::Signature]; + extJson[SessionCredentials::AccessKey] = + m_extFields[SessionCredentials::AccessKey]; + extJson[SessionCredentials::ONSChannelKey] = + m_extFields[SessionCredentials::ONSChannelKey]; + + root["extFields"] = extJson; + } else { // for heartbeat + Json::Value extJson; + extJson[SessionCredentials::Signature] = + m_extFields[SessionCredentials::Signature]; + extJson[SessionCredentials::AccessKey] = + m_extFields[SessionCredentials::AccessKey]; + extJson[SessionCredentials::ONSChannelKey] = + m_extFields[SessionCredentials::ONSChannelKey]; + root["extFields"] = extJson; + } + + Json::FastWriter fastwrite; + string data = fastwrite.write(root); + + uint32 headLen = data.size(); + uint32 totalLen = 4 + headLen + m_body.getSize(); + + uint32 messageHeader[2]; + messageHeader[0] = ByteOrder::swapIfLittleEndian(totalLen); + messageHeader[1] = ByteOrder::swapIfLittleEndian(headLen); + + //<!include self 4 bytes, see : doc/protocol.txt; + m_head.setSize(4 + 4 + headLen); + m_head.copyFrom(messageHeader, 0, sizeof(messageHeader)); + m_head.copyFrom(data.c_str(), sizeof(messageHeader), headLen); +} + +const MemoryBlock* RemotingCommand::GetHead() const { return &m_head; } + +const MemoryBlock* RemotingCommand::GetBody() const { return &m_body; } + +void RemotingCommand::SetBody(const char* pData, int len) { + m_body.reset(); + m_body.setSize(len); + m_body.copyFrom(pData, 0, len); +} + +RemotingCommand* RemotingCommand::Decode(const MemoryBlock& mem) { + //<!decode 1 bytes,4+head+body + uint32 messageHeader[1]; + mem.copyTo(messageHeader, 0, sizeof(messageHeader)); + int totalLen = mem.getSize(); + int headLen = ByteOrder::swapIfLittleEndian(messageHeader[0]); + int bodyLen = totalLen - 4 - headLen; + + //<!decode header; + const char* const pData = static_cast<const char*>(mem.getData()); + Json::Reader reader; + Json::Value object; + const char* begin = pData + 4; + const char* end = pData + 4 + headLen; + + if (!reader.parse(begin, end, object)) { + THROW_MQEXCEPTION(MQClientException, "conn't parse json", -1); + } + + int code = object["code"].asInt(); + + string language = object["language"].asString(); + int version = object["version"].asInt(); + int opaque = object["opaque"].asInt(); + int flag = object["flag"].asInt(); + Json::Value v = object["remark"]; + string remark = ""; + if (!v.isNull()) { + remark = object["remark"].asString(); + } + LOG_DEBUG("code:%d, remark:%s, version:%d, opaque:%d, flag:%d, remark:%s, headLen:%d, bodyLen:%d ", + code, language.c_str(), version, opaque, flag, remark.c_str(), headLen, bodyLen); + RemotingCommand* cmd = + new RemotingCommand(code, language, version, opaque, flag, remark, NULL); + cmd->setParsedJson(object); + if (bodyLen > 0) { + cmd->SetBody(pData + 4 + headLen, bodyLen); + } + return cmd; +} + +void RemotingCommand::markResponseType() { + int bits = 1 << RPC_TYPE; + m_flag |= bits; +} + +bool RemotingCommand::isResponseType() { + int bits = 1 << RPC_TYPE; + return (m_flag & bits) == bits; +} + +void RemotingCommand::markOnewayRPC() { + int bits = 1 << RPC_ONEWAY; + m_flag |= bits; +} + +bool RemotingCommand::isOnewayRPC() { + int bits = 1 << RPC_ONEWAY; + return (m_flag & bits) == bits; +} + +void RemotingCommand::setOpaque(const int opa) { m_opaque = opa; } + +void RemotingCommand::SetExtHeader(int code) { + try { + Json::Value ext = m_parsedJson["extFields"]; + if (!ext.isNull()) { + m_pExtHeader = NULL; + switch (code) { + case SEND_MESSAGE: + m_pExtHeader.reset(SendMessageResponseHeader::Decode(ext)); + break; + case PULL_MESSAGE: + m_pExtHeader.reset(PullMessageResponseHeader::Decode(ext)); + break; + case GET_MIN_OFFSET: + m_pExtHeader.reset(GetMinOffsetResponseHeader::Decode(ext)); + break; + case GET_MAX_OFFSET: + m_pExtHeader.reset(GetMaxOffsetResponseHeader::Decode(ext)); + break; + case SEARCH_OFFSET_BY_TIMESTAMP: + m_pExtHeader.reset(SearchOffsetResponseHeader::Decode(ext)); + break; + case GET_EARLIEST_MSG_STORETIME: + m_pExtHeader.reset( + GetEarliestMsgStoretimeResponseHeader::Decode(ext)); + break; + case QUERY_CONSUMER_OFFSET: + m_pExtHeader.reset(QueryConsumerOffsetResponseHeader::Decode(ext)); + break; + case RESET_CONSUMER_CLIENT_OFFSET: + m_pExtHeader.reset(ResetOffsetRequestHeader::Decode(ext)); + break; + case GET_CONSUMER_RUNNING_INFO: + m_pExtHeader.reset(GetConsumerRunningInfoRequestHeader::Decode(ext)); + break; + case NOTIFY_CONSUMER_IDS_CHANGED: + m_pExtHeader.reset( + NotifyConsumerIdsChangedRequestHeader::Decode(ext)); + default: + break; + } + } + } catch (MQException& e) { + LOG_ERROR("set response head error"); + } +} + +void RemotingCommand::setCode(int code) { m_code = code; } + +int RemotingCommand::getCode() const { return m_code; } + +int RemotingCommand::getOpaque() const { return m_opaque; } + +string RemotingCommand::getRemark() const { return m_remark; } + +void RemotingCommand::setRemark(string mark) { m_remark = mark; } + +CommandHeader* RemotingCommand::getCommandHeader() const { + return m_pExtHeader.get(); +} + +void RemotingCommand::setParsedJson(Json::Value json) { + m_parsedJson = json; +} + +const int RemotingCommand::getFlag() const { return m_flag; } + +const int RemotingCommand::getVersion() const { return m_version; } + +void RemotingCommand::setMsgBody(const string& body) { m_msgBody = body; } + +string RemotingCommand::getMsgBody() const { return m_msgBody; } + +void RemotingCommand::addExtField(const string& key, const string& value) { + m_extFields[key] = value; +} + +} //<!end namespace; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/RemotingCommand.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/protocol/RemotingCommand.h b/rocketmq-cpp/src/protocol/RemotingCommand.h new file mode 100755 index 0000000..633a511 --- /dev/null +++ b/rocketmq-cpp/src/protocol/RemotingCommand.h @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __REMOTINGCOMMAND_H__ +#define __REMOTINGCOMMAND_H__ +#include <boost/atomic.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/thread.hpp> +#include <memory> +#include <sstream> +#include "CommandHeader.h" +#include "dataBlock.h" + +namespace rocketmq { +//<!*************************************************************************** +const int RPC_TYPE = 0; // 0, REQUEST_COMMAND // 1, RESPONSE_COMMAND; +const int RPC_ONEWAY = 1; // 0, RPC // 1, Oneway; +//<!*************************************************************************** +class RemotingCommand { + public: + RemotingCommand(int code, CommandHeader* pCustomHeader = NULL); + RemotingCommand(int code, string language, int version, int opaque, int flag, + string remark, CommandHeader* pCustomHeader); + virtual ~RemotingCommand(); + + const MemoryBlock* GetHead() const; + const MemoryBlock* GetBody() const; + + void SetBody(const char* pData, int len); + void setOpaque(const int opa); + void SetExtHeader(int code); + + void setCode(int code); + int getCode() const; + int getOpaque() const; + void setRemark(string mark); + string getRemark() const; + void markResponseType(); + bool isResponseType(); + void markOnewayRPC(); + bool isOnewayRPC(); + void setParsedJson(Json::Value json); + + CommandHeader* getCommandHeader() const; + const int getFlag() const; + const int getVersion() const; + + void addExtField(const string& key, const string& value); + string getMsgBody() const; + void setMsgBody(const string& body); + + public: + void Encode(); + static RemotingCommand* Decode(const MemoryBlock& mem); + + private: + int m_code; + string m_language; + int m_version; + int m_opaque; + int m_flag; + string m_remark; + string m_msgBody; + map<string, string> m_extFields; + + static boost::mutex m_clock; + MemoryBlock m_head; + MemoryBlock m_body; + //<!save here + Json::Value m_parsedJson; + static boost::atomic<int> s_seqNumber; + unique_ptr<CommandHeader> m_pExtHeader; +}; + +} //<!end namespace; + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/RemotingSerializable.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/protocol/RemotingSerializable.h b/rocketmq-cpp/src/protocol/RemotingSerializable.h new file mode 100755 index 0000000..812a892 --- /dev/null +++ b/rocketmq-cpp/src/protocol/RemotingSerializable.h @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __REMOTINGSERIALIZABLE_H__ +#define __REMOTINGSERIALIZABLE_H__ +#include "json/json.h" + +namespace rocketmq { +//<!*************************************************************************** +class RemotingSerializable { + public: + virtual ~RemotingSerializable(){}; + virtual void Encode(std::string& outData) = 0; +}; + +//<!************************************************************************ +} //<!end namespace; + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/TopicList.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/protocol/TopicList.h b/rocketmq-cpp/src/protocol/TopicList.h new file mode 100755 index 0000000..d8d14a7 --- /dev/null +++ b/rocketmq-cpp/src/protocol/TopicList.h @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __TOPICLIST_H__ +#define __TOPICLIST_H__ +#include <string> +#include <vector> +#include "dataBlock.h" + +namespace rocketmq { +//<!*************************************************************************** +class TopicList { + public: + static TopicList* Decode(const MemoryBlock* mem) { return new TopicList(); } + + private: + vector<string> m_topicList; +}; +//<!************************************************************************ +} //<!end namespace; + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/TopicRouteData.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/protocol/TopicRouteData.h b/rocketmq-cpp/src/protocol/TopicRouteData.h new file mode 100755 index 0000000..ec8f842 --- /dev/null +++ b/rocketmq-cpp/src/protocol/TopicRouteData.h @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __TOPICROUTEDATA_H__ +#define __TOPICROUTEDATA_H__ +#include <algorithm> +#include "Logging.h" +#include "UtilAll.h" +#include "dataBlock.h" +#include "json/json.h" + +namespace rocketmq { +//<!*************************************************************************** +struct QueueData { + string brokerName; + int readQueueNums; + int writeQueueNums; + int perm; + + bool operator<(const QueueData& other) const { + return brokerName < other.brokerName; + } + + bool operator==(const QueueData& other) const { + if (brokerName == other.brokerName && + readQueueNums == other.readQueueNums && + writeQueueNums == other.writeQueueNums && perm == other.perm) { + return true; + } + return false; + } +}; + +//<!*************************************************************************** +struct BrokerData { + string brokerName; + map<int, string> brokerAddrs; //<!0:master,1,2.. slave + + bool operator<(const BrokerData& other) const { + return brokerName < other.brokerName; + } + + bool operator==(const BrokerData& other) const { + if (brokerName == other.brokerName && brokerAddrs == other.brokerAddrs) { + return true; + } + return false; + } +}; + +//<!************************************************************************/ +class TopicRouteData { + public: + virtual ~TopicRouteData() { + m_brokerDatas.clear(); + m_queueDatas.clear(); + } + + static TopicRouteData* Decode(const MemoryBlock* mem) { + //<!see doc/TopicRouteData.json; + const char* const pData = static_cast<const char*>(mem->getData()); + string data(pData, mem->getSize()); + + Json::Value root; + Json::CharReaderBuilder charReaderBuilder; + charReaderBuilder.settings_["allowNumericKeys"] = true; + unique_ptr<Json::CharReader> pCharReaderPtr(charReaderBuilder.newCharReader()); + const char* begin = pData; + const char* end = pData + mem->getSize(); + string errs; + if (!pCharReaderPtr->parse(begin, end, &root, &errs)) { + LOG_ERROR("parse json error:%s, value isArray:%d, isObject:%d", errs.c_str(), root.isArray(), root.isObject()); + return NULL; + } + + TopicRouteData* trd = new TopicRouteData(); + trd->setOrderTopicConf(root["orderTopicConf"].asString()); + + Json::Value qds = root["queueDatas"]; + for (unsigned int i = 0; i < qds.size(); i++) { + QueueData d; + Json::Value qd = qds[i]; + d.brokerName = qd["brokerName"].asString(); + d.readQueueNums = qd["readQueueNums"].asInt(); + d.writeQueueNums = qd["writeQueueNums"].asInt(); + d.perm = qd["perm"].asInt(); + + trd->getQueueDatas().push_back(d); + } + + sort(trd->getQueueDatas().begin(), trd->getQueueDatas().end()); + + Json::Value bds = root["brokerDatas"]; + for (unsigned int i = 0; i < bds.size(); i++) { + BrokerData d; + Json::Value bd = bds[i]; + d.brokerName = bd["brokerName"].asString(); + + LOG_DEBUG("brokerName:%s", d.brokerName.c_str()); + + Json::Value bas = bd["brokerAddrs"]; + Json::Value::Members mbs = bas.getMemberNames(); + for (size_t i = 0; i < mbs.size(); i++) { + string key = mbs.at(i); + LOG_DEBUG("brokerid:%s,brokerAddr:%s", key.c_str(), + bas[key].asString().c_str()); + d.brokerAddrs[atoi(key.c_str())] = bas[key].asString(); + } + + trd->getBrokerDatas().push_back(d); + } + + sort(trd->getBrokerDatas().begin(), trd->getBrokerDatas().end()); + + return trd; + } + + string selectBrokerAddr() { + vector<BrokerData>::iterator it = m_brokerDatas.begin(); + for (; it != m_brokerDatas.end(); ++it) { + map<int, string>::iterator it1 = (*it).brokerAddrs.find(MASTER_ID); + if (it1 != (*it).brokerAddrs.end()) { + return it1->second; + } + } + return ""; + } + + + vector<QueueData>& getQueueDatas() { return m_queueDatas; } + + vector<BrokerData>& getBrokerDatas() { return m_brokerDatas; } + + const string& getOrderTopicConf() const { return m_orderTopicConf; } + + void setOrderTopicConf(const string& orderTopicConf) { + m_orderTopicConf = orderTopicConf; + } + + bool operator==(const TopicRouteData& other) const { + if (m_brokerDatas != other.m_brokerDatas) { + return false; + } + + if (m_orderTopicConf != other.m_orderTopicConf) { + return false; + } + + if (m_queueDatas != other.m_queueDatas) { + return false; + } + return true; + } + + public: + private: + string m_orderTopicConf; + vector<QueueData> m_queueDatas; + vector<BrokerData> m_brokerDatas; +}; + +} //<!end namespace; + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/batch_descriptor.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/thread/disruptor/batch_descriptor.h b/rocketmq-cpp/src/thread/disruptor/batch_descriptor.h new file mode 100755 index 0000000..ba1a035 --- /dev/null +++ b/rocketmq-cpp/src/thread/disruptor/batch_descriptor.h @@ -0,0 +1,70 @@ +// Copyright (c) 2011, François Saint-Jacques +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of the disruptor-- nor the +// names of its contributors may be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL FRANÃOIS SAINT-JACQUES BE LIABLE FOR ANY +// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#ifndef DISRUPTOR_BATCH_DESCRIPTOR_H_ // NOLINT +#define DISRUPTOR_BATCH_DESCRIPTOR_H_ // NOLINT + +#include "sequence.h" + +namespace rocketmq { + +// Used to record the batch of sequences claimed via {@link Sequencer}. +class BatchDescriptor { + public: + // Create a holder for tracking a batch of claimed sequences in a + // {@link Sequencer} + // + // @param size of the batch to claim. + BatchDescriptor(int size) : + size_(size), + end_(kInitialCursorValue) {} + + // Get the size of the batch + int size() const { return size_; } + + // Get the end sequence of a batch. + // + // @return the end sequence in the batch. + int64_t end() const { return end_; } + + // Set the end sequence of a batch. + // + // @param end sequence in the batch. + void set_end(int64_t end) { end_ = end; } + + + // Get the starting sequence of the batch. + // + // @return starting sequence in the batch. + int64_t Start() const { return end_ - size_ + 1L; } + + private: + int size_; + int64_t end_; +}; + +}; // namespace rocketmq + +#endif // DISRUPTOR_SEQUENCE_BATCH_H_ NOLINT http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/claim_strategy.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/thread/disruptor/claim_strategy.h b/rocketmq-cpp/src/thread/disruptor/claim_strategy.h new file mode 100755 index 0000000..0f3263a --- /dev/null +++ b/rocketmq-cpp/src/thread/disruptor/claim_strategy.h @@ -0,0 +1,231 @@ +// Copyright (c) 2011, François Saint-Jacques +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of the disruptor-- nor the +// names of its contributors may be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL FRANÃOIS SAINT-JACQUES BE LIABLE FOR ANY +// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#ifndef DISRUPTOR_CLAIM_STRATEGY_H_ // NOLINT +#define DISRUPTOR_CLAIM_STRATEGY_H_ // NOLINT + +#include <boost/thread.hpp> +#include <boost/noncopyable.hpp> +#include <vector> + +#include "interface.h" + +namespace rocketmq { + +enum ClaimStrategyOption { + kSingleThreadedStrategy, + kMultiThreadedStrategy +}; + +// Optimised strategy can be used when there is a single publisher thread +// claiming {@link AbstractEvent}s. +class SingleThreadedStrategy :public noncopyable, public ClaimStrategyInterface { + public: + SingleThreadedStrategy(const int& buffer_size) : + buffer_size_(buffer_size), + sequence_(kInitialCursorValue), + min_gating_sequence_(kInitialCursorValue) {} + + virtual int64_t IncrementAndGet( + const std::vector<Sequence*>& dependent_sequences) { + int64_t next_sequence = sequence_.IncrementAndGet(1L); + WaitForFreeSlotAt(next_sequence, dependent_sequences); + return next_sequence; + } + + virtual int64_t IncrementAndGet(const int& delta, + const std::vector<Sequence*>& dependent_sequences) { + int64_t next_sequence = sequence_.IncrementAndGet(delta); + WaitForFreeSlotAt(next_sequence, dependent_sequences); + return next_sequence; + } + + virtual bool HasAvalaibleCapacity( + const std::vector<Sequence*>& dependent_sequences) { + int64_t wrap_point = sequence_.sequence() + 1L - buffer_size_; + if (wrap_point > min_gating_sequence_.sequence()) { + int64_t min_sequence = GetMinimumSequence(dependent_sequences); + min_gating_sequence_.set_sequence(min_sequence); + if (wrap_point > min_sequence) + return false; + } + return true; + } + + virtual void SetSequence(const int64_t& sequence, + const std::vector<Sequence*>& dependent_sequences) { + sequence_.set_sequence(sequence); + WaitForFreeSlotAt(sequence, dependent_sequences); + } + + virtual void SerialisePublishing(const int64_t& sequence, + const Sequence& cursor, + const int64_t& batch_size) {} + + private: + SingleThreadedStrategy(); + + void WaitForFreeSlotAt(const int64_t& sequence, + const std::vector<Sequence*>& dependent_sequences) { + int64_t wrap_point = sequence - buffer_size_; + if (wrap_point > min_gating_sequence_.sequence()) { + int64_t min_sequence; + while (wrap_point > (min_sequence = GetMinimumSequence(dependent_sequences))) { + boost::this_thread::yield(); + } + } + } + + const int buffer_size_; + PaddedLong sequence_; + PaddedLong min_gating_sequence_; + +}; + +// Strategy to be used when there are multiple publisher threads claiming +// {@link AbstractEvent}s. +/* +class MultiThreadedStrategy : public ClaimStrategyInterface { + public: + MultiThreadedStrategy(const int& buffer_size) : + buffer_size_(buffer_size), + sequence_(kInitialCursorValue), + min_processor_sequence_(kInitialCursorValue) {} + + virtual int64_t IncrementAndGet( + const std::vector<Sequence*>& dependent_sequences) { + WaitForCapacity(dependent_sequences, min_gating_sequence_local_); + int64_t next_sequence = sequence_.IncrementAndGet(); + WaitForFreeSlotAt(next_sequence, + dependent_sequences, + min_gating_sequence_local_); + return next_sequence; + } + + virtual int64_t IncrementAndGet(const int& delta, + const std::vector<Sequence*>& dependent_sequences) { + int64_t next_sequence = sequence_.IncrementAndGet(delta); + WaitForFreeSlotAt(next_sequence, + dependent_sequences, + min_gating_sequence_local_); + return next_sequence; + } + virtual void SetSequence(const int64_t& sequence, + const std::vector<Sequence*>& dependent_sequences) { + sequence_.set_sequence(sequence); + WaitForFreeSlotAt(sequence, + dependent_sequences, + min_gating_sequence_local_); + } + + virtual bool HasAvalaibleCapacity( + const std::vector<Sequence*>& dependent_sequences) { + const int64_t wrap_point = sequence_.sequence() + 1L - buffer_size_; + if (wrap_point > min_gating_sequence_local_.sequence()) { + int64_t min_sequence = GetMinimumSequence(dependent_sequences); + min_gating_sequence_local_.set_sequence(min_sequence); + if (wrap_point > min_sequence) + return false; + } + return true; + } + + virtual void SerialisePublishing(const Sequence& cursor, + const int64_t& sequence, + const int64_t& batch_size) { + int64_t expected_sequence = sequence - batch_size; + int counter = retries; + + while (expected_sequence != cursor.sequence()) { + if (0 == --counter) { + counter = retries; + std::this_thread::yield(); + } + } + } + + private: + // Methods + void WaitForCapacity(const std::vector<Sequence*>& dependent_sequences, + const MutableLong& min_gating_sequence) { + const int64_t wrap_point = sequence_.sequence() + 1L - buffer_size_; + if (wrap_point > min_gating_sequence.sequence()) { + int counter = retries; + int64_t min_sequence; + while (wrap_point > (min_sequence = GetMinimumSequence(dependent_sequences))) { + counter = ApplyBackPressure(counter); + } + min_gating_sequence.set_sequence(min_sequence); + } + } + + void WaitForFreeSlotAt(const int64_t& sequence, + const std::vector<Sequence*>& dependent_sequences, + const MutableLong& min_gating_sequence) { + const int64_t wrap_point = sequence - buffer_size_; + if (wrap_point > min_gating_sequence.sequence()) { + int64_t min_sequence; + while (wrap_point > (min_sequence = GetMinimumSequence(dependent_sequences))) { + std::this_thread::yield(); + } + min_gating_sequence.set_sequence(min_sequence); + } + } + + int ApplyBackPressure(int counter) { + if (0 != counter) { + --counter; + std::this_thread::yield(); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + return counter; + } + + const int buffer_size_; + PaddedSequence sequence_; + thread_local PaddedLong min_gating_sequence_local_; + + const int retries = 100; + +}; +*/ + +ClaimStrategyInterface* CreateClaimStrategy(ClaimStrategyOption option, + const int& buffer_size) { + switch (option) { + case kSingleThreadedStrategy: + return new SingleThreadedStrategy(buffer_size); + // case kMultiThreadedStrategy: + // return new MultiThreadedStrategy(buffer_size); + default: + return NULL; + } +}; + +}; // namespace rocketmq + +#endif // DISRUPTOR_CLAIM_STRATEGY_H_ NOLINT http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/event_processor.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/thread/disruptor/event_processor.h b/rocketmq-cpp/src/thread/disruptor/event_processor.h new file mode 100755 index 0000000..fb62812 --- /dev/null +++ b/rocketmq-cpp/src/thread/disruptor/event_processor.h @@ -0,0 +1,130 @@ +// Copyright (c) 2011, François Saint-Jacques +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of the disruptor-- nor the +// names of its contributors may be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL FRANÃOIS SAINT-JACQUES BE LIABLE FOR ANY +// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#ifndef DISRUPTOR_EVENT_PROCESSOR_H_ // NOLINT +#define DISRUPTOR_EVENT_PROCESSOR_H_ // NOLINT + +#include <stdexcept> +#include "ring_buffer.h" + +namespace rocketmq { + +template <typename T> +class NoOpEventProcessor : public EventProcessorInterface<T> { + public: + NoOpEventProcessor(RingBuffer<T>* ring_buffer) : + ring_buffer_(ring_buffer) { } + + virtual Sequence* GetSequence() { + return ring_buffer_->GetSequencePtr(); + } + + virtual void Halt() {} + + virtual void Run() {} + + private: + RingBuffer<T>* ring_buffer_; +}; + +template <typename T> +class BatchEventProcessor : public boost::noncopyable, public EventProcessorInterface<T> { + public: + BatchEventProcessor(RingBuffer<T>* ring_buffer, + SequenceBarrierInterface* sequence_barrier, + EventHandlerInterface<T>* event_handler, + ExceptionHandlerInterface<T>* exception_handler) : + running_(false), + ring_buffer_(ring_buffer), + sequence_barrier_(sequence_barrier), + event_handler_(event_handler), + exception_handler_(exception_handler) {} + + + virtual Sequence* GetSequence() { return &sequence_; } + + virtual void Halt() { + running_.store(false); + sequence_barrier_->Alert(); + } + + virtual void Run() { + if (running_.load()) + { + printf("Thread is already running\r\n"); + } + running_.store(true); + sequence_barrier_->ClearAlert(); + event_handler_->OnStart(); + + T* event = NULL; + int64_t next_sequence = sequence_.sequence() + 1L; + + while (true) { + try { + int64_t avalaible_sequence = \ + sequence_barrier_->WaitFor(next_sequence, 300*1000);//wait 300 milliseconds to avoid taskThread blocking on BlockingStrategy::WaitFor when shutdown + //metaq::LOG_INFO("avalaible_sequence:%d, next_sequence:%d", avalaible_sequence,next_sequence); + while (next_sequence <= avalaible_sequence) { + event = ring_buffer_->Get(next_sequence); + event_handler_->OnEvent(next_sequence, + next_sequence == avalaible_sequence, event); + next_sequence++; + } + + sequence_.set_sequence(next_sequence - 1L); + } catch(const AlertException& e) { + //metaq::LOG_INFO("catch alertException"); + if (!running_.load()) + break; + } catch(const std::exception& e) { + //metaq::LOG_ERROR("catch stdException"); + exception_handler_->Handle(e, next_sequence, event); + sequence_.set_sequence(next_sequence); + next_sequence++; + } + } + //metaq::LOG_INFO("BatchEventProcessor shutdown"); + event_handler_->OnShutdown(); + running_.store(false); + } + + void operator()() { Run(); } + + private: + boost::atomic<bool> running_; + Sequence sequence_; + + RingBuffer<T>* ring_buffer_; + SequenceBarrierInterface* sequence_barrier_; + EventHandlerInterface<T>* event_handler_; + ExceptionHandlerInterface<T>* exception_handler_; + +}; + + +}; // namespace rocketmq + +#endif // DISRUPTOR_EVENT_PROCESSOR_H_ NOLINT http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/event_publisher.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/thread/disruptor/event_publisher.h b/rocketmq-cpp/src/thread/disruptor/event_publisher.h new file mode 100755 index 0000000..ae0efd9 --- /dev/null +++ b/rocketmq-cpp/src/thread/disruptor/event_publisher.h @@ -0,0 +1,50 @@ +// Copyright (c) 2011, François Saint-Jacques +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of the disruptor-- nor the +// names of its contributors may be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL FRANÃOIS SAINT-JACQUES BE LIABLE FOR ANY +// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#ifndef DISRUPTOR_EVENT_PUBLISHER_H_ // NOLINT +#define DISRUPTOR_EVENT_PUBLISHER_H_ // NOLINT + +#include "ring_buffer.h" + +namespace rocketmq { + +template<typename T> +class EventPublisher { + public: + EventPublisher(RingBuffer<T>* ring_buffer) : ring_buffer_(ring_buffer) {} + + void PublishEvent(EventTranslatorInterface<T>* translator) { + int64_t sequence = ring_buffer_->Next(); + translator->TranslateTo(sequence, ring_buffer_->Get(sequence)); + ring_buffer_->Publish(sequence); + } + + private: + RingBuffer<T>* ring_buffer_; +}; + +}; // namespace rocketmq + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/exception_handler.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/thread/disruptor/exception_handler.h b/rocketmq-cpp/src/thread/disruptor/exception_handler.h new file mode 100755 index 0000000..e7979a0 --- /dev/null +++ b/rocketmq-cpp/src/thread/disruptor/exception_handler.h @@ -0,0 +1,59 @@ +// Copyright (c) 2011, François Saint-Jacques +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of the disruptor-- nor the +// names of its contributors may be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL FRANÃOIS SAINT-JACQUES BE LIABLE FOR ANY +// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#ifndef DISRUPTOR_EXCEPTION_HANDLER_H_ // NOLINT +#define DISRUPTOR_EXCEPTION_HANDLER_H_ // NOLINT + +#include <exception> + +#include "interface.h" + +namespace rocketmq { + +template<typename T> +class IgnoreExceptionHandler: public ExceptionHandlerInterface<T> { + public: + virtual void Handle(const std::exception& exception, + const int64_t& sequence, + T* event) { + // do nothing with the exception. + ; + } +}; + +template<typename T> +class FatalExceptionHandler: public ExceptionHandlerInterface<T> { + public: + virtual void Handle(const std::exception& exception, + const int64_t& sequence, + T* event) { + // rethrow the exception + throw exception; + } +}; + +}; // namespace rocketmq + +#endif // DISRUPTOR_EXCEPTION_HANDLER_H_ NOLINT http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/exceptions.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/thread/disruptor/exceptions.h b/rocketmq-cpp/src/thread/disruptor/exceptions.h new file mode 100755 index 0000000..f968043 --- /dev/null +++ b/rocketmq-cpp/src/thread/disruptor/exceptions.h @@ -0,0 +1,38 @@ +// Copyright (c) 2011, François Saint-Jacques +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of the disruptor-- nor the +// names of its contributors may be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL FRANÃOIS SAINT-JACQUES BE LIABLE FOR ANY +// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#ifndef DISRUPTOR_EXCEPTIONS_H_ // NOLINT +#define DISRUPTOR_EXCEPTIONS_H_ // NOLINT + +#include <exception> + +namespace rocketmq { + +class AlertException : public std::exception { +}; + +}; // namespace rocketmq + +#endif // DISRUPTOR_EXCEPTIONS_H_ NOLINT http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/interface.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/thread/disruptor/interface.h b/rocketmq-cpp/src/thread/disruptor/interface.h new file mode 100755 index 0000000..0c07774 --- /dev/null +++ b/rocketmq-cpp/src/thread/disruptor/interface.h @@ -0,0 +1,278 @@ +// Copyright (c) 2011, François Saint-Jacques +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of the disruptor-- nor the +// names of its contributors may be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL FRANÃOIS SAINT-JACQUES BE LIABLE FOR ANY +// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#ifndef DISRUPTOR_INTERFACE_H_ // NOLINT +#define DISRUPTOR_INTERFACE_H_ // NOLINT + +#include <climits> +#include <vector> + +#include "sequence.h" +#include "batch_descriptor.h" + +namespace rocketmq { + +// Strategies employed for claiming the sequence of events in the +// {@link Seqencer} by publishers. +class ClaimStrategyInterface { + public: + // Is there available capacity in the buffer for the requested sequence. + // + // @param dependent_sequences to be checked for range. + // @return true if the buffer has capacity for the requested sequence. + virtual ~ClaimStrategyInterface() {} + virtual bool HasAvalaibleCapacity( + const std::vector<Sequence*>& dependent_sequences) = 0; + + // Claim the next sequence in the {@link Sequencer}. + // + // @param dependent_sequences to be checked for range. + // @return the index to be used for the publishing. + virtual int64_t IncrementAndGet( + const std::vector<Sequence*>& dependent_sequences) = 0; + + // Claim the next sequence in the {@link Sequencer}. + // + // @param delta to increment by. + // @param dependent_sequences to be checked for range. + // @return the index to be used for the publishing. + virtual int64_t IncrementAndGet(const int& delta, + const std::vector<Sequence*>& dependent_sequences) = 0; + + // Set the current sequence value for claiming an event in the + // {@link Sequencer}. + // + // @param sequence to be set as the current value. + // @param dependent_sequences to be checked for range. + virtual void SetSequence(const int64_t& sequence, + const std::vector<Sequence*>& dependent_sequences) = 0; + + // Serialise publishing in sequence. + // + // @param sequence to be applied. + // @param cursor to be serialise against. + // @param batch_size of the sequence. + virtual void SerialisePublishing(const int64_t& sequence, + const Sequence& cursor, + const int64_t& batch_size) = 0; +}; + +// Coordination barrier for tracking the cursor for publishers and sequence of +// dependent {@link EventProcessor}s for processing a data structure. +class SequenceBarrierInterface { + public: + // Wait for the given sequence to be available for consumption. + // + // @param sequence to wait for. + // @return the sequence up to which is available. + // + // @throws AlertException if a status change has occurred for the + // Disruptor. + virtual ~SequenceBarrierInterface(){} + virtual int64_t WaitFor(const int64_t& sequence) = 0; + + // Wait for the given sequence to be available for consumption with a + // time out. + // + // @param sequence to wait for. + // @param timeout in microseconds. + // @return the sequence up to which is available. + // + // @throws AlertException if a status change has occurred for the + // Disruptor. + virtual int64_t WaitFor(const int64_t& sequence, + const int64_t& timeout_micro) = 0; + + // Delegate a call to the {@link Sequencer#getCursor()} + // + // @return value of the cursor for entries that have been published. + virtual int64_t GetCursor() const = 0; + + // The current alert status for the barrier. + // + // @return true if in alert otherwise false. + virtual bool IsAlerted() const = 0; + + // Alert the {@link EventProcessor}s of a status change and stay in this + // status until cleared. + virtual void Alert() = 0; + + // Clear the current alert status. + virtual void ClearAlert() = 0; + + // Check if barrier is alerted, if so throws an AlertException + // + // @throws AlertException if barrier is alerted + virtual void CheckAlert() const = 0; +}; + +// Called by the {@link RingBuffer} to pre-populate all the events to fill the +// RingBuffer. +// +// @param <T> event implementation storing the data for sharing during exchange +// or parallel coordination of an event. +template<typename T> +class EventFactoryInterface { + public: + virtual ~EventFactoryInterface(){} + virtual T* NewInstance(const int& size) const = 0; +}; + +// Callback interface to be implemented for processing events as they become +// available in the {@link RingBuffer}. +// +// @param <T> event implementation storing the data for sharing during exchange +// or parallel coordination of an event. +template<typename T> +class EventHandlerInterface { + public: + // Called when a publisher has published an event to the {@link RingBuffer} + // + // @param event published to the {@link RingBuffer} + // @param sequence of the event being processed + // @param end_of_batch flag to indicate if this is the last event in a batch + // from the {@link RingBuffer} + // + // @throws Exception if the EventHandler would like the exception handled + // further up the chain. + virtual ~EventHandlerInterface(){} + virtual void OnEvent(const int64_t& sequence, + const bool& end_of_batch, + T* event) = 0; + + // Called once on thread start before processing the first event. + virtual void OnStart() = 0; + + // Called once on thread stop just before shutdown. + virtual void OnShutdown() = 0; +}; + +// Implementations translate another data representations into events claimed +// for the {@link RingBuffer}. +// +// @param <T> event implementation storing the data for sharing during exchange +// or parallel coordination of an event. +template<typename T> +class EventTranslatorInterface { + public: + // Translate a data representation into fields set in given event + // + // @param event into which the data should be translated. + // @param sequence that is assigned to events. + // @return the resulting event after it has been translated. + virtual ~EventTranslatorInterface(){} + virtual T* TranslateTo(const int64_t& sequence, T* event) { return NULL;} +}; + +// EventProcessors wait for events to become available for consumption from +// the {@link RingBuffer}. An event processor should be associated with a +// thread. +// +// @param <T> event implementation storing the data for sharing during exchange +// or parallel coordination of an event. +template<typename T> +class EventProcessorInterface { + public: + // Get a pointer to the {@link Sequence} being used by this + // {@link EventProcessor}. + // + // @return pointer to the {@link Sequence} for this + // {@link EventProcessor} + virtual ~EventProcessorInterface(){} + virtual Sequence* GetSequence() = 0; + + // Signal that this EventProcessor should stop when it has finished + // consuming at the next clean break. + // It will call {@link DependencyBarrier#alert()} to notify the thread to + // check status. + virtual void Halt() = 0; +}; + +// Callback handler for uncaught exception in the event processing cycle +// of the {@link BatchEventProcessor}. +// +// @param <T> event type stored in the {@link RingBuffer}. +template<typename T> +class ExceptionHandlerInterface { + public: + // Strategy for handling uncaught exceptions when processing an event. + // If the strategy wishes to suspend further processing by the + // {@link BatchEventProcessor} then it should throw a std::runtime_error. + // + // @param exception that propagated from the {@link EventHandler}. + // @param sequence of the event which caused the exception. + // @param event being processed when the exception occured. + virtual ~ExceptionHandlerInterface(){} + virtual void Handle(const std::exception& exception, + const int64_t& sequence, + T* event) = 0; +}; + +// Strategy employed for making {@link EventProcessor}s wait on a cursor +// {@link Sequence}. +class WaitStrategyInterface: public boost::noncopyable { + public: + // Wait for the given sequence to be available for consumption. + // + // @param dependents further back the chain that must advance first. + // @param cursor on which to wait. + // @param barrier the consumer is waiting on. + // @param sequence to be waited on. + // @return the sequence that is available which may be greater than the + // requested sequence. + // + // @throws AlertException if the status of the Disruptor has changed. + virtual ~WaitStrategyInterface(){} + virtual int64_t WaitFor(const std::vector<Sequence*>& dependents, + const Sequence& cursor, + const SequenceBarrierInterface& barrier, + const int64_t& sequence) = 0; + + // Wait for the given sequence to be available for consumption in a + // {@link RingBuffer} with a timeout specified. + // + // @param dependents further back the chain that must advance first + // @param cursor on which to wait. + // @param barrier the consumer is waiting on. + // @param sequence to be waited on. + // @param timeout value in micro seconds to abort after. + // @return the sequence that is available which may be greater than the + // requested sequence. + // + // @throws AlertException if the status of the Disruptor has changed. + // @throws InterruptedException if the thread is interrupted. + virtual int64_t WaitFor(const std::vector<Sequence*>& dependents, + const Sequence& cursor, + const SequenceBarrierInterface& barrier, + const int64_t & sequence, + const int64_t & timeout_micros) = 0; + + // Signal those waiting that the cursor has advanced. + virtual void SignalAllWhenBlocking() = 0; +}; + +}; // namespace rocketmq + +#endif // DISRUPTOR_INTERFACE_H_ NOLINT http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/ring_buffer.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/thread/disruptor/ring_buffer.h b/rocketmq-cpp/src/thread/disruptor/ring_buffer.h new file mode 100755 index 0000000..c7150f1 --- /dev/null +++ b/rocketmq-cpp/src/thread/disruptor/ring_buffer.h @@ -0,0 +1,90 @@ +// Copyright (c) 2011, François Saint-Jacques +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of the disruptor-- nor the +// names of its contributors may be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL FRANÃOIS SAINT-JACQUES BE LIABLE FOR ANY +// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#ifndef DISRUPTOR_RING_BUFFER_H_ // NOLINT +#define DISRUPTOR_RING_BUFFER_H_ // NOLINT + +#include <boost/array.hpp> +#include <vector> + +#include "interface.h" +#include "claim_strategy.h" +#include "wait_strategy.h" +#include "sequencer.h" +#include "sequence_barrier.h" + +namespace rocketmq { + +// Ring based store of reusable entries containing the data representing an +// event beign exchanged between publisher and {@link EventProcessor}s. +// +// @param <T> implementation storing the data for sharing during exchange +// or parallel coordination of an event. +template<typename T> +class RingBuffer : public Sequencer { + public: + // Construct a RingBuffer with the full option set. + // + // @param event_factory to instance new entries for filling the RingBuffer. + // @param buffer_size of the RingBuffer, must be a power of 2. + // @param claim_strategy_option threading strategy for publishers claiming + // entries in the ring. + // @param wait_strategy_option waiting strategy employed by + // processors_to_track waiting in entries becoming available. + RingBuffer(EventFactoryInterface<T>* event_factory, + int buffer_size, + ClaimStrategyOption claim_strategy_option, + WaitStrategyOption wait_strategy_option) : + Sequencer(buffer_size, + claim_strategy_option, + wait_strategy_option), + buffer_size_(buffer_size), + mask_(buffer_size - 1), + events_(event_factory->NewInstance(buffer_size)) { + } + + ~RingBuffer() { + delete[] events_; + } + + // Get the event for a given sequence in the RingBuffer. + // + // @param sequence for the event + // @return event pointer at the specified sequence position. + T* Get(const int64_t& sequence) { + return &events_[sequence & mask_]; + } + + private: + // Members + int buffer_size_; + int mask_; + T* events_; + +}; + +}; // namespace rocketmq + +#endif // DISRUPTOR_RING_BUFFER_H_ NOLINT http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/sequence.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/thread/disruptor/sequence.h b/rocketmq-cpp/src/thread/disruptor/sequence.h new file mode 100755 index 0000000..f1396f3 --- /dev/null +++ b/rocketmq-cpp/src/thread/disruptor/sequence.h @@ -0,0 +1,139 @@ +// Copyright (c) 2011, François Saint-Jacques +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of the disruptor-- nor the +// names of its contributors may be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL FRANÃOIS SAINT-JACQUES BE LIABLE FOR ANY +// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#ifndef CACHE_LINE_SIZE_IN_BYTES // NOLINT +#define CACHE_LINE_SIZE_IN_BYTES 64 // NOLINT +#endif // NOLINT +#define ATOMIC_SEQUENCE_PADDING_LENGTH \ + (CACHE_LINE_SIZE_IN_BYTES - sizeof(boost::atomic<int64_t>))/8 +#define SEQUENCE_PADDING_LENGTH \ + (CACHE_LINE_SIZE_IN_BYTES - sizeof(int64_t))/8 + +#ifndef DISRUPTOR_SEQUENCE_H_ // NOLINT +#define DISRUPTOR_SEQUENCE_H_ // NOLINT + +#include <boost/atomic.hpp> +#include <boost/memory_order.hpp> +#include <boost/noncopyable.hpp> +#include <vector> +#include <limits> +using namespace boost; +namespace rocketmq { + +const int64_t kInitialCursorValue = -1L; + +// Sequence counter. +class Sequence:public noncopyable { + public: + // Construct a sequence counter that can be tracked across threads. + // + // @param initial_value for the counter. + Sequence(int64_t initial_value = kInitialCursorValue) : + value_(initial_value) {} + + // Get the current value of the {@link Sequence}. + // + // @return the current value. + int64_t sequence() const { return value_.load(boost::memory_order_acquire); } + + // Set the current value of the {@link Sequence}. + // + // @param the value to which the {@link Sequence} will be set. + void set_sequence(int64_t value) { value_.store(value, boost::memory_order_release); } + + // Increment and return the value of the {@link Sequence}. + // + // @param increment the {@link Sequence}. + // @return the new value incremented. + int64_t IncrementAndGet(const int64_t& increment) { + return value_.fetch_add(increment, boost::memory_order_release) + increment; + } + + private: + // members + boost::atomic<int64_t> value_; + +}; + +// Cache line padded sequence counter. +// +// Can be used across threads without worrying about false sharing if a +// located adjacent to another counter in memory. +class PaddedSequence : public Sequence { + public: + PaddedSequence(int64_t initial_value = kInitialCursorValue) : + Sequence(initial_value) {} + + private: + // padding + int64_t padding_[ATOMIC_SEQUENCE_PADDING_LENGTH]; + +}; + +// Non-atomic sequence counter. +// +// This counter is not thread safe. +class MutableLong { + public: + MutableLong(int64_t initial_value = kInitialCursorValue) : + sequence_(initial_value) {} + + int64_t sequence() const { return sequence_; } + + void set_sequence(const int64_t& sequence) { sequence_ = sequence; }; + + int64_t IncrementAndGet(const int64_t& delta) { sequence_ += delta; return sequence_; } + + private: + volatile int64_t sequence_; +}; + +// Cache line padded non-atomic sequence counter. +// +// This counter is not thread safe. +class PaddedLong : public MutableLong { + public: + PaddedLong(int64_t initial_value = kInitialCursorValue) : + MutableLong(initial_value) {} + private: + int64_t padding_[SEQUENCE_PADDING_LENGTH]; +}; + +int64_t GetMinimumSequence( + const std::vector<Sequence*>& sequences) { + int64_t minimum = std::numeric_limits<int64_t>::max(); + + std::vector<Sequence*>::const_iterator it= sequences.begin(); + for (;it!=sequences.end();it++) { + int64_t sequence = (*it)->sequence(); + minimum = minimum < sequence ? minimum : sequence; + } + + return minimum; +}; + +}; // namespace rocketmq + +#endif // DISRUPTOR_SEQUENCE_H_ NOLINT http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/sequence_barrier.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/thread/disruptor/sequence_barrier.h b/rocketmq-cpp/src/thread/disruptor/sequence_barrier.h new file mode 100755 index 0000000..c156388 --- /dev/null +++ b/rocketmq-cpp/src/thread/disruptor/sequence_barrier.h @@ -0,0 +1,92 @@ +// Copyright (c) 2011, François Saint-Jacques +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of the disruptor-- nor the +// names of its contributors may be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL FRANÃOIS SAINT-JACQUES BE LIABLE FOR ANY +// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#ifndef DISRUPTOR_SEQUENCE_BARRIER_H_ // NOLINT +#define DISRUPTOR_SEQUENCE_BARRIER_H_ // NOLINT + +#include <memory> +#include <vector> + +#include "exceptions.h" +#include "interface.h" +namespace rocketmq { + +class ProcessingSequenceBarrier : SequenceBarrierInterface { + public: + ProcessingSequenceBarrier(WaitStrategyInterface* wait_strategy, + Sequence* sequence, + const std::vector<Sequence*>& sequences) : + wait_strategy_(wait_strategy), + cursor_(sequence), + dependent_sequences_(sequences), + alerted_(false) { + } + + virtual int64_t WaitFor(const int64_t& sequence) { + return wait_strategy_->WaitFor(dependent_sequences_, *cursor_, *this, + sequence); + } + + virtual int64_t WaitFor(const int64_t& sequence, + const int64_t& timeout_micros) { + return wait_strategy_->WaitFor(dependent_sequences_, *cursor_, *this, + sequence, timeout_micros); + } + + virtual int64_t GetCursor() const { + return cursor_->sequence(); + } + + virtual bool IsAlerted() const { + return alerted_.load(boost::memory_order_acquire); + } + + virtual void Alert() { + //metaq::LOG_INFO("set alert to true"); + alerted_.store(true, boost::memory_order_release); + } + + virtual void ClearAlert() { + alerted_.store(false, boost::memory_order_release); + } + + virtual void CheckAlert() const { + if (IsAlerted()) + { + //metaq::LOG_INFO("throw alert exception\r\n"); + throw AlertException(); + } + } + + private: + WaitStrategyInterface* wait_strategy_; + Sequence* cursor_; + std::vector<Sequence*> dependent_sequences_; + boost::atomic<bool> alerted_; +}; + +}; // namespace rocketmq + +#endif // DISRUPTOR_DEPENDENCY_BARRIER_H_ NOLINT