[ https://issues.apache.org/jira/browse/DRILL-4313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15181410#comment-15181410 ]
ASF GitHub Bot commented on DRILL-4313: --------------------------------------- Github user sudheeshkatkam commented on a diff in the pull request: https://github.com/apache/drill/pull/396#discussion_r55112213 --- Diff: contrib/native/client/src/clientlib/drillClientImpl.cpp --- @@ -1392,6 +1387,198 @@ void DrillClientQueryResult::clearAndDestroy(){ } } + +connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){ + connectionStatus_t stat = CONN_SUCCESS; + std::string pathToDrill, protocol, hostPortStr; + std::string host; + std::string port; + m_connectStr=connStr; + Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr); + if(!strcmp(protocol.c_str(), "zk")){ + // Get a list of drillbits + ZookeeperImpl zook; + std::vector<std::string> drillbits; + int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits); + if(!err){ + Utils::shuffle(drillbits); + // The original shuffled order is maintained if we shuffle first and then add any missing elements + Utils::add(m_drillbits, drillbits); + exec::DrillbitEndpoint e; + size_t nextIndex=0; + boost::lock_guard<boost::mutex> cLock(m_cMutex); + m_lastConnection++; + nextIndex = (m_lastConnection)%(getDrillbitCount()); + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection" + << "(" << (void*)this << ")" + << ": Current counter is: " + << m_lastConnection << std::endl;) + err=zook.getEndPoint(m_drillbits, nextIndex, e); + if(!err){ + host=boost::lexical_cast<std::string>(e.address()); + port=boost::lexical_cast<std::string>(e.user_port()); + } + } + if(err){ + return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str())); + } + zook.close(); + m_bIsDirectConnection=false; + }else if(!strcmp(protocol.c_str(), "local")){ + char tempStr[MAX_CONNECT_STR+1]; + strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0; + host=strtok(tempStr, ":"); + port=strtok(NULL, ""); + m_bIsDirectConnection=true; + }else{ + return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str())); + } + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled) " << host << ":" << port << std::endl;) + DrillClientImpl* pDrillClientImpl = new DrillClientImpl(); + stat = pDrillClientImpl->connect(host.c_str(), port.c_str()); + if(stat == CONN_SUCCESS){ + m_clientConnections.push_back(pDrillClientImpl); + }else{ + DrillClientError* pErr = pDrillClientImpl->getError(); + handleConnError((connectionStatus_t)pErr->status, pErr->msg); + delete pDrillClientImpl; + } + return stat; +} + +connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties* props){ + // Assume there is one valid connection to at least one drillbit + connectionStatus_t stat=CONN_FAILURE; + // Keep a copy of the user properties + if(props!=NULL){ + m_pUserProperties = new DrillUserProperties; + for(size_t i=0; i<props->size(); i++){ + m_pUserProperties->setProperty( + props->keyAt(i), + props->valueAt(i) + ); + } + } + DrillClientImpl* pDrillClientImpl = getOneConnection(); + if(pDrillClientImpl != NULL){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: (Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;) + stat=pDrillClientImpl->validateHandshake(m_pUserProperties); + } + else{ + stat = handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN)); + } + return stat; +} + +DrillClientQueryResult* PooledDrillClientImpl::SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx){ + DrillClientQueryResult* pDrillClientQueryResult = NULL; + DrillClientImpl* pDrillClientImpl = NULL; + pDrillClientImpl = getOneConnection(); + if(pDrillClientImpl != NULL){ + pDrillClientQueryResult=pDrillClientImpl->SubmitQuery(t,plan,listener,listenerCtx); + m_queriesExecuted++; + } + return pDrillClientQueryResult; +} + +void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult* pQryResult){ + // Nothing to do. If this class ever keeps track of executing queries then it will need + // to implement this call to free any query specific resources the pool might have + // allocated + return; +} + +bool PooledDrillClientImpl::Active(){ + for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ + if((*it)->Active()){ + return true; + } + } + return false; +} + +void PooledDrillClientImpl::Close() { + for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){ + (*it)->Close(); --- End diff -- Can this throw an exception? > C++ client - Improve method of drillbit selection from cluster > -------------------------------------------------------------- > > Key: DRILL-4313 > URL: https://issues.apache.org/jira/browse/DRILL-4313 > Project: Apache Drill > Issue Type: Improvement > Reporter: Parth Chandra > Assignee: Parth Chandra > Fix For: 1.6.0 > > > The current C++ client handles multiple parallel queries over the same > connection, but that creates a bottleneck as the queries get sent to the same > drillbit. > The client can manage this more effectively by choosing from a configurable > pool of connections and round robin queries to them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)