DRILL-1498 Drill Client to handle spurious results and handshake messages
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4304b25e Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4304b25e Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4304b25e Branch: refs/heads/master Commit: 4304b25e8246970163966e5790231d7ffdbd308f Parents: 621527d Author: norrislee <norrisle...@hotmail.com> Authored: Tue Jan 6 14:44:00 2015 -0800 Committer: Parth Chandra <pchan...@maprtech.com> Committed: Tue Jan 6 15:31:26 2015 -0800 ---------------------------------------------------------------------- .../client/src/clientlib/drillClientImpl.cpp | 41 ++++++++++++++++++-- .../native/client/src/clientlib/rpcDecoder.cpp | 2 + .../native/client/src/clientlib/rpcMessage.hpp | 4 ++ 3 files changed, 44 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/4304b25e/contrib/native/client/src/clientlib/drillClientImpl.cpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp index 5b390ef..6f4a2ca 100644 --- a/contrib/native/client/src/clientlib/drillClientImpl.cpp +++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp @@ -478,6 +478,17 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer qid.CopyFrom(qr->query_id()); std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it; + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_queryResults size: " << m_queryResults.size() << std::endl; + if(m_queryResults.size() != 0){ + for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++){ + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_QueryResult ids: [" << it->first->part1() << ":" + << it->first->part2() << "]\n"; + } + } + if(qid.part1()==0){ + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: QID=0. Ignore and return QRY_SUCCESS." << std::endl; + return QRY_SUCCESS; + } it=this->m_queryResults.find(&qid); if(it!=this->m_queryResults.end()){ pDrillClientQueryResult=(*it).second; @@ -610,6 +621,13 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, InB boost::lock_guard<boost::mutex> lock(m_dcMutex); std::map<int,DrillClientQueryResult*>::iterator it; + for(it=this->m_queryIds.begin();it!=this->m_queryIds.end();it++){ + DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: " << it->first << std::endl; + } + if(msg.m_coord_id==0){ + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl; + return QRY_SUCCESS; + } it=this->m_queryIds.find(msg.m_coord_id); if(it!=this->m_queryIds.end()){ pDrillClientQueryResult=(*it).second; @@ -755,10 +773,27 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf, handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL); return; }else{ - //If not QUERY_RESULT, then we think something serious has gone wrong? - assert(0); - DRILL_LOG(LOG_TRACE) << "QueryResult returned " << msg.m_rpc_type << std::endl; + // If not QUERY_RESULT, then we think something serious has gone wrong? + // In one case when the client hung, we observed that the server was sending a handshake request to the client + // We should properly handle these handshake requests/responses + if(msg.has_rpc_type() && msg.m_rpc_type==exec::user::HANDSHAKE){ + if(msg.has_mode() && msg.m_mode==exec::rpc::REQUEST){ + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n"; + exec::user::UserToBitHandshake u2b; + u2b.set_channel(exec::shared::USER); + u2b.set_rpc_version(DRILL_RPC_VERSION); + u2b.set_support_listening(true); + OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, exec::user::HANDSHAKE, msg.m_coord_id, &u2b); + sendSync(out_msg); + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n"; + }else{ + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n"; + } + }else{ + DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. " + << "QueryResult returned " << msg.m_rpc_type << std::endl; handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL); + } return; } } http://git-wip-us.apache.org/repos/asf/drill/blob/4304b25e/contrib/native/client/src/clientlib/rpcDecoder.cpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/rpcDecoder.cpp b/contrib/native/client/src/clientlib/rpcDecoder.cpp index c1001fd..d3cf50c 100644 --- a/contrib/native/client/src/clientlib/rpcDecoder.cpp +++ b/contrib/native/client/src/clientlib/rpcDecoder.cpp @@ -84,8 +84,10 @@ int RpcDecoder::Decode(const uint8_t* buf, int length, InBoundRpcMessage& msg) { int header_limit = cis->PushLimit(header_length); header.ParseFromCodedStream(cis); cis->PopLimit(header_limit); + msg.m_has_mode = header.has_mode(); msg.m_mode = header.mode(); msg.m_coord_id = header.coordination_id(); + msg.m_has_rpc_type = header.has_rpc_type(); msg.m_rpc_type = header.rpc_type(); //if(RpcConstants.EXTRA_DEBUGGING) logger.debug(" post header read index {}", buffer.readerIndex()); http://git-wip-us.apache.org/repos/asf/drill/blob/4304b25e/contrib/native/client/src/clientlib/rpcMessage.hpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/rpcMessage.hpp b/contrib/native/client/src/clientlib/rpcMessage.hpp index fa92c42..6696971 100644 --- a/contrib/native/client/src/clientlib/rpcMessage.hpp +++ b/contrib/native/client/src/clientlib/rpcMessage.hpp @@ -33,6 +33,10 @@ class InBoundRpcMessage { int m_coord_id; DataBuf m_pbody; ByteBuf_t m_dbody; + bool m_has_mode; + bool m_has_rpc_type; + bool has_mode() { return m_has_mode; }; + bool has_rpc_type() { return m_has_rpc_type; }; }; class OutBoundRpcMessage {