Github user sohami commented on a diff in the pull request: https://github.com/apache/drill/pull/950#discussion_r141245863 --- Diff: contrib/native/client/src/clientlib/drillClientImpl.cpp --- @@ -65,108 +66,70 @@ struct ToRpcType: public std::unary_function<google::protobuf::int32, exec::user return static_cast<exec::user::RpcType>(i); } }; -} -connectionStatus_t DrillClientImpl::connect(const char* connStr, DrillUserProperties* props){ - std::string pathToDrill, protocol, hostPortStr; - std::string host; - std::string port; +} // anonymous - if (this->m_bIsConnected) { - if(std::strcmp(connStr, m_connectStr.c_str())){ // trying to connect to a different address is not allowed if already connected +connectionStatus_t DrillClientImpl::connect(const char* connStr, DrillUserProperties* props){ + if (this->m_bIsConnected || (this->m_pChannelContext!=NULL && this->m_pChannel!=NULL)) { + if(!std::strcmp(connStr, m_connectStr.c_str())){ + // trying to connect to a different address is not allowed if already connected return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN)); } return CONN_SUCCESS; } + std::string val; + channelType_t type = ( props->isPropSet(USERPROP_USESSL) && + props->getProp(USERPROP_USESSL, val) =="true") ? + CHANNEL_TYPE_SSLSTREAM : + CHANNEL_TYPE_SOCKET; - m_connectStr=connStr; - Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr); - if(protocol == "zk"){ - ZookeeperClient zook(pathToDrill); - std::vector<std::string> drillbits; - int err = zook.getAllDrillbits(hostPortStr, drillbits); - if(!err){ - if (drillbits.empty()){ - return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_ZKNODBIT)); - } - Utils::shuffle(drillbits); - exec::DrillbitEndpoint endpoint; - err = zook.getEndPoint(drillbits[drillbits.size() -1], endpoint);// get the last one in the list - if(!err){ - host=boost::lexical_cast<std::string>(endpoint.address()); - port=boost::lexical_cast<std::string>(endpoint.user_port()); - } - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" << (drillbits.size() - 1) << ">. Selected " << endpoint.DebugString() << std::endl;) - - } - if(err){ - return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str())); - } - zook.close(); - m_bIsDirectConnection=true; - }else if(protocol == "local"){ - boost::lock_guard<boost::mutex> lock(m_dcMutex);//strtok is not reentrant - char tempStr[MAX_CONNECT_STR+1]; - strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0; - host=strtok(tempStr, ":"); - port=strtok(NULL, ""); - m_bIsDirectConnection=false; - }else{ - return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str())); - } - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << host << ":" << port << std::endl;) - std::string serviceHost; - for (size_t i = 0; i < props->size(); i++) { - if (props->keyAt(i) == USERPROP_SERVICE_HOST) { - serviceHost = props->valueAt(i); - } + connectionStatus_t ret = CONN_SUCCESS; + m_pChannelContext = ChannelContextFactory::getChannelContext(type, props); + m_pChannel= ChannelFactory::getChannel(type, m_io_service, connStr); + ret=m_pChannel->init(m_pChannelContext); + if(ret!=CONN_SUCCESS){ + handleConnError(m_pChannel->getError()); + return ret; } - if (serviceHost.empty()) { - props->setProperty(USERPROP_SERVICE_HOST, host); + ret= m_pChannel->connect(); + if(ret!=CONN_SUCCESS){ + handleConnError(m_pChannel->getError()); + return ret; } - connectionStatus_t ret = this->connect(host.c_str(), port.c_str()); + props->setProperty(USERPROP_SERVICE_HOST, m_pChannel->getEndpoint()->getHost()); return ret; } -connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){ - using boost::asio::ip::tcp; - tcp::endpoint endpoint; - try{ - tcp::resolver resolver(m_io_service); - tcp::resolver::query query(tcp::v4(), host, port); - tcp::resolver::iterator iter = resolver.resolve(query); - tcp::resolver::iterator end; - while (iter != end){ - endpoint = *iter++; - DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << endpoint << std::endl;) - } - boost::system::error_code ec; - m_socket.connect(endpoint, ec); - if(ec){ - return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_FAILURE, host, port, ec.message().c_str())); - } - - }catch(const std::exception & e){ - // Handle case when the hostname cannot be resolved. "resolve" is hard-coded in boost asio resolver.resolve - if (!strcmp(e.what(), "resolve")) { - return handleConnError(CONN_HOSTNAME_RESOLUTION_ERROR, getMessage(ERR_CONN_EXCEPT, e.what())); +connectionStatus_t DrillClientImpl::connect(const char* host, const char* port, DrillUserProperties* props){ + if (this->m_bIsConnected || (this->m_pChannelContext!=NULL && this->m_pChannel!=NULL)) { --- End diff -- I think the second condition is not sufficient to say if there is already a valid connection. Instead there should be `isConnected()` method in channel which will `return m_state == CHANNEL_CONNECTED` Also why we need both `this->m_bIsConnected` and second condition ?
---