Repository: incubator-rocketmq-externals
Updated Branches:
  refs/heads/master 343ab198e -> 70ce5c770


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/TcpRemotingClient.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/transport/TcpRemotingClient.cpp 
b/rocketmq-cpp/src/transport/TcpRemotingClient.cpp
new file mode 100755
index 0000000..f74d529
--- /dev/null
+++ b/rocketmq-cpp/src/transport/TcpRemotingClient.cpp
@@ -0,0 +1,728 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TcpRemotingClient.h"
+#include <stddef.h>
+#include <sys/prctl.h>
+#include "Logging.h"
+#include "MemoryOutputStream.h"
+#include "TopAddressing.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+//<!************************************************************************
+TcpRemotingClient::TcpRemotingClient(int pullThreadNum,
+                                     uint64_t tcpConnectTimeout,
+                                     uint64_t tcpTransportTryLockTimeout)
+    : m_pullThreadNum(pullThreadNum),
+      m_tcpConnectTimeout(tcpConnectTimeout),
+      m_tcpTransportTryLockTimeout(tcpTransportTryLockTimeout),
+      m_namesrvIndex(0),
+      m_ioServiceWork(m_ioService) {
+  string taskName = UtilAll::getProcessName();
+  prctl(PR_SET_NAME, "networkTP", 0, 0, 0);
+  for (int i = 0; i != pullThreadNum; ++i) {
+    m_threadpool.create_thread(
+        boost::bind(&boost::asio::io_service::run, &m_ioService));
+  }
+  prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0);
+
+  LOG_INFO(
+      "m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, "
+      "m_pullThreadNum:%d",
+      m_tcpConnectTimeout, m_tcpTransportTryLockTimeout, m_pullThreadNum);
+  m_async_service_thread.reset(new boost::thread(
+      boost::bind(&TcpRemotingClient::boost_asio_work, this)));
+}
+
+void TcpRemotingClient::boost_asio_work() {
+  LOG_INFO("TcpRemotingClient::boost asio async service runing");
+  boost::asio::io_service::work work(m_async_ioService);  // avoid async io
+                                                          // service stops 
after
+                                                          // first timer 
timeout
+                                                          // callback
+  m_async_ioService.run();
+}
+
+TcpRemotingClient::~TcpRemotingClient() {
+  m_tcpTable.clear();
+  m_futureTable.clear();
+  m_asyncFutureTable.clear();
+  m_namesrvAddrList.clear();
+  removeAllTimerCallback();
+}
+
+void TcpRemotingClient::stopAllTcpTransportThread() {
+  LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread Begin");
+  m_async_ioService.stop();
+  m_async_service_thread->interrupt();
+  m_async_service_thread->join();
+  removeAllTimerCallback();
+
+  {
+    TcpMap::iterator it = m_tcpTable.begin();
+    for (; it != m_tcpTable.end(); ++it) {
+      it->second->disconnect(it->first);
+    }
+    m_tcpTable.clear();
+  }
+
+  m_ioService.stop();
+  m_threadpool.join_all();
+
+  {
+    boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
+    for (ResMap::iterator it = m_futureTable.begin(); it != 
m_futureTable.end();
+         ++it) {
+      if (it->second) it->second->releaseThreadCondition();
+    }
+  }
+  LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread End");
+}
+
+void TcpRemotingClient::updateNameServerAddressList(const string& addrs) {
+  if (!addrs.empty()) {
+    boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock,
+                                                boost::try_to_lock);
+    if (!lock.owns_lock()) {
+      if (!lock.timed_lock(boost::get_system_time() +
+                           boost::posix_time::seconds(10))) {
+        LOG_ERROR("updateNameServerAddressList get timed_mutex timeout");
+        return;
+      }
+    }
+    // clear first;
+    m_namesrvAddrList.clear();
+
+    vector<string> out;
+    UtilAll::Split(out, addrs, ";");
+    for (size_t i = 0; i < out.size(); i++) {
+      string addr = out[i];
+      UtilAll::Trim(addr);
+
+      string hostName;
+      short portNumber;
+      if (UtilAll::SplitURL(addr, hostName, portNumber)) {
+        LOG_INFO("update Namesrv:%s", addr.c_str());
+        m_namesrvAddrList.push_back(addr);
+      }
+    }
+    out.clear();
+  }
+}
+
+bool TcpRemotingClient::invokeHeartBeat(const string& addr,
+                                        RemotingCommand& request) {
+  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+  if (pTcp != NULL) {
+    int code = request.getCode();
+    int opaque = request.getOpaque();
+    boost::shared_ptr<ResponseFuture> responseFuture(
+        new ResponseFuture(code, opaque, this, 3000, false, NULL));
+    addResponseFuture(opaque, responseFuture);
+    // LOG_INFO("invokeHeartbeat success, addr:%s, code:%d, opaque:%d,
+    // timeoutms:%d", addr.c_str(), code, opaque, 3000);
+
+    if (SendCommand(pTcp, request)) {
+      responseFuture->setSendRequestOK(true);
+      unique_ptr<RemotingCommand> pRsp(responseFuture->waitResponse(3000));
+      if (pRsp == NULL) {
+        LOG_ERROR(
+            "wait response timeout of heartbeat, so closeTransport of addr:%s",
+            addr.c_str());
+        CloseTransport(addr, pTcp);
+        return false;
+      } else if (pRsp->getCode() == SUCCESS_VALUE) {
+        return true;
+      } else {
+        LOG_WARN("get error response:%d of heartbeat to addr:%s",
+                 pRsp->getCode(), addr.c_str());
+        return false;
+      }
+    } else {
+      CloseTransport(addr, pTcp);
+    }
+  }
+  return false;
+}
+
+RemotingCommand* TcpRemotingClient::invokeSync(const string& addr,
+                                               RemotingCommand& request,
+                                               int timeoutMillis /* = 3000 */) 
{
+  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+  if (pTcp != NULL) {
+    int code = request.getCode();
+    int opaque = request.getOpaque();
+    boost::shared_ptr<ResponseFuture> responseFuture(
+        new ResponseFuture(code, opaque, this, timeoutMillis, false, NULL));
+    addResponseFuture(opaque, responseFuture);
+
+    if (SendCommand(pTcp, request)) {
+      // LOG_INFO("invokeSync success, addr:%s, code:%d, opaque:%d,
+      // timeoutms:%d", addr.c_str(), code, opaque, timeoutMillis);
+      responseFuture->setSendRequestOK(true);
+      RemotingCommand* pRsp = responseFuture->waitResponse(timeoutMillis);
+      if (pRsp == NULL) {
+        if (code != GET_CONSUMER_LIST_BY_GROUP) {
+          LOG_WARN(
+              "wait response timeout or get NULL response of code:%d, so "
+              "closeTransport of addr:%s",
+              code, addr.c_str());
+          CloseTransport(addr, pTcp);
+        }
+        // avoid responseFuture leak;
+        findAndDeleteResponseFuture(opaque);
+        return NULL;
+      } else {
+        return pRsp;
+      }
+    } else {
+      // avoid responseFuture leak;
+      findAndDeleteResponseFuture(opaque);
+      CloseTransport(addr, pTcp);
+    }
+  }
+  return NULL;
+}
+
+bool TcpRemotingClient::invokeAsync(const string& addr,
+                                    RemotingCommand& request,
+                                    AsyncCallbackWrap* cbw,
+                                    int64 timeoutMilliseconds) {
+  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+  if (pTcp != NULL) {
+    //<!not delete, for callback to delete;
+    int code = request.getCode();
+    int opaque = request.getOpaque();
+    boost::shared_ptr<ResponseFuture> responseFuture(
+        new ResponseFuture(code, opaque, this, timeoutMilliseconds, true, 
cbw));
+    addAsyncResponseFuture(opaque, responseFuture);
+    if (cbw) {
+      boost::asio::deadline_timer* t = new boost::asio::deadline_timer(
+          m_async_ioService,
+          boost::posix_time::milliseconds(timeoutMilliseconds));
+      addTimerCallback(t, opaque);
+      boost::system::error_code e;
+      t->async_wait(
+          boost::bind(&TcpRemotingClient::handleAsyncPullForResponseTimeout,
+                      this, e, opaque));
+    }
+
+    if (SendCommand(pTcp, request))  // Even if send failed, asyncTimerThread
+                                     // will trigger next pull request or 
report
+                                     // send msg failed
+    {
+      LOG_DEBUG("invokeAsync success, addr:%s, code:%d, opaque:%d",
+                addr.c_str(), code, opaque);
+      responseFuture->setSendRequestOK(true);
+    }
+    return true;
+  }
+  LOG_ERROR("invokeAsync failed of addr:%s", addr.c_str());
+  return false;
+}
+
+void TcpRemotingClient::invokeOneway(const string& addr,
+                                     RemotingCommand& request) {
+  //<!not need callback;
+  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+  if (pTcp != NULL) {
+    request.markOnewayRPC();
+    LOG_DEBUG("invokeOneway success, addr:%s, code:%d", addr.c_str(),
+              request.getCode());
+    SendCommand(pTcp, request);
+  }
+}
+
+boost::shared_ptr<TcpTransport> TcpRemotingClient::GetTransport(
+    const string& addr, bool needRespons) {
+  if (addr.empty()) return CreateNameserverTransport(needRespons);
+
+  return CreateTransport(addr, needRespons);
+}
+
+boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport(
+    const string& addr, bool needRespons) {
+  boost::shared_ptr<TcpTransport> tts;
+  {
+    // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking
+    // long
+    // time, if could not get m_tcpLock, return NULL
+    bool bGetMutex = false;
+    boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock);
+    if (!lock.owns_lock()) {
+      if (!lock.timed_lock(
+              boost::get_system_time() +
+              boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+        LOG_ERROR("GetTransport of:%s get timed_mutex timeout", addr.c_str());
+        boost::shared_ptr<TcpTransport> pTcp;
+        return pTcp;
+      } else {
+        bGetMutex = true;
+      }
+    } else {
+      bGetMutex = true;
+    }
+    if (bGetMutex) {
+      if (m_tcpTable.find(addr) != m_tcpTable.end()) {
+        boost::weak_ptr<TcpTransport> weakPtcp(m_tcpTable[addr]);
+        boost::shared_ptr<TcpTransport> tcp = weakPtcp.lock();
+        if (tcp) {
+          tcpConnectStatus connectStatus = tcp->getTcpConnectStatus();
+          if (connectStatus == e_connectWaitResponse) {
+            boost::shared_ptr<TcpTransport> pTcp;
+            return pTcp;
+          } else if (connectStatus == e_connectFail) {
+            LOG_ERROR("tcpTransport with server disconnected, erase server:%s",
+                      addr.c_str());
+            tcp->disconnect(
+                addr);  // avoid coredump when connection with broker was 
broken
+            m_tcpTable.erase(addr);
+          } else if (connectStatus == e_connectSuccess) {
+            return tcp;
+          } else {
+            LOG_ERROR(
+                "go to fault state, erase:%s from tcpMap, and reconnect "
+                "it",
+                addr.c_str());
+            m_tcpTable.erase(addr);
+          }
+        }
+      }
+
+      //<!callback;
+      READ_CALLBACK callback =
+          needRespons ? &TcpRemotingClient::static_messageReceived : NULL;
+
+      tts.reset(new TcpTransport(this, callback));
+      tcpConnectStatus connectStatus = tts->connect(addr, m_tcpConnectTimeout);
+      if (connectStatus != e_connectWaitResponse) {
+        LOG_WARN("can not connect to :%s", addr.c_str());
+        tts->disconnect(addr);
+        boost::shared_ptr<TcpTransport> pTcp;
+        return pTcp;
+      } else {
+        m_tcpTable[addr] = tts;  // even if connecting failed finally, this
+                                 // server transport will be erased by next
+                                 // CreateTransport
+      }
+    } else {
+      LOG_WARN("get tcpTransport mutex failed :%s", addr.c_str());
+      boost::shared_ptr<TcpTransport> pTcp;
+      return pTcp;
+    }
+  }
+
+  tcpConnectStatus connectStatus =
+      tts->waitTcpConnectEvent(m_tcpConnectTimeout);
+  if (connectStatus != e_connectSuccess) {
+    LOG_WARN("can not connect to server:%s", addr.c_str());
+    tts->disconnect(addr);
+    boost::shared_ptr<TcpTransport> pTcp;
+    return pTcp;
+  } else {
+    LOG_INFO("connect server with addr:%s success", addr.c_str());
+    return tts;
+  }
+}
+
+boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateNameserverTransport(
+    bool needRespons) {
+  // m_namesrvLock was added to avoid operation of nameServer was blocked by
+  // m_tcpLock, it was used by single Thread mostly, so no performance impact
+  // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking long
+  // time, if could not get m_namesrvlock, return NULL
+  bool bGetMutex = false;
+  boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock,
+                                              boost::try_to_lock);
+  if (!lock.owns_lock()) {
+    if (!lock.timed_lock(
+            boost::get_system_time() +
+            boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+      LOG_ERROR("CreateNameserverTransport get timed_mutex timeout");
+      boost::shared_ptr<TcpTransport> pTcp;
+      return pTcp;
+    } else {
+      bGetMutex = true;
+    }
+  } else {
+    bGetMutex = true;
+  }
+
+  if (bGetMutex) {
+    if (!m_namesrvAddrChoosed.empty()) {
+      boost::shared_ptr<TcpTransport> pTcp =
+          GetTransport(m_namesrvAddrChoosed, true);
+      if (pTcp)
+        return pTcp;
+      else
+        m_namesrvAddrChoosed.clear();
+    }
+
+    vector<string>::iterator itp = m_namesrvAddrList.begin();
+    for (; itp != m_namesrvAddrList.end(); ++itp) {
+      unsigned int index = m_namesrvIndex % m_namesrvAddrList.size();
+      if (m_namesrvIndex == numeric_limits<unsigned int>::max())
+        m_namesrvIndex = 0;
+      m_namesrvIndex++;
+      LOG_INFO("namesrvIndex is:%d, index:%d, namesrvaddrlist size:%zu",
+               m_namesrvIndex, index, m_namesrvAddrList.size());
+      boost::shared_ptr<TcpTransport> pTcp =
+          GetTransport(m_namesrvAddrList[index], true);
+      if (pTcp) {
+        m_namesrvAddrChoosed = m_namesrvAddrList[index];
+        return pTcp;
+      }
+    }
+    boost::shared_ptr<TcpTransport> pTcp;
+    return pTcp;
+  } else {
+    LOG_WARN("get nameServer tcpTransport mutex failed");
+    boost::shared_ptr<TcpTransport> pTcp;
+    return pTcp;
+  }
+}
+
+void TcpRemotingClient::CloseTransport(const string& addr,
+                                       boost::shared_ptr<TcpTransport> pTcp) {
+  if (addr.empty()) {
+    return CloseNameServerTransport(pTcp);
+  }
+
+  bool bGetMutex = false;
+  boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock);
+  if (!lock.owns_lock()) {
+    if (!lock.timed_lock(
+            boost::get_system_time() +
+            boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+      LOG_ERROR("CloseTransport of:%s get timed_mutex timeout", addr.c_str());
+      return;
+    } else {
+      bGetMutex = true;
+    }
+  } else {
+    bGetMutex = true;
+  }
+  LOG_ERROR("CloseTransport of:%s", addr.c_str());
+  if (bGetMutex) {
+    bool removeItemFromTable = true;
+    if (m_tcpTable.find(addr) != m_tcpTable.end()) {
+      if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
+        LOG_INFO(
+            "tcpTransport with addr:%s has been closed before, and has been "
+            "created again, nothing to do",
+            addr.c_str());
+        removeItemFromTable = false;
+      }
+    } else {
+      LOG_INFO(
+          "tcpTransport with addr:%s had been removed from tcpTable before",
+          addr.c_str());
+      removeItemFromTable = false;
+    }
+
+    if (removeItemFromTable == true) {
+      LOG_WARN("closeTransport: disconnect broker:%s with state:%d",
+               addr.c_str(), m_tcpTable[addr]->getTcpConnectStatus());
+      if (m_tcpTable[addr]->getTcpConnectStatus() == e_connectSuccess)
+        m_tcpTable[addr]->disconnect(
+            addr);  // avoid coredump when connection with server was broken
+      LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
+      m_tcpTable.erase(addr);
+    }
+  } else {
+    LOG_WARN("CloseTransport::get tcpTransport mutex failed:%s", addr.c_str());
+    return;
+  }
+  LOG_ERROR("CloseTransport of:%s end", addr.c_str());
+}
+
+void TcpRemotingClient::CloseNameServerTransport(
+    boost::shared_ptr<TcpTransport> pTcp) {
+  bool bGetMutex = false;
+  boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock,
+                                              boost::try_to_lock);
+  if (!lock.owns_lock()) {
+    if (!lock.timed_lock(
+            boost::get_system_time() +
+            boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+      LOG_ERROR("CreateNameserverTransport get timed_mutex timeout");
+      return;
+    } else {
+      bGetMutex = true;
+    }
+  } else {
+    bGetMutex = true;
+  }
+  if (bGetMutex) {
+    string addr = m_namesrvAddrChoosed;
+    bool removeItemFromTable = true;
+    if (m_tcpTable.find(addr) != m_tcpTable.end()) {
+      if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
+        LOG_INFO(
+            "tcpTransport with addr:%s has been closed before, and has been "
+            "created again, nothing to do",
+            addr.c_str());
+        removeItemFromTable = false;
+      }
+    } else {
+      LOG_INFO(
+          "tcpTransport with addr:%s had been removed from tcpTable before",
+          addr.c_str());
+      removeItemFromTable = false;
+    }
+
+    if (removeItemFromTable == true) {
+      m_tcpTable[addr]->disconnect(
+          addr);  // avoid coredump when connection with server was broken
+      LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
+      m_tcpTable.erase(addr);
+      m_namesrvAddrChoosed.clear();
+    }
+  } else {
+    LOG_WARN("CloseNameServerTransport::get tcpTransport mutex failed:%s",
+             m_namesrvAddrChoosed.c_str());
+    return;
+  }
+}
+
+bool TcpRemotingClient::SendCommand(boost::shared_ptr<TcpTransport> pTts,
+                                    RemotingCommand& msg) {
+  const MemoryBlock* phead = msg.GetHead();
+  const MemoryBlock* pbody = msg.GetBody();
+
+  unique_ptr<MemoryOutputStream> result(new MemoryOutputStream(1024));
+  if (phead->getData()) {
+    result->write(phead->getData(), phead->getSize());
+  }
+  if (pbody->getData()) {
+    result->write(pbody->getData(), pbody->getSize());
+  }
+  const char* pData = static_cast<const char*>(result->getData());
+  int len = result->getDataSize();
+  return pTts->sendMessage(pData, len);
+}
+
+void TcpRemotingClient::static_messageReceived(void* context,
+                                               const MemoryBlock& mem,
+                                               const string& addr) {
+  TcpRemotingClient* pTcpRemotingClient = (TcpRemotingClient*)context;
+  if (pTcpRemotingClient) pTcpRemotingClient->messageReceived(mem, addr);
+}
+
+void TcpRemotingClient::messageReceived(const MemoryBlock& mem,
+                                        const string& addr) {
+  m_ioService.post(
+      boost::bind(&TcpRemotingClient::ProcessData, this, mem, addr));
+}
+
+void TcpRemotingClient::ProcessData(const MemoryBlock& mem,
+                                    const string& addr) {
+  RemotingCommand* pRespondCmd = NULL;
+  try {
+    pRespondCmd = RemotingCommand::Decode(mem);
+  } catch (...) {
+    LOG_ERROR("processData_error");
+    return;
+  }
+
+  int opaque = pRespondCmd->getOpaque();
+
+  //<!process self;
+  if (pRespondCmd->isResponseType()) {
+    boost::shared_ptr<ResponseFuture> pFuture(
+        findAndDeleteAsyncResponseFuture(opaque));
+    if (!pFuture) {
+      pFuture = findAndDeleteResponseFuture(opaque);
+      if (pFuture) {
+        if (pFuture->getSyncResponseFlag()) {
+          LOG_WARN("waitResponse already timeout of opaque:%d", opaque);
+          deleteAndZero(pRespondCmd);
+          return;
+        }
+        LOG_DEBUG("find_response opaque:%d", opaque);
+      } else {
+        LOG_DEBUG("responseFuture was deleted by timeout of opaque:%d", 
opaque);
+        deleteAndZero(pRespondCmd);
+        return;
+      }
+    }
+    processResponseCommand(pRespondCmd, pFuture);
+  } else {
+    processRequestCommand(pRespondCmd, addr);
+  }
+}
+
+void TcpRemotingClient::processResponseCommand(
+    RemotingCommand* pCmd, boost::shared_ptr<ResponseFuture> pfuture) {
+  int code = pfuture->getRequestCode();
+  int opaque = pCmd->getOpaque();
+  LOG_DEBUG("processResponseCommand, code:%d,opaque:%d", code, opaque);
+  pCmd->SetExtHeader(code);  // set head , for response use
+
+  pfuture->setResponse(pCmd);
+
+  if (pfuture->getASyncFlag()) {
+    if (!pfuture->getAsyncResponseFlag()) {
+      pfuture->setAsyncResponseFlag();
+      pfuture->setAsyncCallBackStatus(asyncCallBackStatus_response);
+      pfuture->executeInvokeCallback();
+      cancelTimerCallback(opaque);
+    }
+  }
+}
+
+void TcpRemotingClient::processRequestCommand(RemotingCommand* pCmd,
+                                              const string& addr) {
+  unique_ptr<RemotingCommand> pRequestCommand(pCmd);
+  int requestCode = pRequestCommand->getCode();
+  if (m_requestTable.find(requestCode) == m_requestTable.end()) {
+    LOG_ERROR("can_not_find request:%d processor", requestCode);
+  } else {
+    unique_ptr<RemotingCommand> pResponse(
+        m_requestTable[requestCode]->processRequest(addr,
+                                                    pRequestCommand.get()));
+    if (!pRequestCommand->isOnewayRPC()) {
+      if (pResponse) {
+        pResponse->setOpaque(pRequestCommand->getOpaque());
+        pResponse->markResponseType();
+        pResponse->Encode();
+
+        invokeOneway(addr, *pResponse);
+      }
+    }
+  }
+}
+
+void TcpRemotingClient::addResponseFuture(
+    int opaque, boost::shared_ptr<ResponseFuture> pfuture) {
+  boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
+  m_futureTable[opaque] = pfuture;
+}
+
+// Note: after call this function, shared_ptr of m_syncFutureTable[opaque] will
+// be erased, so caller must ensure the life cycle of returned shared_ptr;
+boost::shared_ptr<ResponseFuture>
+TcpRemotingClient::findAndDeleteResponseFuture(int opaque) {
+  boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
+  boost::shared_ptr<ResponseFuture> pResponseFuture;
+  if (m_futureTable.find(opaque) != m_futureTable.end()) {
+    pResponseFuture = m_futureTable[opaque];
+    m_futureTable.erase(opaque);
+  }
+  return pResponseFuture;
+}
+
+void TcpRemotingClient::handleAsyncPullForResponseTimeout(
+    const boost::system::error_code& e, int opaque) {
+  if (e == boost::asio::error::operation_aborted) {
+    return;
+  }
+
+  boost::shared_ptr<ResponseFuture> pFuture(
+      findAndDeleteAsyncResponseFuture(opaque));
+  if (pFuture && pFuture->getASyncFlag() && (pFuture->getAsyncCallbackWrap())) 
{
+    if ((pFuture->getAsyncResponseFlag() !=
+         true))  // if no response received, then check timeout or not
+    {
+      LOG_ERROR("no response got for opaque:%d", opaque);
+      pFuture->setAsyncCallBackStatus(asyncCallBackStatus_timeout);
+      pFuture->executeInvokeCallbackException();
+    }
+  }
+
+  eraseTimerCallback(opaque);
+}
+
+void TcpRemotingClient::addAsyncResponseFuture(
+    int opaque, boost::shared_ptr<ResponseFuture> pfuture) {
+  boost::lock_guard<boost::mutex> lock(m_asyncFutureLock);
+  m_asyncFutureTable[opaque] = pfuture;
+}
+
+// Note: after call this function, shared_ptr of m_asyncFutureTable[opaque] 
will
+// be erased, so caller must ensure the life cycle of returned shared_ptr;
+boost::shared_ptr<ResponseFuture>
+TcpRemotingClient::findAndDeleteAsyncResponseFuture(int opaque) {
+  boost::lock_guard<boost::mutex> lock(m_asyncFutureLock);
+  boost::shared_ptr<ResponseFuture> pResponseFuture;
+  if (m_asyncFutureTable.find(opaque) != m_asyncFutureTable.end()) {
+    pResponseFuture = m_asyncFutureTable[opaque];
+    m_asyncFutureTable.erase(opaque);
+  }
+
+  return pResponseFuture;
+}
+
+void TcpRemotingClient::registerProcessor(
+    MQRequestCode requestCode,
+    ClientRemotingProcessor* clientRemotingProcessor) {
+  if (m_requestTable.find(requestCode) != m_requestTable.end())
+    m_requestTable.erase(requestCode);
+  m_requestTable[requestCode] = clientRemotingProcessor;
+}
+
+void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t,
+                                         int opaque) {
+  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
+  if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
+    // AGENT_INFO("addTimerCallback:erase timerCallback opaque:%lld", opaque);
+    boost::asio::deadline_timer* old_t = m_async_timer_map[opaque];
+    old_t->cancel();
+    delete old_t;
+    old_t = NULL;
+    m_async_timer_map.erase(opaque);
+  }
+  m_async_timer_map[opaque] = t;
+}
+
+void TcpRemotingClient::eraseTimerCallback(int opaque) {
+  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
+  if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
+    boost::asio::deadline_timer* t = m_async_timer_map[opaque];
+    delete t;
+    t = NULL;
+    m_async_timer_map.erase(opaque);
+  }
+}
+
+void TcpRemotingClient::cancelTimerCallback(int opaque) {
+  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
+  if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
+    // AGENT_INFO("cancel timerCallback opaque:%lld", opaque);
+    boost::asio::deadline_timer* t = m_async_timer_map[opaque];
+    t->cancel();
+    delete t;
+    t = NULL;
+    m_async_timer_map.erase(opaque);
+  }
+}
+
+void TcpRemotingClient::removeAllTimerCallback() {
+  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
+  for (asyncTimerMap::iterator it = m_async_timer_map.begin();
+       it != m_async_timer_map.end(); ++it) {
+    boost::asio::deadline_timer* t = it->second;
+    t->cancel();
+    delete t;
+    t = NULL;
+  }
+  m_async_timer_map.clear();
+}
+
+//<!************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/TcpRemotingClient.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/transport/TcpRemotingClient.h 
b/rocketmq-cpp/src/transport/TcpRemotingClient.h
new file mode 100755
index 0000000..832b49a
--- /dev/null
+++ b/rocketmq-cpp/src/transport/TcpRemotingClient.h
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TCPREMOTINGCLIENT_H__
+#define __TCPREMOTINGCLIENT_H__
+
+#include <boost/asio.hpp>
+#include <boost/asio/io_service.hpp>
+#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/weak_ptr.hpp>
+#include <map>
+#include "ClientRemotingProcessor.h"
+#include "RemotingCommand.h"
+#include "ResponseFuture.h"
+#include "SocketUtil.h"
+#include "TcpTransport.h"
+
+namespace rocketmq {
+//<!************************************************************************
+
+class TcpRemotingClient {
+ public:
+  TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout,
+                    uint64_t tcpTransportTryLockTimeout);
+  virtual ~TcpRemotingClient();
+  void stopAllTcpTransportThread();
+  void updateNameServerAddressList(const string& addrs);
+
+  //<!delete outsite;
+  RemotingCommand* invokeSync(const string& addr, RemotingCommand& request,
+                              int timeoutMillis = 3000);
+
+  bool invokeHeartBeat(const string& addr, RemotingCommand& request);
+
+  bool invokeAsync(const string& addr, RemotingCommand& request,
+                   AsyncCallbackWrap* cbw, int64 timeoutMilliseconds);
+
+  void invokeOneway(const string& addr, RemotingCommand& request);
+
+  void ProcessData(const MemoryBlock& mem, const string& addr);
+
+  void registerProcessor(MQRequestCode requestCode,
+                         ClientRemotingProcessor* clientRemotingProcessor);
+
+  void boost_asio_work();
+  void handleAsyncPullForResponseTimeout(const boost::system::error_code& e,
+                                         int opaque);
+
+ private:
+  static void static_messageReceived(void* context, const MemoryBlock& mem,
+                                     const string& addr);
+  void messageReceived(const MemoryBlock& mem, const string& addr);
+  boost::shared_ptr<TcpTransport> GetTransport(const string& addr,
+                                               bool needRespons);
+  boost::shared_ptr<TcpTransport> CreateTransport(const string& addr,
+                                                  bool needRespons);
+  boost::shared_ptr<TcpTransport> CreateNameserverTransport(bool needRespons);
+  void CloseTransport(const string& addr, boost::shared_ptr<TcpTransport> 
pTcp);
+  void CloseNameServerTransport(boost::shared_ptr<TcpTransport> pTcp);
+  bool SendCommand(boost::shared_ptr<TcpTransport> pTts, RemotingCommand& msg);
+  void processRequestCommand(RemotingCommand* pCmd, const string& addr);
+  void processResponseCommand(RemotingCommand* pCmd,
+                              boost::shared_ptr<ResponseFuture> pfuture);
+
+  void addResponseFuture(int opaque, boost::shared_ptr<ResponseFuture> 
pfuture);
+  boost::shared_ptr<ResponseFuture> findAndDeleteResponseFuture(int opaque);
+
+  void addAsyncResponseFuture(int opaque,
+                              boost::shared_ptr<ResponseFuture> pfuture);
+  boost::shared_ptr<ResponseFuture> findAndDeleteAsyncResponseFuture(
+      int opaque);
+
+  void addTimerCallback(boost::asio::deadline_timer* t, int opaque);
+  void eraseTimerCallback(int opaque);
+  void cancelTimerCallback(int opaque);
+  void removeAllTimerCallback();
+
+ private:
+  typedef map<string, boost::shared_ptr<TcpTransport>> TcpMap;
+  typedef map<int, boost::shared_ptr<ResponseFuture>> ResMap;
+
+  typedef map<int, ClientRemotingProcessor*> RequestMap;
+  RequestMap m_requestTable;
+
+  boost::mutex m_futureTableMutex;
+  ResMap m_futureTable;  //<! id->future;
+
+  ResMap m_asyncFutureTable;
+  boost::mutex m_asyncFutureLock;
+
+  TcpMap m_tcpTable;  //<! ip->tcp;
+  boost::timed_mutex m_tcpLock;
+
+  // ThreadPool        m_threadpool;
+  int m_pullThreadNum;
+  uint64_t m_tcpConnectTimeout;           // ms
+  uint64_t m_tcpTransportTryLockTimeout;  // s
+
+  //<! Nameserver
+  boost::timed_mutex m_namesrvlock;
+  vector<string> m_namesrvAddrList;
+  string m_namesrvAddrChoosed;
+  unsigned int m_namesrvIndex;
+  boost::asio::io_service m_ioService;
+  boost::thread_group m_threadpool;
+  boost::asio::io_service::work m_ioServiceWork;
+
+  boost::asio::io_service m_async_ioService;
+  unique_ptr<boost::thread> m_async_service_thread;
+
+  typedef map<int, boost::asio::deadline_timer*> asyncTimerMap;
+  boost::mutex m_timerMapMutex;
+  asyncTimerMap m_async_timer_map;
+};
+
+//<!************************************************************************
+}  //<!end namespace;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/TcpTransport.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/transport/TcpTransport.cpp 
b/rocketmq-cpp/src/transport/TcpTransport.cpp
new file mode 100755
index 0000000..82e9526
--- /dev/null
+++ b/rocketmq-cpp/src/transport/TcpTransport.cpp
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TcpTransport.h"
+#include <arpa/inet.h>  // for sockaddr_in and inet_ntoa...
+#include <netinet/tcp.h>
+#include <sys/socket.h>  // for socket(), bind(), and connect()...
+#include "Logging.h"
+#include "TcpRemotingClient.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+//<!************************************************************************
+TcpTransport::TcpTransport(TcpRemotingClient *pTcpRemointClient,
+                           READ_CALLBACK handle /* = NULL */)
+    : m_tcpConnectStatus(e_connectInit),
+      m_ReadDatathread(NULL),
+      m_readcallback(handle),
+      m_tcpRemotingClient(pTcpRemointClient) {
+  m_startTime = UtilAll::currentTimeMillis();
+  evthread_use_pthreads();
+  m_eventBase = NULL;
+  m_bufferEvent = NULL;
+}
+TcpTransport::~TcpTransport() {
+  m_readcallback = NULL;
+  m_bufferEvent = NULL;
+  m_eventBase = NULL;
+}
+
+tcpConnectStatus TcpTransport::connect(const string &strServerURL,
+                                       int timeOutMillisecs /* = 3000 */) {
+  string hostName;
+  short portNumber;
+  if (!UtilAll::SplitURL(strServerURL, hostName, portNumber)) {
+    return e_connectFail;
+  }
+
+  boost::lock_guard<boost::mutex> lock(m_socketLock);
+
+  struct sockaddr_in sin;
+  memset(&sin, 0, sizeof(sin));
+  sin.sin_family = AF_INET;
+  sin.sin_addr.s_addr = inet_addr(hostName.c_str());
+  sin.sin_port = htons(portNumber);
+
+  m_eventBase = event_base_new();
+  m_bufferEvent = bufferevent_socket_new(
+      m_eventBase, -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);
+  bufferevent_setcb(m_bufferEvent, readNextMessageIntCallback, NULL, eventcb,
+                    this);
+  bufferevent_enable(m_bufferEvent, EV_READ | EV_WRITE);
+  bufferevent_setwatermark(m_bufferEvent, EV_READ, 4, 0);
+
+  setTcpConnectStatus(e_connectWaitResponse);
+  if (bufferevent_socket_connect(m_bufferEvent, (struct sockaddr *)&sin,
+                                 sizeof(sin)) < 0) {
+    LOG_INFO("connect to fd:%d failed", bufferevent_getfd(m_bufferEvent));
+    setTcpConnectStatus(e_connectFail);
+    freeBufferEvent();
+    return e_connectFail;
+  } else {
+    int fd = bufferevent_getfd(m_bufferEvent);
+    LOG_INFO("try to connect to fd:%d, addr:%s", fd, (hostName.c_str()));
+    /*struct timeval timeout;
+    timeout.tv_sec = timeOutMillisecs/1000;
+    timeout.tv_usec = 0;
+    struct event* evtimeout = evtimer_new(m_eventBase, timeoutcb, this);
+    evtimer_add(evtimeout, &timeout);*/
+    evthread_make_base_notifiable(m_eventBase);
+    m_ReadDatathread =
+        new boost::thread(boost::bind(&TcpTransport::runThread, this));
+    return e_connectWaitResponse;
+  }
+}
+
+void TcpTransport::setTcpConnectStatus(tcpConnectStatus connectStatus) {
+  m_tcpConnectStatus = connectStatus;
+}
+
+tcpConnectStatus TcpTransport::getTcpConnectStatus() {
+  return m_tcpConnectStatus;
+}
+
+tcpConnectStatus TcpTransport::waitTcpConnectEvent(int timeoutMillisecs) {
+  boost::unique_lock<boost::mutex> lk(m_connectEventLock);
+  if (!m_connectEvent.timed_wait(
+          lk, boost::posix_time::milliseconds(timeoutMillisecs))) {
+    LOG_INFO("connect timeout");
+  }
+  return getTcpConnectStatus();
+}
+
+void TcpTransport::setTcpConnectEvent(tcpConnectStatus connectStatus) {
+  tcpConnectStatus baseStatus(getTcpConnectStatus());
+  setTcpConnectStatus(connectStatus);
+  if (baseStatus == e_connectWaitResponse) {
+    LOG_INFO("received libevent callback event");
+    m_connectEvent.notify_all();
+  }
+}
+
+void TcpTransport::disconnect(const string &addr) {
+  boost::lock_guard<boost::mutex> lock(m_socketLock);
+  if (getTcpConnectStatus() != e_connectInit) {
+    clearBufferEventCallback();
+    LOG_INFO("disconnect:%s start", addr.c_str());
+    m_connectEvent.notify_all();
+    setTcpConnectStatus(e_connectInit);
+    if (m_ReadDatathread) {
+      m_ReadDatathread->interrupt();
+      exitBaseDispatch();
+      while (m_ReadDatathread->timed_join(boost::posix_time::seconds(1)) ==
+             false) {
+        LOG_WARN("join readDataThread fail, retry");
+        m_ReadDatathread->interrupt();
+        exitBaseDispatch();
+      }
+      delete m_ReadDatathread;
+      m_ReadDatathread = NULL;
+    }
+    freeBufferEvent();
+    LOG_INFO("disconnect:%s completely", addr.c_str());
+  }
+}
+
+void TcpTransport::clearBufferEventCallback() {
+  if (m_bufferEvent) {
+    // Bufferevents are internally reference-counted, so if the bufferevent has
+    // pending deferred callbacks when you free it, it won't be deleted until
+    // the callbacks are done.
+    // so just empty callback to avoid future callback by libevent
+    bufferevent_setcb(m_bufferEvent, NULL, NULL, NULL, NULL);
+  }
+}
+
+void TcpTransport::freeBufferEvent() {
+  if (m_bufferEvent) {
+    bufferevent_free(m_bufferEvent);
+  }
+  if (m_eventBase) {
+    event_base_free(m_eventBase);
+  }
+}
+void TcpTransport::exitBaseDispatch() {
+  if (m_eventBase) {
+    event_base_loopbreak(m_eventBase);
+    // event_base_loopexit(m_eventBase, NULL);  //Note: memory leak will be
+    // occured when timer callback was not done;
+  }
+}
+
+void TcpTransport::runThread() {
+  while (m_ReadDatathread) {
+    if (m_eventBase != NULL) {
+      event_base_dispatch(m_eventBase);
+      // event_base_loop(m_eventBase, EVLOOP_ONCE);//EVLOOP_NONBLOCK should not
+      // be used, as could not callback event immediatly
+    }
+    LOG_INFO("event_base_dispatch exit once");
+    boost::this_thread::sleep(boost::posix_time::milliseconds(1));
+    if (getTcpConnectStatus() != e_connectSuccess) return;
+  }
+}
+
+void TcpTransport::timeoutcb(evutil_socket_t fd, short what, void *arg) {
+  LOG_INFO("timeoutcb: received  event:%d on fd:%d", what, fd);
+  TcpTransport *tcpTrans = (TcpTransport *)arg;
+  if (tcpTrans->getTcpConnectStatus() != e_connectSuccess) {
+    LOG_INFO("timeoutcb: after connect time, tcp was not established on fd:%d",
+             fd);
+    tcpTrans->setTcpConnectStatus(e_connectFail);
+  } else {
+    LOG_INFO("timeoutcb: after connect time, tcp was established on fd:%d", 
fd);
+  }
+}
+
+void TcpTransport::eventcb(struct bufferevent *bev, short what, void *ctx) {
+  evutil_socket_t fd = bufferevent_getfd(bev);
+  TcpTransport *tcpTrans = (TcpTransport *)ctx;
+  LOG_INFO("eventcb: received event:%x on fd:%d", what, fd);
+  if (what & BEV_EVENT_CONNECTED) {
+    int val = 1;
+    setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&val, sizeof(val));
+    LOG_INFO("eventcb:connect to fd:%d successfully", fd);
+    tcpTrans->setTcpConnectEvent(e_connectSuccess);
+  } else if (what & (BEV_EVENT_ERROR | BEV_EVENT_EOF | BEV_EVENT_READING |
+                     BEV_EVENT_WRITING)) {
+    LOG_INFO("eventcb:rcv error event cb:%x on fd:%d", what, fd);
+    tcpTrans->setTcpConnectEvent(e_connectFail);
+    bufferevent_setcb(bev, NULL, NULL, NULL, NULL);
+    // bufferevent_disable(bev, EV_READ|EV_WRITE);
+    // bufferevent_free(bev);
+  } else {
+    LOG_ERROR("eventcb: received error event:%d on fd:%d", what, fd);
+  }
+}
+
+void TcpTransport::readNextMessageIntCallback(struct bufferevent *bev,
+                                              void *ctx) {
+  /* This callback is invoked when there is data to read on bev. */
+
+  // protocol:  <length> <header length> <header data> <body data>
+  //                    1                   2                       3 4
+  // rocketmq protocol contains 4 parts as following:
+  //     1��big endian 4 bytes int, its length is sum of 2,3 and 4
+  //     2��big endian 4 bytes int, its length is 3
+  //     3��use json to serialization data
+  //     4��application could self-defination binary data
+
+  struct evbuffer *input = bufferevent_get_input(bev);
+  while (1) {
+    struct evbuffer_iovec v[4];
+    int n = evbuffer_peek(input, 4, NULL, v, sizeof(v) / sizeof(v[0]));
+
+    int idx = 0;
+    char hdr[4];
+    char *p = hdr;
+    unsigned int needed = 4;
+
+    for (idx = 0; idx < n; idx++) {
+      if (needed) {
+        unsigned int tmp = needed < v[idx].iov_len ? needed : v[idx].iov_len;
+        memcpy(p, v[idx].iov_base, tmp);
+        p += tmp;
+        needed -= tmp;
+      } else {
+        break;
+      }
+    }
+
+    if (needed) {
+      LOG_DEBUG(" too little data received with sum = %d ", 4 - needed);
+      return;
+    }
+    uint32 totalLenOfOneMsg =
+        *(uint32 *)hdr;  // first 4 bytes, which indicates 1st part of protocol
+    uint32 bytesInMessage = ntohl(totalLenOfOneMsg);
+    LOG_DEBUG("fd:%d, totalLen:%zu, bytesInMessage:%d", bufferevent_getfd(bev),
+              v[0].iov_len, bytesInMessage);
+
+    uint32 len = evbuffer_get_length(input);
+    if (len >= bytesInMessage + 4) {
+      LOG_DEBUG("had received all data with len:%d from fd:%d", len,
+                bufferevent_getfd(bev));
+    } else {
+      LOG_DEBUG(
+          "didn't received whole bytesInMessage:%d, from fd:%d, totalLen:%d",
+          bytesInMessage, bufferevent_getfd(bev), len);
+      return;  // consider large data which was not received completely by now
+    }
+
+    if (bytesInMessage > 0) {
+      MemoryBlock messageData(bytesInMessage, true);
+      uint32 bytesRead = 0;
+      void *data = (void *)(messageData.getData() + bytesRead);
+      bufferevent_read(bev, data, 4);
+      bytesRead = bufferevent_read(bev, data, bytesInMessage);
+
+      TcpTransport *tcpTrans = (TcpTransport *)ctx;
+      tcpTrans->messageReceived(messageData);
+    }
+  }
+}
+
+bool TcpTransport::sendMessage(const char *pData, int len) {
+  boost::lock_guard<boost::mutex> lock(m_socketLock);
+  if (getTcpConnectStatus() != e_connectSuccess) {
+    return false;
+  }
+
+  int bytes_left = len;
+  int bytes_written = 0;
+  const char *ptr = pData;
+
+  /*NOTE:
+      1. do not need to consider large data which could not send by once, as
+     bufferevent could handle this case;
+  */
+  if (m_bufferEvent) {
+    bytes_written = bufferevent_write(m_bufferEvent, ptr, bytes_left);
+    if (bytes_written == 0)
+      return true;
+    else
+      return false;
+  }
+  return false;
+}
+
+void TcpTransport::messageReceived(const MemoryBlock &mem) {
+  if (m_readcallback) {
+    m_readcallback(m_tcpRemotingClient, mem, getPeerAddrAndPort());
+  }
+}
+
+const string TcpTransport::getPeerAddrAndPort() {
+  struct sockaddr_in broker;
+  socklen_t cLen = sizeof(broker);
+
+  // getsockname(m_socket->getRawSocketHandle(), (struct sockaddr*) &s, &sLen);
+  // // ! use connectSock here.
+  getpeername(bufferevent_getfd(m_bufferEvent), (struct sockaddr *)&broker,
+              &cLen);  // ! use connectSock here.
+  LOG_DEBUG("broker addr: %s, broker port: %d", inet_ntoa(broker.sin_addr),
+            ntohs(broker.sin_port));
+  string brokerAddr(inet_ntoa(broker.sin_addr));
+  brokerAddr.append(":");
+  string brokerPort(UtilAll::to_string(ntohs(broker.sin_port)));
+  brokerAddr.append(brokerPort);
+  LOG_DEBUG("brokerAddr:%s", brokerAddr.c_str());
+  return brokerAddr;
+}
+
+const uint64_t TcpTransport::getStartTime() const { return m_startTime; }
+
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/TcpTransport.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/transport/TcpTransport.h 
b/rocketmq-cpp/src/transport/TcpTransport.h
new file mode 100755
index 0000000..7ba32dc
--- /dev/null
+++ b/rocketmq-cpp/src/transport/TcpTransport.h
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TCPTRANSPORT_H__
+#define __TCPTRANSPORT_H__
+
+#include <boost/atomic.hpp>
+#include <boost/thread/condition_variable.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/thread.hpp>
+#include "dataBlock.h"
+
+extern "C" {
+#include "event2/buffer.h"
+#include "event2/bufferevent.h"
+#include "event2/event.h"
+#include "event2/thread.h"
+}
+
+namespace rocketmq {
+//<!***************************************************************************
+typedef enum {
+  e_connectInit = 0,
+  e_connectWaitResponse = 1,
+  e_connectSuccess = 2,
+  e_connectFail = 3
+} tcpConnectStatus;
+
+typedef void (*READ_CALLBACK)(void *context, const MemoryBlock &,
+                              const std::string &);
+class TcpRemotingClient;
+class TcpTransport {
+ public:
+  TcpTransport(TcpRemotingClient *pTcpRemointClient,
+               READ_CALLBACK handle = NULL);
+  virtual ~TcpTransport();
+
+  tcpConnectStatus connect(const std::string &strServerURL,
+                           int timeOutMillisecs = 3000);
+  void disconnect(const std::string &addr);
+  tcpConnectStatus waitTcpConnectEvent(int timeoutMillisecs = 3000);
+  void setTcpConnectStatus(tcpConnectStatus connectStatus);
+  tcpConnectStatus getTcpConnectStatus();
+  bool sendMessage(const char *pData, int len);
+  const std::string getPeerAddrAndPort();
+  const uint64_t getStartTime() const;
+
+ private:
+  void messageReceived(const MemoryBlock &mem);
+  static void readNextMessageIntCallback(struct bufferevent *bev, void *ctx);
+  static void eventcb(struct bufferevent *bev, short what, void *ctx);
+  static void timeoutcb(evutil_socket_t fd, short what, void *arg);
+  void runThread();
+  void clearBufferEventCallback();
+  void freeBufferEvent();
+  void exitBaseDispatch();
+  void setTcpConnectEvent(tcpConnectStatus connectStatus);
+
+ private:
+  uint64_t m_startTime;
+  boost::mutex m_socketLock;
+  struct event_base *m_eventBase;
+  struct bufferevent *m_bufferEvent;
+  boost::atomic<tcpConnectStatus> m_tcpConnectStatus;
+  boost::mutex m_connectEventLock;
+  boost::condition_variable_any m_connectEvent;
+  //<!read data thread
+  boost::thread *m_ReadDatathread;
+
+  //<! read data callback
+  READ_CALLBACK m_readcallback;
+  TcpRemotingClient *m_tcpRemotingClient;
+};
+
+//<!************************************************************************
+}  //<!end namespace;
+
+#endif


Reply via email to