Repository: incubator-rocketmq-externals Updated Branches: refs/heads/master 343ab198e -> 70ce5c770
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/TcpRemotingClient.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/transport/TcpRemotingClient.cpp b/rocketmq-cpp/src/transport/TcpRemotingClient.cpp new file mode 100755 index 0000000..f74d529 --- /dev/null +++ b/rocketmq-cpp/src/transport/TcpRemotingClient.cpp @@ -0,0 +1,728 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "TcpRemotingClient.h" +#include <stddef.h> +#include <sys/prctl.h> +#include "Logging.h" +#include "MemoryOutputStream.h" +#include "TopAddressing.h" +#include "UtilAll.h" + +namespace rocketmq { + +//<!************************************************************************ +TcpRemotingClient::TcpRemotingClient(int pullThreadNum, + uint64_t tcpConnectTimeout, + uint64_t tcpTransportTryLockTimeout) + : m_pullThreadNum(pullThreadNum), + m_tcpConnectTimeout(tcpConnectTimeout), + m_tcpTransportTryLockTimeout(tcpTransportTryLockTimeout), + m_namesrvIndex(0), + m_ioServiceWork(m_ioService) { + string taskName = UtilAll::getProcessName(); + prctl(PR_SET_NAME, "networkTP", 0, 0, 0); + for (int i = 0; i != pullThreadNum; ++i) { + m_threadpool.create_thread( + boost::bind(&boost::asio::io_service::run, &m_ioService)); + } + prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0); + + LOG_INFO( + "m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, " + "m_pullThreadNum:%d", + m_tcpConnectTimeout, m_tcpTransportTryLockTimeout, m_pullThreadNum); + m_async_service_thread.reset(new boost::thread( + boost::bind(&TcpRemotingClient::boost_asio_work, this))); +} + +void TcpRemotingClient::boost_asio_work() { + LOG_INFO("TcpRemotingClient::boost asio async service runing"); + boost::asio::io_service::work work(m_async_ioService); // avoid async io + // service stops after + // first timer timeout + // callback + m_async_ioService.run(); +} + +TcpRemotingClient::~TcpRemotingClient() { + m_tcpTable.clear(); + m_futureTable.clear(); + m_asyncFutureTable.clear(); + m_namesrvAddrList.clear(); + removeAllTimerCallback(); +} + +void TcpRemotingClient::stopAllTcpTransportThread() { + LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread Begin"); + m_async_ioService.stop(); + m_async_service_thread->interrupt(); + m_async_service_thread->join(); + removeAllTimerCallback(); + + { + TcpMap::iterator it = m_tcpTable.begin(); + for (; it != m_tcpTable.end(); ++it) { + it->second->disconnect(it->first); + } + m_tcpTable.clear(); + } + + m_ioService.stop(); + m_threadpool.join_all(); + + { + boost::lock_guard<boost::mutex> lock(m_futureTableMutex); + for (ResMap::iterator it = m_futureTable.begin(); it != m_futureTable.end(); + ++it) { + if (it->second) it->second->releaseThreadCondition(); + } + } + LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread End"); +} + +void TcpRemotingClient::updateNameServerAddressList(const string& addrs) { + if (!addrs.empty()) { + boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, + boost::try_to_lock); + if (!lock.owns_lock()) { + if (!lock.timed_lock(boost::get_system_time() + + boost::posix_time::seconds(10))) { + LOG_ERROR("updateNameServerAddressList get timed_mutex timeout"); + return; + } + } + // clear first; + m_namesrvAddrList.clear(); + + vector<string> out; + UtilAll::Split(out, addrs, ";"); + for (size_t i = 0; i < out.size(); i++) { + string addr = out[i]; + UtilAll::Trim(addr); + + string hostName; + short portNumber; + if (UtilAll::SplitURL(addr, hostName, portNumber)) { + LOG_INFO("update Namesrv:%s", addr.c_str()); + m_namesrvAddrList.push_back(addr); + } + } + out.clear(); + } +} + +bool TcpRemotingClient::invokeHeartBeat(const string& addr, + RemotingCommand& request) { + boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true); + if (pTcp != NULL) { + int code = request.getCode(); + int opaque = request.getOpaque(); + boost::shared_ptr<ResponseFuture> responseFuture( + new ResponseFuture(code, opaque, this, 3000, false, NULL)); + addResponseFuture(opaque, responseFuture); + // LOG_INFO("invokeHeartbeat success, addr:%s, code:%d, opaque:%d, + // timeoutms:%d", addr.c_str(), code, opaque, 3000); + + if (SendCommand(pTcp, request)) { + responseFuture->setSendRequestOK(true); + unique_ptr<RemotingCommand> pRsp(responseFuture->waitResponse(3000)); + if (pRsp == NULL) { + LOG_ERROR( + "wait response timeout of heartbeat, so closeTransport of addr:%s", + addr.c_str()); + CloseTransport(addr, pTcp); + return false; + } else if (pRsp->getCode() == SUCCESS_VALUE) { + return true; + } else { + LOG_WARN("get error response:%d of heartbeat to addr:%s", + pRsp->getCode(), addr.c_str()); + return false; + } + } else { + CloseTransport(addr, pTcp); + } + } + return false; +} + +RemotingCommand* TcpRemotingClient::invokeSync(const string& addr, + RemotingCommand& request, + int timeoutMillis /* = 3000 */) { + boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true); + if (pTcp != NULL) { + int code = request.getCode(); + int opaque = request.getOpaque(); + boost::shared_ptr<ResponseFuture> responseFuture( + new ResponseFuture(code, opaque, this, timeoutMillis, false, NULL)); + addResponseFuture(opaque, responseFuture); + + if (SendCommand(pTcp, request)) { + // LOG_INFO("invokeSync success, addr:%s, code:%d, opaque:%d, + // timeoutms:%d", addr.c_str(), code, opaque, timeoutMillis); + responseFuture->setSendRequestOK(true); + RemotingCommand* pRsp = responseFuture->waitResponse(timeoutMillis); + if (pRsp == NULL) { + if (code != GET_CONSUMER_LIST_BY_GROUP) { + LOG_WARN( + "wait response timeout or get NULL response of code:%d, so " + "closeTransport of addr:%s", + code, addr.c_str()); + CloseTransport(addr, pTcp); + } + // avoid responseFuture leak; + findAndDeleteResponseFuture(opaque); + return NULL; + } else { + return pRsp; + } + } else { + // avoid responseFuture leak; + findAndDeleteResponseFuture(opaque); + CloseTransport(addr, pTcp); + } + } + return NULL; +} + +bool TcpRemotingClient::invokeAsync(const string& addr, + RemotingCommand& request, + AsyncCallbackWrap* cbw, + int64 timeoutMilliseconds) { + boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true); + if (pTcp != NULL) { + //<!not delete, for callback to delete; + int code = request.getCode(); + int opaque = request.getOpaque(); + boost::shared_ptr<ResponseFuture> responseFuture( + new ResponseFuture(code, opaque, this, timeoutMilliseconds, true, cbw)); + addAsyncResponseFuture(opaque, responseFuture); + if (cbw) { + boost::asio::deadline_timer* t = new boost::asio::deadline_timer( + m_async_ioService, + boost::posix_time::milliseconds(timeoutMilliseconds)); + addTimerCallback(t, opaque); + boost::system::error_code e; + t->async_wait( + boost::bind(&TcpRemotingClient::handleAsyncPullForResponseTimeout, + this, e, opaque)); + } + + if (SendCommand(pTcp, request)) // Even if send failed, asyncTimerThread + // will trigger next pull request or report + // send msg failed + { + LOG_DEBUG("invokeAsync success, addr:%s, code:%d, opaque:%d", + addr.c_str(), code, opaque); + responseFuture->setSendRequestOK(true); + } + return true; + } + LOG_ERROR("invokeAsync failed of addr:%s", addr.c_str()); + return false; +} + +void TcpRemotingClient::invokeOneway(const string& addr, + RemotingCommand& request) { + //<!not need callback; + boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true); + if (pTcp != NULL) { + request.markOnewayRPC(); + LOG_DEBUG("invokeOneway success, addr:%s, code:%d", addr.c_str(), + request.getCode()); + SendCommand(pTcp, request); + } +} + +boost::shared_ptr<TcpTransport> TcpRemotingClient::GetTransport( + const string& addr, bool needRespons) { + if (addr.empty()) return CreateNameserverTransport(needRespons); + + return CreateTransport(addr, needRespons); +} + +boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport( + const string& addr, bool needRespons) { + boost::shared_ptr<TcpTransport> tts; + { + // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking + // long + // time, if could not get m_tcpLock, return NULL + bool bGetMutex = false; + boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock); + if (!lock.owns_lock()) { + if (!lock.timed_lock( + boost::get_system_time() + + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) { + LOG_ERROR("GetTransport of:%s get timed_mutex timeout", addr.c_str()); + boost::shared_ptr<TcpTransport> pTcp; + return pTcp; + } else { + bGetMutex = true; + } + } else { + bGetMutex = true; + } + if (bGetMutex) { + if (m_tcpTable.find(addr) != m_tcpTable.end()) { + boost::weak_ptr<TcpTransport> weakPtcp(m_tcpTable[addr]); + boost::shared_ptr<TcpTransport> tcp = weakPtcp.lock(); + if (tcp) { + tcpConnectStatus connectStatus = tcp->getTcpConnectStatus(); + if (connectStatus == e_connectWaitResponse) { + boost::shared_ptr<TcpTransport> pTcp; + return pTcp; + } else if (connectStatus == e_connectFail) { + LOG_ERROR("tcpTransport with server disconnected, erase server:%s", + addr.c_str()); + tcp->disconnect( + addr); // avoid coredump when connection with broker was broken + m_tcpTable.erase(addr); + } else if (connectStatus == e_connectSuccess) { + return tcp; + } else { + LOG_ERROR( + "go to fault state, erase:%s from tcpMap, and reconnect " + "it", + addr.c_str()); + m_tcpTable.erase(addr); + } + } + } + + //<!callback; + READ_CALLBACK callback = + needRespons ? &TcpRemotingClient::static_messageReceived : NULL; + + tts.reset(new TcpTransport(this, callback)); + tcpConnectStatus connectStatus = tts->connect(addr, m_tcpConnectTimeout); + if (connectStatus != e_connectWaitResponse) { + LOG_WARN("can not connect to :%s", addr.c_str()); + tts->disconnect(addr); + boost::shared_ptr<TcpTransport> pTcp; + return pTcp; + } else { + m_tcpTable[addr] = tts; // even if connecting failed finally, this + // server transport will be erased by next + // CreateTransport + } + } else { + LOG_WARN("get tcpTransport mutex failed :%s", addr.c_str()); + boost::shared_ptr<TcpTransport> pTcp; + return pTcp; + } + } + + tcpConnectStatus connectStatus = + tts->waitTcpConnectEvent(m_tcpConnectTimeout); + if (connectStatus != e_connectSuccess) { + LOG_WARN("can not connect to server:%s", addr.c_str()); + tts->disconnect(addr); + boost::shared_ptr<TcpTransport> pTcp; + return pTcp; + } else { + LOG_INFO("connect server with addr:%s success", addr.c_str()); + return tts; + } +} + +boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateNameserverTransport( + bool needRespons) { + // m_namesrvLock was added to avoid operation of nameServer was blocked by + // m_tcpLock, it was used by single Thread mostly, so no performance impact + // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking long + // time, if could not get m_namesrvlock, return NULL + bool bGetMutex = false; + boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, + boost::try_to_lock); + if (!lock.owns_lock()) { + if (!lock.timed_lock( + boost::get_system_time() + + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) { + LOG_ERROR("CreateNameserverTransport get timed_mutex timeout"); + boost::shared_ptr<TcpTransport> pTcp; + return pTcp; + } else { + bGetMutex = true; + } + } else { + bGetMutex = true; + } + + if (bGetMutex) { + if (!m_namesrvAddrChoosed.empty()) { + boost::shared_ptr<TcpTransport> pTcp = + GetTransport(m_namesrvAddrChoosed, true); + if (pTcp) + return pTcp; + else + m_namesrvAddrChoosed.clear(); + } + + vector<string>::iterator itp = m_namesrvAddrList.begin(); + for (; itp != m_namesrvAddrList.end(); ++itp) { + unsigned int index = m_namesrvIndex % m_namesrvAddrList.size(); + if (m_namesrvIndex == numeric_limits<unsigned int>::max()) + m_namesrvIndex = 0; + m_namesrvIndex++; + LOG_INFO("namesrvIndex is:%d, index:%d, namesrvaddrlist size:%zu", + m_namesrvIndex, index, m_namesrvAddrList.size()); + boost::shared_ptr<TcpTransport> pTcp = + GetTransport(m_namesrvAddrList[index], true); + if (pTcp) { + m_namesrvAddrChoosed = m_namesrvAddrList[index]; + return pTcp; + } + } + boost::shared_ptr<TcpTransport> pTcp; + return pTcp; + } else { + LOG_WARN("get nameServer tcpTransport mutex failed"); + boost::shared_ptr<TcpTransport> pTcp; + return pTcp; + } +} + +void TcpRemotingClient::CloseTransport(const string& addr, + boost::shared_ptr<TcpTransport> pTcp) { + if (addr.empty()) { + return CloseNameServerTransport(pTcp); + } + + bool bGetMutex = false; + boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock); + if (!lock.owns_lock()) { + if (!lock.timed_lock( + boost::get_system_time() + + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) { + LOG_ERROR("CloseTransport of:%s get timed_mutex timeout", addr.c_str()); + return; + } else { + bGetMutex = true; + } + } else { + bGetMutex = true; + } + LOG_ERROR("CloseTransport of:%s", addr.c_str()); + if (bGetMutex) { + bool removeItemFromTable = true; + if (m_tcpTable.find(addr) != m_tcpTable.end()) { + if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) { + LOG_INFO( + "tcpTransport with addr:%s has been closed before, and has been " + "created again, nothing to do", + addr.c_str()); + removeItemFromTable = false; + } + } else { + LOG_INFO( + "tcpTransport with addr:%s had been removed from tcpTable before", + addr.c_str()); + removeItemFromTable = false; + } + + if (removeItemFromTable == true) { + LOG_WARN("closeTransport: disconnect broker:%s with state:%d", + addr.c_str(), m_tcpTable[addr]->getTcpConnectStatus()); + if (m_tcpTable[addr]->getTcpConnectStatus() == e_connectSuccess) + m_tcpTable[addr]->disconnect( + addr); // avoid coredump when connection with server was broken + LOG_WARN("closeTransport: erase broker: %s", addr.c_str()); + m_tcpTable.erase(addr); + } + } else { + LOG_WARN("CloseTransport::get tcpTransport mutex failed:%s", addr.c_str()); + return; + } + LOG_ERROR("CloseTransport of:%s end", addr.c_str()); +} + +void TcpRemotingClient::CloseNameServerTransport( + boost::shared_ptr<TcpTransport> pTcp) { + bool bGetMutex = false; + boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, + boost::try_to_lock); + if (!lock.owns_lock()) { + if (!lock.timed_lock( + boost::get_system_time() + + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) { + LOG_ERROR("CreateNameserverTransport get timed_mutex timeout"); + return; + } else { + bGetMutex = true; + } + } else { + bGetMutex = true; + } + if (bGetMutex) { + string addr = m_namesrvAddrChoosed; + bool removeItemFromTable = true; + if (m_tcpTable.find(addr) != m_tcpTable.end()) { + if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) { + LOG_INFO( + "tcpTransport with addr:%s has been closed before, and has been " + "created again, nothing to do", + addr.c_str()); + removeItemFromTable = false; + } + } else { + LOG_INFO( + "tcpTransport with addr:%s had been removed from tcpTable before", + addr.c_str()); + removeItemFromTable = false; + } + + if (removeItemFromTable == true) { + m_tcpTable[addr]->disconnect( + addr); // avoid coredump when connection with server was broken + LOG_WARN("closeTransport: erase broker: %s", addr.c_str()); + m_tcpTable.erase(addr); + m_namesrvAddrChoosed.clear(); + } + } else { + LOG_WARN("CloseNameServerTransport::get tcpTransport mutex failed:%s", + m_namesrvAddrChoosed.c_str()); + return; + } +} + +bool TcpRemotingClient::SendCommand(boost::shared_ptr<TcpTransport> pTts, + RemotingCommand& msg) { + const MemoryBlock* phead = msg.GetHead(); + const MemoryBlock* pbody = msg.GetBody(); + + unique_ptr<MemoryOutputStream> result(new MemoryOutputStream(1024)); + if (phead->getData()) { + result->write(phead->getData(), phead->getSize()); + } + if (pbody->getData()) { + result->write(pbody->getData(), pbody->getSize()); + } + const char* pData = static_cast<const char*>(result->getData()); + int len = result->getDataSize(); + return pTts->sendMessage(pData, len); +} + +void TcpRemotingClient::static_messageReceived(void* context, + const MemoryBlock& mem, + const string& addr) { + TcpRemotingClient* pTcpRemotingClient = (TcpRemotingClient*)context; + if (pTcpRemotingClient) pTcpRemotingClient->messageReceived(mem, addr); +} + +void TcpRemotingClient::messageReceived(const MemoryBlock& mem, + const string& addr) { + m_ioService.post( + boost::bind(&TcpRemotingClient::ProcessData, this, mem, addr)); +} + +void TcpRemotingClient::ProcessData(const MemoryBlock& mem, + const string& addr) { + RemotingCommand* pRespondCmd = NULL; + try { + pRespondCmd = RemotingCommand::Decode(mem); + } catch (...) { + LOG_ERROR("processData_error"); + return; + } + + int opaque = pRespondCmd->getOpaque(); + + //<!process self; + if (pRespondCmd->isResponseType()) { + boost::shared_ptr<ResponseFuture> pFuture( + findAndDeleteAsyncResponseFuture(opaque)); + if (!pFuture) { + pFuture = findAndDeleteResponseFuture(opaque); + if (pFuture) { + if (pFuture->getSyncResponseFlag()) { + LOG_WARN("waitResponse already timeout of opaque:%d", opaque); + deleteAndZero(pRespondCmd); + return; + } + LOG_DEBUG("find_response opaque:%d", opaque); + } else { + LOG_DEBUG("responseFuture was deleted by timeout of opaque:%d", opaque); + deleteAndZero(pRespondCmd); + return; + } + } + processResponseCommand(pRespondCmd, pFuture); + } else { + processRequestCommand(pRespondCmd, addr); + } +} + +void TcpRemotingClient::processResponseCommand( + RemotingCommand* pCmd, boost::shared_ptr<ResponseFuture> pfuture) { + int code = pfuture->getRequestCode(); + int opaque = pCmd->getOpaque(); + LOG_DEBUG("processResponseCommand, code:%d,opaque:%d", code, opaque); + pCmd->SetExtHeader(code); // set head , for response use + + pfuture->setResponse(pCmd); + + if (pfuture->getASyncFlag()) { + if (!pfuture->getAsyncResponseFlag()) { + pfuture->setAsyncResponseFlag(); + pfuture->setAsyncCallBackStatus(asyncCallBackStatus_response); + pfuture->executeInvokeCallback(); + cancelTimerCallback(opaque); + } + } +} + +void TcpRemotingClient::processRequestCommand(RemotingCommand* pCmd, + const string& addr) { + unique_ptr<RemotingCommand> pRequestCommand(pCmd); + int requestCode = pRequestCommand->getCode(); + if (m_requestTable.find(requestCode) == m_requestTable.end()) { + LOG_ERROR("can_not_find request:%d processor", requestCode); + } else { + unique_ptr<RemotingCommand> pResponse( + m_requestTable[requestCode]->processRequest(addr, + pRequestCommand.get())); + if (!pRequestCommand->isOnewayRPC()) { + if (pResponse) { + pResponse->setOpaque(pRequestCommand->getOpaque()); + pResponse->markResponseType(); + pResponse->Encode(); + + invokeOneway(addr, *pResponse); + } + } + } +} + +void TcpRemotingClient::addResponseFuture( + int opaque, boost::shared_ptr<ResponseFuture> pfuture) { + boost::lock_guard<boost::mutex> lock(m_futureTableMutex); + m_futureTable[opaque] = pfuture; +} + +// Note: after call this function, shared_ptr of m_syncFutureTable[opaque] will +// be erased, so caller must ensure the life cycle of returned shared_ptr; +boost::shared_ptr<ResponseFuture> +TcpRemotingClient::findAndDeleteResponseFuture(int opaque) { + boost::lock_guard<boost::mutex> lock(m_futureTableMutex); + boost::shared_ptr<ResponseFuture> pResponseFuture; + if (m_futureTable.find(opaque) != m_futureTable.end()) { + pResponseFuture = m_futureTable[opaque]; + m_futureTable.erase(opaque); + } + return pResponseFuture; +} + +void TcpRemotingClient::handleAsyncPullForResponseTimeout( + const boost::system::error_code& e, int opaque) { + if (e == boost::asio::error::operation_aborted) { + return; + } + + boost::shared_ptr<ResponseFuture> pFuture( + findAndDeleteAsyncResponseFuture(opaque)); + if (pFuture && pFuture->getASyncFlag() && (pFuture->getAsyncCallbackWrap())) { + if ((pFuture->getAsyncResponseFlag() != + true)) // if no response received, then check timeout or not + { + LOG_ERROR("no response got for opaque:%d", opaque); + pFuture->setAsyncCallBackStatus(asyncCallBackStatus_timeout); + pFuture->executeInvokeCallbackException(); + } + } + + eraseTimerCallback(opaque); +} + +void TcpRemotingClient::addAsyncResponseFuture( + int opaque, boost::shared_ptr<ResponseFuture> pfuture) { + boost::lock_guard<boost::mutex> lock(m_asyncFutureLock); + m_asyncFutureTable[opaque] = pfuture; +} + +// Note: after call this function, shared_ptr of m_asyncFutureTable[opaque] will +// be erased, so caller must ensure the life cycle of returned shared_ptr; +boost::shared_ptr<ResponseFuture> +TcpRemotingClient::findAndDeleteAsyncResponseFuture(int opaque) { + boost::lock_guard<boost::mutex> lock(m_asyncFutureLock); + boost::shared_ptr<ResponseFuture> pResponseFuture; + if (m_asyncFutureTable.find(opaque) != m_asyncFutureTable.end()) { + pResponseFuture = m_asyncFutureTable[opaque]; + m_asyncFutureTable.erase(opaque); + } + + return pResponseFuture; +} + +void TcpRemotingClient::registerProcessor( + MQRequestCode requestCode, + ClientRemotingProcessor* clientRemotingProcessor) { + if (m_requestTable.find(requestCode) != m_requestTable.end()) + m_requestTable.erase(requestCode); + m_requestTable[requestCode] = clientRemotingProcessor; +} + +void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t, + int opaque) { + boost::lock_guard<boost::mutex> lock(m_timerMapMutex); + if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) { + // AGENT_INFO("addTimerCallback:erase timerCallback opaque:%lld", opaque); + boost::asio::deadline_timer* old_t = m_async_timer_map[opaque]; + old_t->cancel(); + delete old_t; + old_t = NULL; + m_async_timer_map.erase(opaque); + } + m_async_timer_map[opaque] = t; +} + +void TcpRemotingClient::eraseTimerCallback(int opaque) { + boost::lock_guard<boost::mutex> lock(m_timerMapMutex); + if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) { + boost::asio::deadline_timer* t = m_async_timer_map[opaque]; + delete t; + t = NULL; + m_async_timer_map.erase(opaque); + } +} + +void TcpRemotingClient::cancelTimerCallback(int opaque) { + boost::lock_guard<boost::mutex> lock(m_timerMapMutex); + if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) { + // AGENT_INFO("cancel timerCallback opaque:%lld", opaque); + boost::asio::deadline_timer* t = m_async_timer_map[opaque]; + t->cancel(); + delete t; + t = NULL; + m_async_timer_map.erase(opaque); + } +} + +void TcpRemotingClient::removeAllTimerCallback() { + boost::lock_guard<boost::mutex> lock(m_timerMapMutex); + for (asyncTimerMap::iterator it = m_async_timer_map.begin(); + it != m_async_timer_map.end(); ++it) { + boost::asio::deadline_timer* t = it->second; + t->cancel(); + delete t; + t = NULL; + } + m_async_timer_map.clear(); +} + +//<!************************************************************************ +} //<!end namespace; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/TcpRemotingClient.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/transport/TcpRemotingClient.h b/rocketmq-cpp/src/transport/TcpRemotingClient.h new file mode 100755 index 0000000..832b49a --- /dev/null +++ b/rocketmq-cpp/src/transport/TcpRemotingClient.h @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __TCPREMOTINGCLIENT_H__ +#define __TCPREMOTINGCLIENT_H__ + +#include <boost/asio.hpp> +#include <boost/asio/io_service.hpp> +#include <boost/bind.hpp> +#include <boost/date_time/posix_time/posix_time.hpp> +#include <boost/weak_ptr.hpp> +#include <map> +#include "ClientRemotingProcessor.h" +#include "RemotingCommand.h" +#include "ResponseFuture.h" +#include "SocketUtil.h" +#include "TcpTransport.h" + +namespace rocketmq { +//<!************************************************************************ + +class TcpRemotingClient { + public: + TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, + uint64_t tcpTransportTryLockTimeout); + virtual ~TcpRemotingClient(); + void stopAllTcpTransportThread(); + void updateNameServerAddressList(const string& addrs); + + //<!delete outsite; + RemotingCommand* invokeSync(const string& addr, RemotingCommand& request, + int timeoutMillis = 3000); + + bool invokeHeartBeat(const string& addr, RemotingCommand& request); + + bool invokeAsync(const string& addr, RemotingCommand& request, + AsyncCallbackWrap* cbw, int64 timeoutMilliseconds); + + void invokeOneway(const string& addr, RemotingCommand& request); + + void ProcessData(const MemoryBlock& mem, const string& addr); + + void registerProcessor(MQRequestCode requestCode, + ClientRemotingProcessor* clientRemotingProcessor); + + void boost_asio_work(); + void handleAsyncPullForResponseTimeout(const boost::system::error_code& e, + int opaque); + + private: + static void static_messageReceived(void* context, const MemoryBlock& mem, + const string& addr); + void messageReceived(const MemoryBlock& mem, const string& addr); + boost::shared_ptr<TcpTransport> GetTransport(const string& addr, + bool needRespons); + boost::shared_ptr<TcpTransport> CreateTransport(const string& addr, + bool needRespons); + boost::shared_ptr<TcpTransport> CreateNameserverTransport(bool needRespons); + void CloseTransport(const string& addr, boost::shared_ptr<TcpTransport> pTcp); + void CloseNameServerTransport(boost::shared_ptr<TcpTransport> pTcp); + bool SendCommand(boost::shared_ptr<TcpTransport> pTts, RemotingCommand& msg); + void processRequestCommand(RemotingCommand* pCmd, const string& addr); + void processResponseCommand(RemotingCommand* pCmd, + boost::shared_ptr<ResponseFuture> pfuture); + + void addResponseFuture(int opaque, boost::shared_ptr<ResponseFuture> pfuture); + boost::shared_ptr<ResponseFuture> findAndDeleteResponseFuture(int opaque); + + void addAsyncResponseFuture(int opaque, + boost::shared_ptr<ResponseFuture> pfuture); + boost::shared_ptr<ResponseFuture> findAndDeleteAsyncResponseFuture( + int opaque); + + void addTimerCallback(boost::asio::deadline_timer* t, int opaque); + void eraseTimerCallback(int opaque); + void cancelTimerCallback(int opaque); + void removeAllTimerCallback(); + + private: + typedef map<string, boost::shared_ptr<TcpTransport>> TcpMap; + typedef map<int, boost::shared_ptr<ResponseFuture>> ResMap; + + typedef map<int, ClientRemotingProcessor*> RequestMap; + RequestMap m_requestTable; + + boost::mutex m_futureTableMutex; + ResMap m_futureTable; //<! id->future; + + ResMap m_asyncFutureTable; + boost::mutex m_asyncFutureLock; + + TcpMap m_tcpTable; //<! ip->tcp; + boost::timed_mutex m_tcpLock; + + // ThreadPool m_threadpool; + int m_pullThreadNum; + uint64_t m_tcpConnectTimeout; // ms + uint64_t m_tcpTransportTryLockTimeout; // s + + //<! Nameserver + boost::timed_mutex m_namesrvlock; + vector<string> m_namesrvAddrList; + string m_namesrvAddrChoosed; + unsigned int m_namesrvIndex; + boost::asio::io_service m_ioService; + boost::thread_group m_threadpool; + boost::asio::io_service::work m_ioServiceWork; + + boost::asio::io_service m_async_ioService; + unique_ptr<boost::thread> m_async_service_thread; + + typedef map<int, boost::asio::deadline_timer*> asyncTimerMap; + boost::mutex m_timerMapMutex; + asyncTimerMap m_async_timer_map; +}; + +//<!************************************************************************ +} //<!end namespace; + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/TcpTransport.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/transport/TcpTransport.cpp b/rocketmq-cpp/src/transport/TcpTransport.cpp new file mode 100755 index 0000000..82e9526 --- /dev/null +++ b/rocketmq-cpp/src/transport/TcpTransport.cpp @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "TcpTransport.h" +#include <arpa/inet.h> // for sockaddr_in and inet_ntoa... +#include <netinet/tcp.h> +#include <sys/socket.h> // for socket(), bind(), and connect()... +#include "Logging.h" +#include "TcpRemotingClient.h" +#include "UtilAll.h" + +namespace rocketmq { + +//<!************************************************************************ +TcpTransport::TcpTransport(TcpRemotingClient *pTcpRemointClient, + READ_CALLBACK handle /* = NULL */) + : m_tcpConnectStatus(e_connectInit), + m_ReadDatathread(NULL), + m_readcallback(handle), + m_tcpRemotingClient(pTcpRemointClient) { + m_startTime = UtilAll::currentTimeMillis(); + evthread_use_pthreads(); + m_eventBase = NULL; + m_bufferEvent = NULL; +} +TcpTransport::~TcpTransport() { + m_readcallback = NULL; + m_bufferEvent = NULL; + m_eventBase = NULL; +} + +tcpConnectStatus TcpTransport::connect(const string &strServerURL, + int timeOutMillisecs /* = 3000 */) { + string hostName; + short portNumber; + if (!UtilAll::SplitURL(strServerURL, hostName, portNumber)) { + return e_connectFail; + } + + boost::lock_guard<boost::mutex> lock(m_socketLock); + + struct sockaddr_in sin; + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_addr.s_addr = inet_addr(hostName.c_str()); + sin.sin_port = htons(portNumber); + + m_eventBase = event_base_new(); + m_bufferEvent = bufferevent_socket_new( + m_eventBase, -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE); + bufferevent_setcb(m_bufferEvent, readNextMessageIntCallback, NULL, eventcb, + this); + bufferevent_enable(m_bufferEvent, EV_READ | EV_WRITE); + bufferevent_setwatermark(m_bufferEvent, EV_READ, 4, 0); + + setTcpConnectStatus(e_connectWaitResponse); + if (bufferevent_socket_connect(m_bufferEvent, (struct sockaddr *)&sin, + sizeof(sin)) < 0) { + LOG_INFO("connect to fd:%d failed", bufferevent_getfd(m_bufferEvent)); + setTcpConnectStatus(e_connectFail); + freeBufferEvent(); + return e_connectFail; + } else { + int fd = bufferevent_getfd(m_bufferEvent); + LOG_INFO("try to connect to fd:%d, addr:%s", fd, (hostName.c_str())); + /*struct timeval timeout; + timeout.tv_sec = timeOutMillisecs/1000; + timeout.tv_usec = 0; + struct event* evtimeout = evtimer_new(m_eventBase, timeoutcb, this); + evtimer_add(evtimeout, &timeout);*/ + evthread_make_base_notifiable(m_eventBase); + m_ReadDatathread = + new boost::thread(boost::bind(&TcpTransport::runThread, this)); + return e_connectWaitResponse; + } +} + +void TcpTransport::setTcpConnectStatus(tcpConnectStatus connectStatus) { + m_tcpConnectStatus = connectStatus; +} + +tcpConnectStatus TcpTransport::getTcpConnectStatus() { + return m_tcpConnectStatus; +} + +tcpConnectStatus TcpTransport::waitTcpConnectEvent(int timeoutMillisecs) { + boost::unique_lock<boost::mutex> lk(m_connectEventLock); + if (!m_connectEvent.timed_wait( + lk, boost::posix_time::milliseconds(timeoutMillisecs))) { + LOG_INFO("connect timeout"); + } + return getTcpConnectStatus(); +} + +void TcpTransport::setTcpConnectEvent(tcpConnectStatus connectStatus) { + tcpConnectStatus baseStatus(getTcpConnectStatus()); + setTcpConnectStatus(connectStatus); + if (baseStatus == e_connectWaitResponse) { + LOG_INFO("received libevent callback event"); + m_connectEvent.notify_all(); + } +} + +void TcpTransport::disconnect(const string &addr) { + boost::lock_guard<boost::mutex> lock(m_socketLock); + if (getTcpConnectStatus() != e_connectInit) { + clearBufferEventCallback(); + LOG_INFO("disconnect:%s start", addr.c_str()); + m_connectEvent.notify_all(); + setTcpConnectStatus(e_connectInit); + if (m_ReadDatathread) { + m_ReadDatathread->interrupt(); + exitBaseDispatch(); + while (m_ReadDatathread->timed_join(boost::posix_time::seconds(1)) == + false) { + LOG_WARN("join readDataThread fail, retry"); + m_ReadDatathread->interrupt(); + exitBaseDispatch(); + } + delete m_ReadDatathread; + m_ReadDatathread = NULL; + } + freeBufferEvent(); + LOG_INFO("disconnect:%s completely", addr.c_str()); + } +} + +void TcpTransport::clearBufferEventCallback() { + if (m_bufferEvent) { + // Bufferevents are internally reference-counted, so if the bufferevent has + // pending deferred callbacks when you free it, it won't be deleted until + // the callbacks are done. + // so just empty callback to avoid future callback by libevent + bufferevent_setcb(m_bufferEvent, NULL, NULL, NULL, NULL); + } +} + +void TcpTransport::freeBufferEvent() { + if (m_bufferEvent) { + bufferevent_free(m_bufferEvent); + } + if (m_eventBase) { + event_base_free(m_eventBase); + } +} +void TcpTransport::exitBaseDispatch() { + if (m_eventBase) { + event_base_loopbreak(m_eventBase); + // event_base_loopexit(m_eventBase, NULL); //Note: memory leak will be + // occured when timer callback was not done; + } +} + +void TcpTransport::runThread() { + while (m_ReadDatathread) { + if (m_eventBase != NULL) { + event_base_dispatch(m_eventBase); + // event_base_loop(m_eventBase, EVLOOP_ONCE);//EVLOOP_NONBLOCK should not + // be used, as could not callback event immediatly + } + LOG_INFO("event_base_dispatch exit once"); + boost::this_thread::sleep(boost::posix_time::milliseconds(1)); + if (getTcpConnectStatus() != e_connectSuccess) return; + } +} + +void TcpTransport::timeoutcb(evutil_socket_t fd, short what, void *arg) { + LOG_INFO("timeoutcb: received event:%d on fd:%d", what, fd); + TcpTransport *tcpTrans = (TcpTransport *)arg; + if (tcpTrans->getTcpConnectStatus() != e_connectSuccess) { + LOG_INFO("timeoutcb: after connect time, tcp was not established on fd:%d", + fd); + tcpTrans->setTcpConnectStatus(e_connectFail); + } else { + LOG_INFO("timeoutcb: after connect time, tcp was established on fd:%d", fd); + } +} + +void TcpTransport::eventcb(struct bufferevent *bev, short what, void *ctx) { + evutil_socket_t fd = bufferevent_getfd(bev); + TcpTransport *tcpTrans = (TcpTransport *)ctx; + LOG_INFO("eventcb: received event:%x on fd:%d", what, fd); + if (what & BEV_EVENT_CONNECTED) { + int val = 1; + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&val, sizeof(val)); + LOG_INFO("eventcb:connect to fd:%d successfully", fd); + tcpTrans->setTcpConnectEvent(e_connectSuccess); + } else if (what & (BEV_EVENT_ERROR | BEV_EVENT_EOF | BEV_EVENT_READING | + BEV_EVENT_WRITING)) { + LOG_INFO("eventcb:rcv error event cb:%x on fd:%d", what, fd); + tcpTrans->setTcpConnectEvent(e_connectFail); + bufferevent_setcb(bev, NULL, NULL, NULL, NULL); + // bufferevent_disable(bev, EV_READ|EV_WRITE); + // bufferevent_free(bev); + } else { + LOG_ERROR("eventcb: received error event:%d on fd:%d", what, fd); + } +} + +void TcpTransport::readNextMessageIntCallback(struct bufferevent *bev, + void *ctx) { + /* This callback is invoked when there is data to read on bev. */ + + // protocol: <length> <header length> <header data> <body data> + // 1 2 3 4 + // rocketmq protocol contains 4 parts as following: + // 1��big endian 4 bytes int, its length is sum of 2,3 and 4 + // 2��big endian 4 bytes int, its length is 3 + // 3��use json to serialization data + // 4��application could self-defination binary data + + struct evbuffer *input = bufferevent_get_input(bev); + while (1) { + struct evbuffer_iovec v[4]; + int n = evbuffer_peek(input, 4, NULL, v, sizeof(v) / sizeof(v[0])); + + int idx = 0; + char hdr[4]; + char *p = hdr; + unsigned int needed = 4; + + for (idx = 0; idx < n; idx++) { + if (needed) { + unsigned int tmp = needed < v[idx].iov_len ? needed : v[idx].iov_len; + memcpy(p, v[idx].iov_base, tmp); + p += tmp; + needed -= tmp; + } else { + break; + } + } + + if (needed) { + LOG_DEBUG(" too little data received with sum = %d ", 4 - needed); + return; + } + uint32 totalLenOfOneMsg = + *(uint32 *)hdr; // first 4 bytes, which indicates 1st part of protocol + uint32 bytesInMessage = ntohl(totalLenOfOneMsg); + LOG_DEBUG("fd:%d, totalLen:%zu, bytesInMessage:%d", bufferevent_getfd(bev), + v[0].iov_len, bytesInMessage); + + uint32 len = evbuffer_get_length(input); + if (len >= bytesInMessage + 4) { + LOG_DEBUG("had received all data with len:%d from fd:%d", len, + bufferevent_getfd(bev)); + } else { + LOG_DEBUG( + "didn't received whole bytesInMessage:%d, from fd:%d, totalLen:%d", + bytesInMessage, bufferevent_getfd(bev), len); + return; // consider large data which was not received completely by now + } + + if (bytesInMessage > 0) { + MemoryBlock messageData(bytesInMessage, true); + uint32 bytesRead = 0; + void *data = (void *)(messageData.getData() + bytesRead); + bufferevent_read(bev, data, 4); + bytesRead = bufferevent_read(bev, data, bytesInMessage); + + TcpTransport *tcpTrans = (TcpTransport *)ctx; + tcpTrans->messageReceived(messageData); + } + } +} + +bool TcpTransport::sendMessage(const char *pData, int len) { + boost::lock_guard<boost::mutex> lock(m_socketLock); + if (getTcpConnectStatus() != e_connectSuccess) { + return false; + } + + int bytes_left = len; + int bytes_written = 0; + const char *ptr = pData; + + /*NOTE: + 1. do not need to consider large data which could not send by once, as + bufferevent could handle this case; + */ + if (m_bufferEvent) { + bytes_written = bufferevent_write(m_bufferEvent, ptr, bytes_left); + if (bytes_written == 0) + return true; + else + return false; + } + return false; +} + +void TcpTransport::messageReceived(const MemoryBlock &mem) { + if (m_readcallback) { + m_readcallback(m_tcpRemotingClient, mem, getPeerAddrAndPort()); + } +} + +const string TcpTransport::getPeerAddrAndPort() { + struct sockaddr_in broker; + socklen_t cLen = sizeof(broker); + + // getsockname(m_socket->getRawSocketHandle(), (struct sockaddr*) &s, &sLen); + // // ! use connectSock here. + getpeername(bufferevent_getfd(m_bufferEvent), (struct sockaddr *)&broker, + &cLen); // ! use connectSock here. + LOG_DEBUG("broker addr: %s, broker port: %d", inet_ntoa(broker.sin_addr), + ntohs(broker.sin_port)); + string brokerAddr(inet_ntoa(broker.sin_addr)); + brokerAddr.append(":"); + string brokerPort(UtilAll::to_string(ntohs(broker.sin_port))); + brokerAddr.append(brokerPort); + LOG_DEBUG("brokerAddr:%s", brokerAddr.c_str()); + return brokerAddr; +} + +const uint64_t TcpTransport::getStartTime() const { return m_startTime; } + +} //<!end namespace; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/TcpTransport.h ---------------------------------------------------------------------- diff --git a/rocketmq-cpp/src/transport/TcpTransport.h b/rocketmq-cpp/src/transport/TcpTransport.h new file mode 100755 index 0000000..7ba32dc --- /dev/null +++ b/rocketmq-cpp/src/transport/TcpTransport.h @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __TCPTRANSPORT_H__ +#define __TCPTRANSPORT_H__ + +#include <boost/atomic.hpp> +#include <boost/thread/condition_variable.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/thread.hpp> +#include "dataBlock.h" + +extern "C" { +#include "event2/buffer.h" +#include "event2/bufferevent.h" +#include "event2/event.h" +#include "event2/thread.h" +} + +namespace rocketmq { +//<!*************************************************************************** +typedef enum { + e_connectInit = 0, + e_connectWaitResponse = 1, + e_connectSuccess = 2, + e_connectFail = 3 +} tcpConnectStatus; + +typedef void (*READ_CALLBACK)(void *context, const MemoryBlock &, + const std::string &); +class TcpRemotingClient; +class TcpTransport { + public: + TcpTransport(TcpRemotingClient *pTcpRemointClient, + READ_CALLBACK handle = NULL); + virtual ~TcpTransport(); + + tcpConnectStatus connect(const std::string &strServerURL, + int timeOutMillisecs = 3000); + void disconnect(const std::string &addr); + tcpConnectStatus waitTcpConnectEvent(int timeoutMillisecs = 3000); + void setTcpConnectStatus(tcpConnectStatus connectStatus); + tcpConnectStatus getTcpConnectStatus(); + bool sendMessage(const char *pData, int len); + const std::string getPeerAddrAndPort(); + const uint64_t getStartTime() const; + + private: + void messageReceived(const MemoryBlock &mem); + static void readNextMessageIntCallback(struct bufferevent *bev, void *ctx); + static void eventcb(struct bufferevent *bev, short what, void *ctx); + static void timeoutcb(evutil_socket_t fd, short what, void *arg); + void runThread(); + void clearBufferEventCallback(); + void freeBufferEvent(); + void exitBaseDispatch(); + void setTcpConnectEvent(tcpConnectStatus connectStatus); + + private: + uint64_t m_startTime; + boost::mutex m_socketLock; + struct event_base *m_eventBase; + struct bufferevent *m_bufferEvent; + boost::atomic<tcpConnectStatus> m_tcpConnectStatus; + boost::mutex m_connectEventLock; + boost::condition_variable_any m_connectEvent; + //<!read data thread + boost::thread *m_ReadDatathread; + + //<! read data callback + READ_CALLBACK m_readcallback; + TcpRemotingClient *m_tcpRemotingClient; +}; + +//<!************************************************************************ +} //<!end namespace; + +#endif