[ 
https://issues.apache.org/jira/browse/DRILL-4313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15181410#comment-15181410
 ] 

ASF GitHub Bot commented on DRILL-4313:
---------------------------------------

Github user sudheeshkatkam commented on a diff in the pull request:

    https://github.com/apache/drill/pull/396#discussion_r55112213
  
    --- Diff: contrib/native/client/src/clientlib/drillClientImpl.cpp ---
    @@ -1392,6 +1387,198 @@ void DrillClientQueryResult::clearAndDestroy(){
         }
     }
     
    +
    +connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){
    +    connectionStatus_t stat = CONN_SUCCESS;
    +    std::string pathToDrill, protocol, hostPortStr;
    +    std::string host;
    +    std::string port;
    +    m_connectStr=connStr;
    +    Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
    +    if(!strcmp(protocol.c_str(), "zk")){
    +        // Get a list of drillbits
    +        ZookeeperImpl zook;
    +        std::vector<std::string> drillbits;
    +        int err = zook.getAllDrillbits(hostPortStr.c_str(), 
pathToDrill.c_str(), drillbits);
    +        if(!err){
    +            Utils::shuffle(drillbits);
    +            // The original shuffled order is maintained if we shuffle 
first and then add any missing elements
    +            Utils::add(m_drillbits, drillbits);
    +            exec::DrillbitEndpoint e;
    +            size_t nextIndex=0;
    +            boost::lock_guard<boost::mutex> cLock(m_cMutex);
    +            m_lastConnection++;
    +            nextIndex = (m_lastConnection)%(getDrillbitCount());
    +            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection"
    +                    << "(" << (void*)this << ")"
    +                    << ": Current counter is: " 
    +                    << m_lastConnection << std::endl;)
    +                err=zook.getEndPoint(m_drillbits, nextIndex, e);
    +            if(!err){
    +                host=boost::lexical_cast<std::string>(e.address());
    +                port=boost::lexical_cast<std::string>(e.user_port());
    +            }
    +        }
    +        if(err){
    +            return handleConnError(CONN_ZOOKEEPER_ERROR, 
getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
    +        }
    +        zook.close();
    +        m_bIsDirectConnection=false;
    +    }else if(!strcmp(protocol.c_str(), "local")){
    +        char tempStr[MAX_CONNECT_STR+1];
    +        strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); 
tempStr[MAX_CONNECT_STR]=0;
    +        host=strtok(tempStr, ":");
    +        port=strtok(NULL, "");
    +        m_bIsDirectConnection=true;
    +    }else{
    +        return handleConnError(CONN_INVALID_INPUT, 
getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
    +    }
    +    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled) 
" << host << ":" << port << std::endl;)
    +        DrillClientImpl* pDrillClientImpl = new DrillClientImpl();
    +    stat =  pDrillClientImpl->connect(host.c_str(), port.c_str());
    +    if(stat == CONN_SUCCESS){
    +        m_clientConnections.push_back(pDrillClientImpl);
    +    }else{
    +        DrillClientError* pErr = pDrillClientImpl->getError();
    +        handleConnError((connectionStatus_t)pErr->status, pErr->msg);
    +        delete pDrillClientImpl;
    +    }
    +    return stat;
    +}
    +
    +connectionStatus_t 
PooledDrillClientImpl::validateHandshake(DrillUserProperties* props){
    +    // Assume there is one valid connection to at least one drillbit
    +    connectionStatus_t stat=CONN_FAILURE;
    +    // Keep a copy of the user properties
    +    if(props!=NULL){
    +        m_pUserProperties = new DrillUserProperties;
    +        for(size_t i=0; i<props->size(); i++){
    +            m_pUserProperties->setProperty(
    +                    props->keyAt(i),
    +                    props->valueAt(i)
    +                    );
    +        }
    +    }
    +    DrillClientImpl* pDrillClientImpl = getOneConnection();
    +    if(pDrillClientImpl != NULL){
    +        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: 
(Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;)
    +        stat=pDrillClientImpl->validateHandshake(m_pUserProperties);
    +    }
    +    else{
    +        stat =  handleConnError(CONN_NOTCONNECTED, 
getMessage(ERR_CONN_NOCONN));
    +    }
    +    return stat;
    +}
    +
    +DrillClientQueryResult* 
PooledDrillClientImpl::SubmitQuery(::exec::shared::QueryType t, const 
std::string& plan, pfnQueryResultsListener listener, void* listenerCtx){
    +    DrillClientQueryResult* pDrillClientQueryResult = NULL;
    +    DrillClientImpl* pDrillClientImpl = NULL;
    +    pDrillClientImpl = getOneConnection();
    +    if(pDrillClientImpl != NULL){
    +        
pDrillClientQueryResult=pDrillClientImpl->SubmitQuery(t,plan,listener,listenerCtx);
    +        m_queriesExecuted++;
    +    }
    +    return pDrillClientQueryResult;
    +}
    +
    +void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult* 
pQryResult){
    +    // Nothing to do. If this class ever keeps track of executing queries 
then it will need 
    +    // to implement this call to free any query specific resources the 
pool might have 
    +    // allocated
    +    return;
    +}
    +
    +bool PooledDrillClientImpl::Active(){
    +    for(std::vector<DrillClientImpl*>::iterator it = 
m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
    +        if((*it)->Active()){
    +            return true;
    +        }
    +    }
    +    return false;
    +}
    +
    +void PooledDrillClientImpl::Close() {
    +    for(std::vector<DrillClientImpl*>::iterator it = 
m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
    +        (*it)->Close();
    --- End diff --
    
    Can this throw an exception?


> C++ client - Improve method of drillbit selection from cluster
> --------------------------------------------------------------
>
>                 Key: DRILL-4313
>                 URL: https://issues.apache.org/jira/browse/DRILL-4313
>             Project: Apache Drill
>          Issue Type: Improvement
>            Reporter: Parth Chandra
>            Assignee: Parth Chandra
>             Fix For: 1.6.0
>
>
> The current C++ client handles multiple parallel queries over the same 
> connection, but that creates a bottleneck as the queries get sent to the same 
> drillbit.
> The client can manage this more effectively by choosing from a configurable 
> pool of connections and round robin queries to them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to