Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/950#discussion_r141742537
  
    --- Diff: contrib/native/client/src/clientlib/drillClientImpl.cpp ---
    @@ -65,108 +66,70 @@ struct ToRpcType: public 
std::unary_function<google::protobuf::int32, exec::user
                return static_cast<exec::user::RpcType>(i);
        }
     };
    -}
    -connectionStatus_t DrillClientImpl::connect(const char* connStr, 
DrillUserProperties* props){
    -    std::string pathToDrill, protocol, hostPortStr;
    -    std::string host;
    -    std::string port;
    +} // anonymous
     
    -    if (this->m_bIsConnected) {
    -        if(std::strcmp(connStr, m_connectStr.c_str())){ // trying to 
connect to a different address is not allowed if already connected
    +connectionStatus_t DrillClientImpl::connect(const char* connStr, 
DrillUserProperties* props){
    +    if (this->m_bIsConnected || (this->m_pChannelContext!=NULL && 
this->m_pChannel!=NULL)) {
    +        if(!std::strcmp(connStr, m_connectStr.c_str())){
    +            // trying to connect to a different address is not allowed if 
already connected
                 return handleConnError(CONN_ALREADYCONNECTED, 
getMessage(ERR_CONN_ALREADYCONN));
             }
             return CONN_SUCCESS;
         }
    +    std::string val;
    +    channelType_t type = ( props->isPropSet(USERPROP_USESSL) &&
    +            props->getProp(USERPROP_USESSL, val) =="true") ?
    +        CHANNEL_TYPE_SSLSTREAM :
    +        CHANNEL_TYPE_SOCKET;
     
    -    m_connectStr=connStr;
    -    Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
    -    if(protocol == "zk"){
    -        ZookeeperClient zook(pathToDrill);
    -        std::vector<std::string> drillbits;
    -        int err = zook.getAllDrillbits(hostPortStr, drillbits);
    -        if(!err){
    -            if (drillbits.empty()){
    -                return handleConnError(CONN_FAILURE, 
getMessage(ERR_CONN_ZKNODBIT));
    -            }
    -            Utils::shuffle(drillbits);
    -            exec::DrillbitEndpoint endpoint;
    -            err = zook.getEndPoint(drillbits[drillbits.size() -1], 
endpoint);// get the last one in the list
    -            if(!err){
    -                host=boost::lexical_cast<std::string>(endpoint.address());
    -                
port=boost::lexical_cast<std::string>(endpoint.user_port());
    -            }
    -            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" << 
(drillbits.size() - 1)  << ">. Selected " << endpoint.DebugString() << 
std::endl;)
    -
    -        }
    -        if(err){
    -            return handleConnError(CONN_ZOOKEEPER_ERROR, 
getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
    -        }
    -        zook.close();
    -        m_bIsDirectConnection=true;
    -    }else if(protocol == "local"){
    -        boost::lock_guard<boost::mutex> lock(m_dcMutex);//strtok is not 
reentrant
    -        char tempStr[MAX_CONNECT_STR+1];
    -        strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); 
tempStr[MAX_CONNECT_STR]=0;
    -        host=strtok(tempStr, ":");
    -        port=strtok(NULL, "");
    -        m_bIsDirectConnection=false;
    -    }else{
    -        return handleConnError(CONN_INVALID_INPUT, 
getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
    -    }
    -    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: " << 
host << ":" << port << std::endl;)
    -    std::string serviceHost;
    -    for (size_t i = 0; i < props->size(); i++) {
    -        if (props->keyAt(i) == USERPROP_SERVICE_HOST) {
    -            serviceHost = props->valueAt(i);
    -        }
    +    connectionStatus_t ret = CONN_SUCCESS;
    +    m_pChannelContext = ChannelContextFactory::getChannelContext(type, 
props);
    +    m_pChannel= ChannelFactory::getChannel(type, m_io_service, connStr);
    +    ret=m_pChannel->init(m_pChannelContext);
    +    if(ret!=CONN_SUCCESS){
    +        handleConnError(m_pChannel->getError());
    +        return ret;
         }
    -    if (serviceHost.empty()) {
    -        props->setProperty(USERPROP_SERVICE_HOST, host);
    +    ret= m_pChannel->connect();
    +    if(ret!=CONN_SUCCESS){
    +        handleConnError(m_pChannel->getError());
    +        return ret;
         }
    -    connectionStatus_t ret = this->connect(host.c_str(), port.c_str());
    +    props->setProperty(USERPROP_SERVICE_HOST, 
m_pChannel->getEndpoint()->getHost());
         return ret;
     }
     
    -connectionStatus_t DrillClientImpl::connect(const char* host, const char* 
port){
    -    using boost::asio::ip::tcp;
    -    tcp::endpoint endpoint;
    -    try{
    -        tcp::resolver resolver(m_io_service);
    -        tcp::resolver::query query(tcp::v4(), host, port);
    -        tcp::resolver::iterator iter = resolver.resolve(query);
    -        tcp::resolver::iterator end;
    -        while (iter != end){
    -            endpoint = *iter++;
    -            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << endpoint << std::endl;)
    -        }
    -        boost::system::error_code ec;
    -        m_socket.connect(endpoint, ec);
    -        if(ec){
    -            return handleConnError(CONN_FAILURE, 
getMessage(ERR_CONN_FAILURE, host, port, ec.message().c_str()));
    -        }
    -
    -    }catch(const std::exception & e){
    -        // Handle case when the hostname cannot be resolved. "resolve" is 
hard-coded in boost asio resolver.resolve
    -        if (!strcmp(e.what(), "resolve")) {
    -            return handleConnError(CONN_HOSTNAME_RESOLUTION_ERROR, 
getMessage(ERR_CONN_EXCEPT, e.what()));
    +connectionStatus_t DrillClientImpl::connect(const char* host, const char* 
port, DrillUserProperties* props){
    +    if (this->m_bIsConnected || (this->m_pChannelContext!=NULL && 
this->m_pChannel!=NULL)) {
    +        std::string connStr = std::string(host)+":"+std::string(port);
    +        if(!std::strcmp(connStr.c_str(), m_connectStr.c_str())){
    +            // trying to connect to a different address is not allowed if 
already connected
    +            return handleConnError(CONN_ALREADYCONNECTED, 
getMessage(ERR_CONN_ALREADYCONN));
             }
    -        return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_EXCEPT, 
e.what()));
    +        return CONN_SUCCESS;
         }
    -
    -    m_bIsConnected=true;
    -    // set socket keep alive
    -    boost::asio::socket_base::keep_alive keepAlive(true);
    -    m_socket.set_option(keepAlive);
    -    // set no_delay
    -    boost::asio::ip::tcp::no_delay noDelay(true);
    -    m_socket.set_option(noDelay);
    -
    -    std::ostringstream connectedHost;
    -    connectedHost << "id: " << m_socket.native_handle() << " address: " << 
host << ":" << port;
    -    m_connectedHost = connectedHost.str();
    -    DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Connected to endpoint: " << 
m_connectedHost << std::endl;)
    -
    -    return CONN_SUCCESS;
    +    std::string val;
    +    channelType_t type = ( props->isPropSet(USERPROP_USESSL) &&
    +            props->getProp(USERPROP_USESSL, val) =="true") ?
    +        CHANNEL_TYPE_SSLSTREAM :
    +        CHANNEL_TYPE_SOCKET;
    +
    +    connectionStatus_t ret = CONN_SUCCESS;
    +    m_pChannelContext = ChannelContextFactory::getChannelContext(type, 
props);
    +    m_pChannel= ChannelFactory::getChannel(type, m_io_service, host, port);
    +    m_pChannel->init(m_pChannelContext);
    +    ret=m_pChannel->init(m_pChannelContext);
    --- End diff --
    
    Yeah. I wonder why I did that.


---

Reply via email to